资讯详情

Blink SQL之创建日志服务SLS源表

创建日志服务SLS源表

仅适用于Blink 1.4.5及以上版本。

日志服务是什么?

日志服务SLS是针对日志类数据的一站式服务,对于日志服务而言,数据格式类似JSON,示例如下。

{ 
             "a": 1000,     "b": 1234,     "c": "li" } 

日志服务本身就是流数据存储,Blink它可以作为流式数据的输入。

语法示例

日志服务源表DDL示例如下(代码中的sls代表日志服务)。

create table sls_stream(   a INT,   b INT,   c VARCHAR ) with (   type ='sls',     endPoint ='http://cn-hangzhou-share.log.aliyuncs.com',   accessId ='<yourAccessId>',   accessKey ='<yourAccessKey>',   startTime = '2017-07-05 00:00:00',   project ='<yourProjectName>',   logStore ='<yourLogStoreName>',   consumerGroup ='<yourConsumerGroupName>' ); 

WITH参数

参数 说明 是否必选 备注
type 源表类型 固定值为sls。
endPoint 消费端点信息 服务入口
accessId AccessKey ID
accessKey AccessKey Secret
project 读取的SLS项目名称
logStore Project下的具体的LogStore名称
startTime 日志消费的开始时间
consumerGroup 消费组名 您可以定制消费者组名(没有固定格式)。
heartBeatIntervalMills 消费者端心跳间隔 默认值为10000,单位为毫秒。
maxRetryTimes 读取最大尝试次数 默认值为5。
batchGetSize 单次读取logGroup的条数 默认值为100。
columnErrorDebug 调试开关是否打开 默认值为false,不要打开。若选择打开,则打印分析异常的日志。
startupMode 启动消费模式 取值如下::每个Shard消费从指定时间开始。:每个Shard从最早的位置开始消费。:每个Shard消费从最新位置开始。:每个Shard优先从服务端保存Checkpoint必须指定开始消费consumerGroup。消费模式如下: 如果从Flink State成功恢复状态,从Flink状态中的Checkpoint开始消费。 如果从Flink State中恢复状态失败: 如果consumerGroup中存在Checkpoint,则尝试从consumerGroup的Checkpoint开始消费。 如果consumerGroup中不存在Checkpoint: 指了startTime:从startTime开始消费。 未指定startTime:每个shard从最早位置开始消费。

  • 1.6.0及以下版本,在指定consumerGroup的Shards数目时,可能会影响读取性能。

  • SLS暂不支持MAP类型的数据。

  • SLS对于不存在字段会置为Null。

  • 字段顺序支持无序(建议字段顺序和表中定义一致)。

  • 输入数据源为JSON形式时,注意定义分隔符,并且需要采用内置函数JSON_VALUE分析,否则就会解析失败,报错如下。

    2017-12-25 15:24:43,467 WARN [Topology-0 (1/1)] com.alibaba.blink.streaming.connectors.common.source.parse.DefaultSourceCollector - Field missing **error**, table column number: 3, data column number: 3, data filed number: 1, data: [{"lg_order_code":"LP00000005","activity_code":"TEST_CODE1","occur_time":"2017-12-10 00:00:01"}]

  • 设置不能超过1000,否则会报错。

  • 指明的是logGroup获取的数量。如果单条logItem的大小和都很大,有可能会导致频繁的垃圾回收(Garbage Collection),这种情况下该参数应调小。

类型映射

建议您使用该对应关系进行DDL声明。

日志服务字段类型 实时计算Blink版字段类型
STRING VARCHAR

属性字段

目前Flink SQL默认支持3个SLS属性字段的获取,也支持其它自定义字段的写入。

字段名 注释说明
__source__ 消息源
__topic__ 消息主题
__timestamp__ 日志时间

代码示例

create table sls_input(
  a int, 
  b int,
  c varchar
) with (
  type ='sls',
  endPoint ='http://cn-hangzhou-share.log.aliyuncs.com',
  accessId ='<yourAccessI>',
  accessKey ='<yourAccessKey>',
  startTime = '2017-07-05 00:00:00',
  project ='ali-cloud-streamtest',
  logStore ='stream-test',
  consumerGroup ='consumerGroupTest1'
);

create table print_output(
 a int,
 b int,
 c varchar 
) with (
  type='print'
);

INSERT INTO print_output
SELECT 
  a, b, c
from sls_input;

常见问题

  1. Q:为什么Job的整体延迟增加,或有窗口聚合的Job无输出?

    A:如果一个Partition没有新数据写入,会导致上述情况,只需要把并发数调整为读写的Partition数量即可。

  2. Q:如何设置并发数?

    A:并发数建议等于Partition数量,否则当两个Partition读取速度差异较大时,理论上在追数据场景,存在数据被过滤掉和数据延迟的风险。

  3. Q:Flink Job延迟增大应该如何排查?

    A:SLS源表可能会发生Shard分裂,分裂后的Shard index会不连续,导致Flink延迟增大。如果发现Flink Job延迟增大,请查看SLS源是否发生分裂。

标签: sls103无线温湿度变送器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

 锐单商城 - 一站式电子元器件采购平台  

 深圳锐单电子有限公司