资讯详情

Flink

Flink

一、Flink简介

Flink它是一种分布式大数据处理引擎,的分布式大数据处理引擎。Spark-Streaming延迟更低。 

二、Flink集群搭建

JobManager:即Master TaskManager:即Worker 安装Flink,Flink有自带的stand-alone模式, 设置参数:master为linux01;worker为linux02和linux03 每个worker可用槽数为2  启动命令:bin/start-cluster.sh web端口:8081  

任务提交方式:

第一种方法:通过web页面进行提交任务,同时指定参数

第二种方法:使用命令提交

/opt/apps/flinkflink-1.13.2/bin/flink run  -m linxu01:8081  -p 2  -c com.doit.WorkCount_Java  /opt/flink-java-1.0-SNAPSHOT.jar   linux01 9111 (--hostname node-1.51doit.cn --port 8888) 参数说明: -m指定主机名后面的端口为JobManager的REST而非RPC的端口,RPC通信端口是6123 -p 指定是平行度 -c 指定main方法的全类名 指定jar包位置  args参数 

三、编程入门

四、DataStream算子

4.1 Flink Source

在Flink中,Source主要负责数据读取,分为单并行和多并行,算子多并行

4.1.1 基于File的数据源

readTextFile

(1)使用TextInputFormat方法要求文本文件,并将结果视为String返回。调用readTextFile方法,      如果只输入读取数据的路径,则该方法创建DataStream读取有限的数据流数据,        job就退出了。  (2)并行度跟job并行保持一致,默认是cpu逻辑核数。  (3)并行度可以设置       DataStreamSource<String> lines =                                  env.readTextFile("E:\\work2\\mrdata\\data\\date.txt");     // DataStreamSource<String> lines = env.readTextFile("E:\\work2\\mrdata\\data\\date.txt").setParallelism(4);  (4)readTextFile底部调用的是readFile  String path = "/Users/xing/Desktop/a.txt";     TextInputFormat format = new TextInputFormat(new Path(path));     DataStreamSource<String> lines =      env.readFile(format, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000);     连续读取文件设置在这里,每隔一秒就读一次,但是有一个缺陷,每次都是从零开始读取,不会记录偏移量 

4.1.2 基于集合的数据源

fromCollection fromElements fromParallelCollection generateSequence

 * 将集合变成DataStream  *  * 集合类的Source都是有限数据流,即将集合内的数据读完后,就停止了  *  * fromCollection 创建的DataStream并行必须是1    * fromElements 创建的DataStream并行也是必须的1    *fromParallelCollection 创建的DataStream并行度可以是多个    *generateSequence 创建的DataStream并行度可以是多个   List<String> list =  Arrays.asList("spark hadoop", "flink spark", "spark", "hive flume hadopp"); DataStreamSource<String> lines = env.fromCollection(list);   DataStreamSource<String> lines  env.fromElements("spark", "hadoop", "flink", "hive");

4.1.3 基于Socket的数据源

socketTextStream

从Socket中读取信息,元素可以用分隔符分开。即从一个端口中读取数据。

socket的并行度必须为1,所以只有一个消费者读数据

需要现在linux中使用  nc -lk 9111 开启一个9111的端口

DataStreamSource<String> lines = env.socketTextStream("linux01", 9111);

4.1.4 基于kafka的数据源

addSource

 * 使用FlinkKafkaSource从Kafka中读取数据
 *
 *并行度为 逻辑核数 
 *
 * FlinkKafkaSource创建的DataStream的多并行的,可以有多个消费者从Kafka中读取数据
 
 *前提要开启kafka 和zookeeper  可以创建多个消费者
 
   Properties properties = new Properties();
   properties
   .setProperty("bootstrap.servers","linux01:9092,linux02:9092,linux03:9092");

   properties.setProperty("group.id", "test11");
   properties.setProperty("auto.offset.reset","earliest");

   DataStream<String> lines =env
   .addSource(new FlinkKafkaConsumer<>("test1", new SimpleStringSchema(), properties));

4.1.5 自定义Source

1、实现SourceFunction接口 :单并行

单并行的Source:并行度为1
实现SourceFunction接口,重写run方法 和cancel方法
/** * run方法是Source对应的Task启动后,会调用该方法,用来产生数据 * 如果是一个【有限】的数据流,run方法中的逻辑执行完后,Source就停止了,整个job也停止了 * 如果是一个【无限】的数据流,run方法中会有while循环,不停的产生数据 */


例1:自定义一个单并行的source,有限数据流
public static void main(String[] args) throws Exception { 
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> lines = env.addSource(new mySource());

        int parallelism = lines.getParallelism();
        System.out.println("source的并行度为"+parallelism);
        lines.print();
        env.execute();


    }


    //自定义source
    private static class mySource implements SourceFunction<String> { 
        
   /** * run方法是Source对应的Task启动后,会调用该方法,用来产生数据 * 如果是一个【有限】的数据流,run方法中的逻辑执行完后,Source就停止了,整个job也停止了 * 如果是一个【无限】的数据流,run方法中会有while循环,不停的产生数据 */
        @Override
        public void run(SourceContext<String> ctx) throws Exception { 
        
            List<String> words = Arrays.asList("spark", "hadoop", "flink", "hive");
            for (String word : words) { 
        
                ctx.collect(word);
            }
        }

        @Override
        public void cancel() { 
        

        }
    }
    


例2:自定义一个单并行的source,无限数据流
 public static void main(String[] args) throws Exception { 
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<Integer> lines = env.addSource(new mySource());

        int parallelism = lines.getParallelism();

        System.out.println("source的并行度为"+parallelism);
        lines.print();

        env.execute();


    }


    //自定义source
    private static class mySource implements SourceFunction<Integer> { 
        
      
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception { 
        
            int i =0 ;
            while(true){ 
        
                ctx.collect(i);
                i++;
                Thread.sleep(1000);
            }
        }

        @Override
        public void cancel() { 
        

        }
    }

2、实现ParallelSourceFunction接口 :多并行

 * 实现了ParallelSourceFunction接口的Source,创建的DataStream就是多并行
 *
 * 自定义一个parallel sources (多并行的Source)
 *
 * 这个例子是一个【无限】的数据流,run方法中会不停的产生数据(while循环)
*/

public static void main(String[] args) { 
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    }

    //自定义多并行的source
    private static class MyParallelSource implements ParallelSourceFunction<Integer> { 
        

        private boolean flag = true;
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception { 
        
            int i = 0;
            while(flag){ 
        
                ctx.collect(i);
                i++;
                Thread.sleep(1000);
            }
        }

        @Override
        public void cancel() { 
        
            flag = false;
        }
    }

3、继承RichParallelSourceFunction :多并行,而且有生命周期方法

自定义多并行的Source,继承RichParallelSourceFunction 便能拥有生命周期方法(open  run  cancel close)

在subtask中方法的执行顺序(生命周期方法):
open(一次) -> run(一次) -> cancel(一次)-> close(一次)
  
该并行度为8,所以open run cancel close 方法都会调用8次  而在自定义MyParallelSource的构造方法只会被调用一次,因为是在Driver端创建

*/
例子:
 public static void main(String[] args) throws Exception { 
        

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<Integer> lines = env.addSource(new MyParallelSource());

        int parallelism2 = lines.getParallelism();
        System.out.println("自定义的实现SourceFunction的Source并行度为:" + parallelism2);


        lines.print();

        env.execute();
    }

    //多并行的Source,继承了RichParallelSourceFunction
    // 即调用完Source后得到的DataStream中对应的数据类型
    private static class MyParallelSource extends RichParallelSourceFunction<Integer> { 
        

        public MyParallelSource() { 
        
            System.out.println("constructor invoke ~~~~~~~~~~~~");
        }

        @Override
        public void open(Configuration parameters) throws Exception { 
        
            //getRuntimeContext().getIndexOfThisSubtask() 获取当前subTask的Index
            System.out.println("subtask:" + getRuntimeContext().getIndexOfThisSubtask() + " @@@@ open 方法被调用了");

        }

        @Override
        public void close() throws Exception { 
        
            System.out.println("subtask:" + getRuntimeContext().getIndexOfThisSubtask() + " &&&& close 方法被调用了");

        }

        private boolean flag = true;

        @Override
        public void run(SourceContext<Integer> ctx) throws Exception { 
        
            System.out.println("subtask:" + getRuntimeContext().getIndexOfThisSubtask() + " Run方法被调用了~~~~~~~~~~~~~");
            int i = 0;
            while (flag) { 
        
                ctx.collect(i);
                i += 1;
                Thread.sleep(2000);
            }
        }


        @Override
        public void cancel() { 
        
            System.out.println("subtask:" + getRuntimeContext().getIndexOfThisSubtask() + " cancel方法被调用了!!!!!!!!!!!!");
            flag = false;
        }
    }

4.1.6 总结

file的文件源,无法做到实时,相当于是批处理,相当于是做离线

基于集合的数据源,一般只是用来做测试

基于socket的数据源,一般也是用来做测试,因为读取数据的并行度必须为1,影响效率

基于kafka的数据源一般用于生产环境中,因为多个并行度,而且可以有多个生产者

有时候需要自定义Source数据源,可以定义单并行,多并行,以及多并行带生命周期方法

4.2 Sink 算子

sink算子 相当于spark中的action算子

4.2.1 Print

 * 将数据以标准数据打印,
 * PrintSink是多并行的



    public static void main(String[] args) throws Exception{ 
        

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //整个job的并行度
        int parallelism = env.getParallelism();
        System.out.println("当前job的执行环境默认的并行度为:" + parallelism);

        //DataStreamSource是DataStream的子类
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        lines.print("doit ");

        //启动并执行
        env.execute();
    }

4.2.2 writeAsText

  WriteText 将数据写到文件中 该sink方法已经过时,了解即可

public static void main(String[] args) throws Exception{ 
        

        //创建DataStream,必须调用StreamExecutitionEnvriroment的方法
        //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());


        //整个job的并行度
        int parallelism = env.getParallelism();
        System.out.println("当前job的执行环境默认的并行度为:" + parallelism);

        //DataStreamSource是DataStream的子类
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        lines.writeAsText("/Users/xing/Desktop/out", FileSystem.WriteMode.OVERWRITE);


        //启动并执行
        env.execute();
    }

4.2.3 自定义Sink

继承RichSinkFunction 调用的时候使用addSink

 /** 自定义sink方法:实现跟print一样的功能 定义一个类,继承RichSinkFunction 重写invoke方法 invoke是来一条数据就执行一次 open和close生命周期方法是有几个并行度就执行几次,每个task各执行一次 调用该sink方法的时候是使用:addSink */

public static void main(String[] args) throws Exception { 
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092");
        properties.setProperty("group.id", "test11");
        properties.setProperty("auto.offset.reset","earliest");

        DataStream<String> lines = env.addSource(new FlinkKafkaConsumer<>("test1", new SimpleStringSchema(), properties));


        //lines.print() print底层使用的就是addSink
        //sink可以指定前缀名,可以指定并行度
        lines.addSink(new mySink()).name("我的sink").setParallelism(3);

        env.execute("test");

    }

    //自定义printSink
    private static class mySink extends RichSinkFunction<String> { 
        
        @Override
        public void open(Configuration parameters) throws Exception { 
        
            System.out.println("open method invoked ~~~~~~~");
        }

        @Override
        public void close() throws Exception { 
        
            System.out.println("close method invoked ########");
        }

        @Override
        public void invoke(String value, Context context) throws Exception { 
        
            System.out.println("invoke method invoked $$$$$$$$$$");
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            System.out.println((indexOfThisSubtask+1) + ">" + value);
        }
    }

4.3 Transformation 算子

4.3.1 map

/* map :将数据一条一条的处理,处理一条返回一条 多并行 底层使用的是transform 使用lambda表达式,需要注意两个问题: *泛型是否会自动推导: 里面的泛型有时候不能自动推导,所以要进行类型指定 当参数中还有类型的时候,比如( line, out) ->,而out是一个collect类型,其中有泛 型,此时不指定类型就会有泛型丢失, 就需要指定数据类型(String line, Collector<Tuple2<String, Integer>> out) -> *是否需要使用returns 不需要使用returns指定返回结果的数据类型的场景: ---->输入与输出的数据类型一致,输入一行返回一行 需要使用returns指定返回结果的数据类型的场景: 1)如果输入的数据类型和输出的数据类型不一致 2)如果输入一行,返回多行,比如flatMap */
 public static void main(String[] args) throws Exception { 
        

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        DataStreamSource<String> lines = env.socketTextStream("Linux01", 8888);

        //做映射
        //调用完map算子后生成的DataStream是单并行的还是多并行的?答案:多并行的
        SingleOutputStreamOperator<String> upperDataStream = lines.map(new MapFunction<String, String>() { 
        
            @Override
            public String map(String s) throws Exception { 
        
                return s.toUpperCase();
            }
        }).setParallelism(2);


//使用lambda表达式的方式
// SingleOutputStreamOperator<String> upperDataStream = lines.map(s -> s.toUpperCase()).setParallelism(2);



/* 使用lambda

标签: cn槽型连接器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台