一、实验环境
以下环境需要提前进行VMWare构建虚拟机:
1.1 Maxwell 周边生态组件版本
软件 | 版本 | 说明 |
---|---|---|
CentOS | 7.6 | - |
CDH版本 | Cloudera Express 5.16.2 (JDK 1.8) | - |
Kafka | CDH集成的 0.11.0 kafka3.0.0 | - |
Maxwell | 1.34.1 (JDK 11、需要单独安装) | 安装目录: /opt/module/maxwell , 需在${MAXWELL_HOME}/bin/maxwell 文件头部, 添加JAVA_HOME, 如: JAVA_HOME=/opt/module/jdk-11.0.7 |
设置软链接:
ln -s /opt/module/maxwell-1.34.1 /opt/module/maxwell
新日志目录:
mkdir -p /opt/module/maxwell/logs
1.2 数据库相关信息(数据源)
数据库编号 | 数据库名称 | 数据库IP | 数据库PORT | 备注 |
---|---|---|---|---|
1 | maxwell | 111.111.111.111 | 3306 | Maxwell元数据库, binlog : master.000001 |
2 | gmall | 111.111.111.112 | 3306 | 业务数据库(在线商城) |
1.3 Kafka相关信息(数据着陆)
MQ编号 | MQ 列表 |
---|---|
1 | dn3:9092,dn4:9092,dn5:9092 |
二、实现目标
目标:
电子商城商务库 gmall 下, 过滤指定的数据表, 并同步至 kafka集群的指定topic(名称: topic_db)主题中.
如下图 part-1 部分所示:
.
三、Maxwell配置
3.1 目录结构
[hdfs@dn5 module]$ tree -L 2 /opt/module/maxwell maxwell ├── bin │ ├── maxwell │ ├── maxwell-benchmark │ ├── maxwell-bootstrap │ └── maxwell-docker ├── config.md ├── config.properties.example ├── kinesis-producer-library.properties.example ├── lib │ └── ****.jar ├── LICENSE ├── log4j2.xml ├── maxwelllog │ └── gmall_new_kafka.log ├── quickstart.md ├── README_2.md ├── README.md ├── run-gmall_new_kafka.sh └── my_custom_config └── gmall_rtdw_test.properties
3.2 maxwell配置文件
log_level=info # Maxwell id info client_id=gmall_client_v2022u0101 replica_server_id=1234 # Maxwell database info host=111.111.111.111 user=USER password=PASSWORD port=3306 # Kafka configuration producer=kafka kafka.bootstrap.servers=dn3:9092,dn4:9092,dn5:9092 kafka_topic=topic_db kafka_partition_hash=murmur3 producer_partition_by=primary_key # Biz database(s) info replication_host=111.111.111.112
replication_user=ROOT
replication_password=PASSWORD
replication_port=3306
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
exclude_dbs=*
include_dbs=gmall
include_tables=activity_info,base_category1,base_category2,base_category3,base_province,base_region,cart_info,order_info,order_detal,payment_info,sku_info,spu_info,user_info
3.3 创建maxwell启动脚本
run-gmall_new_kafka.sh
#!/bin/bash
/opt/module/maxwell/bin/maxwell --config /opt/module/maxwell/my_custom_config/gmall_rtdw_test.properties >> /opt/module/maxwell/logs/gmall_new_kafka.log 2>&1
也可以将其设置为systemctl服务:
maxwell_gmall_rtdw_test_etl.service
[Unit]
Description=maxwell gmall_rtdw_test
Wants=network-online.target
After=network-online.target
[Service]
Type=simple
ExecStart=/opt/module/maxwell/run-gmall_new_kafka.sh
WorkingDirectory=/opt/module/maxwell/
StandardOutput=inherit
StandardError=inherit
Restart=always
RestartSec=20
User=hdfs
StartLimitIntervalSec=0
[Install]
WantedBy=multi-user.target
3.4 服务管控
使用如下命令管控systemctl服务:
# 开机自启
systemctl enable maxwell_gmall_rtdw_test_etl.service
systemctl is-enabled maxwell_gmall_rtdw_test_etl.service
systemctl disable maxwell_gmall_rtdw_test_etl.service
# 状态查询
systemctl status maxwell_gmall_rtdw_test_etl.service
journalctl -xef
# 启停
systemctl start maxwell_gmall_rtdw_test_etl.service
systemctl stop maxwell_gmall_rtdw_test_etl.service
systemctl restart maxwell_gmall_rtdw_test_etl.service
3.5 查看进程
查看jps
[hdfs@dn5 maxwell-1.34.1]$ jps -ml | grep maxwell
11223 com.zendesk.maxwell.Maxwell --config /opt/module/maxwell/my_custom_config/gmall_rtdw_test.properties
查看服务:
[root@dn5 system]# systemctl status maxwell_gmall_rtdw_test_etl.service
● maxwell_gmall_rtdw_test_etl.service - maxwell gmall_new_kafka
Loaded: loaded (/etc/systemd/system/maxwell_gmall_rtdw_test_etl.service; enabled; vendor preset: disabled)
Active: active (running) since Wed 2022-01-01 12:12:12 CST; 1 months 15 days ago
Main PID: 11223 (run-gmall_new_kafka.sh)
CGroup: /system.slice/maxwell_gmall_rtdw_test_etl.service
├─22569 /bin/bash /opt/module/maxwell/run-gmall_new_kafka.sh
└─22570 /opt/module/jdk-11.0.7/bin/java -Xmx27g -Xms15g -Dfile.encoding=UTF-8 -Dlog4j.shutdownCallbackRegistry=com.djdch.log4j.StaticShutdownCallbackRegist...
Jan 1 12:12:12 dn5 systemd[1]: Started maxwell gmall_new_kafka.
四、查看Maxwell启动日志
more /opt/module/maxwell/logs/gmall_new_kafka.log
[hdfs@dn5 maxwelllog]$ more gmall_new_kafka.log
2022-07-12 10:28:25,091 [main] WARN Filter - using exclude/include/includeColumns is deprecated. Please update your configuration to use:
2022-07-12 10:28:25,095 [main] WARN Filter - filter = "exclude: *.*, include: gmall.*, exclude: *.*, include: *.activity_info, include: *.base_category1, include: *.base_ category2, include: *.base_category3, include: *.base_province, include: *.base_region, include: *.cart_info, include: *.order_info, include: *.order_detal, include: *.pay ment_info, include: *.sku_info, include: *.spu_info, include: *.user_info"
2022-07-12 10:28:25,596 [main] INFO Maxwell - Starting Maxwell. maxMemory: 8392802304 bufferMemoryUsage: 0.25
2022-07-12 10:28:26,098 [main] INFO ProducerConfig - ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [dn3:9092, dn4:9092, dn5:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2022-07-12 10:28:26,146 [main] INFO AppInfoParser - Kafka version : 1.0.0
2022-07-12 10:28:26,146 [main] INFO AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
2022-07-12 10:28:26,179 [main] INFO Maxwell - Maxwell v1.34.1 is booting (MaxwellKafkaProducer), starting at Position[BinlogPosition[master.001138:256152843], lastHeartbe
at=0]
2022-07-12 10:28:26,684 [main] INFO AbstractSchemaStore - Maxwell is capturing initial schema
2022-07-12 10:28:36,057 [main] INFO BinlogConnectorReplicator - Setting initial binlog pos to: master.001138:256152843
2022-07-12 10:28:36,078 [blc-172.20.105.58:3306] INFO BinaryLogClient - Connected to 111.111.111.112:3306 at master.000001/256152843 (sid:1333, cid:63998820)
2022-07-12 10:28:36,078 [blc-172.20.105.58:3306] INFO BinlogConnectorReplicator - Binlog connected.
五、实验效果
触发binlog产生:
5.1 若干binlog生成(1+)
基于业务逻辑, 触发生成binlog数据:
update ${DB}.${TABLE_NAME} t set t.create_time=DATE_ADD(t.create_time,INTERVAL 1 SECOND)
where t.${BIZ_DATE}>='2022-01-15 00:00:00';
如修改指定spu的描述:
update gmall.spu_info t
set t.description = '小米10-test-maxwell'
where t.id=1;
使用kafka消费者, 查看maxwell 解析出的 MYSQL binlog数据: [hdfs@dn5 ~]$ kafka-console-consumer --zookeeper dn3:2181 --topic topic_db
{
"database": "gmall",
"table": "spu_info",
"type": "update",
"ts": 1657598220,
"xid": 1338044537,
"commit": true,
"data": {
"id": 1,
"spu_name": "小米10",
"description": "小米10-test-maxwell",
"category3_id": 61,
"tm_id": 1
},
"old": {
"description": "小米10"
}
}
5.2 全表binlog生成
[hdfs@dn5 maxwell]$ bin/maxwell-bootstrap --database gmall --table base_trademark --config ./my_custom_config/gmall_rtdw_test.properties
connecting to jdbc:mysql://111.111.111.111:3306/maxwell?allowPublicKeyRetrieval=true&connectTimeout=5000&serverTimezone=Asia%2FShanghai&zeroDateTimeBehavior=convertToNull&useSSL=false
批量生成指定一批表的binlog: batch_upsert_table_process.sh
#!/bin/bash
include_tables="activity_info,activity_rule,activity_sku,base_category1,base_category2,base_category3,base_province,base_region,base_trademark,coupon_info,coupon_range,coupon_use,financial_sku_cost,sku_info,spu_info,user_info"
function get_element_by_split_comma(){
ifs_old=$IFS
IFS=","
for tbl in $(echo "${include_tables}");do
echo 'Now is handle table >>>>>>>>>>>>>>>>>>>>> '$tbl
/data/maxwell/bin/maxwell-bootstrap --database gmall --table $tbl --config /data/maxwell/my_custom_config/gmall_rtdw_test.properties
done
#Recovery IFS to default
IFS=$ifs_old
}
# shell脚本内调用函数
get_element_by_split_comma
使用kafka消费者, 查看maxwell 解析出的 MYSQL binlog数据: [hdfs@dn5 ~]# kafka-console-consumer --zookeeper dn3:2181 --topic topic_db
{ "database":"gmall","table":"base_trademark","type":"bootstrap-start","ts":1657615954,"data":{ }} { "database":"gmall","table":"base_trademark","type":"bootstrap-insert","ts":1657615954,"data":{ "id":1,"tm_name":"
三星","logo_url":"/static/default.jpg"}} { "database":"gmall","table":"base_trademark","type":"bootstrap-insert","ts":1657615954,"data":{ "id":2,"tm_name":"苹果","logo_url":"/static/default.jpg"}} { "database":"gmall","table":"base_trademark","type":"bootstrap-insert","ts":1657615954,"data":{ "id":3,"tm_name":"华为","logo_url":"/static/default.jpg"}} { "database":"gmall","table":"base_trademark","type":"bootstrap-insert","ts":1657615954,"data":{ "id":4,"tm_name":"TCL","logo_url":"/static/default.jpg"}} { "database":"gmall","table":"base_trademark","type":"bootstrap-insert","ts":1657615954,"data":{ "id":5,"tm_name":"小米","logo_url":"/static/default.jpg"}} { "database":"gmall","table":"base_trademark","type":"bootstrap-insert","ts":1657615954,"data":{ "id":6,"tm_name":"长粒香","logo_url":"/static/default.jpg"}} { "database":"gmall","table":"base_trademark","type":"bootstrap-insert","ts":1657615954,"data":{ "id":7,"tm_name":"金沙河","logo_url":"/static/default.jpg"}} { "database":"gmall","table":"base_trademark","type":"bootstrap-insert","ts":1657615954,"data":{ "id":8,"tm_name":"索芙特","logo_url":"/static/default.jpg"}} { "database":"gmall","table":"base_trademark","type":"bootstrap-insert","ts":1657615954,"data":{ "id":9,"tm_name":"CAREMiLLE","logo_url":"/static/default.jpg"}} { "database":"gmall","table":"base_trademark","type":"bootstrap-insert","ts":1657615954,"data":{ "id":10,"tm_name":"欧莱雅","logo_url":"/static/default.jpg"}} { "database":"gmall","table":"base_trademark","type":"bootstrap-insert","ts":1657615954,"data":{ "id":11,"tm_name":"香奈儿","logo_url":"/static/default.jpg"}} { "database":"gmall","table":"base_trademark","type":"bootstrap-complete","ts":1657615954,"data":{ }}
可以看到, type 一共有3种类型,其中 bootstrap-start 与 bootstrap-complete 的data属性为空.