Flink
一、Flink简介
Flink它是一种分布式大数据处理引擎,的分布式大数据处理引擎。Spark-Streaming延迟更低。
二、Flink集群搭建
JobManager:即Master TaskManager:即Worker 安装Flink,Flink有自带的stand-alone模式, 设置参数:master为linux01;worker为linux02和linux03 每个worker可用槽数为2 启动命令:bin/start-cluster.sh web端口:8081
任务提交方式:
第一种方法:通过web页面进行提交任务,同时指定参数
第二种方法:使用命令提交
/opt/apps/flinkflink-1.13.2/bin/flink run -m linxu01:8081 -p 2 -c com.doit.WorkCount_Java /opt/flink-java-1.0-SNAPSHOT.jar linux01 9111 (--hostname node-1.51doit.cn --port 8888) 参数说明: -m指定主机名后面的端口为JobManager的REST而非RPC的端口,RPC通信端口是6123 -p 指定是平行度 -c 指定main方法的全类名 指定jar包位置 args参数
三、编程入门
四、DataStream算子
4.1 Flink Source
在Flink中,Source主要负责数据读取,分为单并行和多并行,算子多并行
4.1.1 基于File的数据源
readTextFile
(1)使用TextInputFormat方法要求文本文件,并将结果视为String返回。调用readTextFile方法, 如果只输入读取数据的路径,则该方法创建DataStream读取有限的数据流数据, job就退出了。 (2)并行度跟job并行保持一致,默认是cpu逻辑核数。 (3)并行度可以设置 DataStreamSource<String> lines = env.readTextFile("E:\\work2\\mrdata\\data\\date.txt"); // DataStreamSource<String> lines = env.readTextFile("E:\\work2\\mrdata\\data\\date.txt").setParallelism(4); (4)readTextFile底部调用的是readFile String path = "/Users/xing/Desktop/a.txt"; TextInputFormat format = new TextInputFormat(new Path(path)); DataStreamSource<String> lines = env.readFile(format, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000); 连续读取文件设置在这里,每隔一秒就读一次,但是有一个缺陷,每次都是从零开始读取,不会记录偏移量
4.1.2 基于集合的数据源
fromCollection fromElements fromParallelCollection generateSequence
* 将集合变成DataStream * * 集合类的Source都是有限数据流,即将集合内的数据读完后,就停止了 * * fromCollection 创建的DataStream并行必须是1 * fromElements 创建的DataStream并行也是必须的1 *fromParallelCollection 创建的DataStream并行度可以是多个 *generateSequence 创建的DataStream并行度可以是多个 List<String> list = Arrays.asList("spark hadoop", "flink spark", "spark", "hive flume hadopp"); DataStreamSource<String> lines = env.fromCollection(list); DataStreamSource<String> lines env.fromElements("spark", "hadoop", "flink", "hive");
4.1.3 基于Socket的数据源
socketTextStream
从Socket中读取信息,元素可以用分隔符分开。即从一个端口中读取数据。
socket的并行度必须为1,所以只有一个消费者读数据
需要现在linux中使用 nc -lk 9111 开启一个9111的端口
DataStreamSource<String> lines = env.socketTextStream("linux01", 9111);
4.1.4 基于kafka的数据源
addSource
* 使用FlinkKafkaSource从Kafka中读取数据
*
*并行度为 逻辑核数
*
* FlinkKafkaSource创建的DataStream的多并行的,可以有多个消费者从Kafka中读取数据
*前提要开启kafka 和zookeeper 可以创建多个消费者
Properties properties = new Properties();
properties
.setProperty("bootstrap.servers","linux01:9092,linux02:9092,linux03:9092");
properties.setProperty("group.id", "test11");
properties.setProperty("auto.offset.reset","earliest");
DataStream<String> lines =env
.addSource(new FlinkKafkaConsumer<>("test1", new SimpleStringSchema(), properties));
4.1.5 自定义Source
1、实现SourceFunction接口 :单并行
单并行的Source:并行度为1
实现SourceFunction接口,重写run方法 和cancel方法
/** * run方法是Source对应的Task启动后,会调用该方法,用来产生数据 * 如果是一个【有限】的数据流,run方法中的逻辑执行完后,Source就停止了,整个job也停止了 * 如果是一个【无限】的数据流,run方法中会有while循环,不停的产生数据 */
例1:自定义一个单并行的source,有限数据流
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<String> lines = env.addSource(new mySource());
int parallelism = lines.getParallelism();
System.out.println("source的并行度为"+parallelism);
lines.print();
env.execute();
}
//自定义source
private static class mySource implements SourceFunction<String> {
/** * run方法是Source对应的Task启动后,会调用该方法,用来产生数据 * 如果是一个【有限】的数据流,run方法中的逻辑执行完后,Source就停止了,整个job也停止了 * 如果是一个【无限】的数据流,run方法中会有while循环,不停的产生数据 */
@Override
public void run(SourceContext<String> ctx) throws Exception {
List<String> words = Arrays.asList("spark", "hadoop", "flink", "hive");
for (String word : words) {
ctx.collect(word);
}
}
@Override
public void cancel() {
}
}
例2:自定义一个单并行的source,无限数据流
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<Integer> lines = env.addSource(new mySource());
int parallelism = lines.getParallelism();
System.out.println("source的并行度为"+parallelism);
lines.print();
env.execute();
}
//自定义source
private static class mySource implements SourceFunction<Integer> {
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
int i =0 ;
while(true){
ctx.collect(i);
i++;
Thread.sleep(1000);
}
}
@Override
public void cancel() {
}
}
2、实现ParallelSourceFunction接口 :多并行
* 实现了ParallelSourceFunction接口的Source,创建的DataStream就是多并行
*
* 自定义一个parallel sources (多并行的Source)
*
* 这个例子是一个【无限】的数据流,run方法中会不停的产生数据(while循环)
*/
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
}
//自定义多并行的source
private static class MyParallelSource implements ParallelSourceFunction<Integer> {
private boolean flag = true;
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
int i = 0;
while(flag){
ctx.collect(i);
i++;
Thread.sleep(1000);
}
}
@Override
public void cancel() {
flag = false;
}
}
3、继承RichParallelSourceFunction :多并行,而且有生命周期方法
自定义多并行的Source,继承RichParallelSourceFunction 便能拥有生命周期方法(open run cancel close)
在subtask中方法的执行顺序(生命周期方法):
open(一次) -> run(一次) -> cancel(一次)-> close(一次)
该并行度为8,所以open run cancel close 方法都会调用8次 而在自定义MyParallelSource的构造方法只会被调用一次,因为是在Driver端创建
*/
例子:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<Integer> lines = env.addSource(new MyParallelSource());
int parallelism2 = lines.getParallelism();
System.out.println("自定义的实现SourceFunction的Source并行度为:" + parallelism2);
lines.print();
env.execute();
}
//多并行的Source,继承了RichParallelSourceFunction
// 即调用完Source后得到的DataStream中对应的数据类型
private static class MyParallelSource extends RichParallelSourceFunction<Integer> {
public MyParallelSource() {
System.out.println("constructor invoke ~~~~~~~~~~~~");
}
@Override
public void open(Configuration parameters) throws Exception {
//getRuntimeContext().getIndexOfThisSubtask() 获取当前subTask的Index
System.out.println("subtask:" + getRuntimeContext().getIndexOfThisSubtask() + " @@@@ open 方法被调用了");
}
@Override
public void close() throws Exception {
System.out.println("subtask:" + getRuntimeContext().getIndexOfThisSubtask() + " &&&& close 方法被调用了");
}
private boolean flag = true;
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
System.out.println("subtask:" + getRuntimeContext().getIndexOfThisSubtask() + " Run方法被调用了~~~~~~~~~~~~~");
int i = 0;
while (flag) {
ctx.collect(i);
i += 1;
Thread.sleep(2000);
}
}
@Override
public void cancel() {
System.out.println("subtask:" + getRuntimeContext().getIndexOfThisSubtask() + " cancel方法被调用了!!!!!!!!!!!!");
flag = false;
}
}
4.1.6 总结
file的文件源,无法做到实时,相当于是批处理,相当于是做离线
基于集合的数据源,一般只是用来做测试
基于socket的数据源,一般也是用来做测试,因为读取数据的并行度必须为1,影响效率
基于kafka的数据源一般用于生产环境中,因为多个并行度,而且可以有多个生产者
有时候需要自定义Source数据源,可以定义单并行,多并行,以及多并行带生命周期方法
4.2 Sink 算子
sink算子 相当于spark中的action算子
4.2.1 Print
* 将数据以标准数据打印,
* PrintSink是多并行的
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//整个job的并行度
int parallelism = env.getParallelism();
System.out.println("当前job的执行环境默认的并行度为:" + parallelism);
//DataStreamSource是DataStream的子类
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
lines.print("doit ");
//启动并执行
env.execute();
}
4.2.2 writeAsText
WriteText 将数据写到文件中 该sink方法已经过时,了解即可
public static void main(String[] args) throws Exception{
//创建DataStream,必须调用StreamExecutitionEnvriroment的方法
//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//整个job的并行度
int parallelism = env.getParallelism();
System.out.println("当前job的执行环境默认的并行度为:" + parallelism);
//DataStreamSource是DataStream的子类
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
lines.writeAsText("/Users/xing/Desktop/out", FileSystem.WriteMode.OVERWRITE);
//启动并执行
env.execute();
}
4.2.3 自定义Sink
继承RichSinkFunction 调用的时候使用addSink
/** 自定义sink方法:实现跟print一样的功能 定义一个类,继承RichSinkFunction 重写invoke方法 invoke是来一条数据就执行一次 open和close生命周期方法是有几个并行度就执行几次,每个task各执行一次 调用该sink方法的时候是使用:addSink */
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092");
properties.setProperty("group.id", "test11");
properties.setProperty("auto.offset.reset","earliest");
DataStream<String> lines = env.addSource(new FlinkKafkaConsumer<>("test1", new SimpleStringSchema(), properties));
//lines.print() print底层使用的就是addSink
//sink可以指定前缀名,可以指定并行度
lines.addSink(new mySink()).name("我的sink").setParallelism(3);
env.execute("test");
}
//自定义printSink
private static class mySink extends RichSinkFunction<String> {
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("open method invoked ~~~~~~~");
}
@Override
public void close() throws Exception {
System.out.println("close method invoked ########");
}
@Override
public void invoke(String value, Context context) throws Exception {
System.out.println("invoke method invoked $$$$$$$$$$");
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
System.out.println((indexOfThisSubtask+1) + ">" + value);
}
}
4.3 Transformation 算子
4.3.1 map
/* map :将数据一条一条的处理,处理一条返回一条 多并行 底层使用的是transform 使用lambda表达式,需要注意两个问题: *泛型是否会自动推导: 里面的泛型有时候不能自动推导,所以要进行类型指定 当参数中还有类型的时候,比如( line, out) ->,而out是一个collect类型,其中有泛 型,此时不指定类型就会有泛型丢失, 就需要指定数据类型(String line, Collector<Tuple2<String, Integer>> out) -> *是否需要使用returns 不需要使用returns指定返回结果的数据类型的场景: ---->输入与输出的数据类型一致,输入一行返回一行 需要使用returns指定返回结果的数据类型的场景: 1)如果输入的数据类型和输出的数据类型不一致 2)如果输入一行,返回多行,比如flatMap */
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<String> lines = env.socketTextStream("Linux01", 8888);
//做映射
//调用完map算子后生成的DataStream是单并行的还是多并行的?答案:多并行的
SingleOutputStreamOperator<String> upperDataStream = lines.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
return s.toUpperCase();
}
}).setParallelism(2);
//使用lambda表达式的方式
// SingleOutputStreamOperator<String> upperDataStream = lines.map(s -> s.toUpperCase()).setParallelism(2);
/* 使用lambda