背景
在FlinkSql client下尝试将 kafka虚拟表中映射ods_base_province 导入到mysql表base_province抛出以下错误:
Flink SQL> INSERT INTO base_province SELECT * FROM ods_base_province; [INFO] Submitting SQL update statement to the cluster... [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. Available factory identifiers are: blackhole datagen filesystem kafka print upsert-kafka Shutting down the session... done. [bigdata_admin@dn5 flink_sql_rtdw-demo]$
分析
在Flink操作时上下文可能:
- 缺少flink与jdbc连接适配器
- 缺少 mysql 的 jdbc 驱动包
请检查${FLINK_HOME}/lib下面是否包含以下名称?jar:
- flink-connector-jdbc_2.x-1.y.z.jar
- mysql-connector-java-5.1.38.jar (仅举例)
修复
记得手动添加上述依赖包hive、kafka还导入了适配器连接包:
cd ${FLINK_HOME}/lib wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.13.6/flink-connector-jdbc_2.11-1.13.6.jar wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.38/mysql-connector-java-5.1.38.jar wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.11/1.13.6/flink-connector-hive_2.11-1.13.6.jar wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.13.6/flink-sql-connector-kafka_2.11-1.13.6.jar
下载后的jar包:
[bigdata_admin@dn5 lib]$ ll total 210872 -rw-rw-r-- 1 bigdata_admin bigdata_admin 7758727 Feb 4 17:48 flink-connector-hive_2.11-1.13.6.jar -rw-rw-r-- 1 bigdata_admin bigdata_admin 249570 Feb 4 17:48 flink-connector-jdbc_2.11-1.13.6.jar -rw-r--r-- 1 bigdata_admin bigdata_admin 92314 Feb 4 17:11 flink-csv-1.13.6.jar -rw-r--r-- 1 bigdata_admin bigdata_admin 115425612 Feb 4 17:15 flink-dist_2.11-1.13.6.jar -rw-r--r-- 1 bigdata_admin bigdata_admin 148127 Feb 4 17:11 flink-json-1.13.6.jar -rw-r--r-- 1 bigdata_admin bigdata_admin 7709740 May 7 2021 flink-shaded-zookeeper-3.4.14.jar -rw-rw-r-- 1 bigdata_admin bigdata_admin 3674190 Feb 4 17:59 flink-sql-connector-kafka_2.11-1.13.6.jar -rw-r--r-- 1 bigdata_admin bigdata_admin 36455408 Feb 4 17:14 flink-table_2.11-1.13.6.jar -rw-r--r-- 1 bigdata_admin bigdata_admin 41077430 Feb 4 17:14 flink-table-blink_2.11-1.13.6.jar -rw-r--r-- 1 bigdata_admin bigdata_admin 208006 Jan 13 19:06 log4j-1.2-api-2.17.1.jar -rw-r--r-- 1 bigdata_admin bigdata_admin 301872 Jan 7 18:07 log4j-api-2.17.1.jar -rw-r--r-- 1 bigdata_admin bigdata_amin 1790452 Jan 7 18:07 log4j-core-2.17.1.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 24279 Jan 7 18:07 log4j-slf4j-impl-2.17.1.jar
-rw-rw-r-- 1 bigdata_admin bigdata_admin 983911 Dec 2 2015 mysql-connector-java-5.1.38.jar
引申
这里使用的FlinkSql client方式来操作source端kafka中的数据,落地至sink端的mysql中,在使用TableEnvironment scala编程时,请将 驱动包添加到 pom.xml中, 同时在相关依赖中的参数置为 provided,如下所示, 以防止与服务器上的jar发生jar冲突。
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.13.6</version>
<scope>provided</scope>
</dependency>
Maven依赖包参数取值范围:
1.test范围是指测试范围有效,在编译和打包时都不会使用这个依赖
2.compile范围是指编译范围内有效,在编译和打包时都会将依赖存储进去
3.provided依赖,在编译和测试过程中有效,最后生成的war包时不会加入 例如:
servlet-api,因为servlet-api tomcat服务器已经存在了,如果再打包会冲突
4.runtime在运行时候依赖,在编译时候不依赖
默认依赖范围是compile