Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
-
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。 -
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 -
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。 -
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。 -
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文详细的介绍了maxwell的部署、2个示例以及在Flink 中通过maxwell将数据变化信息同步到Kafka中,然后通过Flink SQL client进行读取。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,还依赖maxwell、kafka和flink的环境。
一、maxwell Format
1、maxwell介绍
Maxwell是一个CDC(Changelog Data Capture)工具,可以将MySQL中的数据变化实时流式传输到Kafka、Kinesis和其他流式连接器中。Maxwell为变更日志提供了统一的格式模式,并支持使用JSON序列化消息。
Flink支持将Maxwell JSON消息解释为INSERT/UPDATE/DELETE Flink SQL系统中的消息。在许多情况下,这对于利用此功能非常有用,例如
- 将增量数据从数据库同步到其他系统
- 审核日志
- 数据库上的实时物化视图
- 数据库表的临时连接更改历史等等。
Flink还支持将Flink SQL中的INSERT/UPDATE/DELETE消息编码为Maxwell JSON消息,并发送到Kafka等外部系统。但是,截至Flink 1.17版本,Flink无法将UPDATE_BEFORE和UPDATE_AFTER合并为一条UPDATE消息。因此,Flink将UPDATE_BEFORE和UDPATE_AFTER编码为DELETE和INSERT Maxwell消息。
2、binlog设置及验证
设置binlog需要监控的数据库,本示例使用的数据库是mysql5.7
1)、配置
本示例设置的参数参考下面的配置
[root@server4 ~]# cat /etc/my.cnf
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
[mysqld]
......
log-bin=mysql-bin # log-bin的名称,可以是任意名称
binlog-format=row # 推荐该参数,其他的参数视情况而定,比如mixed、statement
server_id=1 # mysql集群环境中不要重复
binlog_do_db=test # test是mysql的数据库名称,如果监控多个数据库,可以添加多个binlog_do_db即可,例如下面示例
# binlog_do_db=test2
# binlog_do_db=test3
.....
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
-
STATEMENT模式(SBR)
每一条会修改数据的sql语句会记录到binlog中。优点是并不需要记录每一条sql语句和每一行的数据变化,减少了binlog日志量,节约IO,提高性能。缺点是在某些情况下会导致master-slave中的数据不一致(如sleep()函数, last_insert_id(),以及user-defined functions(udf)等会出现问题) -
ROW模式(RBR)
不记录每条sql语句的上下文信息,仅需记录哪条数据被修改了,修改成什么样了。而且不会出现某些特定情况下的存储过程、或function、或trigger的调用和触发无法被正确复制的问题。缺点是会产生大量的日志,尤其是alter table的时候会让日志暴涨。 -
MIXED模式(MBR)
以上两种模式的混合使用,一般的复制使用STATEMENT模式保存binlog,对于STATEMENT模式无法复制的操作使用ROW模式保存binlog,MySQL会根据执行的SQL语句选择日志保存方式。
2)、重启mysql
保存配置后重启mysql
service mysqld restart
- 1
3)、验证
重启后,可以通过2个简单的方法验证是否设置成功。
mysql默认的安装目录:cd /var/lib/mysql
[root@server4 ~]# cd /var/lib/mysql
[root@server4 mysql]# ll
......
-rw-r----- 1 mysql mysql 154 1月 10 2022 mysql-bin.000001
-rw-r----- 1 mysql mysql 1197 1月 16 12:21 mysql-bin.index
.....
- 1
- 2
- 3
- 4
- 5
- 6
查看mysql-bin.000001文件是否生成,且其大小为154字节。mysql-bin.000001是mysql重启的次数,重启2次则为mysql-bin.000002
在test数据库中创建或添加数据,mysql-bin.000001的大小是否增加
以上情况满足,则说明binlog配置正常
3、部署
1)、下载
去其官网:https://maxwells-daemon.io/quickstart/下载需要的版本。
本示例使用的是:maxwell-1.29.2.tar.gz 注意其不同版本对jdk的要求,最新版本要求jdk11.
2)、解压
解压的目录/usr/local/bigdata/maxwell-1.29.2
tar -zvxf maxwell-1.29.2.tar.gz -C /usr/local/bigdata
[alanchan@server3 maxwell-1.29.2]$ ll
总用量 108
drwxr-xr-x 2 alanchan root 4096 1月 16 05:45 bin
-rw-r--r-- 1 alanchan root 25133 1月 24 2021 config.md
-rw-r--r-- 1 alanchan root 11970 1月 24 2021 config.properties.example
-rw-r--r-- 1 alanchan root 10259 4月 22 2020 kinesis-producer-library.properties.example
drwxr-xr-x 3 alanchan root 12288 1月 27 2021 lib
-rw-r--r-- 1 alanchan root 548 4月 22 2020 LICENSE
-rw-r--r-- 1 alanchan root 470 1月 24 2021 log4j2.xml
-rw-r--r-- 1 alanchan root 3328 1月 27 2021 quickstart.md
-rw-r--r-- 1 alanchan root 1429 1月 27 2021 README.md
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
3)、创建元数据库
该步骤需要创建一个mysql数据库,用以保存maxwell的元数据,至于访问这个数据库的用户名和密码则视情况而定,下面的内容是其官方上的操作,也就是创建用户、授权。
本文的示例中使用的是root用户,创建的数据库名称为maxwell。
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX';
mysql> CREATE USER 'maxwell'@'localhost' IDENTIFIED BY 'XXXXXX';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'localhost';
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
4)、启动方式
其提供了2种启动方式,即通过命令行参数的形式和通过配置文件的形式,下面是给出的示例
- 命令行参数形式,输出到控制台
不需要做任何配置即可直接使用,
maxwell --user='root' --password='123456' --host='192.168.10.44' --producer=stdout
# user 和 password 是连接mysql元数据库的账号和密码
# host是被监控的mysql的ip
# producer是maxwell的输出类型,比如stdout、kafka等
- 1
- 2
- 3
- 4
- 配置文件方式,输出到控制台
maxwell --config ../config.properties
# config.properties文件修改内容如下,其他的保持不变,也可以根据自己的需要修改
producer=stdout
# mysql login info
host=192.168.10.44
user=root
password=123456
- 1
- 2
- 3
- 4
- 5
- 6
- 7
4、示例1:maxwell CDC 输出至控制台
1)、启动maxwell
部署完成后,不需要做任何的改动即可执行下面的命令
maxwell --user='root' --password='123456' --host='192.168.10.44' --producer=stdout
- 1
2)、操作mysql监控的数据库,观察其控制台输出
在mysql中增加、修改和删除数据后,maxwell控制台的输出内容如下
[alanchan@server3 bin]$ maxwell --user='root' --password='123456' --host='192.168.10.44' --producer=stdout
Using kafka version: 1.0.0
{"database":"cdctest","table":"userscoressink","type":"update","ts":1705653290,"xid":20392,"commit":true,"data":{"name":"alanchanchn","scores":109.0},"old":{"scores":199.0}}
{"database":"cdctest","table":"userscoressink","type":"insert","ts":1705653456,"xid":20935,"commit":true,"data":{"name":"alan1","scores":5.0}}
{"database":"cdctest","table":"userscoressink","type":"update","ts":1705653461,"xid":20951,"commit":true,"data":{"name":"alan1","scores":109.0},"old":{"scores":5.0}}
{"database":"cdctest","table":"userscoressink","type":"delete","ts":1705653465,"xid":20967,"commit":true,"data":{"name":"alan1","scores":109.0}}
- 1
- 2
- 3
- 4
- 5
- 6
5、示例2:maxwell CDC 输出至kafka
1)、启动maxwell
部署完成后,不需要做任何的改动即可执行下面的命令
maxwell --user='root' --password='rootroot' --host='192.168.10.44' --producer=kafka --kafka.bootstrap.servers=server1:9092,server2:9092,server3:9092 --kafka_topic=alan_maxwell_to_kafka_topic
[alanchan@server3 bin]$ maxwell --user='root' --password='rootroot' --host='192.168.10.44' --producer=kafka --kafka.bootstrap.servers=server1:9092,server2:9092,server3:9092 --kafka_topic=alan_maxwell_to_kafka_topic
Using kafka version: 1.0.0
- 1
- 2
- 3
- 4
2)、通过命令行打开kafka消费者
kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_maxwell_to_kafka_topic --from-beginning
- 1
3)、操作mysql监控的数据库,观察其控制台输出
在mysql中增加、修改和删除数据后,maxwell控制台的输出内容如下
[alanchan@server1 bin]$ cd ../../kafka_2.12-3.0.0/bin/
[alanchan@server1 bin]$ kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_maxwell_to_kafka_topic --from-beginning
{"database":"cdctest","table":"userscoressink","type":"insert","ts":1705654206,"xid":22158,"commit":true,"data":{"name":"test","scores":100.0}}
{"database":"cdctest","table":"userscoressink","type":"update","ts":1705654220,"xid":22196,"commit":true,"data":{"name":"test","scores":200.0},"old":{"scores":100.0}}
{"database":"cdctest","table":"userscoressink","type":"delete","ts":1705654224,"xid":22210,"commit":true,"data":{"name":"test","scores":200.0}}
- 1
- 2
- 3
- 4
- 5
二、Flink 与 maxwell 实践
为了使用maxwell格式,使用构建自动化工具(如Maven或SBT)的项目和带有SQL JAR包的SQLClient都需要以下依赖项。
1、maven依赖
该依赖在flink自建工程中已经包含。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.17.1</version>
</dependency>
- 1
- 2
- 3
- 4
- 5
有关如何部署 maxwell 以将变更日志同步到消息队列,请参阅上文的具体事例或想了解更多的信息参考maxwell 文档。
2、Flink sql client 建表示例
maxwell 为变更日志提供了统一的格式,下面是一个从 MySQL 库 userscoressink表中捕获更新操作的简单示例:
{
"database": "cdctest",
"table": "userscoressink",
"type": "update",
"ts": 1705654220,
"xid": 22196,
"commit": true,
"data": {
"name": "test",
"scores": 200.0
},
"old": {
"scores": 100.0
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
MySQL userscoressink表有2列(name,scores)。上面的 JSON 消息是 userscoressink表上的一个更新事件,表示数据上scores字段值从100变更成为200。
消息已经同步到了一个 Kafka 主题:alan_maxwell_to_kafka_topic,那么就可以使用以下DDL来从这个主题消费消息并解析变更事件。
具体启动maxwell参考本文的第一部分的kafka示例,其他不再赘述。下面的部分仅仅是演示maxwell环境都正常后,在Flink SQL client中的操作。
-- 元数据与 MySQL "userscoressink" 表完全相同
CREATE TABLE userscoressink (
name STRING,
scores FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'alan_maxwell_to_kafka_topic',
'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'maxwell-json' -- 使用 maxwell-json 格式
);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
将 Kafka 主题注册成 Flink 表之后,就可以将 maxwell消息用作变更日志源。
-- 验证,在mysql中新增、修改和删除数据,观察flink sql client 的数据变化
Flink SQL> show tables;
Empty set
Flink SQL> CREATE TABLE userscoressink (
> name STRING,
> scores FLOAT
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'alan_maxwell_to_kafka_topic',
> 'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
> 'properties.group.id' = 'testGroup',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'maxwell-json' -- 使用 maxwell-json 格式
> );
[INFO] Execute statement succeed.
Flink SQL> select * from userscoressink;
+----+--------------------------------+--------------------------------+
| op | name | scores |
+----+--------------------------------+--------------------------------+
| +I | test | 100.0 |
| -U | test | 100.0 |
| +U | test | 200.0 |
| -D | test | 200.0 |
Query terminated, received a total of 4 rows
-- 关于MySQL "userscoressink" 表的实时物化视图
-- 按name分组,对scores进行求和
Flink SQL> select name ,sum(scores) sum_scores from userscoressink group by name;
+----+--------------------------------+--------------------------------+
| op | name | sum_scores |
+----+--------------------------------+--------------------------------+
| +I | test | 100.0 |
| -D | test | 100.0 |
| +I | test | 200.0 |
| -D | test | 200.0 |
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
3、Available Metadata
以下格式元数据可以在表定义中公开为只读(VIRTUAL)列。
只有当相应的连接器转发格式元数据时,格式元数据字段才可用。
截至Flink 1.17版本,只有Kafka连接器能够公开其值格式的元数据字段。
以下示例显示了如何访问Kafka中的Maxwell元数据字段:
CREATE TABLE userscoressink2(
origin_database STRING METADATA FROM 'value.database' VIRTUAL,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
origin_primary_key_columns ARRAY<STRING> METADATA FROM 'value.primary-key-columns' VIRTUAL,
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
name STRING,
scores FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'alan_maxwell_to_kafka_topic',
'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'maxwell-json'
);
# 操作步骤如下
Flink SQL> CREATE TABLE userscoressink2(
> origin_database STRING METADATA FROM 'value.database' VIRTUAL,
> origin_table STRING METADATA FROM 'value.table' VIRTUAL,
> origin_primary_key_columns ARRAY<STRING> METADATA FROM 'value.primary-key-columns' VIRTUAL,
> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
> name STRING,
> scores FLOAT
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'alan_maxwell_to_kafka_topic',
> 'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
> 'properties.group.id' = 'testGroup',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'maxwell-json'
> );
[INFO] Execute statement succeed.
Flink SQL> select * from userscoressink2;
+----+----------------+---------------+---------------------------+-------------------------+-----+-------+
| op |origin_database | origin_table |origin_primary_key_columns | origin_ts |name |scores |
+----+----------------+---------------+---------------------------+-------------------------+-----+-------+
| +I | cdctest |userscoressink | <NULL> | 2024-01-19 16:50:06.000 |test | 100.0 |
| -U | cdctest |userscoressink | <NULL> | 2024-01-19 16:50:20.000 |test | 100.0 |
| +U | cdctest |userscoressink | <NULL> | 2024-01-19 16:50:20.000 |test | 200.0 |
| -D | cdctest |userscoressink | <NULL> | 2024-01-19 16:50:24.000 |test | 200.0 |
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
4、Format 参数
5、重要事项:重复的变更事件
Maxwell应用程序允许每次更改事件只投递一次。在这种情况下,Flink在消费Maxwell生产的事件时效果非常好。如果Maxwell应用程序至少在一次交付中工作,它可能会向Kafka交付重复的更改事件,Flink将获得重复的事件。这可能会导致Flink查询得到错误的结果或意外的异常。因此,在这种情况下,建议将作业配置表.exec.source.cdc-events-duplicate设置为true,并在源上定义PRIMARY KEY。框架将生成一个额外的有状态运算符,并使用主键来消除更改事件的重复,并生成一个规范化的更改日志流。
6、数据类型映射
目前,maxwell Format 使用 JSON Format 进行序列化和反序列化。 有关数据类型映射的更多详细信息,请参阅 JSON Format 文档。
以上,本文详细的介绍了maxwell的部署、2个示例以及在Flink 中通过maxwell将数据变化信息同步到Kafka中,然后通过Flink SQL client进行读取。