Flink1.14学习测试收kafka通过消息通过结构化数据通过结构化数据JDBC保存到数据库中
准备事项
关键依赖版本
参考资料
??
驱动包(试用)
??
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.27</version> <scope>runtime</scope> </dependency> ??
<dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.4.0</version> <scope>runtime</scope> </dependency>
其他(不必要,纯粹是个人习惯)
??
<dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.3</version> </dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
<scope>provided</scope>
</dependency>
一、接收Kafka消息内容(Java)
在1.14中FlinkKafkaConsumer类已经过时,此处使用KafkaSource用于就收Kafka消息流。
测试接收消息内容并解析打印
import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; import lombok.Data; import lombok.SneakyThrows; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; public class KafkaReceiveTest { //测试类(消息内容) @Data private static class Person { private String name; private int age; private char gender; } @SneakyThrows public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(TimeUnit.SECONDS.toMillis(1), CheckpointingMode.EXACTLY_ONCE); KafkaSource<Person> source = KafkaSource.<Person>builder() .setBootstrapServers("192.168.11.160:9092,192.168.11.161:9092,192.168.11.162:9092")//Kafka服务 .setTopics("ly_test")//消息主题 .setGroupId("KafkaReceiveTest")//消费组 //偏移量 当没有提交偏移量则从最开始开始消费 .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) //自定义解析消息内容 .setValueOnlyDeserializer(new AbstractDeserializationSchema<Person>() { @Override public Person deserialize(byte[] message) { return JSONUtil.toBean(StrUtil.str(message, StandardCharsets.UTF_8), Person.class); } }) .build(); DataStreamSource<Person> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "person"); kafkaSource.print(); env.execute(); } }
运行结果
程序正常运行后,发送消息内容(此处测试发送了五条消息),可在控制台中打印如下内容:
5> KafkaReceiveTest.Person(name=Bmtjom, age=15, gender=男)
5> KafkaReceiveTest.Person(name=72r3gp, age=15, gender=女)
5> KafkaReceiveTest.Person(name=Cf4qa3, age=16, gender=男)
5> KafkaReceiveTest.Person(name=Wwqbpw, age=15, gender=男)
5> KafkaReceiveTest.Person(name=So5skv, age=15, gender=男)
二、写入数据到Mysql中(Scala)
准备测试表
create table if not exists mysql_person
(
name varchar(6),
age tinyint,
gender char
);
向Mysql数据库中写入测试数据
import cn.hutool.core.text.CharSequenceUtil
import cn.hutool.core.util.RandomUtil
import org.apache.flink.table.api.{
DataTypes, EnvironmentSettings, FieldExpression, Schema, TableDescriptor, TableEnvironment}
import org.apache.flink.types.Row
import scala.collection.JavaConverters.seqAsJavaListConverter
object MysqlWriteTest {
//测试数据结构
case class Person(name: String, age: Int, gender: String)
def main(args: Array[String]): Unit = {
//设置为批处理
val environmentSettings = EnvironmentSettings.newInstance().inBatchMode().build()
//使用TableApi
val tableEnv = TableEnvironment.create(environmentSettings)
//创建测试数据
val rows = (1 to 5)
//创建5条数据对象
.map(_ =>
//随机创建数据内容
Person(
CharSequenceUtil.upperFirst(RandomUtil.randomString(6)),
RandomUtil.randomInt(15, 17),
RandomUtil.randomString("男女", 1)))
//转换成Row
.map(p => Row.of(p.name, Int.box(p.age), p.gender))
//转换成Java List
.asJava
//将测试数据转换成数据表
val dataTable = tableEnv.fromValues(rows).as("name", "age", "gender")
//创建JDBC连接
val targetTableName = "person"
tableEnv.createTemporaryTable(targetTableName, TableDescriptor.forConnector("jdbc")
.schema(Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.column("gender", DataTypes.CHAR(1))
.build())
.option("url", "jdbc:mysql://127.0.0.1:3306/copy")
.option("table-name", "mysql_person") //目标表名称
.option("username", "root")
.option("password", "123456")
.build())
//将数据插入目标表
dataTable.select($"*").executeInsert(targetTableName)
//查看目标数据表内容
tableEnv.from(targetTableName).select($"*").execute().print()
}
}
运行结果
程序正常运行后,创建五条测试数据并写入到目标表中,可在控制台中打印如下内容:
+--------------------------------+-------------+--------------------------------+
| name | age | gender |
+--------------------------------+-------------+--------------------------------+
| Jku9ch | 15 | 女 |
| 1z4a3m | 16 | 女 |
| 7k5yf5 | 15 | 女 |
| 2taztn | 15 | 女 |
| 75a2y4 | 15 | 男 |
+--------------------------------+-------------+--------------------------------+
5 rows in set
三、写入数据到PostgreSql中(Java)
准备测试表
create table if not exists postgresql_person
(
name varchar(6),
age int,
gender char
);
向PostgreSql数据库中写入测试数据
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.table.api.*;
import org.apache.flink.types.Row;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.flink.table.api.Expressions.$;
public class PostgresqlWriteTest {
//测试数据结构
@Data
@AllArgsConstructor
private static class Person {
private String name;
private int age;
private String gender;
}
public static void main(String[] args) {
//设置为批处理
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
//使用TableApi
TableEnvironment tableEnv = TableEnvironment.create(settings);
//创建测试数据
List<Row> rows = IntStream.rangeClosed(1, 5)
.mapToObj(v -> {
//创建5条数据对象,随机创建数据内容
return new Person(
StrUtil.upperFirst(RandomUtil.randomString(6)),
RandomUtil.randomInt(15, 17),
RandomUtil.randomString("男女", 1));
})
.map(p -> Row.of(p.name, p.age, p.gender))
.collect(Collectors.toList());
//将测试数据转换成数据表
Table dataTable = tableEnv.fromValues(rows).as("name", "age", "gender");
//创建JDBC连接
String targetTableName = "person";
tableEnv.createTemporaryTable(targetTableName, TableDescriptor.forConnector("jdbc")
.schema(Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.column("gender", DataTypes.CHAR(1))
.build())
.option("url", "jdbc:postgresql://192.168.3.190:5432/y")
.option("table-name", "postgresql_person") //目标表名称
.option("username", "postgres")
.option("password", "123456")
.build());
//将数据插入目标表
dataTable.select($("*")).executeInsert(targetTableName);
//查看目标数据表内容
tableEnv.from(targetTableName).select($("*")).execute().print();
}
}
运行结果
程序正常运行后,创建五条测试数据并写入到目标表中,可在控制台中打印如下内容:
+--------------------------------+-------------+--------------------------------+
| name | age | gender |
+--------------------------------+-------------+--------------------------------+
| Xfsg1q | 16 | 女 |
| 0z5ilr | 16 | 男 |
| Eutala | 15 | 女 |
| F75phz | 15 | 男 |
| 2xhqd8 | 15 | 男 |
+--------------------------------+-------------+--------------------------------+
四、接收Kafka消息流写入数据到Mysql中(Scala)
测试代码
import cn.hutool.core.util.StrUtil import cn.hutool.json.JSONUtil import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.serialization.AbstractDeserializationSchema import org.apache.flink.connector.kafka.source.KafkaSource import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.{ DataTypes, FieldExpression, Schema, Table, TableDescriptor} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.kafka.clients.consumer.OffsetResetStrategy import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit object KafkaToMysqlTest { //测试数据结构(自带schema不需要再设置) private case class Person(name: String, age: Int, gender: String) def main(args: Array[String]): Unit = { //环境配置 val env = StreamExecutionEnvironment.getExecutionEnvironment //每个1s检查一次,精确一次 env.enableCheckpointing(TimeUnit.SECONDS.toMillis(1), CheckpointingMode.EXACTLY_ONCE) //table api val tableEnv = StreamTableEnvironment.create(env) //配置kafka val source = KafkaSource.builder() .setBootstrapServers("192.168.11.160:9092,192.168.11.161:9092,192.168.11.162:9092") .setTopics("ly_test") .setGroupId("KafkaToMysqlTest") .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) //自定义解析消息内容 .setValueOnlyDeserializer(new AbstractDeserializationSchema[Person]() { override def deserialize(message: Array[Byte]): Person = { val json = JSONUtil.parse(StrUtil.str(message, StandardCharsets.UTF_8)) Person( json.getByPath("name", classOf[String