Kafka Connect 是一种用于在 Apache Kafka? 与其他数据系统的工具。它可以快速定义大型数据集的移入和移出 Kafka 连接器变得简单。Kafka Connect 可以从应用程序服务器收集指标到整个数据库或 Kafka 在主题中,数据可用于低延迟流处理。导出连接器可以将数据从出 Kafka 主题传输到二级索引(如 Elasticsearch)或批处理系统(如 Hadoop)离线分析。
一、Kafka主要概念
- 高级抽象连接器通过管理任务协调数据流
- 任务-如何复制数据 Kafka 或从 Kafka 实现复制数据
- 工人- 执行连接器和任务的运行过程
- 转换器-用于在场 Connect 在发送或接收数据的系统之间转换数据的代码
- 转换- 从连接器产生或发送到连接器的每个消息的简单逻辑变化
- 死信队列– Connect 如何处理连接器错误
1.1 连接器
Kafka Connect 中间的连接器定义了数据应该复制到哪里和从哪里。阿连接器实例是管理卡夫卡与其他系统之间数据复制的逻辑工作。连接器实现或使用的所有类都在连接器插件中定义。连接器实例和连接器插件可以称为连接器,但引用的上下文应始终清楚(例如,安装连接器是指插件,检查连接器的状态) 指连接器实例)。
我们鼓励用户使用现有的连接器。然而,新的连接器插件可以从零开始编写。一般来说,希望编写新连接器插件的开发人员遵循以下工作流程。开发人员指南提供了更多信息。
1.2 任务
任务是 Connect 数据模型的主要参与者。协调一组实际复制数据的任务。单个操作通过允许连接器分解为多个任务,Kafka Connect 内置支持并行性和可扩展数据复制,配置很少。这些任务中没有存储状态。存储在任务状态 Kafka 在特殊主题中config.storage.topic,status.storage.topic并由相关连接器管理。因此,可以随时启动、停止或重新启动任务,以提供弹性、可扩展的数据管道。
通过 Connect 源任务传入 Kafka 数据的高级表示。请注意,存储内部偏移量 Kafka 或者磁盘,而不是存储在任务本身。
1.3任务再平衡
当连接器首次提交给集群时,工作人员将重新平衡集群中的全套连接器及其任务,使每个工作人员的工作量大致相同。当连接器增加或减少所需的任务数量时,或当连接器的配置发生变化时,也会使用相同的重新平衡过程。当工作人员失败时,任务将在活动人员之间重新平衡。由于任务失败被视为例外,任务失败不会触发重新平衡。因此,失败的任务不会由框架自动重新启动,而该通过REST API重新启动。
任务故障转移示例显示了工作器故障时如何重新平衡任务。
1.4 工人
工作的逻辑单元必须安排连接器和任务。Kafka Connect 将这些过程称为员工,有两种类型的员工:独立和分布式。
1.4.1 独立工作者
独立模式是最简单的模式,其中一个进程负责执行所有连接器和任务。
因为它是一个单一的过程,所以它需要最少的配置。独立模式很容易进入、开发和一些只有一个过程的有意义的情况,比如从主机收集日志。然而,由于只有一个过程,其功能更有限:可扩展性仅限于单个过程,除了您添加到任何监控的单个过程外,没有容错性。
1.4.2 分布式工作者
分布式模式为 Kafka Connect 提供可扩展性和自动容错性。在分布式模式下,您可以以同样的方式启动许多工作过程group.id,它们将自动协调所有可用人员之间的连接器和任务的执行。如果您添加员工、关闭员工或员工意外失败,其他员工将检测到这一点,并自动协调连接器和任务在更新的可用员工集之间重新分配。请注意与消费者组重新平衡的相似性。幕后,连接人员正在利用消费者群体进行协调和重新平衡。
所有工作人员相同group.id将在同一连接集群中。例如,如果 worker-agroup.id=connect-cluster-a和 worker-b 具有相同的group.id,则 worker-a 和 worker-b 形成一个名字 的集群connect-cluster-a。
三节点 Kafka Connect 分布式集群。活动人员之间自动平衡连接器(监控源或接收器系统的更改需要重新配置任务)和任务(复制连接器数据的子集)。任务之间的工作分工由每个任务分配的分区表示。
二、Connector 单机版部署连接JDBC
2.1 下载kafka-connect-jdbc
下载安装JDBC SOURCE CONNECTOR 网址:https://www.confluent.io/hub/ 解压到kafka在下载目录中plugins文件夹下,解压后有文件
2.2 整理文件
将etc复制文件夹中的两个文件kafka在下载目录中config文件夹下,两者分别重命名为connect-mysql-sink.properties” 和“connect-mysql-source.properties”
将lib文件中的kafka-connect-jdbc-10.2.0.jar的文件在kafka在下载目录中libs在文件中新建一个connector将此文件放入文件并将其放入lib下的所有文件复制到kafka的libs并将在目录下jdbc的连接驱动jar一份包复制到kafka的libs文件中一份
2.2.1修改connect-mysql-sink.properties里面的配置
name=test-sink connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=1 topics=orders connection.url=jdbc:mysql://localhost:3306/kafkatest?user=root&password=root auto.create=true pk.mode = record_value pk.fields = kafkacol table.name.format=kafkatable security.protocol=SSL ssl.truststore.location=/tmp/kafka.client.truststore.jks
2.2.2 修改connect-mysql-source.properties里面的配置
name=tst-source-sqlite-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/kafkatest?user=root&password=root
mode=incrementing
incrementing.column.name=id
topic.prefix=test-sqlite-jdbc-
2.2.3 修改connect-standalone.properties配置
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
#配置jdbc在kafka配置目录中的位置
plugin.path=E:\kafka_2.12-2.4.0\libs\connector
2.2、测试connect-jdbc
2.2.1 分别启动zkserver和各个kafka
2.2.2 启动kafka-connect
bin\windows\connect-standalone.bat config/connect-standalone.properties config\connect-mysql-source.properties
2.2.3 测试
在被连接的数据库的表中插入一条数据,观察kafka是否有新的主题生成,并且查看里面的数据