flink 提供特殊操作RabbitMQ该连接器使用更方便,配置连接信息可以快速实现数据读取和输出,但目前只支持Queue如果需要使用交换机模式,模式仍然需要定制RabbitMQ 数据源读取与数据
必要依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-rabbitmq_2.12</artifactId> <version>1.12.2</version> </dependency>
代码示例
package com.leilei; import com.alibaba.fastjson.JSON; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import java.nio.charset.StandardCharsets;
/** * @author lei * @version 1.0 * @date 2021/3/14 15:27 * @desc flink 连接器 rabbitmq */
public class FlinkConnectorsRabbitMq {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
final RMQConnectionConfig rabbitConfig = new RMQConnectionConfig.Builder()
.setHost("xxx")
.setUserName("admin")
.setPassword("xx")
.setPort(5672)
.setVirtualHost("/")
.build();
//rabbit connectors 加载数据
DataStream<String> stream = env
.addSource(new RMQSource<>(
// mq 连接配置
rabbitConfig,
// 队列名
"vehicle-location",
true,
// 反序列化方式
new SimpleStringSchema(StandardCharsets.UTF_8)))
.setParallelism(1);
stream.print();
//rabbit connectors 数据输出
stream.addSink(new RMQSink<>(
rabbitConfig,
// 输出到哪个队列
"over-speeding-alarm",
// 序列化方式
new SimpleStringSchema(StandardCharsets.UTF_8)));
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Location {
private String id;
private String licensePlate;
private String plateColor;
private Integer speed;
private Integer limitSpeed;
private Long deviceTime;
private String zone;
}
}