文章目录
- 数据准备
- Spark读写Doris
-
- 准备spark环境
- 使用Spark Doris Connector
-
- SQL读写数据的方式
- DataFrame方式
- RDD读取数据的方法
- 配置和字段类型映射
- Flink读写Doris
-
- 准备Flink环境
- SQL方式读写
- DataStream方式读写
-
- Source
- Sink
- 通用配置和字段类型
- DataX doris writer
-
- 编译
- 使用
- 参数说明
- ODBC外部表
-
- 版本与数据类型对应关系
- unixODBC与MySQL安装连接器
- 数据准备
- 结点配置和外观创建
- Doris On ES
-
- 原理
- 使用方式
-
- Doris中创建ES外表
- 列式扫描优化查询速度
- 探测keyword类型字段
- 自动发现打开结点
- 配置https访问模式
- 查询用法
-
- 基本查询
- 扩展的esquery(field, QueryDSL)
- 最佳实践
-
- 建议使用时间类型字段
- 获取ES元数据字段_id
数据准备
首先,准备表和数据:
CREATE TABLE table1 ( siteid INT DEFAULT '10', citycode SMALLINT, username VARCHAR(32) DEFAULT '', pv BIGINT SUM DEFAULT '0' ) AGGREGATE KEY(siteid, citycode, username) DISTRIBUTED BY HASH(siteid) BUCKETS 10 PROPERTIES("replication_num" = "1", "storage_medium" = "SSD"); insert into table1 values (1,1,'jim',2), (2,1,'grace',2), (3,2,'tom',2),(4,3,'bush',3),
(5,3,'helen',3);
Spark读写Doris
准备spark环境
新建Maven项目,编辑pom.xml文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>SparkOnDoris</artifactId>
<version>1.0</version>
<properties>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.0.0</spark.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!-- Spark 的依赖引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<scope>provided</scope>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<scope>provided</scope>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<scope>provided</scope>
<version>${spark.version}</version>
</dependency>
<!-- 引入 Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.10</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<!--spark-doris-connector-->
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-3.1_2.12</artifactId>
<!--<artifactId>spark-doris-connector- 2.3_2.11</artifactId>-->
<version>1.0.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<!--编译 scala 所需插件-->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.1</version>
<executions>
<execution>
<id>compile-scala</id>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<!-- 声明绑定到 maven 的 compile 阶段 -->
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- assembly 打包插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<manifest>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
使用Spark Doris Connector
Spark Doris Connector支持通过Spark对Doris进行数据读写,运行时要在运行配置中开启Add dependencies with "provided" scope to classpath。
SQL方式读写数据
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object SQLDemo {
def main( args: Array[String] ): Unit = {
val sparkConf = new SparkConf().setAppName("SQLDemo")
.setMaster("local[*]")
val sparkSession =
SparkSession.builder().config(sparkConf).getOrCreate()
sparkSession.sql(
""" |CREATE TEMPORARY VIEW spark_doris |USING doris |OPTIONS( | "table.identifier"="test.table1", | "fenodes"="scentos:8030", | "user"="root", | "password"="root" |); """.stripMargin)
//读取数据
sparkSession.sql("select * from spark_doris").show()
//写入数据
// sparkSession.sql("insert into spark_doris values(99,99,'haha',5)")
}
}
先写入数据,再读取数据,在IDEA中运行结果如下(其他日志信息省略):
+------+--------+--------+---+
|siteid|citycode|username| pv|
+------+--------+--------+---+
| 2| 1| grace| 2|
| 1| 1| jim| 2|
| 3| 2| tom| 2|
| 4| 3| bush| 3|
| 99| 99| haha| 5|
| 5| 3| helen| 3|
+------+--------+--------+---+
DataFrame方式
读数据:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object DataFrameDemo {
def main( args: Array[String] ): Unit = {
val sparkConf = new SparkConf().setAppName("DataFrameDemo")
.setMaster("local[*]") //TODO 要打包提交集群执行,注释掉
val sparkSession =
SparkSession.builder().config(sparkConf).getOrCreate()
// 读取数据
val dorisSparkDF = sparkSession.read.format("doris")
.option("doris.table.identifier", "test.table1")
.option("doris.fenodes", "scentos:8030")
.option("user", "root")
.option("password", "root")
.load()
dorisSparkDF.show()
}
}
运行结果:
+------+--------+--------+---+
|siteid|citycode|username| pv|
+------+--------+--------+---+
| 3| 2| tom| 2|
| 4| 3| bush| 3|
| 99| 99| haha| 5|
| 5| 3| helen| 3|
| 2| 1| grace| 2|
| 1| 1| jim| 2|
+------+--------+--------+---+
写数据:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object DataFrameDemo {
def main( args: Array[String] ): Unit = {
val sparkConf = new SparkConf().setAppName("DataFrameDemo")
.setMaster("local[*]") //TODO 要打包提交集群执行,注释掉
val sparkSession =
SparkSession.builder().config(sparkConf).getOrCreate()
// 写入数据
import sparkSession.implicits._
val mockDataDF = List(
(11,23, "haha", 8),
(11, 3, "hehe", 9),
(11, 3, "heihei", 10)
).toDF("siteid", "citycode", "username","pv")
mockDataDF.show(5)
mockDataDF.write.format("doris")
.option("doris.table.identifier", "test.table1")
.option("doris.fenodes", "scentos:8030")
.option("user", "root")
.option("password", "root")
//指定你要写入的字段
// .option("doris.write.fields", "user")
.save()
}
}
运行后在Doris中查看:
mysql> select * from table1;
+--------+----------+----------+------+
| siteid | citycode | username | pv |
+--------+----------+----------+------+
| 3 | 2 | tom | 2 |
| 11 | 3 | hehe | 9 |
| 11 | 3 | heihei | 10 |
| 11 | 23 | haha | 8 |
| 4 | 3 | bush | 3 |
| 99 | 99 | haha | 5 |
| 1 | 1 | jim | 2 |
| 2 | 1 | grace | 2 |
| 5 | 3 | helen | 3 |
+--------+----------+----------+------+
9 rows in set (0.12 sec)
RDD方式读取数据
import org.apache.spark.{
SparkConf, SparkContext}
object RDDDemo {
def main( args: Array[String] ): Unit = {
val sparkConf = new SparkConf().setAppName("RDDDemo")
.setMaster("local[*]") //TODO 要打包提交集群执行,注释掉
val sc = new SparkContext(sparkConf)
import org.apache.doris.spark._
val dorisSparkRDD = sc.dorisRDD(
tableIdentifier = Some("test.table1"),
cfg = Some(Map(
"doris.fenodes" -> "scentos:8030",
"doris.request.auth.user" -> "root",
"doris.request.auth.password" -> "root"
))
)
dorisSparkRDD.collect().foreach(println)
}
}
运行结果:
[1, 1, jim, 2] [5, 3, helen, 3] [2, 1, grace, 2] [4, 3, bush, 3] [99, 99, haha, 5] [11, 标签:
k5连接器