分布式定时任务调度系统技术文档(附完整代码)

文档说明

概述

在实现分布式定时任务之前,建议优先考虑业界成熟的调度平台PowerJob,它提供了完善的 管理界面、监控告警等功能,可以避免重复造轮子。

代码组织:

  1. 配置类 – ShedLock 分布式锁配置及达梦数据库适配说明

  2. 实体类(PO) – Cron任务、单次任务主表、明细表

  3. Repository 层 – 数据访问接口(含乐观锁更新)

  4. Service 层 – Cron调度器、单次调度器、业务执行器接口


1. 概述与设计原则

1.1 业务背景

  • 部署架构:多个个应用节点 + Redis集群

  • 数据库:达梦8(信创/国产化要求)

  • 核心要求:强一致性、任务不能丢失、支持动态增删改

1.2 核心设计原则

原则 说明
数据库唯一真理源 所有任务定义、状态、下次执行时间均持久化在数据库中,节点间不直接通信。
分布式锁 + 乐观锁 ShedLock 保证同一任务只在一个节点执行;乐观锁(next_fire_time / status)保证状态推进的原子性。
先执行后更新 执行业务后再更新状态或下次执行时间。即使节点宕机,其他节点可重试,任务不丢失。
本地延迟调度 + 内存窗口 每个节点独立调度,只加载未来5分钟内的任务到内存,远期任务不加载,避免内存膨胀。
定时轮询补偿 每30秒扫描未来1分钟窗口,补注册因节点宕机等原因遗漏的任务。
容错设计 Cron任务5ms滞后容错;单次任务失败明细直接标记已处理,不重试。

1.3 整体架构

text

┌─────────────────────────────────────────────────────────────┐
│                        6个应用节点                            │
├─────────────────────────────────────────────────────────────┤
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────────┐  │
│  │ 任务加载层    │  │ 本地延迟调度  │  │ 分布式锁+乐观锁  │  │
│  │ 启动+轮询    │  │ ThreadPool   │  │   ShedLock       │  │
│  └──────────────┘  └──────────────┘  └──────────────────┘  │
│  ┌──────────────────────────────────────────────────────┐  │
│  │              业务执行器(CronTaskExecutor)           │  │
│  └──────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                      达梦数据库                              │
├─────────────────────────────────────────────────────────────┤
│  • wb_cron_scheduled_task   (Cron任务主表)                 │
│  • wb_once_scheduled_task   (单次任务主表)                 │
│  • wb_once_task_detail      (单次任务明细表)               │
│  • shedlock  (ShedLock锁表)                │
└─────────────────────────────────────────────────────────────┘


2. 配置类

2.1 ShedLock 配置 ShedLockConfig.java

设计说明:

  • @EnableScheduling 开启 Spring 定时任务能力(用于轮询补偿)。

  • @EnableSchedulerLock(defaultLockAtMostFor = "10m") 启用 ShedLock,默认锁最长持有10分钟(可被具体任务覆盖)。

  • LockProvider 使用 JdbcTemplateLockProvider,基于数据库表实现分布式锁。

  • 表名指定为 shedlock(可根据实际 schema 调整)。

  • usingDbTime() 要求数据库支持获取当前时间的函数,达梦需额外适配(见下节)。

java

import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider;
import net.javacrumbs.shedlock.spring.annotation.EnableSchedulerLock;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
​
import javax.sql.DataSource;
​
@Configuration
@EnableScheduling
@EnableSchedulerLock(defaultLockAtMostFor = "10m")
public class ShedLockConfig {
    @Bean
    public LockProvider lockProvider(DataSource dataSource) {
        return new JdbcTemplateLockProvider(
                JdbcTemplateLockProvider.Configuration.builder()
                        .withJdbcTemplate(new JdbcTemplate(dataSource))
                        .withTableName("shedlock")
                         .usingDbTime()
                        .build()
        );
    }
}

2.2 达梦数据库适配说明

为什么需要适配? ShedLock 的 usingDbTime() 会调用数据库的当前时间函数(如 MySQL 的 NOW()),但达梦需要使用 SYSTIMESTAMPDATEADD 函数。原生不支持会抛出 UnsupportedOperationException

解决方案: 在项目中创建同包名覆盖类: net.javacrumbs.shedlock.provider.jdbctemplate.DamengServerTimeStatementsSource

实现达梦特定的 SQL 语句,例如:

  • 获取当前时间:SYSTIMESTAMP

  • 时间加法:DATEADD(MICROSECOND, :lockAtMostForMicros, SYSTIMESTAMP)

具体代码参考设计方案中的适配实现(此处不重复贴出)。确保该类被类加载器优先加载。


3. 实体类(PO)

3.1 Cron任务表实体 WbCronScheduledTask.java

设计说明:

  • id:自增主键。

  • cron:Cron 表达式。

  • enabled:是否启用。

  • ref_count:引用计数,支持多个子任务共享同一个 cron 表达式(例如多个模型任务使用相同调度周期)。

  • next_fire_time:下次执行时间,作为乐观锁字段。

  • @PrePersist / @PreUpdate:自动填充创建/更新时间。

java

import lombok.Getter;
import lombok.Setter;
​
import javax.persistence.*;
import java.time.LocalDateTime;
​
@Getter
@Setter
@Entity
@Table(name = "wb_cron_scheduled_task")
@org.hibernate.annotations.Table(appliesTo = "wb_cron_scheduled_task", comment = "cron定时任务表")
public class WbCronScheduledTask {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name = "id", unique = true)
    private Integer id;
​
    @Column(name = "name", columnDefinition = "varchar(256) comment '名称' ")
    private String name;
​
    @Column(name = "cron", columnDefinition = "varchar(256) comment 'cron表达式' ")
    private String cron;
​
    @Column(name = "enabled", columnDefinition = "boolean comment '是否启用 true:是 false:否' ")
    private Boolean enabled;
​
    @Column(name = "ref_count", columnDefinition = "int comment '计数器'")
    private Integer refCount = 0;
​
    @Column(name = "next_fire_time", columnDefinition = "TIMESTAMP(6) NULL")
    private LocalDateTime nextFireTime;
​
    @Column(name = "updated_at", columnDefinition = "TIMESTAMP(6) NULL")
    private LocalDateTime updatedAt;
​
    @Column(name = "created_at", columnDefinition = "TIMESTAMP(6) NULL", updatable = false)
    private LocalDateTime createdAt;
}

3.2 单次任务主表实体 WbOnceScheduledTask.java

设计说明:

  • task_type:REMINDER(提醒)或 URGE(催办)。

  • scheduled_time:计划执行时间(微秒精度)。

  • status:PENDING / COMPLETED / EXPIRED。

  • 唯一约束 (task_type, scheduled_time):保证相同类型和时间的任务只有一个主任务,实现幂等创建。

  • @PrePersist:初始化状态为 PENDING,填充时间戳。

java

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;
​
import javax.persistence.*;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
​
@Data
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Table(name = "wb_once_scheduled_task", uniqueConstraints = {
        @UniqueConstraint(columnNames = {"task_type", "scheduled_time"})
})
@org.hibernate.annotations.Table(appliesTo = "wb_once_scheduled_task", comment = "单次批量任务主表")
public class WbOnceScheduledTask {
​
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name = "id", unique = true)
    private Integer id;
​
    @Column(name = "task_type", nullable = false, columnDefinition = "varchar(256) comment 'taskType' ")
    private String taskType;
​
    @Column(name = "scheduled_time", columnDefinition = "TIMESTAMP(6)  NULL")
    private LocalDateTime scheduledTime;
​
    @Column(name = "status", nullable = false, columnDefinition = "varchar(256) comment '状态' ")
    private String status;
​
    @Column(name = "created_at", columnDefinition = "TIMESTAMP(6) NULL", updatable = false)
    private LocalDateTime createdAt;
​
    @Column(name = "updated_at", columnDefinition = "TIMESTAMP(6) NULL")
    private LocalDateTime updatedAt;
​
    @Transient
    private List<WbOnceTaskDetail> details = new ArrayList<>();
​
    @PrePersist
    protected void onCreate() {
        createdAt = LocalDateTime.now();
        updatedAt = LocalDateTime.now();
        if (status == null) {
            status = "PENDING";
        }
    }
​
    @PreUpdate
    protected void onUpdate() {
        updatedAt = LocalDateTime.now();
    }
}

3.3 单次任务明细表实体 WbOnceTaskDetail.java

设计说明:

  • task_id:关联主表 ID。

  • personal_schedule_code:日程业务编码。

  • processed:0-未处理,1-已处理(无论成功或失败)。

  • 索引:task_id 用于快速查询任务关联的明细;personal_schedule_code 用于反查。

java

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;
​
import javax.persistence.*;
​
@Data
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Table(name = "wb_once_task_detail", indexes = {
        @Index(name = "idx_detail_task_id", columnList = "task_id"),
        @Index(name = "idx_detail_schedule_code", columnList = "personal_schedule_code")
})
@org.hibernate.annotations.Table(appliesTo = "wb_once_task_detail", comment = "单次任务与日程关联表")
public class WbOnceTaskDetail {
​
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name = "id", unique = true)
    private Integer id;
​
    @Column(name = "task_id", nullable = false, columnDefinition = "int comment 'taskId'")
    private Integer taskId;
​
    @Column(name = "personal_schedule_code", nullable = false, columnDefinition = "varchar(256) comment 'personalScheduleCode' ")
    private String personalScheduleCode;
​
    @Column(name = "processed", nullable = false, columnDefinition = "int comment 'processed'")
    private Integer processed = 0;
}


4. Repository 层

4.1 Cron任务Repository WbCronScheduledTaskRepository.java

设计说明:

  • findEnabledTasksByNextFireTimeBetween:查询时间窗口内需要执行的启用任务,用于初始化加载和轮询补偿。

  • updateNextFireTime:乐观锁更新,条件 next_fire_time = oldNextFire,返回影响行数(0表示乐观锁冲突)。

  • disableTask:禁用任务(用于过期或无效任务)。

  • findByCron:根据 cron 表达式查询任务,用于引用计数管理。

所有 @Modifying 方法都加了 @Transactional,确保在无外层事务时也能正常提交。

java

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;
​
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
​
public interface WbCronScheduledTaskRepository extends JpaRepository<WbCronScheduledTask, Integer> {
​
    @Query("SELECT t FROM WbCronScheduledTask t WHERE t.enabled = true AND t.nextFireTime BETWEEN :start AND :end")
    List<WbCronScheduledTask> findEnabledTasksByNextFireTimeBetween(@Param("start") LocalDateTime start, @Param("end") LocalDateTime end);
​
    @Modifying
    @Transactional(rollbackFor = Exception.class)
    @Query("UPDATE WbCronScheduledTask t SET t.nextFireTime = :newNextFire, t.updatedAt = CURRENT_TIMESTAMP " +
            "WHERE t.id = :id AND t.nextFireTime = :oldNextFire")
    int updateNextFireTime(@Param("id") Integer id,
                           @Param("oldNextFire") LocalDateTime oldNextFire,
                           @Param("newNextFire") LocalDateTime newNextFire);
​
    @Modifying
    @Transactional(rollbackFor = Exception.class)
    @Query("UPDATE WbCronScheduledTask t SET t.enabled = false WHERE t.id = :id")
    int disableTask(@Param("id") Integer id);
​
    Optional<WbCronScheduledTask> findByCron(String cron);
   
@Query("SELECT t FROM WbCronScheduledTask t WHERE t.enabled = true AND t.nextFireTime < :now")
List<WbCronScheduledTask> findEnabledTasksWithNextFireBefore(@Param("now") LocalDateTime now);
}

4.2 单次任务主表Repository WbOnceScheduledTaskRepository.java

设计说明:

  • findPendingTasksInWindow:查询未来1分钟内待执行的 PENDING 任务,用于轮询补偿。

  • updateStatus:乐观锁更新状态,条件 status = oldStatus

  • findByTaskTypeAndScheduledTimeAndStatus:用于幂等创建,检查是否已存在相同类型和时间的任务。

java

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;
​
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
​
public interface WbOnceScheduledTaskRepository extends JpaRepository<WbOnceScheduledTask, Integer>, JpaSpecificationExecutor<WbOnceScheduledTask> {
​
    @Query("SELECT t FROM WbOnceScheduledTask t WHERE t.status = 'PENDING' AND t.scheduledTime BETWEEN :start AND :end")
    List<WbOnceScheduledTask> findPendingTasksInWindow(@Param("start") LocalDateTime start, @Param("end") LocalDateTime end);
​
    List<WbOnceScheduledTask> findAllByStatus(String status);
​
    Optional<WbOnceScheduledTask> findById(Long id);
​
    @Modifying
    @Transactional(rollbackFor = Exception.class)
    @Query("UPDATE WbOnceScheduledTask t SET t.status = :newStatus WHERE t.id = :id AND t.status = :oldStatus")
    int updateStatus(@Param("id") Integer id, @Param("oldStatus") String oldStatus, @Param("newStatus") String newStatus);
​
    WbOnceScheduledTask findByTaskTypeAndScheduledTimeAndStatus(String taskType, LocalDateTime scheduledTime, String status);
}

4.3 单次任务明细Repository WbOnceTaskDetailRepository.java

设计说明:

  • findByTaskId:查询任务下所有明细。

  • markProcessed:将明细标记为已处理。

  • existsByTaskIdAndPersonalScheduleCode:避免重复关联同一日程到同一任务。

java

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;
​
import java.util.List;
​
public interface WbOnceTaskDetailRepository extends JpaRepository<WbOnceTaskDetail, Integer>, JpaSpecificationExecutor<WbOnceTaskDetail> {
​
    List<WbOnceTaskDetail> findByTaskId(Integer taskId);
​
    @Modifying
    @Transactional(rollbackFor = Exception.class)
    @Query("UPDATE WbOnceTaskDetail d SET d.processed = 1 WHERE d.id = :id")
    int markProcessed(@Param("id") Integer id);
​
    boolean existsByTaskIdAndPersonalScheduleCode(Integer taskId, String personalScheduleCode);
}


5. Service 层

5.1 Cron任务调度器 CronExecutionScheduledTask.java

设计说明:

  • 初始化:启动时加载未来5分钟内需执行的 Cron 任务,注册到本地调度器。

  • 任务管理

    • addTask:增加引用计数,若 cron 首次出现则创建任务并计算 next_fire_time

    • removeTask:减少引用计数,归零时删除任务。

    • deleteTask / setTaskEnabled:物理删除或禁用任务。

  • 本地注册registerLocalTask 只注册未来5分钟内的任务,超出窗口则暂不注册;过期任务直接禁用。

  • 执行流程executeTask):

    1. 获取 ShedLock 锁。

    2. 检查 next_fire_time 是否在当前时间 ±5ms 内(容错窗口)。

    3. 调用业务执行器 CronTaskExecutor.execute

    4. 计算下一次执行时间(基于 oldNextFire)。

    5. 乐观锁更新 next_fire_time

    6. 重新注册下次执行。

    7. 业务异常时仍推进时间,避免卡死。

  • 轮询补偿syncPendingTasks 每30秒扫描未来1分钟窗口,补注册遗漏任务(提前5秒避免边界遗漏)。

关键点:

  • 使用 ConcurrentHashMap 存储本地任务注册表,支持线程安全。

  • 锁超时时间可配置(scheduled.cron.lock-at-most-for)。

  • 容错窗口 5ms 防止时钟抖动导致漏执行。

java

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.core.SimpleLock;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronExpression;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
​
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
​
@Slf4j
@Service
@RequiredArgsConstructor
public class CronExecutionScheduledTask {
​
    private final WbCronScheduledTaskRepository taskRepository;
    private final LockProvider lockProvider;
    private final CronTaskExecutor cronTaskExecutor;
​
    private final ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
    private final Map<Integer, ScheduledFuture<?>> localTasks = new ConcurrentHashMap<>();
​
    @Value("${scheduled.cron.lock-at-most-for:30000}")
    private long lockAtMostFor;
​
    @Value("${scheduled.cron.lock-at-least-for:5000}")
    private long lockAtLeastFor;
​
    private static final long TOLERANCE_NANOS = 5_000_000L;
​
   
@PostConstruct
public void init() {
    taskScheduler.initialize();
    taskScheduler.setPoolSize(10);
    taskScheduler.setThreadNamePrefix("cron-task-");
    loadPendingTasksInWindow(LocalDateTime.now(), LocalDateTime.now().plusMinutes(5));
    compensateOverdueCronTasks();
    log.info("Cron调度器初始化完成");
}


private void compensateOverdueCronTasks() {
    String lockName = "compensate_overdue_cron_tasks";
    LockConfiguration lockConfig = new LockConfiguration(
            Instant.now(),
            lockName,
            Duration.ofSeconds(60),
            Duration.ofSeconds(5)
    );
    Optional<SimpleLock> lockOpt = lockProvider.lock(lockConfig);
    if (!lockOpt.isPresent()) {
        log.info("其他节点正在执行过期任务补偿,本节点跳过");
        return;
    }
    SimpleLock lock = lockOpt.get();
    try {
        LocalDateTime now = LocalDateTime.now();
        List<WbCronScheduledTask> overdueTasks = taskRepository.findEnabledTasksWithNextFireBefore(now);
        if (overdueTasks.isEmpty()) return;
        log.info("发现 {} 个过期的Cron任务,开始补偿", overdueTasks.size());
        for (WbCronScheduledTask task : overdueTasks) {
            LocalDateTime newNext = computeNextFireTime(task.getCron(), now);
            if (newNext != null) {
                task.setNextFireTime(newNext);
                taskRepository.save(task);
                log.debug("补偿任务 {},原时间 {},新时间 {}", task.getId(), task.getNextFireTime(), newNext);
                // 如果新时间在未来5分钟内,立即注册
                if (newNext.isBefore(now.plusMinutes(5))) {
                    registerLocalTask(task);
                }
            } else {
                taskRepository.disableTask(task.getId());
            }
        }
        log.info("过期Cron任务补偿完成");
    } catch (Exception e) {
        log.error("补偿过期Cron任务时发生异常", e);
    } finally {
        lock.unlock();
    }
}
​
    private void loadPendingTasksInWindow(LocalDateTime start, LocalDateTime end) {
        List<WbCronScheduledTask> tasks = taskRepository.findEnabledTasksByNextFireTimeBetween(start, end);
        for (WbCronScheduledTask task : tasks) {
            registerLocalTask(task);
        }
        log.info("加载了 {} 个Cron任务(窗口:{} ~ {})", tasks.size(), start, end);
    }
​
    public void addTask(String cronExpression) {
        WbCronScheduledTask task = taskRepository.findByCron(cronExpression)
                .orElseGet(() -> {
                    WbCronScheduledTask newTask = new WbCronScheduledTask();
                    newTask.setName("任务-" + cronExpression);
                    newTask.setCron(cronExpression);
                    newTask.setEnabled(true);
                    newTask.setRefCount(0);
                    LocalDateTime next = computeNextFireTime(cronExpression, LocalDateTime.now());
                    newTask.setNextFireTime(next);
                    return newTask;
                });
        task.setRefCount(task.getRefCount() + 1);
        task.setEnabled(true);
        this.saveOrUpdateTask(task);
    }
​
    @Transactional
    public void saveOrUpdateTask(WbCronScheduledTask task) {
        if (task.getEnabled() && task.getNextFireTime() == null) {
            LocalDateTime next = computeNextFireTime(task.getCron(), LocalDateTime.now());
            task.setNextFireTime(next);
        }
        taskRepository.save(task);
        if (task.getEnabled()) {
            registerLocalTask(task);
        } else {
            cancelLocalTask(task.getId());
        }
    }
​
    @Transactional
    public void deleteTask(Integer id) {
        cancelLocalTask(id);
        taskRepository.deleteById(id);
    }
​
    @Transactional
    public void setTaskEnabled(Integer id, boolean enabled) {
        WbCronScheduledTask task = taskRepository.findById(id).orElse(null);
        if (task != null) {
            task.setEnabled(enabled);
            taskRepository.save(task);
            if (enabled) {
                registerLocalTask(task);
            } else {
                cancelLocalTask(id);
            }
        }
    }
​
    
private void registerLocalTask(WbCronScheduledTask task) {
    if (!task.getEnabled() || task.getNextFireTime() == null) {
        return;
    }
    // 先取消旧的本地任务(无论新时间是否在窗口内)
    cancelLocalTask(task.getId());
    LocalDateTime now = LocalDateTime.now();
    LocalDateTime nextFire = task.getNextFireTime();

    // 只注册未来5分钟内的任务
    if (nextFire.isAfter(now.plusMinutes(5))) {
        log.info("任务 {} 执行时间较远,暂不注册", task.getId());
        return;
    }
    if (nextFire.plusNanos(TOLERANCE_NANOS).isBefore(now)) {
        log.warn("任务 {} 已过期(超出容错窗口),标记为过期", task.getId());
        taskRepository.disableTask(task.getId());
        return;
    }

    long delay = now.until(nextFire, ChronoUnit.MILLIS);
    ScheduledFuture<?> future = taskScheduler.schedule(
            () -> executeTask(task.getId()),
            Instant.now().plus(delay, ChronoUnit.MILLIS)
    );
    localTasks.put(task.getId(), future);
    log.info("注册Cron任务 id={}, 延迟={}ms", task.getId(), delay);
}

private void cancelLocalTask(Integer id) {
    ScheduledFuture<?> future = localTasks.remove(id);
    if (future != null) {
        future.cancel(false);
    }
}
​
    private void cancelLocalTask(Integer id) {
        ScheduledFuture<?> future = localTasks.remove(id);
        if (future != null) future.cancel(false);
    }
​
    private void executeTask(Integer id) {
        String lockName = "cron_task_" + id;
        LockConfiguration lockConfig = new LockConfiguration(
                Instant.now(), lockName,
                Duration.ofMillis(lockAtMostFor),
                Duration.ofMillis(lockAtLeastFor)
        );
        Optional<SimpleLock> lockOpt = lockProvider.lock(lockConfig);
        if (!lockOpt.isPresent()) {
            log.debug("获取锁失败,任务 {} 由其他节点执行", id);
            return;
        }
        SimpleLock lock = lockOpt.get();
        try {
            WbCronScheduledTask task = taskRepository.findById(id).orElse(null);
            if (task == null || !task.getEnabled()) return;
            LocalDateTime oldNextFire = task.getNextFireTime();
            LocalDateTime now = LocalDateTime.now();
            if (oldNextFire == null || oldNextFire.plusNanos(TOLERANCE_NANOS).isBefore(now)) {
                log.info("任务 {} 的 next_fire_time 未到或已过期(超出容错),跳过", id);
                return;
            }
​
            cronTaskExecutor.execute(task);
​
            LocalDateTime newNextFire = computeNextFireTime(task.getCron(), oldNextFire);
            if (newNextFire == null) {
                log.error("任务 {} cron表达式无效,禁用任务", id);
                taskRepository.disableTask(id);
                return;
            }
​
            int updated = taskRepository.updateNextFireTime(id, oldNextFire, newNextFire);
            if (updated == 0) {
                log.warn("乐观锁更新失败,任务 {} 可能已被其他节点修改", id);
                taskRepository.findById(id).ifPresent(this::registerLocalTask);
            } else {
                log.info("任务 {} 执行成功,下次执行时间 {}", id, newNextFire);
                task.setNextFireTime(newNextFire);
                registerLocalTask(task);
            }
        } catch (Exception e) {
            log.error("执行Cron任务 {} 业务失败,推进next_fire_time避免卡死", id, e);
            try {
                WbCronScheduledTask task = taskRepository.findById(id).orElse(null);
                if (task != null && task.getEnabled()) {
                    LocalDateTime oldNextFire = task.getNextFireTime();
                    LocalDateTime newNextFire = computeNextFireTime(task.getCron(), oldNextFire);
                    if (newNextFire != null) {
                        int updated = taskRepository.updateNextFireTime(id, oldNextFire, newNextFire);
                        if (updated > 0) {
                            log.info("任务 {} 业务失败但已推进到下次执行时间 {}", id, newNextFire);
                        }
                    }
                }
            } catch (Exception ex) {
                log.error("推进任务 {} next_fire_time 失败", id, ex);
            }
        } finally {
            lock.unlock();
        }
    }
​
    private LocalDateTime computeNextFireTime(String cron, LocalDateTime from) {
        try {
            CronExpression expression = CronExpression.parse(cron);
            return expression.next(from);
        } catch (Exception e) {
            log.error("解析cron表达式失败: {}", cron, e);
            return null;
        }
    }
​
    public void removeTask(String cronExpression) {
        taskRepository.findByCron(cronExpression).ifPresent(task -> {
            int newCount = task.getRefCount() - 1;
            if (newCount <= 0) {
                deleteTask(task.getId());
                log.info("Cron表达式 {} 已无引用,删除调度", cronExpression);
            } else {
                task.setRefCount(newCount);
                taskRepository.save(task);
                log.info("Cron表达式 {} 引用计数减为 {}, 仍保留调度", cronExpression, newCount);
            }
        });
    }
​
    @org.springframework.scheduling.annotation.Scheduled(fixedDelay = 30000)
    public void syncPendingTasks() {
        LocalDateTime now = LocalDateTime.now();
        LocalDateTime start = now.minusSeconds(5);
        LocalDateTime windowEnd = now.plusMinutes(1);
        List<WbCronScheduledTask> tasks = taskRepository.findEnabledTasksByNextFireTimeBetween(start, windowEnd);
        for (WbCronScheduledTask task : tasks) {
            if (!localTasks.containsKey(task.getId())) {
                registerLocalTask(task);
                log.debug("轮询补偿注册Cron任务 id={}", task.getId());
            }
        }
    }
}

5.2 单次任务调度器 OnceScheduleDelayTask.java

设计说明:

  • 任务创建addTask(PersonalScheduleInformation) 根据日程的开始/结束时间,生成提醒(提前5分钟)和催办(根据提前量)任务。

  • 幂等追加createOnceTask 先查询是否存在相同 (taskType, scheduledTime)PENDING 任务,若不存在则创建新主任务;若存在则复用,并检查明细是否已关联,避免重复。

  • 本地注册registerLocalTask 只注册未来5分钟内的任务,过期任务标记为 EXPIRED

  • 执行流程executeTask):

    1. 获取 ShedLock 锁。

    2. 检查任务状态是否为 PENDING

    3. 遍历所有明细,调用 executeBusiness 处理每个日程。

    4. 无论成功或失败,明细都标记为 processed=1(不重试)。

    5. 乐观锁更新主任务状态为 COMPLETED

  • 轮询补偿:同 Cron 任务。

业务说明:

  • 提醒时间 = 开始时间 - 5分钟。

  • 催办时间 = 结束时间 - 提前量(分钟),多个提前量去重。

  • 发送通知失败时直接标记明细已处理,不重试,主任务仍然完成。

java

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.core.SimpleLock;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
​
import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
​
/**
 * 个人日程延迟任务管理(一体化分布式调度)
 * 包含:任务创建、本地延迟调度、分布式锁、定时轮询、业务执行
 *
 * @author 
 * @date 2024/1/24 8:53
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class OnceScheduleDelayTask {
​
    // 数据库操作
    private final WbOnceScheduledTaskRepository onceTaskRepository;
    private final WbOnceTaskDetailRepository onceTaskDetailRepository;
​
    // 分布式锁 需实现返回 Optional<SimpleLock>
    private final LockProvider lockProvider;
​
    // 消息服务
    private final MsNotificationDataService messageNotificationDataService;
​
​
    // 本地延迟调度器
    private final ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
    // 本地任务注册表:taskId -> ScheduledFuture
    private final Map<Integer, ScheduledFuture<?>> localTasks = new ConcurrentHashMap<>();
​
    // 锁参数(毫秒)
    @Value("${scheduled.once.lock-at-most-for:30000}")
    private long lockAtMostFor;
​
    @Value("${scheduled.once.lock-at-least-for:5000}")
    private long lockAtLeastFor;
​
    // ==================== 初始化 ====================
    @PostConstruct
    public void init() {
        taskScheduler.initialize();
        taskScheduler.setPoolSize(10);
        taskScheduler.setThreadNamePrefix("personal-schedule-");
        // 改为只加载未来5分钟内要执行的任务
        loadPendingTasksInWindow(LocalDateTime.now(), LocalDateTime.now().plusMinutes(5));
        log.info("PersonalScheduleDelayTask 初始化完成");
    }
​
    /**
     * 只加载指定时间窗口内的 PENDING 任务
     */
    private void loadPendingTasksInWindow(LocalDateTime start, LocalDateTime end) {
        List<WbOnceScheduledTask> pendingTasks = onceTaskRepository.findPendingTasksInWindow(start, end);
        for (WbOnceScheduledTask task : pendingTasks) {
            registerLocalTask(task);
        }
        log.info("加载了 {} 个待执行任务(时间窗口:{} ~ {})", pendingTasks.size(), start, end);
    }
​
    /**
     * 启动时加载所有状态为 PENDING 的任务,注册到本地调度器
     */
    private void loadAllPendingTasks() {
        List<WbOnceScheduledTask> pendingTasks = onceTaskRepository.findAllByStatus("PENDING");
        for (WbOnceScheduledTask task : pendingTasks) {
            registerLocalTask(task);
        }
        log.info("加载了 {} 个待执行任务", pendingTasks.size());
    }
​
    // ==================== 任务创建(对外接口) ====================
​
    /**
     * 为单个日程创建所有延迟任务(提醒 + 多个催办)
     * 这是原有 addTask 方法的替代实现
     */
    @Transactional
    public void addTask(ScheduleInformation schedule) {
        String code = schedule.getCode();
        String startTime = schedule.getStartTime();
        String endTime = schedule.getEndTime();
        Long callAdvanceTime1 = schedule.getCallAdvanceTime1();
​
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        try {
            // 1. 提醒:开始时间 - 5分钟
            Date startDate = sdf.parse(startTime);
            long remindTime = startDate.getTime() - 5 * 60 * 1000;
            if (System.currentTimeMillis() <= remindTime) {
                LocalDateTime remindLdt = LocalDateTime.ofInstant(new Date(remindTime).toInstant(), ZoneId.systemDefault());
                createOnceTask(code, "REMINDER", remindLdt);
            }
​
            // 2. 催办:结束时间 - 提前量(去重)
            Date endDate = sdf.parse(endTime);
            long endTimeMs = endDate.getTime();
            Set<Long> advances = new HashSet<>();
            if (callAdvanceTime1 != null) {
                advances.add(callAdvanceTime1);
            }
            for (Long advance : advances) {
                long urgeTime = endTimeMs - advance * 60 * 1000;
                if (System.currentTimeMillis() <= urgeTime) {
                    LocalDateTime urgeLdt = LocalDateTime.ofInstant(new Date(urgeTime).toInstant(), ZoneId.systemDefault());
                    createOnceTask(code, "URGE", urgeLdt);
                }
            }
        } catch (Exception e) {
            log.error("创建日程任务失败,code={}", code, e);
        }
    }
​
    /**
     * 创建或追加单次任务(一个任务可对应多个日程编码)
     * 幂等性:相同 taskType + scheduledTime 的任务只会创建一次,后续调用只追加明细
     */
    private void createOnceTask(String scheduleCode, String taskType, LocalDateTime scheduledTime) {
        // 1. 查找是否存在相同类型和时间的未完成任务
        WbOnceScheduledTask task = onceTaskRepository
                .findByTaskTypeAndScheduledTimeAndStatus(taskType, scheduledTime, "PENDING");
​
        if (task == null) {
            // 2. 不存在则创建新任务
            task = new WbOnceScheduledTask();
            task.setTaskType(taskType);
            task.setScheduledTime(scheduledTime);
            task.setStatus("PENDING");
            onceTaskRepository.save(task);
            log.info("创建新的批量任务:类型={}, 时间={}", taskType, scheduledTime);
        } else {
            log.debug("复用已存在的批量任务:id={}, 类型={}, 时间={}", task.getId(), taskType, scheduledTime);
        }
​
        // 3. 检查该日程是否已关联到此任务(避免重复关联)
        boolean alreadyExists = onceTaskDetailRepository.existsByTaskIdAndPersonalScheduleCode(task.getId(), scheduleCode);
        if (!alreadyExists) {
            WbOnceTaskDetail detail = new WbOnceTaskDetail();
            detail.setTaskId(task.getId());
            detail.setPersonalScheduleCode(scheduleCode);
            detail.setProcessed(0);
            onceTaskDetailRepository.save(detail);
            log.debug("添加日程 {} 到任务 {}", scheduleCode, task.getId());
        } else {
            log.debug("日程 {} 已存在于任务 {},跳过", scheduleCode, task.getId());
        }
​
        // 4. 注册到本地调度器(如果还没注册)
        registerLocalTask(task);
    }
​
    // ==================== 本地调度注册 ====================
​
    /**
     * 注册任务到本地延迟调度器
     */
    private void registerLocalTask(WbOnceScheduledTask task) {
        LocalDateTime now = LocalDateTime.now();
        // 只注册未来5分钟内要执行的任务
        if (task.getScheduledTime().isAfter(now.plusMinutes(5))) {
            log.debug("任务执行时间超过5分钟,暂不注册:id={}, 时间={}", task.getId(), task.getScheduledTime());
            return;
        }
        // 如果任务已过期,标记并返回
        if (task.getScheduledTime().isBefore(now)) {
            log.warn("任务已过期,不注册:id={}, 时间={}", task.getId(), task.getScheduledTime());
            onceTaskRepository.updateStatus(task.getId(), "PENDING", "EXPIRED");
            return;
        }
        cancelLocalTask(task.getId());
        long delay = now.until(task.getScheduledTime(), ChronoUnit.MILLIS);
        ScheduledFuture<?> future = taskScheduler.schedule(
                () -> executeTask(task.getId()),
                Instant.now().plus(delay, ChronoUnit.MILLIS)
        );
        localTasks.put(task.getId(), future);
        log.debug("注册任务 id={}, 延迟={}ms", task.getId(), delay);
    }
​
    private void cancelLocalTask(Integer taskId) {
        ScheduledFuture<?> future = localTasks.remove(taskId);
        if (future != null) {
            future.cancel(false);
        }
    }
​
    // ==================== 任务执行(分布式锁 + 业务) ====================
​
    /**
     * 执行单个任务(由本地调度器触发)
     */
    private void executeTask(Integer taskId) {
        String lockName = "once_task_" + taskId;
        LockConfiguration lockConfig = new LockConfiguration(
                Instant.now(),
                lockName,
                Duration.ofMillis(lockAtMostFor),
                Duration.ofMillis(lockAtLeastFor)
        );
        Optional<SimpleLock> lockOpt = lockProvider.lock(lockConfig);
        if (!lockOpt.isPresent()) {
            log.debug("获取锁失败,任务 {} 由其他节点执行", taskId);
            return;
        }
        SimpleLock lock = lockOpt.get();
        try {
            // 再次检查任务状态(乐观锁)
            WbOnceScheduledTask task = onceTaskRepository.findById(taskId).orElse(null);
            if (task == null || !"PENDING".equals(task.getStatus())) {
                log.info("任务 {} 状态不是 PENDING,跳过", taskId);
                return;
            }
​
            // 执行具体业务:处理所有关联的日程
            executeBusiness(task);
​
            // 更新主任务状态为 COMPLETED
            int updated = onceTaskRepository.updateStatus(taskId, "PENDING", "COMPLETED");
            if (updated == 0) {
                log.warn("乐观锁更新失败,任务 {} 可能已被其他节点完成", taskId);
            } else {
                localTasks.remove(taskId);
                log.info("任务 {} 执行成功", taskId);
            }
        } catch (Exception e) {
            log.error("执行任务 {} 失败", taskId, e);
            // 保持 PENDING,等待下次轮询重试
        } finally {
            lock.unlock();
        }
    }
​
    /**
     * 业务逻辑:遍历任务关联的所有日程,发送通知
     */
    private void executeBusiness(WbOnceScheduledTask task) {
        List<WbOnceTaskDetail> details = onceTaskDetailRepository.findByTaskId(task.getId());
        for (WbOnceTaskDetail detail : details) {
            if (detail.getProcessed() == 1) {
                continue;
            }
           //真实业务代码
        }
    }
    
    // ==================== 定时轮询(补偿机制) ====================
​
    /**
     * 每30秒轮询一次,补偿因节点宕机等原因未注册的任务
     */
    @Scheduled(fixedDelay = 30000)
    public void syncPendingTasks() {
        LocalDateTime now = LocalDateTime.now();
        LocalDateTime windowEnd = now.plusMinutes(1);
        List<WbOnceScheduledTask> pendingTasks = onceTaskRepository.findPendingTasksInWindow(now, windowEnd);
        for (WbOnceScheduledTask task : pendingTasks) {
            if (!localTasks.containsKey(task.getId())) {
                registerLocalTask(task);
                log.debug("轮询补偿注册任务 id={}", task.getId());
            }
        }
    }
}

5.3 业务执行器接口 CronTaskExecutor.java

设计说明:

  • 用户需自行实现该接口,将具体的业务逻辑(如模型数据抽取、计算、推送等)注入到调度器中。

  • 调度器在 Cron 任务触发时会调用 execute 方法。

java

public interface CronTaskExecutor {
    void execute(WbCronScheduledTask task);
}

实现示例: 在 ModelTaskExecutionScheduledTask 原业务类中实现该接口,根据 task.getCron() 查询所有使用该 cron 表达式的任务周期,然后执行 demoss 等原有逻辑。


6. 使用示例

6.1 添加 Cron 任务

java

@Autowired
private ModelTaskExecutionScheduledTask cronTaskService;
​
// 添加一个每分钟执行一次的任务
cronTaskService.addTask("0 * * * * ?");
​
// 删除(引用计数减1,归零时删除调度)
cronTaskService.removeTask("0 * * * * ?");
​
// 禁用任务
cronTaskService.setTaskEnabled(taskId, false);

6.2 添加单次任务

java

@Autowired
private PersonalScheduleDelayTask onceTaskService;
​
// 假设从数据库或 API 获取 PersonalScheduleInformation 对象
PersonalScheduleInformation schedule = ...;
onceTaskService.addTask(schedule);
// 系统自动根据日程的开始/结束时间生成提醒和催办任务

6.3 实现 Cron 业务执行器

java

@Component
public class MyCronTaskExecutor implements CronTaskExecutor {
    @Override
    public void execute(WbCronScheduledTask task) {
        String cron = task.getCron();
        // 查询使用该 cron 的所有模型任务周期
        List<DataDrivenModelTaskCycle> cycles = modelTaskCycleRepository.findAllByCronExpressionAndState(cron, "1");
        // 执行业务逻辑(如数据抽取、计算)
        demoss(...);
    }
}


7. 参数调优与监控

7.1 关键配置参数(application.yml)

yaml

scheduled:
  cron:
    lock-at-most-for: 30000   # Cron任务锁最长持有时间(毫秒),建议 = P99执行时间 × 2 + 5秒
    lock-at-least-for: 5000    # 最短持有时间,建议 = 节点间最大时钟差 × 2 + 1秒
  once:
    lock-at-most-for: 30000
    lock-at-least-for: 5000

7.2 调优公式

参数 推荐值 说明
lockAtMostFor 30秒 根据业务执行时间的 P99 值调整,避免锁提前释放
lockAtLeastFor 5秒 保证网络延迟或时钟抖动下锁仍有效
内存窗口 5分钟 任务执行时间较远时不加载,减少内存占用
轮询间隔 30秒 平衡实时性和数据库压力
Cron容错窗口 5毫秒 避免时钟抖动导致漏执行

7.3 监控建议

  • 锁获取失败次数:shedlock.lock.error(可通过日志或 Micrometer 统计)

  • 任务执行耗时、成功/失败率

  • 调度延迟:实际触发时间与计划时间的差值

  • 乐观锁更新失败次数(日志关键字 “乐观锁更新失败”)


8. 已知限制与注意事项

  1. 单次任务不重试:失败明细直接标记为已处理,仅记录日志,不会重试。

  2. Cron任务业务异常会推进时间:避免卡死,但业务失败不重试,需业务层保证幂等。

  3. 主键类型:所有表主键均为 Integer(自增),Repository 方法参数类型需匹配。

  4. 达梦适配:必须完成 DamengServerTimeStatementsSource 同包名覆盖,否则 ShedLock 无法获取数据库时间。

  5. 锁表清理shedlock 表数据不会自动清理,可定期执行:

    sql

    DELETE FROM shedlock WHERE lock_until < SYSTIMESTAMP - INTERVAL '7' DAY;

  6. 调度器线程池:默认 poolSize=10,可根据任务数量调整。


9. 总结

本系统实现了分布式环境下 Cron 任务和单次任务的高可靠调度,具备以下特点:

  • 强一致性(ShedLock 分布式锁 + 乐观锁)

  • 任务不丢失(先执行后更新)

  • 节点宕机恢复(锁超时 + 轮询补偿)

  • 内存可控(5分钟窗口)

  • 国产化数据库适配(达梦)

  • 动态增删启停任务(支持引用计数)

  • 代码简洁,无独立调度中心

附录

10.1 达梦shedlock建表语句

CREATE TABLE  "shedlock"
(
 "name" VARCHAR(64) NOT NULL,
 "lock_until" TIMESTAMP(6) NOT NULL,
 "locked_at" TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP
 NOT NULL,
 "locked_by" VARCHAR(255) NOT NULL
);
ALTER TABLE  "shedlock" ADD CONSTRAINT  NOT CLUSTER  PRIMARY KEY("name") ;
​

10.2 shedlcok达梦方言适配

package net.javacrumbs.shedlock.provider.jdbctemplate;
​
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.support.annotation.NonNull;
​
import java.util.HashMap;
import java.util.Map;
​
/**
 * sf
 */
 class DamengServerTimeStatementsSource extends SqlStatementsSource {
    // 1. 获取当前时间:使用 SYSDATE
  //  private static final String now = "SYSDATE";
    // 2. 时间运算:使用 DATEADD 函数,单位 MICROSECOND
   // private static final String lockAtMostFor = "DATEADD(MICROSECOND, :lockAtMostForMicros, " + now + ")";
​
    private static final String now = "SYSTIMESTAMP";
    private static final String lockAtMostFor = "DATEADD(MICROSECOND, :lockAtMostForMicros, " + now + ")";
​
    DamengServerTimeStatementsSource(JdbcTemplateLockProvider.Configuration configuration) {
        super(configuration);
    }
​
    @Override
    String getInsertStatement() {
        // 3. "插入或更新"逻辑:使用 MERGE INTO 语句
        return "MERGE INTO " + tableName() + " t USING (SELECT :name AS name) s ON (t." + name() + " = s.name) " +
                "WHEN NOT MATCHED THEN INSERT (" + name() + ", " + lockUntil() + ", " + lockedAt() + ", " + lockedBy() + ") " +
                "VALUES (s.name, " + lockAtMostFor + ", " + now + ", :lockedBy)";
    }
​
    @Override
    public String getUpdateStatement() {
        return "UPDATE " + tableName() + " SET " + lockUntil() + " = " + lockAtMostFor + ", " +
                lockedAt() + " = " + now + ", " + lockedBy() + " = :lockedBy " +
                "WHERE " + name() + " = :name AND " + lockUntil() + " <= " + now;
    }
​
    @Override
    public String getUnlockStatement() {
        String lockAtLeastFor = "DATEADD(MICROSECOND, :lockAtLeastForMicros, " + lockedAt() + ")";
        // 4. 条件更新:使用 CASE WHEN 表达式
        return "UPDATE " + tableName() + " SET " + lockUntil() + " = " +
                "CASE WHEN " + lockAtLeastFor + " > " + now + " THEN " + lockAtLeastFor + " ELSE " + now + " END " +
                "WHERE " + name() + " = :name AND " + lockedBy() + " = :lockedBy";
    }
​
    @Override
    public String getExtendStatement() {
        return "UPDATE " + tableName() + " SET " + lockUntil() + " = " + lockAtMostFor + " " +
                "WHERE " + name() + " = :name AND " + lockedBy() + " = :lockedBy AND " + lockUntil() + " > " + now;
    }
​
    @Override
    @NonNull
    Map<String, Object> params(@NonNull LockConfiguration lockConfiguration) {
        Map<String, Object> params = new HashMap<>();
        params.put("name", lockConfiguration.getName());
        params.put("lockedBy", configuration.getLockedByValue());
        params.put("lockAtMostForMicros", lockConfiguration.getLockAtMostFor().toNanos() / 1_000);
        params.put("lockAtLeastForMicros", lockConfiguration.getLockAtLeastFor().toNanos() / 1_000);
        return params;
    }
}
/**
 * Copyright 2009 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package net.javacrumbs.shedlock.provider.jdbctemplate;
​
import net.javacrumbs.shedlock.core.ClockProvider;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider.Configuration;
import net.javacrumbs.shedlock.support.annotation.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.ConnectionCallback;
​
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
​
 class SqlStatementsSource {
    protected final Configuration configuration;
​
    private static final Logger logger = LoggerFactory.getLogger(SqlStatementsSource.class);
​
    SqlStatementsSource(Configuration configuration) {
        this.configuration = configuration;
    }
​
    private static final String POSTGRESQL = "postgresql";
    private static final String MSSQL = "microsoft sql server";
    private static final String ORACLE = "oracle";
    private static final String MYSQL = "mysql";
    private static final String MARIADB = "mariadb";
    private static final String HSQL = "hsql database engine";
    private static final String H2 = "h2";
​
    // 新增达梦数据库标识(产品名可能为 "DM DBMS" 或 "Dameng")
    private static final String DAMENG = "dm";
     private static final String DAMENG_DBMS = "dm dbms";
    private static final String DAMENG_FULL = "dameng";
​
​
​
    static SqlStatementsSource create(Configuration configuration) {
        String databaseProductName = getDatabaseProductName(configuration).toLowerCase();
​
        if (configuration.getUseDbTime()) {
            switch (databaseProductName) {
                case POSTGRESQL:
                    logger.debug("Using PostgresSqlServerTimeStatementsSource");
                    return new PostgresSqlServerTimeStatementsSource(configuration);
                case MSSQL:
                    logger.debug("Using MsSqlServerTimeStatementsSource");
                    return new MsSqlServerTimeStatementsSource(configuration);
                case ORACLE:
                    logger.debug("Using OracleServerTimeStatementsSource");
                    return new OracleServerTimeStatementsSource(configuration);
                case MYSQL:
                    logger.debug("Using MySqlServerTimeStatementsSource");
                    return new MySqlServerTimeStatementsSource(configuration);
                case MARIADB:
                    logger.debug("Using MySqlServerTimeStatementsSource (for MariaDB)");
                    return new MySqlServerTimeStatementsSource(configuration);
                case HSQL:
                    logger.debug("Using HsqlServerTimeStatementsSource");
                    return new HsqlServerTimeStatementsSource(configuration);
                case H2:
                    logger.debug("Using H2ServerTimeStatementsSource");
                    return new H2ServerTimeStatementsSource(configuration);
                // ========== 新增对达梦数据库的支持 ==========
                case DAMENG:
                case DAMENG_DBMS:
                case DAMENG_FULL:
                    logger.debug("Using DamengServerTimeStatementsSource");
                    return new DamengServerTimeStatementsSource(configuration);
                // ==========================================
                default:
                    if (databaseProductName.startsWith("db2")) {
                        logger.debug("Using Db2ServerTimeStatementsSource");
                        return new Db2ServerTimeStatementsSource(configuration);
                    }
                    throw new UnsupportedOperationException("DB time is not supported for '" + databaseProductName + "'");
            }
        } else {
            if (POSTGRESQL.equals(databaseProductName)) {
                logger.debug("Using PostgresSqlServerTimeStatementsSource");
                return new PostgresSqlStatementsSource(configuration);
            } else {
                logger.debug("Using SqlStatementsSource");
                return new SqlStatementsSource(configuration);
            }
        }
    }
​
    private static String getDatabaseProductName(Configuration configuration) {
        try {
            return configuration.getJdbcTemplate().execute((ConnectionCallback<String>) connection -> connection.getMetaData().getDatabaseProductName());
        } catch (Exception e) {
            logger.debug("Can not determine database product name " + e.getMessage());
            return "Unknown";
        }
    }
​
    @NonNull
    Map<String, Object> params(@NonNull LockConfiguration lockConfiguration) {
        Map<String, Object> params = new HashMap<>();
        params.put("name", lockConfiguration.getName());
        params.put("lockUntil", timestamp(lockConfiguration.getLockAtMostUntil()));
        params.put("now", timestamp(ClockProvider.now()));
        params.put("lockedBy", configuration.getLockedByValue());
        params.put("unlockTime", timestamp(lockConfiguration.getUnlockTime()));
        return params;
    }
​
    @NonNull
    private Object timestamp(Instant time) {
        TimeZone timeZone = configuration.getTimeZone();
        if (timeZone == null) {
            return Timestamp.from(time);
        } else {
            Calendar calendar = Calendar.getInstance();
            calendar.setTime(Date.from(time));
            calendar.setTimeZone(timeZone);
            return calendar;
        }
    }
​
​
    String getInsertStatement() {
        return "INSERT INTO " + tableName() + "(" + name() + ", " + lockUntil() + ", " + lockedAt() + ", " + lockedBy() + ") VALUES(:name, :lockUntil, :now, :lockedBy)";
    }
​
​
    public String getUpdateStatement() {
        return "UPDATE " + tableName() + " SET " + lockUntil() + " = :lockUntil, " + lockedAt() + " = :now, " + lockedBy() + " = :lockedBy WHERE " + name() + " = :name AND " + lockUntil() + " <= :now";
    }
​
    public String getExtendStatement() {
        return "UPDATE " + tableName() + " SET " + lockUntil() + " = :lockUntil WHERE " + name() + " = :name AND " + lockedBy() + " = :lockedBy AND " + lockUntil() + " > :now";
    }
​
    public String getUnlockStatement() {
        return "UPDATE " + tableName() + " SET " + lockUntil() + " = :unlockTime WHERE " + name() + " = :name";
    }
​
    String name() {
        return configuration.getColumnNames().getName();
    }
​
    String lockUntil() {
        return configuration.getColumnNames().getLockUntil();
    }
​
    String lockedAt() {
        return configuration.getColumnNames().getLockedAt();
    }
​
    String lockedBy() {
        return configuration.getColumnNames().getLockedBy();
    }
​
    String tableName() {
        return configuration.getTableName();
    }
}
​
Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐