1. Flink Connectors 介绍
Flink 连接器包含数据源与汇聚两部分。Flink数据源输入包括文件、目录、Socket以及 支持从collections 和 iterators 中读取数据;汇聚输出支持把数据写入文件、标准输出(stdout)、输出标准错误(stderr)和 socket。
官方地址
Flink还可支持扩展连接器,可与第三方系统交互。目前支持以下系统:
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
- Google PubSub (source/sink)
- JDBC (sink)****
常用的是Kafka、ES、HDFS以及JDBC。
2. JDBC 读写连接模式的读写
-
功能: 将集合数据写入数据库
代码: JdbcConnectorApplication实现类:
public class JdbcConnectorApplication { public static void main(String[] args) throws Exception{ // 配置日志文件 System.setProperty("log4j.configurationFile", "log4j2.xml"); // 1. 创造运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 创建集合数据 List arrs = new ArrayList<String>(); arrs.add("10.10.20.101\t1601297294548\tPOST\taddOrder"); arrs.add("10.10.20.102\t1601297296549\tGET\tgetOrder"); // 3. 读取集合数据, 写入数据库 env.fromCollection(arrs).addSink(JdbcSink.sink( // 配置SQL语句 "insert into t_access_log (ip, time, type, api) values (,,", (ps, value) -> { System.out.println("receive ==> " + value); // 解析数据 String[] arrValue = String.valueOf(value).split("\t"); for(int i=0; i<arrValue.length; i++) { // 新增数据 ps.setString(i+1, arrValue[i]); } }, // JDBC 连接配置 new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://192.168.19.150:3306/flink?useSSL=false") .withDriverName("com.mysql.jdbc.Driver") .withUsername("root") .withPassword("654321") .build())); // 4. 执行任务 env.execute("job"); } }
数据表:
DROP TABLE IF EXISTS `t_access_log`; CREATE TABLE `t_access_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID', `ip` varchar(32) NOT NULL COMMENT 'IP地址', `time` varchar(255) NULL DEFAULT NULL COMMENT '访问时间', `type` varchar(32) NOT NULL COMMENT '请求类型', `api` varchar(32) NOT NULL COMMENT 'API地址', PRIMARY KEY (`id`) ) ENGINE = InnoDB AUTO_INCREMENT=1;
-
自定义写入数据源
功能:读取Socket数据, 采用流方式写入数据库中。
代码:
CustomSinkApplication实现类:
public class CustomSinkApplication { public static void main(String[] args) throws Exception{ // 配置日志文件 System.setProperty("log4j.configurationFile", "log4j2.xml"); // 1. 创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取Socket数据源 DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\n"); // 3. 转换处理流数据 SingleOutputStreamOperator<AccessLog> outputStream = socketStr.map(new MapFunction<String, AccessLog>() { @Override public AccessLog map(String value) throws Exception { System.out.println(value); // 根据分隔符解析数据 String[] arrValue = value.split("\t"); // 将数据组装为对象 AccessLog log = new AccessLog(); log.setNum(1); for(int i=0; i<arrValue.length; i++) { if(i == 0) { log.setIp(arrValue[i]); }else if( i== 1) { log.setTime(arrValue[i]); }else if( i== 2) { log.setType(arrValue[i]); }else if( i== 3) { log.setApi(arrValue[i]); } } return log; } }); // 4. 配置自定义写入数据源 outputStream.addSink(new MySQLSinkFunction()); // 5. 执行任务 env.execute("job"); } }
AccessLog:
@Data public class AccessLog { private String ip; private String time; private String type; private String api; private Integer num; }
测试数据:
10.10.20.11 1603166893313 GET getOrder 10.10.20.12 1603166893314 POST addOrder
-
功能: 读取数据库中的数据, 并将结果打印出来。
代码: CustomSourceApplication实现类
public class CustomSourceApplication { public static void main(String[] args) throws Exception { // 1. 创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 配置自定义MySQL读取数据源 DataStreamSource<AccessLog> dataStream = env.addSource(new MySQLSourceFunction()); // 3. 设置并行度 dataStream.print().setParallelism(1); // 4. 执行任务 env.execute("custom jdbc source."); } }
3. HDFS 连接方式的读取与写入
-
功能: 将Socket接收到的数据, 写入至HDFS文件中。
代码:HdfsSinkApplication实现类
public class HdfsSinkApplication { public static void main(String[] args) throws Exception{ // 配置日志文件 System.setProperty("log4j.configurationFile", "log4j2.xml"); // 1. 创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取Socket数据源 DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\n"); BucketingSink<String> sink = new BucketingSink<String>("d:/tmp/hdfs"); sink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd--HHmm")); sink.setWriter(new StringWriter()) .setBatchSize(5*1024) // 设置每个文件的大小 .setBatchRolloverInterval(5*1000) // 设置滚动写入新文件的时间 .setInactiveBucketCheckInterval(30*1000) // 30秒检查一次不写入的文件 .setInactiveBucketThreshold(60*1000); // 60秒不写入,就滚动写入新的文件 socketStr.addSink(sink).setParallelism(1); // 5. 执行任务 env.execute("job"); } }
POM依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>1.11.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.8.1</version> </dependency>
数据源模拟实现, SocketSourceApplication实现类:
public class SocketSourceApplication { /** * 服务端的端口 */ private int port; /** * 初始化构造方法 * @param port */ public SocketSourceApplication(int port) { this.port = port; } /** * IP 访问列表 */ private static String[] accessIps = new String[]{ "10.10.20.101", "10.10.20.102", "10.10.20.103"}; /** * 请求访问类型 */ private static String[] accessTypes = new String[] { "GET", "POST", "PUT"}; /** * 请求接口信息 */ private static String[] accessApis = new String[] { "addOrder", "getAccount", "getOrder"}; /** * Netty通讯服务启动方法 * @throws Exception */ public void runServer() throws Exception { // 1. 创建Netty服务 // 2. 定义事件Boss监听组 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 3. 定义用来处理已经被接收的连接 EventLoopGroup workerGourp = new NioEventLoopGroup(); try { // 4. 定义NIO的服务启动类 ServerBootstrap sbs = new ServerBootstrap(); // 5. 配置NIO服务启动的相关参数 sbs.group(bossGroup, workerGourp) .channel(NioServerSocketChannel.class) // tcp最大缓存链接个数,它是tcp的参数, tcp_max_syn_backlog(半连接上限数量, CENTOS6.5默认是128) .option(ChannelOption.SO_BACKLOG, 128) //保持连接的正常状态 .childOption(ChannelOption.SO_KEEPALIVE, true) // 根据日志级别打印输出 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //管道注册handler ChannelPipeline pipeline = socketChannel.pipeline(); //编码通道处理 pipeline.addLast("decode", new StringDecoder()); //转码通道处理 pipeline.addLast("encode", new StringEncoder()); // 处理接收到的请求 pipeline.addLast(new NettyServerHandler()); } }); System.err.println("-------server 启动------"); // 6. 监听控制台的输入, 并将输入信息, 广播发送给客户端 new Thread(new Runnable() { @Override public void run() { try { while(true) { String accessLog = getAccessLog(); System.out.println("broadcast (" + NettyServerHandler.channelList.size() + ") ==> " + accessLog); if(NettyServerHandler.channelList.size() > 0 ){ for(Channel channel : NettyServerHandler.channelList) { channel.writeAndFlush(accessLog); } } Thread.sleep(1000); } }catch(Exception e) { e.printStackTrace(); } } }).start(); // 7. 启动netty服务 ChannelFuture cf = sbs.bind(port).sync(); cf.channel().closeFuture().sync(); }catch (Exception e) { e.printStackTrace(); } } /** * 获取访问日志 * @return */ private String getAccessLog() { StringBuilder strBuilder = new StringBuilder(); strBuilder.append(accessIps[new Random().nextInt(accessIps.length )]).append("\t") .append(System.currentTimeMillis()).append("\t") .append(accessTypes[new Random().nextInt(accessTypes.length)]).append("\t") .append(accessApis[new Random().nextInt(accessApis.length)]).append("\t\n"); return strBuilder.toString(); } /** * netty服务端的启动 * @param args * @throws Exception */ public static void main(String[] args) throws Exception{ new SocketSourceApplication(9911).runServer(); } }
NettyServerHandler实现类:
public class NettyServerHandler extends ChannelInboundHandlerAdapter { // 客户端通道记录集合 public static List<Channel> channelList = new ArrayList<>(); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Server---连接已建立: " + ctx); super.channelActive(ctx); // 将成功建立的连接通道, 加入到集合当中 channelList.add(ctx.channel()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Server---收到的消息: " + msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, 标签:
管连接用连接器13p连接器1连接器dr146连接器连接器20203r004连接器