资讯详情

25.Flink监控\什么是Metrics\Metrics分类\Flink性能优化的方法\合理调整并行度\合理调整并行度\Flink内存...

25.Flink监控 25.1.什么是Metrics 25.2.Metrics分类 25.2.1.Metric Types 25.2.2.代码 25.2.3.操作 26.Flink性能优化 26.1.复用对象 26.2.数据倾斜 26.3.异步IO 26.4.合理调整并行度 27.Flink内存管理 28.Spark VS Flink 28.1.应用场景 28.2.API 28.3.核心角色/流程原理 28.3.1.spark 28.3.2.Flink 28.4.时间机制 28.5.容错机制 28.6.窗口 28.7.整合Kafka 28.8.其他的 28.9.单独补充:流式计算实现原理 28.10.单独补充:背压/反压 28.10.1.back pressure

25.Flink监控

https://blog.lovedata.net/8156c1e1.html

25.1.什么是Metrics

由于集群运行后难以发现内部实际情况,运行缓慢或快,是否异常,开发人员无法实时检查所有情况Task日志,比如作业大或者作业多,该怎么办?这个时候,Metrics帮助开发人员了解操作的现状。

Flink提供的Metrics可以在Flink通过这些指标,开发人员可以更好地了解操作或集群的状态。 在这里插入图片描述

25.2.Metrics分类

25.2.1.Metric Types

1.常用的如Counter, 写过mapreduce作业的开发人员就应该很熟悉Counter, 其实意思是一样的,就是累积一个计数器,就是一直向上添加多个数据和多兆数据的过程。 2.Gauga,Gauge是最简单的Metrics,它反映了一个值。例如,这取决于现在。Java heap每次可以实时暴露内存使用多少内存。Gauge, Gauge当前值为heap使用的量。 3.Meter, Meter指单位时间内吞吐量和事件数量的统计。它相当于一个速率,即事件次数除以使用时间。 4.Histogram,Histogram比较复杂,不常用,Histogram用于统计一些数据的分布,如Quantile、Mean、StdDev、Max、Min等。

Metric在Flink内部有多层结构Group它不是一个扁平化的结构,Metric Group Metric Name是Metrics唯一的标识。

25.2.2.代码

package day6;  import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; import org.apache.flink.streaming.api.datastream.DataStream; import org.apach.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/** * @author tuzuoquan * @date 2022/6/22 9:26 */
public class MetricsDemo { 
        

    public static void main(String[] args) throws Exception { 
        
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //TODO 1.source
        DataStream<String> lines = env.socketTextStream("node1", 9999);

        //TODO 2.transformation
        SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() { 
        
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception { 
        
                String[] arr = value.split(" ");
                for (String word : arr) { 
        
                    out.collect(word);
                }
            }
        });

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words
                .map(new RichMapFunction<String, Tuple2<String, Integer>>() { 
        
                    //用来记录map处理了多少个单词
                    Counter myCounter;

                    //对Counter进行初始化
                    @Override
                    public void open(Configuration parameters) throws Exception { 
        
                        myCounter = getRuntimeContext().getMetricGroup().addGroup("myGroup").counter("myCounter");
                    }
                    //处理单词,将单词记为(单词,1)
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception { 
        
                        myCounter.inc();//计数器+1
                        return Tuple2.of(value, 1);
                    }
                });

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(t -> t.f0).sum(1);

        //TODO 3.sink
        result.print();

        //TODO 4.execute
        env.execute();
    }

}

运行命令:

// /export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
// /export/server/flink/bin/flink run --class cn.xxx.metrics.MetricsDemo /root/metrics.jar
// 查看WebUI

25.2.3.操作

1.打包 2.提交到Yarn上运行 3.查看监控指标 4.也可以通过浏览器f12的找到url发送请求获取监控信息 5.也可以通过代码发送请求获取监控信息

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;

public class MetricsTest { 
        
    public static void main(String[] args) { 
        
        //String result = sendGet("http://node1:8088/proxy/application_1609508087977_0010/jobs/558a5a3016661f1d732228330ebfaad5/vertices/cbc357ccb763df2852fee8c4fc7d55f2/metrics?get=0.Map.myGroup.myCounter");
        String result = sendGet("http://node1:8088/proxy/application_1609508087977_0010/jobs/558a5a3016661f1d732228330ebfaad5");

        System.out.println(result);
    }

    public static String sendGet(String url) { 
        
        String result = "";
        BufferedReader in = null;
        try { 
        
            String urlNameString = url;
            URL realUrl = new URL(urlNameString);
            URLConnection connection = realUrl.openConnection();
            // 设置通用的请求属性
            connection.setRequestProperty("accept", "*/*");
            connection.setRequestProperty("connection", "Keep-Alive");
            connection.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)");
            // 建立实际的连接
            connection.connect();
            in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
            String line;
            while ((line = in.readLine()) != null) { 
        
                result += line;
            }
        } catch (Exception e) { 
        
            System.out.println("发送GET请求出现异常!" + e);
            e.printStackTrace();
        }
        // 使用finally块来关闭输入流
        finally { 
        
            try { 
        
                if (in != null) { 
        
                    in.close();
                }
            } catch (Exception e2) { 
        
                e2.printStackTrace();
            }
        }
        return result;
    }

}

26.Flink性能优化

26.1.复用对象

stream.apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() { 
        
        @Override
        public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception { 
        
            long changesCount = ...
            // A new Tuple instance is created on every execution
            collector.collect(new Tuple2<>(userName, changesCount));
        }
    }

上面的代码可以优化为下面的代码:

可以避免Tuple2的重复创建

stream.apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() { 
        
    // Create an instance that we will reuse on every call
    private Tuple2<String, Long> result = new Tuple<>();
    @Override
    public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception { 
        
        long changesCount = ...
        // Set fields on an existing object instead of creating a new one
        result.f0 = userName;
        // Auto-boxing!! A new Long value may be created
        result.f1 = changesCount;
        // Reuse the same Tuple2 object
        collector.collect(result);
    }
}

26.2.数据倾斜

rebalance

自定义分区器

key+随机前后缀

26.3.异步IO

26.4.合理调整并行度

数据过滤之后可以减少并行度

数据合并之后再处理之前可以增加并行度

大量小文件写入到HDFS可以减少并行度 1.ds.writeAsText(“data/output/result1”).setParallelism(1); 2.env.setParallelism(1); 3.提交任务时webUI或命令行参数 flink run -p 10 4.配置文件flink-conf.yaml parallelism.default: 1

更多的优化在后面的项目中结合业务来讲解

27.Flink内存管理

1.减少full gc时间:因为所有常用数据都在Memory Manager里,这部分内存的生命周期是伴随TaskManager管理的而不会被GC回收。其他的常用数据对象都是用户定义的数据对象,这部分会快速的被GC回收。 2.减少OOM:所有的运行的内存应用都从池化的内存中获取,而且运行时的算法可以在内存不足的时候将数据写到堆外内存。 3.节约空间:由于Flink自定序列化/反序列化的方法,所有的对象都以二进制的形式存储,降低消耗。 4.高效的二进制操作和缓存友好:二进制数据以定义好的格式存储,可以高效地比较与操作。另外,该二进制形式可以把相关的值,以及hash值,键值和指针等相邻地放进内存中。这使得数据结构可以对CPU高效缓存更友好,可以从CPU的L1/L2/L3缓存获取性能的提升,也就是Flink的数据存储二进制格式符合CPU缓存的标准,非常方便被CPU的L1/L2/L3各级别缓存利用,比内存还要快!

28.Spark VS Flink

28.1.应用场景

Spark:主要用作离线批处理 , 对延迟要求不高的实时处理(微批) ,DataFrame和DataSetAPI 也支持 “流批一体” Flink:主要用作实时处理 ,注意Flink1.12开始支持真正的流批一体

28.2.API

Spark : RDD(不推荐) /DSteam(不推荐)/DataFrame和DataSet Flink : DataSet(1.12软弃用) 和 DataStream /Table&SQL(快速发展中)

28.3.核心角色/流程原理

28.3.1.spark

28.3.2.Flink

28.4.时间机制

Spark: SparkStreaming只支持处理时间 StructuredStreaming开始支持事件时间 Flink: 直接支持事件时间 / 处理时间 /摄入时间

28.5.容错机制

Spark : 缓存/持久化 +Checkpoint(应用级别) StructuredStreaming中的Checkpoint也开始借鉴Flink使用Chandy-Lamport algorithm分布式快照算法

Flink: State + Checkpoint(Operator级别) + 自动重启策略 + Savepoint

28.6.窗口

Spark中的支持基于时间/数量的滑动/滚动 要求windowDuration和slideDuration必须是batchDuration的倍数

Flink中的窗口机制更加灵活/功能更多

支持基于时间/数量的滑动/滚动 和 会话窗口

28.7.整合Kafka

SparkStreaming整合Kafka: 支持offset自动维护/手动维护 , 支持动态分区检测 无需配置 Flink整合Kafka: 支持offset自动维护/手动维护(一般自动由Checkpoint维护即可) , 支持动态分区检测 需要配置

//会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测
props.setProperty("flink.partition-discovery.interval-millis","5000");

28.8.其他的

源码编程语言

Flink的高级功能 : Flink CEP可以实现 实时风控…

28.9.单独补充:流式计算实现原理

Spark : SparkStreaming: 微批 StructuredStreaming: 微批(连续处理在实验中)

Flink : 是真真正正的流式处理, 只不过对于低延迟和高吞吐做了平衡 早期就确定了后续的方向:基于事件的流式数据处理框架! env.setBufferTimeout - 默认100ms taskmanager.memory.segment-size - 默认32KB

28.10.单独补充:背压/反压

28.10.1.back pressure

Spark: PIDRateEsimator ,PID算法实现一个速率评估器(统计DAG调度时间,任务处理时间,数据条数等, 得出一个消息处理最大速率, 进而调整根据offset从kafka消费消息的速率)。

Flink: 基于credit – based 流控机制,在应用层模拟 TCP 的流控机制(上游发送数据给下游之前会先进行通信,告诉下游要发送的blockSize, 那么下游就可以准备相应的buffer来接收, 如果准备ok则返回一个credit凭证,上游收到凭证就发送数据, 如果没有准备ok,则不返回credit,上游等待下一次通信返回credit) 阻塞占比在 web 上划分了三个等级: OK: 0 <= Ratio <= 0.10,表示状态良好; LOW: 0.10 < Ratio <= 0.5,表示有待观察; HIGH: 0.5 < Ratio <= 1,表示要处理了(增加并行度/subTask/检查是否有数据倾斜/增加内存…)。

例如,0.01,代表着100次中有一次阻塞在内部调用

标签: sh8c15连接器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台