25.Flink监控 25.1.什么是Metrics 25.2.Metrics分类 25.2.1.Metric Types 25.2.2.代码 25.2.3.操作 26.Flink性能优化 26.1.复用对象 26.2.数据倾斜 26.3.异步IO 26.4.合理调整并行度 27.Flink内存管理 28.Spark VS Flink 28.1.应用场景 28.2.API 28.3.核心角色/流程原理 28.3.1.spark 28.3.2.Flink 28.4.时间机制 28.5.容错机制 28.6.窗口 28.7.整合Kafka 28.8.其他的 28.9.单独补充:流式计算实现原理 28.10.单独补充:背压/反压 28.10.1.back pressure
25.Flink监控
https://blog.lovedata.net/8156c1e1.html
25.1.什么是Metrics
由于集群运行后难以发现内部实际情况,运行缓慢或快,是否异常,开发人员无法实时检查所有情况Task日志,比如作业大或者作业多,该怎么办?这个时候,Metrics帮助开发人员了解操作的现状。
Flink提供的Metrics可以在Flink通过这些指标,开发人员可以更好地了解操作或集群的状态。
25.2.Metrics分类
25.2.1.Metric Types
1.常用的如Counter, 写过mapreduce作业的开发人员就应该很熟悉Counter, 其实意思是一样的,就是累积一个计数器,就是一直向上添加多个数据和多兆数据的过程。 2.Gauga,Gauge是最简单的Metrics,它反映了一个值。例如,这取决于现在。Java heap每次可以实时暴露内存使用多少内存。Gauge, Gauge当前值为heap使用的量。 3.Meter, Meter指单位时间内吞吐量和事件数量的统计。它相当于一个速率,即事件次数除以使用时间。 4.Histogram,Histogram比较复杂,不常用,Histogram用于统计一些数据的分布,如Quantile、Mean、StdDev、Max、Min等。
Metric在Flink内部有多层结构Group它不是一个扁平化的结构,Metric Group Metric Name是Metrics唯一的标识。
25.2.2.代码
package day6; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; import org.apache.flink.streaming.api.datastream.DataStream; import org.apach.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/** * @author tuzuoquan * @date 2022/6/22 9:26 */
public class MetricsDemo {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<String> lines = env.socketTextStream("node1", 9999);
//TODO 2.transformation
SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] arr = value.split(" ");
for (String word : arr) {
out.collect(word);
}
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words
.map(new RichMapFunction<String, Tuple2<String, Integer>>() {
//用来记录map处理了多少个单词
Counter myCounter;
//对Counter进行初始化
@Override
public void open(Configuration parameters) throws Exception {
myCounter = getRuntimeContext().getMetricGroup().addGroup("myGroup").counter("myCounter");
}
//处理单词,将单词记为(单词,1)
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
myCounter.inc();//计数器+1
return Tuple2.of(value, 1);
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(t -> t.f0).sum(1);
//TODO 3.sink
result.print();
//TODO 4.execute
env.execute();
}
}
运行命令:
// /export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
// /export/server/flink/bin/flink run --class cn.xxx.metrics.MetricsDemo /root/metrics.jar
// 查看WebUI
25.2.3.操作
1.打包 2.提交到Yarn上运行 3.查看监控指标 4.也可以通过浏览器f12的找到url发送请求获取监控信息 5.也可以通过代码发送请求获取监控信息
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
public class MetricsTest {
public static void main(String[] args) {
//String result = sendGet("http://node1:8088/proxy/application_1609508087977_0010/jobs/558a5a3016661f1d732228330ebfaad5/vertices/cbc357ccb763df2852fee8c4fc7d55f2/metrics?get=0.Map.myGroup.myCounter");
String result = sendGet("http://node1:8088/proxy/application_1609508087977_0010/jobs/558a5a3016661f1d732228330ebfaad5");
System.out.println(result);
}
public static String sendGet(String url) {
String result = "";
BufferedReader in = null;
try {
String urlNameString = url;
URL realUrl = new URL(urlNameString);
URLConnection connection = realUrl.openConnection();
// 设置通用的请求属性
connection.setRequestProperty("accept", "*/*");
connection.setRequestProperty("connection", "Keep-Alive");
connection.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)");
// 建立实际的连接
connection.connect();
in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
String line;
while ((line = in.readLine()) != null) {
result += line;
}
} catch (Exception e) {
System.out.println("发送GET请求出现异常!" + e);
e.printStackTrace();
}
// 使用finally块来关闭输入流
finally {
try {
if (in != null) {
in.close();
}
} catch (Exception e2) {
e2.printStackTrace();
}
}
return result;
}
}
26.Flink性能优化
26.1.复用对象
stream.apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
@Override
public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
long changesCount = ...
// A new Tuple instance is created on every execution
collector.collect(new Tuple2<>(userName, changesCount));
}
}
上面的代码可以优化为下面的代码:
可以避免Tuple2的重复创建
stream.apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
// Create an instance that we will reuse on every call
private Tuple2<String, Long> result = new Tuple<>();
@Override
public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
long changesCount = ...
// Set fields on an existing object instead of creating a new one
result.f0 = userName;
// Auto-boxing!! A new Long value may be created
result.f1 = changesCount;
// Reuse the same Tuple2 object
collector.collect(result);
}
}
26.2.数据倾斜
rebalance
自定义分区器
key+随机前后缀
26.3.异步IO
26.4.合理调整并行度
数据过滤之后可以减少并行度
数据合并之后再处理之前可以增加并行度
大量小文件写入到HDFS可以减少并行度 1.ds.writeAsText(“data/output/result1”).setParallelism(1); 2.env.setParallelism(1); 3.提交任务时webUI或命令行参数 flink run -p 10 4.配置文件flink-conf.yaml parallelism.default: 1
更多的优化在后面的项目中结合业务来讲解
27.Flink内存管理
1.减少full gc时间:因为所有常用数据都在Memory Manager里,这部分内存的生命周期是伴随TaskManager管理的而不会被GC回收。其他的常用数据对象都是用户定义的数据对象,这部分会快速的被GC回收。 2.减少OOM:所有的运行的内存应用都从池化的内存中获取,而且运行时的算法可以在内存不足的时候将数据写到堆外内存。 3.节约空间:由于Flink自定序列化/反序列化的方法,所有的对象都以二进制的形式存储,降低消耗。 4.高效的二进制操作和缓存友好:二进制数据以定义好的格式存储,可以高效地比较与操作。另外,该二进制形式可以把相关的值,以及hash值,键值和指针等相邻地放进内存中。这使得数据结构可以对CPU高效缓存更友好,可以从CPU的L1/L2/L3缓存获取性能的提升,也就是Flink的数据存储二进制格式符合CPU缓存的标准,非常方便被CPU的L1/L2/L3各级别缓存利用,比内存还要快!
28.Spark VS Flink
28.1.应用场景
Spark:主要用作离线批处理 , 对延迟要求不高的实时处理(微批) ,DataFrame和DataSetAPI 也支持 “流批一体” Flink:主要用作实时处理 ,注意Flink1.12开始支持真正的流批一体
28.2.API
Spark : RDD(不推荐) /DSteam(不推荐)/DataFrame和DataSet Flink : DataSet(1.12软弃用) 和 DataStream /Table&SQL(快速发展中)
28.3.核心角色/流程原理
28.3.1.spark
28.3.2.Flink
28.4.时间机制
Spark: SparkStreaming只支持处理时间 StructuredStreaming开始支持事件时间 Flink: 直接支持事件时间 / 处理时间 /摄入时间
28.5.容错机制
Spark : 缓存/持久化 +Checkpoint(应用级别) StructuredStreaming中的Checkpoint也开始借鉴Flink使用Chandy-Lamport algorithm分布式快照算法
Flink: State + Checkpoint(Operator级别) + 自动重启策略 + Savepoint
28.6.窗口
Spark中的支持基于时间/数量的滑动/滚动 要求windowDuration和slideDuration必须是batchDuration的倍数
Flink中的窗口机制更加灵活/功能更多
支持基于时间/数量的滑动/滚动 和 会话窗口
28.7.整合Kafka
SparkStreaming整合Kafka: 支持offset自动维护/手动维护 , 支持动态分区检测 无需配置 Flink整合Kafka: 支持offset自动维护/手动维护(一般自动由Checkpoint维护即可) , 支持动态分区检测 需要配置
//会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测
props.setProperty("flink.partition-discovery.interval-millis","5000");
28.8.其他的
源码编程语言
Flink的高级功能 : Flink CEP可以实现 实时风控…
28.9.单独补充:流式计算实现原理
Spark : SparkStreaming: 微批 StructuredStreaming: 微批(连续处理在实验中)
Flink : 是真真正正的流式处理, 只不过对于低延迟和高吞吐做了平衡 早期就确定了后续的方向:基于事件的流式数据处理框架! env.setBufferTimeout - 默认100ms taskmanager.memory.segment-size - 默认32KB
28.10.单独补充:背压/反压
28.10.1.back pressure
Spark: PIDRateEsimator ,PID算法实现一个速率评估器(统计DAG调度时间,任务处理时间,数据条数等, 得出一个消息处理最大速率, 进而调整根据offset从kafka消费消息的速率)。
Flink: 基于credit – based 流控机制,在应用层模拟 TCP 的流控机制(上游发送数据给下游之前会先进行通信,告诉下游要发送的blockSize, 那么下游就可以准备相应的buffer来接收, 如果准备ok则返回一个credit凭证,上游收到凭证就发送数据, 如果没有准备ok,则不返回credit,上游等待下一次通信返回credit) 阻塞占比在 web 上划分了三个等级: OK: 0 <= Ratio <= 0.10,表示状态良好; LOW: 0.10 < Ratio <= 0.5,表示有待观察; HIGH: 0.5 < Ratio <= 1,表示要处理了(增加并行度/subTask/检查是否有数据倾斜/增加内存…)。
例如,0.01,代表着100次中有一次阻塞在内部调用