
flinkcdc同步mysql
Flink CDC 实现 MySQL 到 MySQL 数据同步项目解析
1. 项目概述
本项目使用 Apache Flink 和 Debezium 实现从 MySQL 源数据库到目标 MySQL 数据库的数据同步。主要功能包括:
- 增量和全量数据同步。
- 支持 DDL(数据定义语言)事件处理。
- 使用 HikariCP 进行数据库连接池管理。
- 处理 MySQL 时间类型转换。
- 项目地址:https://gitee.com/Blainw/flinkcdc-mysql-to-mysql.git
2. 文件结构与功能说明
2.1 MySqlSourceProperties.java
该文件定义了源 MySQL 数据库的配置属性,通过 Spring Boot 的 @ConfigurationProperties
注解读取配置文件中的参数,并提供默认值。
@Component
@ConfigurationProperties(prefix = "source")
@Data
public class MySqlSourceProperties {
private String name;
private String host;
private int port;
private String username;
private String password;
private String databaseList;
private String tableList;
private String startDate;
private String startUpMode="latest";
}
2.2 MySqlDBUtils.java
负责初始化和关闭 HikariCP 数据库连接池,执行 SQL 语句。
public class MySqlDBUtils {
private static HikariDataSource dataSource;
public static synchronized void initConnectionPool() {
// 初始化连接池逻辑
}
public static synchronized void closeConnectionPool() {
// 关闭连接池逻辑
}
public static int executeSql(String sql, List<Object> insertParams, List<Object> updateParams) throws SQLException {
// 执行 SQL 语句逻辑
}
}
2.3 RefCountedReentrantLock.java
实现了一个带有引用计数的可重入锁,用于确保在 DDL 操作期间对表进行锁定,防止并发冲突。
public class RefCountedReentrantLock {
private final ReentrantLock lock;
private final AtomicInteger refCount;
public synchronized void lock() {
lock.lock();
refCount.incrementAndGet();
}
public synchronized void unlock() {
if (refCount.decrementAndGet() == 0) {
lock.unlock();
}
}
public boolean isLocked() {
return refCount.get() > 0;
}
}
2.4 DateTimeConverter.java
实现了 Debezium 的 CustomConverter
接口,用于将 MySQL 的时间类型转换为标准格式。
public class DateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;
private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;
private ZoneId timestampZoneId = ZoneId.systemDefault();
@Override
public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
// 转换逻辑
}
private String convertDate(Object input) {
// 日期转换逻辑
}
private String convertTime(Object input) {
// 时间转换逻辑
}
private String convertDateTime(Object input) {
// 日期时间转换逻辑
}
private String convertTimestamp(Object input) {
// 时间戳转换逻辑
}
}
2.5 MysqlSink.java
实现了 Flink 的 RichSinkFunction
,负责将捕获到的数据写入目标 MySQL 数据库,并处理 DML 和 DDL 事件。
public class MysqlSink extends RichSinkFunction<String> {
private static ConcurrentHashMap<String, RefCountedReentrantLock> tableLocks = new ConcurrentHashMap<>();
private static ExecutorService executorService = Executors.newFixedThreadPool(10);
private static ExecutorService taskService = Executors.newFixedThreadPool(5);
private static ConcurrentHashMap<String, BlockingQueue<Runnable>> waitingQueues = new ConcurrentHashMap<>();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MySqlDBUtils.initConnectionPool();
}
@Override
public void invoke(String value, Context context) throws Exception {
// 处理不同类型的操作
}
private void handleDmlEvent(JSONObject obj) throws Exception {
// 处理插入、更新操作
}
private void executeDmlEvent(JSONObject afterObj, JSONObject source) throws Exception {
// 构建并执行 SQL 语句
}
private void handleDeleteEvent(JSONObject obj) throws Exception {
// 处理删除操作
}
private void executeDeleteEvent(JSONObject beforeObj, JSONObject source) throws Exception {
// 构建并执行删除 SQL 语句
}
private void handleDdlEvent(String tableName, JSONObject obj) throws Exception {
// 处理 DDL 事件
}
private void addTaskToWaitingQueue(String tableName, Runnable task) {
// 将任务加入等待队列
}
private void processWaitingQueue(String tableName) {
// 处理等待队列中的任务
}
@Override
public void close() throws Exception {
super.close();
MySqlDBUtils.closeConnectionPool();
executorService.shutdown();
}
}
2.6 MySqlListenerStart.java
负责启动 Flink 环境并配置 MySQL CDC 源,监听 MySQL 的 binlog 事件,将其发送到 MysqlSink
进行处理。
@Component
public class MySqlListenerStart {
@Autowired
private MySqlSourceProperties config;
@EventListener
public void start(ContextRefreshedEvent event) throws IOException {
// 配置 Flink 环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config1);
// 构建 MySQL Source
MySqlSourceBuilder<String> builder = MySqlSource.<String>builder()
.hostname(config.getHost())
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword())
.serverTimeZone("Asia/Shanghai")
.jdbcProperties(prop)
.debeziumProperties(debeziumProperties)
.includeSchemaChanges(true)
.databaseList(config.getDatabaseList())
.tableList(config.getTableList())
.deserializer(new JsonDebeziumDeserializationSchema())
.splitSize(10000)
.fetchSize(10000);
// 设置同步模式
StartupOptions startupOptions;
if (StringUtils.hasText(config.getStartDate())) {
startupOptions = StartupOptions.timestamp(DateUtil.parseDate(config.getStartDate()).getTime());
} else if ("initial".equals(config.getStartUpMode())) {
startupOptions = StartupOptions.initial();
} else {
startupOptions = StartupOptions.latest();
}
builder.startupOptions(startupOptions);
DataStreamSource<String> mySqlDS = env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), config.getName());
mySqlDS.addSink(new MysqlSink());
try {
env.execute("mysqlSynchronization");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
3. 注意事项
3.1 数据库连接池配置
- HikariCP 是一个高性能的 JDBC 连接池,需要根据实际需求调整最大连接数、最小空闲连接数等参数。
- 确保
application.yml
中的数据库配置正确无误,避免因配置错误导致连接失败。
3.2 锁机制
RefCountedReentrantLock
用于防止 DDL 操作时的并发冲突,确保同一张表不会同时被多个线程修改。- 在 DDL 操作完成后,必须解锁并处理等待队列中的任务,以保证后续操作正常进行。
3.3 数据同步模式
startUpMode
参数决定了同步模式:latest
表示增量同步,initial
表示全量同步。- 如果指定了
startDate
,则从指定时间点开始同步。
3.4 异常处理
- 在
invoke
方法中,对于不同类型的事件(如 DML、DDL),应确保异常被捕获并处理,避免程序崩溃。 - 使用
CompletableFuture
异步处理任务时,需注意异常传播问题。
3.5 性能优化
- 根据实际情况调整线程池大小和 SQL 执行批量大小,以提高性能。
- 合理设置 Flink 的 Checkpoint 和 Savepoint 配置,确保数据一致性。
4. 总结
本项目通过 Flink CDC 实现了 MySQL 到 MySQL 的数据同步,涵盖了增量和全量同步、DDL 事件处理、时间类型转换等功能。通过合理的锁机制和异步任务处理,确保了数据同步的稳定性和高效性。希望这篇博客能帮助你更好地理解和实现类似项目。
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 王德明
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果