Flink CDC 实现 MySQL 到 MySQL 数据同步项目解析

1. 项目概述

本项目使用 Apache Flink 和 Debezium 实现从 MySQL 源数据库到目标 MySQL 数据库的数据同步。主要功能包括:

2. 文件结构与功能说明

2.1 MySqlSourceProperties.java

该文件定义了源 MySQL 数据库的配置属性,通过 Spring Boot 的 @ConfigurationProperties 注解读取配置文件中的参数,并提供默认值。

@Component
@ConfigurationProperties(prefix = "source")
@Data
public class MySqlSourceProperties {
    private String name;
    private String host;
    private int port;
    private String username;
    private String password;
    private String databaseList;
    private String tableList;
    private String startDate;
    private String startUpMode="latest";
}

2.2 MySqlDBUtils.java

负责初始化和关闭 HikariCP 数据库连接池,执行 SQL 语句。

public class MySqlDBUtils {

    private static HikariDataSource dataSource;

    public static synchronized void initConnectionPool() {
        // 初始化连接池逻辑
    }

    public static synchronized void closeConnectionPool() {
        // 关闭连接池逻辑
    }

    public static int executeSql(String sql, List<Object> insertParams, List<Object> updateParams) throws SQLException {
        // 执行 SQL 语句逻辑
    }
}

2.3 RefCountedReentrantLock.java

实现了一个带有引用计数的可重入锁,用于确保在 DDL 操作期间对表进行锁定,防止并发冲突。

public class RefCountedReentrantLock {
    private final ReentrantLock lock;
    private final AtomicInteger refCount;

    public synchronized void lock() {
        lock.lock();
        refCount.incrementAndGet();
    }

    public synchronized void unlock() {
        if (refCount.decrementAndGet() == 0) {
            lock.unlock();
        }
    }

    public boolean isLocked() {
        return refCount.get() > 0;
    }
}

2.4 DateTimeConverter.java

实现了 Debezium 的 CustomConverter 接口,用于将 MySQL 的时间类型转换为标准格式。

public class DateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
    private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
    private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
    private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;
    private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;
    private ZoneId timestampZoneId = ZoneId.systemDefault();

    @Override
    public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
        // 转换逻辑
    }

    private String convertDate(Object input) {
        // 日期转换逻辑
    }

    private String convertTime(Object input) {
        // 时间转换逻辑
    }

    private String convertDateTime(Object input) {
        // 日期时间转换逻辑
    }

    private String convertTimestamp(Object input) {
        // 时间戳转换逻辑
    }
}

2.5 MysqlSink.java

实现了 Flink 的 RichSinkFunction,负责将捕获到的数据写入目标 MySQL 数据库,并处理 DML 和 DDL 事件。

public class MysqlSink extends RichSinkFunction<String> {
    private static ConcurrentHashMap<String, RefCountedReentrantLock> tableLocks = new ConcurrentHashMap<>();
    private static ExecutorService executorService = Executors.newFixedThreadPool(10);
    private static ExecutorService taskService = Executors.newFixedThreadPool(5);
    private static ConcurrentHashMap<String, BlockingQueue<Runnable>> waitingQueues = new ConcurrentHashMap<>();

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        MySqlDBUtils.initConnectionPool();
    }

    @Override
    public void invoke(String value, Context context) throws Exception {
        // 处理不同类型的操作
    }

    private void handleDmlEvent(JSONObject obj) throws Exception {
        // 处理插入、更新操作
    }

    private void executeDmlEvent(JSONObject afterObj, JSONObject source) throws Exception {
        // 构建并执行 SQL 语句
    }

    private void handleDeleteEvent(JSONObject obj) throws Exception {
        // 处理删除操作
    }

    private void executeDeleteEvent(JSONObject beforeObj, JSONObject source) throws Exception {
        // 构建并执行删除 SQL 语句
    }

    private void handleDdlEvent(String tableName, JSONObject obj) throws Exception {
        // 处理 DDL 事件
    }

    private void addTaskToWaitingQueue(String tableName, Runnable task) {
        // 将任务加入等待队列
    }

    private void processWaitingQueue(String tableName) {
        // 处理等待队列中的任务
    }

    @Override
    public void close() throws Exception {
        super.close();
        MySqlDBUtils.closeConnectionPool();
        executorService.shutdown();
    }
}

2.6 MySqlListenerStart.java

负责启动 Flink 环境并配置 MySQL CDC 源,监听 MySQL 的 binlog 事件,将其发送到 MysqlSink 进行处理。

@Component
public class MySqlListenerStart {

    @Autowired
    private MySqlSourceProperties config;

    @EventListener
    public void start(ContextRefreshedEvent event) throws IOException {
        // 配置 Flink 环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config1);

        // 构建 MySQL Source
        MySqlSourceBuilder<String> builder = MySqlSource.<String>builder()
                .hostname(config.getHost())
                .port(config.getPort())
                .username(config.getUsername())
                .password(config.getPassword())
                .serverTimeZone("Asia/Shanghai")
                .jdbcProperties(prop)
                .debeziumProperties(debeziumProperties)
                .includeSchemaChanges(true)
                .databaseList(config.getDatabaseList())
                .tableList(config.getTableList())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .splitSize(10000)
                .fetchSize(10000);

        // 设置同步模式
        StartupOptions startupOptions;
        if (StringUtils.hasText(config.getStartDate())) {
            startupOptions = StartupOptions.timestamp(DateUtil.parseDate(config.getStartDate()).getTime());
        } else if ("initial".equals(config.getStartUpMode())) {
            startupOptions = StartupOptions.initial();
        } else {
            startupOptions = StartupOptions.latest();
        }
        builder.startupOptions(startupOptions);

        DataStreamSource<String> mySqlDS = env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), config.getName());
        mySqlDS.addSink(new MysqlSink());

        try {
            env.execute("mysqlSynchronization");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

3. 注意事项

3.1 数据库连接池配置

  • HikariCP 是一个高性能的 JDBC 连接池,需要根据实际需求调整最大连接数、最小空闲连接数等参数。
  • 确保 application.yml 中的数据库配置正确无误,避免因配置错误导致连接失败。

3.2 锁机制

  • RefCountedReentrantLock 用于防止 DDL 操作时的并发冲突,确保同一张表不会同时被多个线程修改。
  • 在 DDL 操作完成后,必须解锁并处理等待队列中的任务,以保证后续操作正常进行。

3.3 数据同步模式

  • startUpMode 参数决定了同步模式:latest 表示增量同步,initial 表示全量同步。
  • 如果指定了 startDate,则从指定时间点开始同步。

3.4 异常处理

  • invoke 方法中,对于不同类型的事件(如 DML、DDL),应确保异常被捕获并处理,避免程序崩溃。
  • 使用 CompletableFuture 异步处理任务时,需注意异常传播问题。

3.5 性能优化

  • 根据实际情况调整线程池大小和 SQL 执行批量大小,以提高性能。
  • 合理设置 Flink 的 Checkpoint 和 Savepoint 配置,确保数据一致性。

4. 总结

本项目通过 Flink CDC 实现了 MySQL 到 MySQL 的数据同步,涵盖了增量和全量同步、DDL 事件处理、时间类型转换等功能。通过合理的锁机制和异步任务处理,确保了数据同步的稳定性和高效性。希望这篇博客能帮助你更好地理解和实现类似项目。