资讯详情

MySQL 的 Debezium 连接器-中文版

MySQL 的 Debezium 连接器

MySQL 二进制日志(binlog),它按提交到数据库的顺序记录所有操作。这包括表模式和表中数据的变化。MySQL 使用 binlog 复制和恢复。

Debezium MySQL 连接器读取 binlog,为行级INSERT、、UPDATEDELETE操作生成变更事件,并将变更事件发送到 Kafka 主题 Kafka 主题。

由于 MySQL 通常设置在指定时间段后清除 binlog,因此 MySQL 连接器将初始执行您的每个数据库一致快照。MySQL 从创建快照的位置读取连接器 binlog。

与此连接器兼容的连接器 MySQL 请参考数据库版本的信息Debezium 版本概述。

连接器的工作原理

支持连接器 MySQL 拓扑概述对规划您的应用程序非常有用。优化配置和操作 Debezium MySQL 连接器了解如何跟踪表结构、公共模式变更、执行快照和确定 Kafka 主题名称会很有帮助。

Debezium MySQL 连接器还没有 MariaDB 来自社区的多份报告显示,该连接器已成功用于数据库。计划在未来进行 Debezium 提供正确的版本 MariaDB 官方支持。

支持的 MySQL 拓扑

Debezium MySQL 支持以下连接器 MySQL 拓扑:

历史主题的结构

当数据库客户端查询数据库时,客户端使用当前的数据库模式。然而,数据库架构可以随时更改,这意味着连接器必须能够识别每次插入、更新或删除记录的架构。此外,连接器不仅可以使用当前的模式,因为连接器可能正在处理在更改表模式之前记录的相对较旧的事件。

为确保架构变更后的变更正确处理,MySQL 在 binlog 它不仅包括数据的行级变更,还包括应用于数据库 DDL 语句。读取连接器时 binlog 并遇到这些 DDL 在句子中,它会分析并更新每个表模式的内存表示。每次插入、更新或删除操作时,连接器使用此模式来识别表结构,并产生适当的变更。历史上的数据库历史中 Kafka 在主题中,连接器记录所有内容 DDL 句子和每一个 DDL 语句出现在 binlog 中的位置。

当连接器在崩溃或优雅停止后重新启动时,连接器将从特定位置读取,即从特定时间点读取 binlog。连接器读取数据库历史 Kafka 主题和分析一切 DDL 在连接器启动的二进制日志中的点之前,重建此时存在的表结构。

该数据库的历史主题仅供连接器使用。连接器可以选择将模式变更事件发送到消费者应用程序的不同主题。

当 MySQL 框架变更工具应用于连接器捕获gh-ost或表中的变化pt-online-schema-change在迁移过程中创建辅助表。需要配置连接器来捕获这些帮助表的更改。如果消费者不需要生成帮助表的记录,他们可以通过单个信息转换过滤掉它们。

查看接收 Debezium 事件记录主题的默认名称。

改变主题的架构

您可以配置 Debezium MySQL 连接器以生成模式更改事件,描述应用于数据库捕获的表的模式更改。连接器将架构变更写入名称 的 Kafka 主题*<serverName>*,其中是连接器配置属性*serverName*中指定的逻辑服务器名称。database.server.name将连接器发送到架构改变主题的消息包含有效负载,而(可选)还包含改变事件消息的架构。

事件消息的有效负载包括以下元素:

  • ddl

    提供导致架构变更的 SQL CREATEALTER或语句。DROP

  • databaseName

    应用 DDL 语句数据库的名称。databaseName作为信息键。

  • pos

    语句出现在 binlog 中的位置。

  • tableChanges

    整个表架构的结构化表示在架构改变后。tableChanges字段包含一个数组,包含表中每列的条目。由于结构化表示 JSON 或 Avro 数据呈现格式,消费者可以轻松读取消息,而无需先通过 DDL 处理分析器。

对于捕获模式的表,连接器不仅将模式变更的历史记录存储在模式变更的主题中,而且还存储在内部数据库的历史记录主题中。内部数据库的历史主题仅供连接器使用,不适合直接使用消费者应用程序。确保只使用来自架构变更主题的信息。
永远不要划分数据库的历史主题。为了使数据库的历史主题正确运行,它必须保持连接器对事件记录的一致顺序。为确保主题不会在分区之间拆分,请使用以下方法之一设置主题的分区计数:如果您手动创建数据库的历史主题,请将分区计数指定为1.如果您使用 Apache Kafka 如果代理商自动创建数据库的历史主题,它将创建主题。Kafkanum.partitions配置选项的值设置为1.
将主题发送给其模式变更的信息格式处于孵化状态,如有变更,将不另行通知。

例子:发送到 MySQL 连接器架构改变主题的消息

显示了以下示例 JSON 典型的格式架构改变了新闻。该消息包含表模式的逻辑表示。

{ 
           "shema": { 
        
  ...
  },
  "payload": { 
        
        "source": { 
          // (1)
        "version": "1.9.5.Final",
        "connector": "mysql",
        "name": "dbserver1",
        "ts_ms": 0,
        "snapshot": "false",
        "db": "inventory",
        "sequence": null,
        "table": "customers",
        "server_id": 0,
        "gtid": null,
        "file": "mysql-bin.000003",
        "pos": 219,
        "row": 0,
        "thread": null,
        "query": null
    },
    "databaseName": "inventory", // (2)
    "schemaName": null,
    "ddl": "ALTER TABLE customers ADD COLUMN middle_name VARCHAR(2000)", // (3)
    "tableChanges": [ // (4)
        { 
        
        "type": "ALTER", // (5)
        "id": "\"inventory\".\"customers\"",  // (6)
        "table": { 
         // (7)
            "defaultCharsetName": "latin1",
            "primaryKeyColumnNames": [  // (8)
                "id"
            ],
            "columns": [ // (9)
                { 
        
                "name": "id",
                "jdbcType": 4,
                "nativeType": null,
                "typeName": "INT",
                "typeExpression": "INT",
                "charsetName": null,
                "length": 11,
                "scale": null,
                "position": 1,
                "optional": false,
                "autoIncremented": true,
                "generated": true
            },
            { 
        
                "name": "first_name",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "latin1",
                "length": 255,
                "scale": null,
                "position": 2,
                "optional": false,
                "autoIncremented": false,
                "generated": false
            },                        { 
        
                "name": "last_name",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "latin1",
                "length": 255,
                "scale": null,
                "position": 3,
                "optional": false,
                "autoIncremented": false,
                "generated": false
            },
            { 
        
                "name": "email",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "latin1",
                "length": 255,
                "scale": null,
                "position": 4,
                "optional": false,
                "autoIncremented": false,
                "generated": false
            },
            { 
        
                "name": "middle_name",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "latin1",
                "length": 2000,
                "scale": null,
                "position": 5,
                "optional": true,
                "autoIncremented": false,
                "generated": false
            }
          ]
        }
      }
    ]
  },
  "payload": { 
        
    "databaseName": "inventory",
    "ddl": "CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512), weight FLOAT ); ALTER TABLE products AUTO_INCREMENT = 101;",
    "source" : { 
        
      "version": "1.9.5.Final",
      "name": "mysql-server-1",
      "server_id": 0,
      "ts_ms": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "snapshot": true,
      "thread": null,
      "db": null,
      "table": null,
      "query": null
    }
  }
}
物品 字段名称 描述
1 source source字段的结构与连接器写入特定于表的主题的标准数据更改事件完全相同。此字段可用于关联不同主题的事件。
2 databaseName schemaName 标识包含更改的数据库和架构。该databaseName字段的值用作记录的消息键。
3 ddl 此字段包含负责架构更改的 DDL。该ddl字段可以包含多个 DDL 语句。每个语句都适用于数据库中的databaseName字段。多个 DDL 语句按照它们应用于数据库的顺序出现。 客户端可以提交多个适用于多个数据库的 DDL 语句。如果 MySQL 以原子方式应用它们,则连接器按顺序获取 DDL 语句,按数据库对它们进行分组,并为每个组创建一个模式更改事件。如果 MySQL 单独应用它们,连接器会为每个语句创建一个单独的模式更改事件。
4 tableChanges 包含由 DDL 命令生成的架构更改的一个或多个项目的数组。
5 type 描述变化的种类。该值为以下之一:CREATE表已创建。ALTER表已修改。DROP表已删除。
6 id 创建、更改或删除的表的完整标识符。在表重命名的情况下,此标识符是表名的串联。*<old>*,*<new>*
7 table 表示应用更改后的表元数据。
8 primaryKeyColumnNames 组成表的主键的列的列表。
9 columns 已更改表中每一列的元数据。

另请参阅:模式历史主题。

快照

首次启动 Debezium MySQL 连接器时,它会执行数据库的初始一致快照。以下流程描述了连接器如何创建此快照。此流程适用于默认快照模式,即initial. 有关其他快照模式的信息,请参阅MySQL 连接器snapshot.mode配置属性。

行动
1 获取阻止其他数据库客户端写入的全局读锁。 快照本身不会阻止其他客户端应用可能会干扰连接器尝试读取 binlog 位置和表模式的 DDL。连接器在读取 binlog 位置时保持全局读锁,并如后面的步骤所述释放锁。
2 启动具有可重复读取语义的事务,以确保事务中的所有后续读取都针对一致的快照完成。
3 读取当前的 binlog 位置。
4 读取连接器配置为捕获更改的数据库和表的架构。
5 释放全局读锁。其他数据库客户端现在可以写入数据库。
6 如果适用,将 DDL 更改写入架构更改主题,包括所有必要DROP…CREATE…DDL 语句。
7 扫描数据库表。对于每一行,连接器将CREATE事件发送到相关的特定于表的 Kafka 主题。
8 提交事务。
9 在连接器偏移中记录完成的快照。
  • 连接器重新启动

    如果连接器在执行初始快照时发生故障、停止或重新平衡,则在连接器重新启动后,它会执行新的快照。在初始快照完成后,Debezium MySQL 连接器从 binlog 中的相同位置重新启动,因此它不会错过任何更新。如果连接器停止的时间足够长,MySQL 可能会清除旧的二进制日志文件,连接器的位置就会丢失。如果位置丢失,连接器将恢复为其起始位置的*初始快照。*有关对 Debezium MySQL 连接器进行故障排除的更多提示,请参阅出现问题时的行为。

  • 不允许全局读锁

    某些环境不允许全局读锁。如果 Debezium MySQL 连接器检测到不允许全局读锁,则连接器使用表级锁代替并使用此方法执行快照。这要求 Debezium 连接器的数据库用户具有LOCK TABLES权限。表 3. 使用表级锁执行初始快照的工作流程步行动1获取表级锁。2启动具有可重复读取语义的事务,以确保事务中的所有后续读取都针对一致的快照完成。3读取和过滤数据库和表的名称。4读取当前的 binlog 位置。5读取连接器配置为捕获更改的数据库和表的架构。6如果适用,将 DDL 更改写入架构更改主题,包括所有必要DROP…CREATE…DDL 语句。7扫描数据库表。对于每一行,连接器将CREATE事件发送到相关的特定于表的 Kafka 主题。8提交事务。9释放表级锁。10在连接器偏移中记录完成的快照。

即席快照

默认情况下,连接器仅在首次启动后才运行初始快照操作。在这个初始快照之后,在正常情况下,连接器不会重复快照过程。连接器捕获的任何未来更改事件数据仅通过流式处理进入。

但是,在某些情况下,连接器在初始快照期间获得的数据可能会变得陈旧、丢失或不完整。为了提供一种重新捕获表数据的机制,Debezium 包含一个执行临时快照的选项。数据库中的以下更改可能会导致执行临时快照:

  • 修改连接器配置以捕获一组不同的表。
  • Kafka 主题被删除,必须重建。
  • 由于配置错误或其他问题而发生数据损坏。

您可以通过启动所谓的ad-hoc 快照为之前捕获快照的表重新运行快照。即席快照需要使用信令表。您可以通过向 Debezium 信号表发送信号请求来启动临时快照。

当您启动现有表的临时快照时,连接器会将内容附加到表已存在的主题中。如果删除了以前存在的主题,如果启用了自动主题创建,Debezium 可以自动创建主题。

即席快照信号指定要包含在快照中的表。快照可以捕获数据库的全部内容,或仅捕获数据库中表的子集。

execute-snapshot您可以通过向信令表发送消息来指定要捕获的表。将execute-snapshot信号的类型设置为incremental,并提供要包含在快照中的表的名称,如下表所述:

场地 默认 价值
type incremental 指定要运行的快照类型。 设置类型是可选的。目前,您只能请求incremental快照。
data-collections 不适用 一个数组,其中包含要生成快照的表的完全限定名称。名称的格式与配置选项 的格式相同。signal.data.collection

触发临时快照

execute-snapshot您可以通过将具有信号类型的条目添加到信令表来启动临时快照。连接器处理完消息后,将开始快照操作。快照进程读取第一个和最后一个主键值,并将这些值用作每个表的起点和终点。根据表中的条目数和配置的块大小,Debezium 将表划分为块,并继续对每个块进行快照,一次一个。

目前,execute-snapshot操作类型仅触发增量快照。有关详细信息,请参阅增量快照。

增量快照

为了提供管理快照的灵活性,Debezium 包含一个补充快照机制,称为增量快照。增量快照依靠 Debezium 机制向 Debezium 连接器发送信号。增量快照基于DDD-3设计文档。

在增量快照中,Debezium 不是像在初始快照中那样一次捕获数据库的完整状态,而是在一系列可配置的块中分阶段捕获每个表。您可以指定您希望快照捕获的表和每个块的大小。块大小决定了快照在数据库上的每次提取操作期间收集的行数。增量快照的默认块大小为 1 KB。

随着增量快照的进行,Debezium 使用水印来跟踪其进度,维护它捕获的每个表行的记录。与标准初始快照过程相比,这种分阶段捕获数据的方法具有以下优势:

  • 您可以在流式数据捕获的同时运行增量快照,而不是将流式传输推迟到快照完成。连接器在整个快照过程中继续从更改日志中捕获近乎实时的事件,并且两个操作都不会阻塞另一个操作。
  • 如果增量快照的进度中断,您可以恢复它而不会丢失任何数据。进程恢复后,快照从它停止的点开始,而不是从头重新捕获表。
  • 您可以随时按需运行增量快照,并根据需要重复该过程以适应数据库更新。例如,您可以在修改连接器配置以将表添加到其table.include.list属性后重新运行快照。

增量快照过程

当您运行增量快照时,Debezium 按主键对每个表进行排序,然后根据配置的块大小将表拆分为块。逐块工作,然后捕获块中的每个表行。对于它捕获的每一行,快照都会发出一个READ事件。该事件表示块的快照开始时行的值。

随着快照的进行,其他进程可能会继续访问数据库,可能会修改表记录。为反映此类更改,INSERTUPDATEDELETE操作将照常提交到事务日志。同样,正在进行的 Debezium 流式处理继续检测这些更改事件并将相应的更改事件记录发送到 Kafka。

Debezium 如何解决具有相同主键的记录之间的冲突

在某些情况下,流式处理发出的UPDATEDELETE事件被乱序接收。也就是说,流式处理可能会在快照捕获包含该行的READ事件的块之前发出一个修改表行的事件。当快照最终为该行发出相应的READ事件时,它的值已经被取代。为了确保以正确的逻辑顺序处理乱序到达的增量快照事件,Debezium 采用了一种缓冲方案来解决冲突。只有在解决了快照事件和流事件之间的冲突后,Debezium 才会向 Kafka 发出事件记录。

快照窗口

为了帮助解决延迟到达事件和修改同一表行的流事件之间的冲突READ,Debezium 采用了所谓的快照窗口。快照窗口划分了增量快照捕获指定表块数据的时间间隔。在一个块的快照窗口打开之前,Debezium 遵循其通常的行为并从事务日志直接向下游发送事件到目标 Kafka 主题。但是从特定块的快照打开的那一刻起,直到它关闭,Debezium 执行重复数据删除步骤以解决具有相同主键的事件之间的冲突。

对于每个数据集合,Debezium 发出两种类型的事件,并将它们的记录存储在单个目标 Kafka 主题中。它直接从表中捕获的快照记录作为READ操作发出。同时,随着用户不断更新数据集合中的记录,事务日志也更新以反映每次提交,Debezium 会针对每次更改发出UPDATE或操作。DELETE

当快照窗口打开时,Debezium 开始处理快照块,它将快照记录传递到内存缓冲区。在快照窗口期间,READ缓冲区中事件的主键与传入流事件的主键进行比较。如果未找到匹配项,则将流式事件记录直接发送到 Kafka。如果 Debezium 检测到匹配,它会丢弃缓冲的READ事件,并将流式记录写入目标主题,因为流式事件在逻辑上取代了静态快照事件。块的快照窗口关闭后,缓冲区仅包含READ不存在相关事务日志事件的事件。Debezium 将这些剩余READ事件发送到表的 Kafka 主题。

连接器对每个快照块重复该过程。

触发增量快照

目前,启动增量快照的唯一方法是将临时快照信号发送到源数据库上的信令表。INSERT您将信号作为 SQL查询提交给表。Debezium 检测到信号表中的变化后,它会读取信号,并运行请求的快照操作。

您提交的查询指定要包含在快照中的表,并且可以选择指定快照操作的类型。目前,快照操作的唯一有效选项是默认值incremental.

要指定要包含在快照中的表,请提供一个data-collections列出这些表的数组,例如, {"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

增量快照信号的data-collections数组没有默认值。如果data-collections数组为空,Debezium 检测到不需要任何操作并且不执行快照。

如果要包含在快照中的表.的名称在数据库、模式或表的名称中包含点 (),则要将表添加到data-collections数组中,您必须用双引号对名称的每个部分进行转义. 例如,要包含存在于**public**架构中且名称为 的表**My.Table**,请使用以下格式:**"public"."My.Table"**.

先决条件

  • 信令已启用。
    • 源数据库上存在信令数据集合,连接器配置为捕获它。
    • 信令数据集合在signal.data.collection属性中指定。

程序

  1. 发送 SQL 查询以将临时增量快照请求添加到信令表:

    INSERT INTO _<signalTable>_ (id, type, data) VALUES (_'<id>'_, _'<snapshotType>'_, '{"data-collections": ["_<tableName>_","_<tableName>_"],"type":"_<snapshotType>_"}');
    

    例如,

    INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.table1", "schema2.table2"],"type":"incremental"}');
    

    id命令中的、type和参数的值data对应于信令表的字段。

    下表描述了这些参数:

    价值 描述
    myschema.debezium_signal 指定源数据库上的信令表的完全限定名称
    ad-hoc-1 id参数指定一个任意字符串,该字符串被分配为id信号请求的标识符。 使用此字符串将日志消息标识到信令表中的条目。Debezium 不使用此字符串。相反,在快照期间,Debezium 会生成自己的id字符串作为水印信号。
    execute-snapshot 指定type参数指定信号要触发的操作。
    data-collections 信号字段的必需组件,data它指定要包含在快照中的表名数组。 该数组按表的完全限定名称列出表,使用的格式与您在signal.data.collection配置属性中指定连接器信号表的名称时使用的格式相同。
    incremental 信号字段的可选type组件data,指定要运行的快照操作的种类。 目前,唯一有效的选项是默认值incremental. 在您提交给信令表的 SQL 查询中指定一个type值是可选的。 如果您未指定值,则连接器将运行增量快照。

以下示例显示了连接器捕获的增量快照事件的 JSON。

示例:增量快照事件消息

{ 
        
    "before":null,
    "after": { 
        
        "pk":"1",
        "value":"New data"
    },
    "source": { 
        
        ...
        "snapshot":"incremental" 
    },
    "op":"r", 
    "ts_ms":"1620393591654",
    "transaction":null
}
物品 字段名称 描述
1 snapshot 指定要运行的快照操作的类型。 目前,唯一有效的选项是默认值incremental. 在您提交给信令表的 SQL 查询中指定一个type值是可选的。 如果您未指定值,则连接器将运行增量快照。
2 op 指定事件类型。 快照事件的值为r,表示READ操作。

只读增量快照

MySQL 连接器允许使用与数据库的只读连接来运行增量快照。要运行具有只读访问权限的增量快照,连接器使用已执行的全局事务 ID (GTID) 设置为高水位线和低水位线。通过将二进制日志 (binlog) 事件的 GTID 或服务器的心跳与低水位线和高水位线进行比较来更新块窗口的状态。

要切换到只读实现,请将read.only属性的值设置为true

先决条件

  • 启用 MySQL GTID。
  • 如果连接器从多线程副本(即,值replica_parallel_workers大于的副本0)读取,则必须设置以下选项之一:
    • replica_preserve_commit_order=ON
    • slave_preserve_commit_order=ON

即席只读增量快照

当 MySQL 连接为只读时,信令表机制还可以通过向signal.kafka.topic属性中指定的 Kafka 主题发送消息来运行快照。

Kafka 消息的键必须与database.server.name连接器配置选项的值匹配。

该值是一个带有typedata字段的 JSON 对象。

信号类型是execute-snapshotdata字段必须有以下字段:

场地 默认 价值
type incremental 要执行的快照的类型。目前仅incremental支持。 有关详细信息,请参阅下一节。
data-collections 不适用 要快照的表的限定名称数组。名称的格式与signal.data.collection配置选项 的格式相同。

执行快照 Kafka 消息的示例:

键 = `test_connector`

值 = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

快照事件的操作类型

READMySQL 连接器将快照事件作为操作发出("op" : "r")。如果您希望连接器将快照事件作为CREATE( c) 事件发出,请配置 DebeziumReadToInsertEvent单消息转换 (SMT) 以修改事件类型。

以下示例显示了如何配置 SMT:

示例:使用ReadToInsertEventSMT 更改快照事件的类型

转换=快照插入,...
transforms.snapshotasinsert.type=io.debezium.connector.mysql.transforms.ReadToInsertEvent

主题名称

默认情况下,MySQL 连接器将表中发生的所有 、 和 操作的更改事件写入INSERT特定UPDATEDELETE该表的单个 Apache Kafka 主题。

连接器使用以下约定来命名更改事件主题:

serverName.databaseName.tableName

假设fulfillment是服务器名称,inventory是数据库名称,并且数据库包含名为orderscustomers和的表products。Debezium MySQL 连接器向三个 Kafka 主题发出事件,每个主题对应一个数据库中的表:

履行.库存.订单
履行.库存.客户
履行.库存.产品

以下列表提供了默认名称组件的定义:

  • 服务器名称

    database.server.name由连接器配置属性指定的服务器的逻辑名称。

  • 模式名称

    发生操作的模式的名称。

  • tableName

    发生操作的表的名称。

连接器应用类似的命名约定来标记其内部数据库历史主题、模式更改主题和事务元数据主题。

如果默认主题名称不符合您的要求,您可以配置自定义主题名称。要配置自定义主题名称,请在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 自定义主题命名的更多信息,请参阅主题路由。

交易元数据

Debezium 可以生成表示事务边界和丰富数据更改事件消息的事件。

Debezium 接收交易元数据的时间限制Debezium 仅注册和接收部署连接器后发生的事务的元数据。部署连接器之前发生的事务的元数据不可用。

BEGINDebezium 为每个事务中的和END分隔符生成事务边界事件。事务边界事件包含以下字段:

  • status

    BEGINEND

  • id

    唯一交易标识符的字符串表示形式。

  • event_count(用于END活动)

    事务发出的事件总数。

  • data_collections(用于END活动)

    一对data_collectionevent_count元素的数组。表示连接器针对源自数据集合的更改发出的事件数。

例子

{ 
        
  "status": "BEGIN",
  "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10",
  "event_count": null,
  "data_collections": null
}

{ 
        
  "status": "END",
  "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10",
  "event_count": 2,
  "data_collections": [
    { 
        
      "data_collection": "s1.a",
      "event_count": 1
    },
    { 
        
      "data_collection": "s2.a",
      "event_count": 1
    }
  ]
}

除非通过transaction.topic选项覆盖,否则连接器会向主题发出事务事件。**.transaction

更改数据事件丰富

启用事务元数据后,数据消息Envelope会增加一个新transaction字段。此字段以字段组合的形式提供有关每个事件的信息:

  • id- 唯一交易标识符的字符串表示
  • total_order- 事件在事务产生的所有事件中的绝对位置
  • data_collection_order- 事件在事务发出的所有事件中的每个数据收集位置

以下是消息的示例:

{ 
        
  "before": null,
  "after": { 
        
    "pk": "2",
    "aa": "1"
  },
  "source": { 
        
...
  },
  "op": "c",
  
        标签: pk1接近传感器sc1204

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台