java基于shedlock和数据库实现简单分布式动态定时任务
分布式定时任务调度系统技术文档(附完整代码)
文档说明
概述
在实现分布式定时任务之前,建议优先考虑业界成熟的调度平台PowerJob,它提供了完善的 管理界面、监控告警等功能,可以避免重复造轮子。
代码组织:
-
配置类 – ShedLock 分布式锁配置及达梦数据库适配说明
-
实体类(PO) – Cron任务、单次任务主表、明细表
-
Repository 层 – 数据访问接口(含乐观锁更新)
-
Service 层 – Cron调度器、单次调度器、业务执行器接口
1. 概述与设计原则
1.1 业务背景
-
部署架构:多个个应用节点 + Redis集群
-
数据库:达梦8(信创/国产化要求)
-
核心要求:强一致性、任务不能丢失、支持动态增删改
1.2 核心设计原则
| 原则 | 说明 |
|---|---|
| 数据库唯一真理源 | 所有任务定义、状态、下次执行时间均持久化在数据库中,节点间不直接通信。 |
| 分布式锁 + 乐观锁 | ShedLock 保证同一任务只在一个节点执行;乐观锁(next_fire_time / status)保证状态推进的原子性。 |
| 先执行后更新 | 执行业务后再更新状态或下次执行时间。即使节点宕机,其他节点可重试,任务不丢失。 |
| 本地延迟调度 + 内存窗口 | 每个节点独立调度,只加载未来5分钟内的任务到内存,远期任务不加载,避免内存膨胀。 |
| 定时轮询补偿 | 每30秒扫描未来1分钟窗口,补注册因节点宕机等原因遗漏的任务。 |
| 容错设计 | Cron任务5ms滞后容错;单次任务失败明细直接标记已处理,不重试。 |
1.3 整体架构
text
┌─────────────────────────────────────────────────────────────┐ │ 6个应用节点 │ ├─────────────────────────────────────────────────────────────┤ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │ │ │ 任务加载层 │ │ 本地延迟调度 │ │ 分布式锁+乐观锁 │ │ │ │ 启动+轮询 │ │ ThreadPool │ │ ShedLock │ │ │ └──────────────┘ └──────────────┘ └──────────────────┘ │ │ ┌──────────────────────────────────────────────────────┐ │ │ │ 业务执行器(CronTaskExecutor) │ │ │ └──────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 达梦数据库 │ ├─────────────────────────────────────────────────────────────┤ │ • wb_cron_scheduled_task (Cron任务主表) │ │ • wb_once_scheduled_task (单次任务主表) │ │ • wb_once_task_detail (单次任务明细表) │ │ • shedlock (ShedLock锁表) │ └─────────────────────────────────────────────────────────────┘
2. 配置类
2.1 ShedLock 配置 ShedLockConfig.java
设计说明:
-
@EnableScheduling开启 Spring 定时任务能力(用于轮询补偿)。 -
@EnableSchedulerLock(defaultLockAtMostFor = "10m")启用 ShedLock,默认锁最长持有10分钟(可被具体任务覆盖)。 -
LockProvider使用JdbcTemplateLockProvider,基于数据库表实现分布式锁。 -
表名指定为
shedlock(可根据实际 schema 调整)。 -
usingDbTime()要求数据库支持获取当前时间的函数,达梦需额外适配(见下节)。
java
import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider;
import net.javacrumbs.shedlock.spring.annotation.EnableSchedulerLock;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import javax.sql.DataSource;
@Configuration
@EnableScheduling
@EnableSchedulerLock(defaultLockAtMostFor = "10m")
public class ShedLockConfig {
@Bean
public LockProvider lockProvider(DataSource dataSource) {
return new JdbcTemplateLockProvider(
JdbcTemplateLockProvider.Configuration.builder()
.withJdbcTemplate(new JdbcTemplate(dataSource))
.withTableName("shedlock")
.usingDbTime()
.build()
);
}
}
2.2 达梦数据库适配说明
为什么需要适配? ShedLock 的 usingDbTime() 会调用数据库的当前时间函数(如 MySQL 的 NOW()),但达梦需要使用 SYSTIMESTAMP 和 DATEADD 函数。原生不支持会抛出 UnsupportedOperationException。
解决方案: 在项目中创建同包名覆盖类: net.javacrumbs.shedlock.provider.jdbctemplate.DamengServerTimeStatementsSource
实现达梦特定的 SQL 语句,例如:
-
获取当前时间:
SYSTIMESTAMP -
时间加法:
DATEADD(MICROSECOND, :lockAtMostForMicros, SYSTIMESTAMP)
具体代码参考设计方案中的适配实现(此处不重复贴出)。确保该类被类加载器优先加载。
3. 实体类(PO)
3.1 Cron任务表实体 WbCronScheduledTask.java
设计说明:
-
id:自增主键。 -
cron:Cron 表达式。 -
enabled:是否启用。 -
ref_count:引用计数,支持多个子任务共享同一个 cron 表达式(例如多个模型任务使用相同调度周期)。 -
next_fire_time:下次执行时间,作为乐观锁字段。 -
@PrePersist/@PreUpdate:自动填充创建/更新时间。
java
import lombok.Getter;
import lombok.Setter;
import javax.persistence.*;
import java.time.LocalDateTime;
@Getter
@Setter
@Entity
@Table(name = "wb_cron_scheduled_task")
@org.hibernate.annotations.Table(appliesTo = "wb_cron_scheduled_task", comment = "cron定时任务表")
public class WbCronScheduledTask {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "id", unique = true)
private Integer id;
@Column(name = "name", columnDefinition = "varchar(256) comment '名称' ")
private String name;
@Column(name = "cron", columnDefinition = "varchar(256) comment 'cron表达式' ")
private String cron;
@Column(name = "enabled", columnDefinition = "boolean comment '是否启用 true:是 false:否' ")
private Boolean enabled;
@Column(name = "ref_count", columnDefinition = "int comment '计数器'")
private Integer refCount = 0;
@Column(name = "next_fire_time", columnDefinition = "TIMESTAMP(6) NULL")
private LocalDateTime nextFireTime;
@Column(name = "updated_at", columnDefinition = "TIMESTAMP(6) NULL")
private LocalDateTime updatedAt;
@Column(name = "created_at", columnDefinition = "TIMESTAMP(6) NULL", updatable = false)
private LocalDateTime createdAt;
}
3.2 单次任务主表实体 WbOnceScheduledTask.java
设计说明:
-
task_type:REMINDER(提醒)或 URGE(催办)。 -
scheduled_time:计划执行时间(微秒精度)。 -
status:PENDING / COMPLETED / EXPIRED。 -
唯一约束
(task_type, scheduled_time):保证相同类型和时间的任务只有一个主任务,实现幂等创建。 -
@PrePersist:初始化状态为PENDING,填充时间戳。
java
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;
import javax.persistence.*;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
@Data
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Table(name = "wb_once_scheduled_task", uniqueConstraints = {
@UniqueConstraint(columnNames = {"task_type", "scheduled_time"})
})
@org.hibernate.annotations.Table(appliesTo = "wb_once_scheduled_task", comment = "单次批量任务主表")
public class WbOnceScheduledTask {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "id", unique = true)
private Integer id;
@Column(name = "task_type", nullable = false, columnDefinition = "varchar(256) comment 'taskType' ")
private String taskType;
@Column(name = "scheduled_time", columnDefinition = "TIMESTAMP(6) NULL")
private LocalDateTime scheduledTime;
@Column(name = "status", nullable = false, columnDefinition = "varchar(256) comment '状态' ")
private String status;
@Column(name = "created_at", columnDefinition = "TIMESTAMP(6) NULL", updatable = false)
private LocalDateTime createdAt;
@Column(name = "updated_at", columnDefinition = "TIMESTAMP(6) NULL")
private LocalDateTime updatedAt;
@Transient
private List<WbOnceTaskDetail> details = new ArrayList<>();
@PrePersist
protected void onCreate() {
createdAt = LocalDateTime.now();
updatedAt = LocalDateTime.now();
if (status == null) {
status = "PENDING";
}
}
@PreUpdate
protected void onUpdate() {
updatedAt = LocalDateTime.now();
}
}
3.3 单次任务明细表实体 WbOnceTaskDetail.java
设计说明:
-
task_id:关联主表 ID。 -
personal_schedule_code:日程业务编码。 -
processed:0-未处理,1-已处理(无论成功或失败)。 -
索引:
task_id用于快速查询任务关联的明细;personal_schedule_code用于反查。
java
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;
import javax.persistence.*;
@Data
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Table(name = "wb_once_task_detail", indexes = {
@Index(name = "idx_detail_task_id", columnList = "task_id"),
@Index(name = "idx_detail_schedule_code", columnList = "personal_schedule_code")
})
@org.hibernate.annotations.Table(appliesTo = "wb_once_task_detail", comment = "单次任务与日程关联表")
public class WbOnceTaskDetail {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "id", unique = true)
private Integer id;
@Column(name = "task_id", nullable = false, columnDefinition = "int comment 'taskId'")
private Integer taskId;
@Column(name = "personal_schedule_code", nullable = false, columnDefinition = "varchar(256) comment 'personalScheduleCode' ")
private String personalScheduleCode;
@Column(name = "processed", nullable = false, columnDefinition = "int comment 'processed'")
private Integer processed = 0;
}
4. Repository 层
4.1 Cron任务Repository WbCronScheduledTaskRepository.java
设计说明:
-
findEnabledTasksByNextFireTimeBetween:查询时间窗口内需要执行的启用任务,用于初始化加载和轮询补偿。 -
updateNextFireTime:乐观锁更新,条件next_fire_time = oldNextFire,返回影响行数(0表示乐观锁冲突)。 -
disableTask:禁用任务(用于过期或无效任务)。 -
findByCron:根据 cron 表达式查询任务,用于引用计数管理。
所有 @Modifying 方法都加了 @Transactional,确保在无外层事务时也能正常提交。
java
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
public interface WbCronScheduledTaskRepository extends JpaRepository<WbCronScheduledTask, Integer> {
@Query("SELECT t FROM WbCronScheduledTask t WHERE t.enabled = true AND t.nextFireTime BETWEEN :start AND :end")
List<WbCronScheduledTask> findEnabledTasksByNextFireTimeBetween(@Param("start") LocalDateTime start, @Param("end") LocalDateTime end);
@Modifying
@Transactional(rollbackFor = Exception.class)
@Query("UPDATE WbCronScheduledTask t SET t.nextFireTime = :newNextFire, t.updatedAt = CURRENT_TIMESTAMP " +
"WHERE t.id = :id AND t.nextFireTime = :oldNextFire")
int updateNextFireTime(@Param("id") Integer id,
@Param("oldNextFire") LocalDateTime oldNextFire,
@Param("newNextFire") LocalDateTime newNextFire);
@Modifying
@Transactional(rollbackFor = Exception.class)
@Query("UPDATE WbCronScheduledTask t SET t.enabled = false WHERE t.id = :id")
int disableTask(@Param("id") Integer id);
Optional<WbCronScheduledTask> findByCron(String cron);
@Query("SELECT t FROM WbCronScheduledTask t WHERE t.enabled = true AND t.nextFireTime < :now")
List<WbCronScheduledTask> findEnabledTasksWithNextFireBefore(@Param("now") LocalDateTime now);
}
4.2 单次任务主表Repository WbOnceScheduledTaskRepository.java
设计说明:
-
findPendingTasksInWindow:查询未来1分钟内待执行的PENDING任务,用于轮询补偿。 -
updateStatus:乐观锁更新状态,条件status = oldStatus。 -
findByTaskTypeAndScheduledTimeAndStatus:用于幂等创建,检查是否已存在相同类型和时间的任务。
java
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
public interface WbOnceScheduledTaskRepository extends JpaRepository<WbOnceScheduledTask, Integer>, JpaSpecificationExecutor<WbOnceScheduledTask> {
@Query("SELECT t FROM WbOnceScheduledTask t WHERE t.status = 'PENDING' AND t.scheduledTime BETWEEN :start AND :end")
List<WbOnceScheduledTask> findPendingTasksInWindow(@Param("start") LocalDateTime start, @Param("end") LocalDateTime end);
List<WbOnceScheduledTask> findAllByStatus(String status);
Optional<WbOnceScheduledTask> findById(Long id);
@Modifying
@Transactional(rollbackFor = Exception.class)
@Query("UPDATE WbOnceScheduledTask t SET t.status = :newStatus WHERE t.id = :id AND t.status = :oldStatus")
int updateStatus(@Param("id") Integer id, @Param("oldStatus") String oldStatus, @Param("newStatus") String newStatus);
WbOnceScheduledTask findByTaskTypeAndScheduledTimeAndStatus(String taskType, LocalDateTime scheduledTime, String status);
}
4.3 单次任务明细Repository WbOnceTaskDetailRepository.java
设计说明:
-
findByTaskId:查询任务下所有明细。 -
markProcessed:将明细标记为已处理。 -
existsByTaskIdAndPersonalScheduleCode:避免重复关联同一日程到同一任务。
java
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
public interface WbOnceTaskDetailRepository extends JpaRepository<WbOnceTaskDetail, Integer>, JpaSpecificationExecutor<WbOnceTaskDetail> {
List<WbOnceTaskDetail> findByTaskId(Integer taskId);
@Modifying
@Transactional(rollbackFor = Exception.class)
@Query("UPDATE WbOnceTaskDetail d SET d.processed = 1 WHERE d.id = :id")
int markProcessed(@Param("id") Integer id);
boolean existsByTaskIdAndPersonalScheduleCode(Integer taskId, String personalScheduleCode);
}
5. Service 层
5.1 Cron任务调度器 CronExecutionScheduledTask.java
设计说明:
-
初始化:启动时加载未来5分钟内需执行的 Cron 任务,注册到本地调度器。
-
任务管理:
-
addTask:增加引用计数,若 cron 首次出现则创建任务并计算next_fire_time。 -
removeTask:减少引用计数,归零时删除任务。 -
deleteTask/setTaskEnabled:物理删除或禁用任务。
-
-
本地注册:
registerLocalTask只注册未来5分钟内的任务,超出窗口则暂不注册;过期任务直接禁用。 -
执行流程(
executeTask):-
获取 ShedLock 锁。
-
检查
next_fire_time是否在当前时间 ±5ms 内(容错窗口)。 -
调用业务执行器
CronTaskExecutor.execute。 -
计算下一次执行时间(基于
oldNextFire)。 -
乐观锁更新
next_fire_time。 -
重新注册下次执行。
-
业务异常时仍推进时间,避免卡死。
-
-
轮询补偿:
syncPendingTasks每30秒扫描未来1分钟窗口,补注册遗漏任务(提前5秒避免边界遗漏)。
关键点:
-
使用
ConcurrentHashMap存储本地任务注册表,支持线程安全。 -
锁超时时间可配置(
scheduled.cron.lock-at-most-for)。 -
容错窗口 5ms 防止时钟抖动导致漏执行。
java
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.core.SimpleLock;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronExpression;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
@Slf4j
@Service
@RequiredArgsConstructor
public class CronExecutionScheduledTask {
private final WbCronScheduledTaskRepository taskRepository;
private final LockProvider lockProvider;
private final CronTaskExecutor cronTaskExecutor;
private final ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
private final Map<Integer, ScheduledFuture<?>> localTasks = new ConcurrentHashMap<>();
@Value("${scheduled.cron.lock-at-most-for:30000}")
private long lockAtMostFor;
@Value("${scheduled.cron.lock-at-least-for:5000}")
private long lockAtLeastFor;
private static final long TOLERANCE_NANOS = 5_000_000L;
@PostConstruct
public void init() {
taskScheduler.initialize();
taskScheduler.setPoolSize(10);
taskScheduler.setThreadNamePrefix("cron-task-");
loadPendingTasksInWindow(LocalDateTime.now(), LocalDateTime.now().plusMinutes(5));
compensateOverdueCronTasks();
log.info("Cron调度器初始化完成");
}
private void compensateOverdueCronTasks() {
String lockName = "compensate_overdue_cron_tasks";
LockConfiguration lockConfig = new LockConfiguration(
Instant.now(),
lockName,
Duration.ofSeconds(60),
Duration.ofSeconds(5)
);
Optional<SimpleLock> lockOpt = lockProvider.lock(lockConfig);
if (!lockOpt.isPresent()) {
log.info("其他节点正在执行过期任务补偿,本节点跳过");
return;
}
SimpleLock lock = lockOpt.get();
try {
LocalDateTime now = LocalDateTime.now();
List<WbCronScheduledTask> overdueTasks = taskRepository.findEnabledTasksWithNextFireBefore(now);
if (overdueTasks.isEmpty()) return;
log.info("发现 {} 个过期的Cron任务,开始补偿", overdueTasks.size());
for (WbCronScheduledTask task : overdueTasks) {
LocalDateTime newNext = computeNextFireTime(task.getCron(), now);
if (newNext != null) {
task.setNextFireTime(newNext);
taskRepository.save(task);
log.debug("补偿任务 {},原时间 {},新时间 {}", task.getId(), task.getNextFireTime(), newNext);
// 如果新时间在未来5分钟内,立即注册
if (newNext.isBefore(now.plusMinutes(5))) {
registerLocalTask(task);
}
} else {
taskRepository.disableTask(task.getId());
}
}
log.info("过期Cron任务补偿完成");
} catch (Exception e) {
log.error("补偿过期Cron任务时发生异常", e);
} finally {
lock.unlock();
}
}
private void loadPendingTasksInWindow(LocalDateTime start, LocalDateTime end) {
List<WbCronScheduledTask> tasks = taskRepository.findEnabledTasksByNextFireTimeBetween(start, end);
for (WbCronScheduledTask task : tasks) {
registerLocalTask(task);
}
log.info("加载了 {} 个Cron任务(窗口:{} ~ {})", tasks.size(), start, end);
}
public void addTask(String cronExpression) {
WbCronScheduledTask task = taskRepository.findByCron(cronExpression)
.orElseGet(() -> {
WbCronScheduledTask newTask = new WbCronScheduledTask();
newTask.setName("任务-" + cronExpression);
newTask.setCron(cronExpression);
newTask.setEnabled(true);
newTask.setRefCount(0);
LocalDateTime next = computeNextFireTime(cronExpression, LocalDateTime.now());
newTask.setNextFireTime(next);
return newTask;
});
task.setRefCount(task.getRefCount() + 1);
task.setEnabled(true);
this.saveOrUpdateTask(task);
}
@Transactional
public void saveOrUpdateTask(WbCronScheduledTask task) {
if (task.getEnabled() && task.getNextFireTime() == null) {
LocalDateTime next = computeNextFireTime(task.getCron(), LocalDateTime.now());
task.setNextFireTime(next);
}
taskRepository.save(task);
if (task.getEnabled()) {
registerLocalTask(task);
} else {
cancelLocalTask(task.getId());
}
}
@Transactional
public void deleteTask(Integer id) {
cancelLocalTask(id);
taskRepository.deleteById(id);
}
@Transactional
public void setTaskEnabled(Integer id, boolean enabled) {
WbCronScheduledTask task = taskRepository.findById(id).orElse(null);
if (task != null) {
task.setEnabled(enabled);
taskRepository.save(task);
if (enabled) {
registerLocalTask(task);
} else {
cancelLocalTask(id);
}
}
}
private void registerLocalTask(WbCronScheduledTask task) {
if (!task.getEnabled() || task.getNextFireTime() == null) {
return;
}
// 先取消旧的本地任务(无论新时间是否在窗口内)
cancelLocalTask(task.getId());
LocalDateTime now = LocalDateTime.now();
LocalDateTime nextFire = task.getNextFireTime();
// 只注册未来5分钟内的任务
if (nextFire.isAfter(now.plusMinutes(5))) {
log.info("任务 {} 执行时间较远,暂不注册", task.getId());
return;
}
if (nextFire.plusNanos(TOLERANCE_NANOS).isBefore(now)) {
log.warn("任务 {} 已过期(超出容错窗口),标记为过期", task.getId());
taskRepository.disableTask(task.getId());
return;
}
long delay = now.until(nextFire, ChronoUnit.MILLIS);
ScheduledFuture<?> future = taskScheduler.schedule(
() -> executeTask(task.getId()),
Instant.now().plus(delay, ChronoUnit.MILLIS)
);
localTasks.put(task.getId(), future);
log.info("注册Cron任务 id={}, 延迟={}ms", task.getId(), delay);
}
private void cancelLocalTask(Integer id) {
ScheduledFuture<?> future = localTasks.remove(id);
if (future != null) {
future.cancel(false);
}
}
private void cancelLocalTask(Integer id) {
ScheduledFuture<?> future = localTasks.remove(id);
if (future != null) future.cancel(false);
}
private void executeTask(Integer id) {
String lockName = "cron_task_" + id;
LockConfiguration lockConfig = new LockConfiguration(
Instant.now(), lockName,
Duration.ofMillis(lockAtMostFor),
Duration.ofMillis(lockAtLeastFor)
);
Optional<SimpleLock> lockOpt = lockProvider.lock(lockConfig);
if (!lockOpt.isPresent()) {
log.debug("获取锁失败,任务 {} 由其他节点执行", id);
return;
}
SimpleLock lock = lockOpt.get();
try {
WbCronScheduledTask task = taskRepository.findById(id).orElse(null);
if (task == null || !task.getEnabled()) return;
LocalDateTime oldNextFire = task.getNextFireTime();
LocalDateTime now = LocalDateTime.now();
if (oldNextFire == null || oldNextFire.plusNanos(TOLERANCE_NANOS).isBefore(now)) {
log.info("任务 {} 的 next_fire_time 未到或已过期(超出容错),跳过", id);
return;
}
cronTaskExecutor.execute(task);
LocalDateTime newNextFire = computeNextFireTime(task.getCron(), oldNextFire);
if (newNextFire == null) {
log.error("任务 {} cron表达式无效,禁用任务", id);
taskRepository.disableTask(id);
return;
}
int updated = taskRepository.updateNextFireTime(id, oldNextFire, newNextFire);
if (updated == 0) {
log.warn("乐观锁更新失败,任务 {} 可能已被其他节点修改", id);
taskRepository.findById(id).ifPresent(this::registerLocalTask);
} else {
log.info("任务 {} 执行成功,下次执行时间 {}", id, newNextFire);
task.setNextFireTime(newNextFire);
registerLocalTask(task);
}
} catch (Exception e) {
log.error("执行Cron任务 {} 业务失败,推进next_fire_time避免卡死", id, e);
try {
WbCronScheduledTask task = taskRepository.findById(id).orElse(null);
if (task != null && task.getEnabled()) {
LocalDateTime oldNextFire = task.getNextFireTime();
LocalDateTime newNextFire = computeNextFireTime(task.getCron(), oldNextFire);
if (newNextFire != null) {
int updated = taskRepository.updateNextFireTime(id, oldNextFire, newNextFire);
if (updated > 0) {
log.info("任务 {} 业务失败但已推进到下次执行时间 {}", id, newNextFire);
}
}
}
} catch (Exception ex) {
log.error("推进任务 {} next_fire_time 失败", id, ex);
}
} finally {
lock.unlock();
}
}
private LocalDateTime computeNextFireTime(String cron, LocalDateTime from) {
try {
CronExpression expression = CronExpression.parse(cron);
return expression.next(from);
} catch (Exception e) {
log.error("解析cron表达式失败: {}", cron, e);
return null;
}
}
public void removeTask(String cronExpression) {
taskRepository.findByCron(cronExpression).ifPresent(task -> {
int newCount = task.getRefCount() - 1;
if (newCount <= 0) {
deleteTask(task.getId());
log.info("Cron表达式 {} 已无引用,删除调度", cronExpression);
} else {
task.setRefCount(newCount);
taskRepository.save(task);
log.info("Cron表达式 {} 引用计数减为 {}, 仍保留调度", cronExpression, newCount);
}
});
}
@org.springframework.scheduling.annotation.Scheduled(fixedDelay = 30000)
public void syncPendingTasks() {
LocalDateTime now = LocalDateTime.now();
LocalDateTime start = now.minusSeconds(5);
LocalDateTime windowEnd = now.plusMinutes(1);
List<WbCronScheduledTask> tasks = taskRepository.findEnabledTasksByNextFireTimeBetween(start, windowEnd);
for (WbCronScheduledTask task : tasks) {
if (!localTasks.containsKey(task.getId())) {
registerLocalTask(task);
log.debug("轮询补偿注册Cron任务 id={}", task.getId());
}
}
}
}
5.2 单次任务调度器 OnceScheduleDelayTask.java
设计说明:
-
任务创建:
addTask(PersonalScheduleInformation)根据日程的开始/结束时间,生成提醒(提前5分钟)和催办(根据提前量)任务。 -
幂等追加:
createOnceTask先查询是否存在相同(taskType, scheduledTime)的PENDING任务,若不存在则创建新主任务;若存在则复用,并检查明细是否已关联,避免重复。 -
本地注册:
registerLocalTask只注册未来5分钟内的任务,过期任务标记为EXPIRED。 -
执行流程(
executeTask):-
获取 ShedLock 锁。
-
检查任务状态是否为
PENDING。 -
遍历所有明细,调用
executeBusiness处理每个日程。 -
无论成功或失败,明细都标记为
processed=1(不重试)。 -
乐观锁更新主任务状态为
COMPLETED。
-
-
轮询补偿:同 Cron 任务。
业务说明:
-
提醒时间 = 开始时间 - 5分钟。
-
催办时间 = 结束时间 - 提前量(分钟),多个提前量去重。
-
发送通知失败时直接标记明细已处理,不重试,主任务仍然完成。
java
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.core.SimpleLock;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
/**
* 个人日程延迟任务管理(一体化分布式调度)
* 包含:任务创建、本地延迟调度、分布式锁、定时轮询、业务执行
*
* @author
* @date 2024/1/24 8:53
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class OnceScheduleDelayTask {
// 数据库操作
private final WbOnceScheduledTaskRepository onceTaskRepository;
private final WbOnceTaskDetailRepository onceTaskDetailRepository;
// 分布式锁 需实现返回 Optional<SimpleLock>
private final LockProvider lockProvider;
// 消息服务
private final MsNotificationDataService messageNotificationDataService;
// 本地延迟调度器
private final ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
// 本地任务注册表:taskId -> ScheduledFuture
private final Map<Integer, ScheduledFuture<?>> localTasks = new ConcurrentHashMap<>();
// 锁参数(毫秒)
@Value("${scheduled.once.lock-at-most-for:30000}")
private long lockAtMostFor;
@Value("${scheduled.once.lock-at-least-for:5000}")
private long lockAtLeastFor;
// ==================== 初始化 ====================
@PostConstruct
public void init() {
taskScheduler.initialize();
taskScheduler.setPoolSize(10);
taskScheduler.setThreadNamePrefix("personal-schedule-");
// 改为只加载未来5分钟内要执行的任务
loadPendingTasksInWindow(LocalDateTime.now(), LocalDateTime.now().plusMinutes(5));
log.info("PersonalScheduleDelayTask 初始化完成");
}
/**
* 只加载指定时间窗口内的 PENDING 任务
*/
private void loadPendingTasksInWindow(LocalDateTime start, LocalDateTime end) {
List<WbOnceScheduledTask> pendingTasks = onceTaskRepository.findPendingTasksInWindow(start, end);
for (WbOnceScheduledTask task : pendingTasks) {
registerLocalTask(task);
}
log.info("加载了 {} 个待执行任务(时间窗口:{} ~ {})", pendingTasks.size(), start, end);
}
/**
* 启动时加载所有状态为 PENDING 的任务,注册到本地调度器
*/
private void loadAllPendingTasks() {
List<WbOnceScheduledTask> pendingTasks = onceTaskRepository.findAllByStatus("PENDING");
for (WbOnceScheduledTask task : pendingTasks) {
registerLocalTask(task);
}
log.info("加载了 {} 个待执行任务", pendingTasks.size());
}
// ==================== 任务创建(对外接口) ====================
/**
* 为单个日程创建所有延迟任务(提醒 + 多个催办)
* 这是原有 addTask 方法的替代实现
*/
@Transactional
public void addTask(ScheduleInformation schedule) {
String code = schedule.getCode();
String startTime = schedule.getStartTime();
String endTime = schedule.getEndTime();
Long callAdvanceTime1 = schedule.getCallAdvanceTime1();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
// 1. 提醒:开始时间 - 5分钟
Date startDate = sdf.parse(startTime);
long remindTime = startDate.getTime() - 5 * 60 * 1000;
if (System.currentTimeMillis() <= remindTime) {
LocalDateTime remindLdt = LocalDateTime.ofInstant(new Date(remindTime).toInstant(), ZoneId.systemDefault());
createOnceTask(code, "REMINDER", remindLdt);
}
// 2. 催办:结束时间 - 提前量(去重)
Date endDate = sdf.parse(endTime);
long endTimeMs = endDate.getTime();
Set<Long> advances = new HashSet<>();
if (callAdvanceTime1 != null) {
advances.add(callAdvanceTime1);
}
for (Long advance : advances) {
long urgeTime = endTimeMs - advance * 60 * 1000;
if (System.currentTimeMillis() <= urgeTime) {
LocalDateTime urgeLdt = LocalDateTime.ofInstant(new Date(urgeTime).toInstant(), ZoneId.systemDefault());
createOnceTask(code, "URGE", urgeLdt);
}
}
} catch (Exception e) {
log.error("创建日程任务失败,code={}", code, e);
}
}
/**
* 创建或追加单次任务(一个任务可对应多个日程编码)
* 幂等性:相同 taskType + scheduledTime 的任务只会创建一次,后续调用只追加明细
*/
private void createOnceTask(String scheduleCode, String taskType, LocalDateTime scheduledTime) {
// 1. 查找是否存在相同类型和时间的未完成任务
WbOnceScheduledTask task = onceTaskRepository
.findByTaskTypeAndScheduledTimeAndStatus(taskType, scheduledTime, "PENDING");
if (task == null) {
// 2. 不存在则创建新任务
task = new WbOnceScheduledTask();
task.setTaskType(taskType);
task.setScheduledTime(scheduledTime);
task.setStatus("PENDING");
onceTaskRepository.save(task);
log.info("创建新的批量任务:类型={}, 时间={}", taskType, scheduledTime);
} else {
log.debug("复用已存在的批量任务:id={}, 类型={}, 时间={}", task.getId(), taskType, scheduledTime);
}
// 3. 检查该日程是否已关联到此任务(避免重复关联)
boolean alreadyExists = onceTaskDetailRepository.existsByTaskIdAndPersonalScheduleCode(task.getId(), scheduleCode);
if (!alreadyExists) {
WbOnceTaskDetail detail = new WbOnceTaskDetail();
detail.setTaskId(task.getId());
detail.setPersonalScheduleCode(scheduleCode);
detail.setProcessed(0);
onceTaskDetailRepository.save(detail);
log.debug("添加日程 {} 到任务 {}", scheduleCode, task.getId());
} else {
log.debug("日程 {} 已存在于任务 {},跳过", scheduleCode, task.getId());
}
// 4. 注册到本地调度器(如果还没注册)
registerLocalTask(task);
}
// ==================== 本地调度注册 ====================
/**
* 注册任务到本地延迟调度器
*/
private void registerLocalTask(WbOnceScheduledTask task) {
LocalDateTime now = LocalDateTime.now();
// 只注册未来5分钟内要执行的任务
if (task.getScheduledTime().isAfter(now.plusMinutes(5))) {
log.debug("任务执行时间超过5分钟,暂不注册:id={}, 时间={}", task.getId(), task.getScheduledTime());
return;
}
// 如果任务已过期,标记并返回
if (task.getScheduledTime().isBefore(now)) {
log.warn("任务已过期,不注册:id={}, 时间={}", task.getId(), task.getScheduledTime());
onceTaskRepository.updateStatus(task.getId(), "PENDING", "EXPIRED");
return;
}
cancelLocalTask(task.getId());
long delay = now.until(task.getScheduledTime(), ChronoUnit.MILLIS);
ScheduledFuture<?> future = taskScheduler.schedule(
() -> executeTask(task.getId()),
Instant.now().plus(delay, ChronoUnit.MILLIS)
);
localTasks.put(task.getId(), future);
log.debug("注册任务 id={}, 延迟={}ms", task.getId(), delay);
}
private void cancelLocalTask(Integer taskId) {
ScheduledFuture<?> future = localTasks.remove(taskId);
if (future != null) {
future.cancel(false);
}
}
// ==================== 任务执行(分布式锁 + 业务) ====================
/**
* 执行单个任务(由本地调度器触发)
*/
private void executeTask(Integer taskId) {
String lockName = "once_task_" + taskId;
LockConfiguration lockConfig = new LockConfiguration(
Instant.now(),
lockName,
Duration.ofMillis(lockAtMostFor),
Duration.ofMillis(lockAtLeastFor)
);
Optional<SimpleLock> lockOpt = lockProvider.lock(lockConfig);
if (!lockOpt.isPresent()) {
log.debug("获取锁失败,任务 {} 由其他节点执行", taskId);
return;
}
SimpleLock lock = lockOpt.get();
try {
// 再次检查任务状态(乐观锁)
WbOnceScheduledTask task = onceTaskRepository.findById(taskId).orElse(null);
if (task == null || !"PENDING".equals(task.getStatus())) {
log.info("任务 {} 状态不是 PENDING,跳过", taskId);
return;
}
// 执行具体业务:处理所有关联的日程
executeBusiness(task);
// 更新主任务状态为 COMPLETED
int updated = onceTaskRepository.updateStatus(taskId, "PENDING", "COMPLETED");
if (updated == 0) {
log.warn("乐观锁更新失败,任务 {} 可能已被其他节点完成", taskId);
} else {
localTasks.remove(taskId);
log.info("任务 {} 执行成功", taskId);
}
} catch (Exception e) {
log.error("执行任务 {} 失败", taskId, e);
// 保持 PENDING,等待下次轮询重试
} finally {
lock.unlock();
}
}
/**
* 业务逻辑:遍历任务关联的所有日程,发送通知
*/
private void executeBusiness(WbOnceScheduledTask task) {
List<WbOnceTaskDetail> details = onceTaskDetailRepository.findByTaskId(task.getId());
for (WbOnceTaskDetail detail : details) {
if (detail.getProcessed() == 1) {
continue;
}
//真实业务代码
}
}
// ==================== 定时轮询(补偿机制) ====================
/**
* 每30秒轮询一次,补偿因节点宕机等原因未注册的任务
*/
@Scheduled(fixedDelay = 30000)
public void syncPendingTasks() {
LocalDateTime now = LocalDateTime.now();
LocalDateTime windowEnd = now.plusMinutes(1);
List<WbOnceScheduledTask> pendingTasks = onceTaskRepository.findPendingTasksInWindow(now, windowEnd);
for (WbOnceScheduledTask task : pendingTasks) {
if (!localTasks.containsKey(task.getId())) {
registerLocalTask(task);
log.debug("轮询补偿注册任务 id={}", task.getId());
}
}
}
}
5.3 业务执行器接口 CronTaskExecutor.java
设计说明:
-
用户需自行实现该接口,将具体的业务逻辑(如模型数据抽取、计算、推送等)注入到调度器中。
-
调度器在 Cron 任务触发时会调用
execute方法。
java
public interface CronTaskExecutor {
void execute(WbCronScheduledTask task);
}
实现示例: 在
ModelTaskExecutionScheduledTask原业务类中实现该接口,根据task.getCron()查询所有使用该 cron 表达式的任务周期,然后执行demoss等原有逻辑。
6. 使用示例
6.1 添加 Cron 任务
java
@Autowired
private ModelTaskExecutionScheduledTask cronTaskService;
// 添加一个每分钟执行一次的任务
cronTaskService.addTask("0 * * * * ?");
// 删除(引用计数减1,归零时删除调度)
cronTaskService.removeTask("0 * * * * ?");
// 禁用任务
cronTaskService.setTaskEnabled(taskId, false);
6.2 添加单次任务
java
@Autowired private PersonalScheduleDelayTask onceTaskService; // 假设从数据库或 API 获取 PersonalScheduleInformation 对象 PersonalScheduleInformation schedule = ...; onceTaskService.addTask(schedule); // 系统自动根据日程的开始/结束时间生成提醒和催办任务
6.3 实现 Cron 业务执行器
java
@Component
public class MyCronTaskExecutor implements CronTaskExecutor {
@Override
public void execute(WbCronScheduledTask task) {
String cron = task.getCron();
// 查询使用该 cron 的所有模型任务周期
List<DataDrivenModelTaskCycle> cycles = modelTaskCycleRepository.findAllByCronExpressionAndState(cron, "1");
// 执行业务逻辑(如数据抽取、计算)
demoss(...);
}
}
7. 参数调优与监控
7.1 关键配置参数(application.yml)
yaml
scheduled: cron: lock-at-most-for: 30000 # Cron任务锁最长持有时间(毫秒),建议 = P99执行时间 × 2 + 5秒 lock-at-least-for: 5000 # 最短持有时间,建议 = 节点间最大时钟差 × 2 + 1秒 once: lock-at-most-for: 30000 lock-at-least-for: 5000
7.2 调优公式
| 参数 | 推荐值 | 说明 |
|---|---|---|
lockAtMostFor |
30秒 | 根据业务执行时间的 P99 值调整,避免锁提前释放 |
lockAtLeastFor |
5秒 | 保证网络延迟或时钟抖动下锁仍有效 |
| 内存窗口 | 5分钟 | 任务执行时间较远时不加载,减少内存占用 |
| 轮询间隔 | 30秒 | 平衡实时性和数据库压力 |
| Cron容错窗口 | 5毫秒 | 避免时钟抖动导致漏执行 |
7.3 监控建议
-
锁获取失败次数:
shedlock.lock.error(可通过日志或 Micrometer 统计) -
任务执行耗时、成功/失败率
-
调度延迟:实际触发时间与计划时间的差值
-
乐观锁更新失败次数(日志关键字 “乐观锁更新失败”)
8. 已知限制与注意事项
-
单次任务不重试:失败明细直接标记为已处理,仅记录日志,不会重试。
-
Cron任务业务异常会推进时间:避免卡死,但业务失败不重试,需业务层保证幂等。
-
主键类型:所有表主键均为
Integer(自增),Repository 方法参数类型需匹配。 -
达梦适配:必须完成
DamengServerTimeStatementsSource同包名覆盖,否则 ShedLock 无法获取数据库时间。 -
锁表清理:
shedlock表数据不会自动清理,可定期执行:sql
DELETE FROM shedlock WHERE lock_until < SYSTIMESTAMP - INTERVAL '7' DAY;
-
调度器线程池:默认
poolSize=10,可根据任务数量调整。
9. 总结
本系统实现了分布式环境下 Cron 任务和单次任务的高可靠调度,具备以下特点:
-
✅ 强一致性(ShedLock 分布式锁 + 乐观锁)
-
✅ 任务不丢失(先执行后更新)
-
✅ 节点宕机恢复(锁超时 + 轮询补偿)
-
✅ 内存可控(5分钟窗口)
-
✅ 国产化数据库适配(达梦)
-
✅ 动态增删启停任务(支持引用计数)
-
✅ 代码简洁,无独立调度中心
附录
10.1 达梦shedlock建表语句
CREATE TABLE "shedlock"
(
"name" VARCHAR(64) NOT NULL,
"lock_until" TIMESTAMP(6) NOT NULL,
"locked_at" TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP
NOT NULL,
"locked_by" VARCHAR(255) NOT NULL
);
ALTER TABLE "shedlock" ADD CONSTRAINT NOT CLUSTER PRIMARY KEY("name") ;
10.2 shedlcok达梦方言适配
package net.javacrumbs.shedlock.provider.jdbctemplate;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.support.annotation.NonNull;
import java.util.HashMap;
import java.util.Map;
/**
* sf
*/
class DamengServerTimeStatementsSource extends SqlStatementsSource {
// 1. 获取当前时间:使用 SYSDATE
// private static final String now = "SYSDATE";
// 2. 时间运算:使用 DATEADD 函数,单位 MICROSECOND
// private static final String lockAtMostFor = "DATEADD(MICROSECOND, :lockAtMostForMicros, " + now + ")";
private static final String now = "SYSTIMESTAMP";
private static final String lockAtMostFor = "DATEADD(MICROSECOND, :lockAtMostForMicros, " + now + ")";
DamengServerTimeStatementsSource(JdbcTemplateLockProvider.Configuration configuration) {
super(configuration);
}
@Override
String getInsertStatement() {
// 3. "插入或更新"逻辑:使用 MERGE INTO 语句
return "MERGE INTO " + tableName() + " t USING (SELECT :name AS name) s ON (t." + name() + " = s.name) " +
"WHEN NOT MATCHED THEN INSERT (" + name() + ", " + lockUntil() + ", " + lockedAt() + ", " + lockedBy() + ") " +
"VALUES (s.name, " + lockAtMostFor + ", " + now + ", :lockedBy)";
}
@Override
public String getUpdateStatement() {
return "UPDATE " + tableName() + " SET " + lockUntil() + " = " + lockAtMostFor + ", " +
lockedAt() + " = " + now + ", " + lockedBy() + " = :lockedBy " +
"WHERE " + name() + " = :name AND " + lockUntil() + " <= " + now;
}
@Override
public String getUnlockStatement() {
String lockAtLeastFor = "DATEADD(MICROSECOND, :lockAtLeastForMicros, " + lockedAt() + ")";
// 4. 条件更新:使用 CASE WHEN 表达式
return "UPDATE " + tableName() + " SET " + lockUntil() + " = " +
"CASE WHEN " + lockAtLeastFor + " > " + now + " THEN " + lockAtLeastFor + " ELSE " + now + " END " +
"WHERE " + name() + " = :name AND " + lockedBy() + " = :lockedBy";
}
@Override
public String getExtendStatement() {
return "UPDATE " + tableName() + " SET " + lockUntil() + " = " + lockAtMostFor + " " +
"WHERE " + name() + " = :name AND " + lockedBy() + " = :lockedBy AND " + lockUntil() + " > " + now;
}
@Override
@NonNull
Map<String, Object> params(@NonNull LockConfiguration lockConfiguration) {
Map<String, Object> params = new HashMap<>();
params.put("name", lockConfiguration.getName());
params.put("lockedBy", configuration.getLockedByValue());
params.put("lockAtMostForMicros", lockConfiguration.getLockAtMostFor().toNanos() / 1_000);
params.put("lockAtLeastForMicros", lockConfiguration.getLockAtLeastFor().toNanos() / 1_000);
return params;
}
}
/**
* Copyright 2009 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.javacrumbs.shedlock.provider.jdbctemplate;
import net.javacrumbs.shedlock.core.ClockProvider;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider.Configuration;
import net.javacrumbs.shedlock.support.annotation.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.ConnectionCallback;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
class SqlStatementsSource {
protected final Configuration configuration;
private static final Logger logger = LoggerFactory.getLogger(SqlStatementsSource.class);
SqlStatementsSource(Configuration configuration) {
this.configuration = configuration;
}
private static final String POSTGRESQL = "postgresql";
private static final String MSSQL = "microsoft sql server";
private static final String ORACLE = "oracle";
private static final String MYSQL = "mysql";
private static final String MARIADB = "mariadb";
private static final String HSQL = "hsql database engine";
private static final String H2 = "h2";
// 新增达梦数据库标识(产品名可能为 "DM DBMS" 或 "Dameng")
private static final String DAMENG = "dm";
private static final String DAMENG_DBMS = "dm dbms";
private static final String DAMENG_FULL = "dameng";
static SqlStatementsSource create(Configuration configuration) {
String databaseProductName = getDatabaseProductName(configuration).toLowerCase();
if (configuration.getUseDbTime()) {
switch (databaseProductName) {
case POSTGRESQL:
logger.debug("Using PostgresSqlServerTimeStatementsSource");
return new PostgresSqlServerTimeStatementsSource(configuration);
case MSSQL:
logger.debug("Using MsSqlServerTimeStatementsSource");
return new MsSqlServerTimeStatementsSource(configuration);
case ORACLE:
logger.debug("Using OracleServerTimeStatementsSource");
return new OracleServerTimeStatementsSource(configuration);
case MYSQL:
logger.debug("Using MySqlServerTimeStatementsSource");
return new MySqlServerTimeStatementsSource(configuration);
case MARIADB:
logger.debug("Using MySqlServerTimeStatementsSource (for MariaDB)");
return new MySqlServerTimeStatementsSource(configuration);
case HSQL:
logger.debug("Using HsqlServerTimeStatementsSource");
return new HsqlServerTimeStatementsSource(configuration);
case H2:
logger.debug("Using H2ServerTimeStatementsSource");
return new H2ServerTimeStatementsSource(configuration);
// ========== 新增对达梦数据库的支持 ==========
case DAMENG:
case DAMENG_DBMS:
case DAMENG_FULL:
logger.debug("Using DamengServerTimeStatementsSource");
return new DamengServerTimeStatementsSource(configuration);
// ==========================================
default:
if (databaseProductName.startsWith("db2")) {
logger.debug("Using Db2ServerTimeStatementsSource");
return new Db2ServerTimeStatementsSource(configuration);
}
throw new UnsupportedOperationException("DB time is not supported for '" + databaseProductName + "'");
}
} else {
if (POSTGRESQL.equals(databaseProductName)) {
logger.debug("Using PostgresSqlServerTimeStatementsSource");
return new PostgresSqlStatementsSource(configuration);
} else {
logger.debug("Using SqlStatementsSource");
return new SqlStatementsSource(configuration);
}
}
}
private static String getDatabaseProductName(Configuration configuration) {
try {
return configuration.getJdbcTemplate().execute((ConnectionCallback<String>) connection -> connection.getMetaData().getDatabaseProductName());
} catch (Exception e) {
logger.debug("Can not determine database product name " + e.getMessage());
return "Unknown";
}
}
@NonNull
Map<String, Object> params(@NonNull LockConfiguration lockConfiguration) {
Map<String, Object> params = new HashMap<>();
params.put("name", lockConfiguration.getName());
params.put("lockUntil", timestamp(lockConfiguration.getLockAtMostUntil()));
params.put("now", timestamp(ClockProvider.now()));
params.put("lockedBy", configuration.getLockedByValue());
params.put("unlockTime", timestamp(lockConfiguration.getUnlockTime()));
return params;
}
@NonNull
private Object timestamp(Instant time) {
TimeZone timeZone = configuration.getTimeZone();
if (timeZone == null) {
return Timestamp.from(time);
} else {
Calendar calendar = Calendar.getInstance();
calendar.setTime(Date.from(time));
calendar.setTimeZone(timeZone);
return calendar;
}
}
String getInsertStatement() {
return "INSERT INTO " + tableName() + "(" + name() + ", " + lockUntil() + ", " + lockedAt() + ", " + lockedBy() + ") VALUES(:name, :lockUntil, :now, :lockedBy)";
}
public String getUpdateStatement() {
return "UPDATE " + tableName() + " SET " + lockUntil() + " = :lockUntil, " + lockedAt() + " = :now, " + lockedBy() + " = :lockedBy WHERE " + name() + " = :name AND " + lockUntil() + " <= :now";
}
public String getExtendStatement() {
return "UPDATE " + tableName() + " SET " + lockUntil() + " = :lockUntil WHERE " + name() + " = :name AND " + lockedBy() + " = :lockedBy AND " + lockUntil() + " > :now";
}
public String getUnlockStatement() {
return "UPDATE " + tableName() + " SET " + lockUntil() + " = :unlockTime WHERE " + name() + " = :name";
}
String name() {
return configuration.getColumnNames().getName();
}
String lockUntil() {
return configuration.getColumnNames().getLockUntil();
}
String lockedAt() {
return configuration.getColumnNames().getLockedAt();
}
String lockedBy() {
return configuration.getColumnNames().getLockedBy();
}
String tableName() {
return configuration.getTableName();
}
}
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐




所有评论(0)