资讯详情

Doris学习笔记之与其他系统集成

文章目录

  • 数据准备
  • Spark读写Doris
    • 准备spark环境
    • 使用Spark Doris Connector
      • SQL读写数据的方式
      • DataFrame方式
      • RDD读取数据的方法
      • 配置和字段类型映射
  • Flink读写Doris
    • 准备Flink环境
    • SQL方式读写
    • DataStream方式读写
      • Source
      • Sink
    • 通用配置和字段类型
  • DataX doris writer
    • 编译
    • 使用
    • 参数说明
  • ODBC外部表
    • 版本与数据类型对应关系
    • unixODBC与MySQL安装连接器
    • 数据准备
    • 结点配置和外观创建
  • Doris On ES
    • 原理
    • 使用方式
      • Doris中创建ES外表
      • 列式扫描优化查询速度
      • 探测keyword类型字段
      • 自动发现打开结点
      • 配置https访问模式
      • 查询用法
        • 基本查询
        • 扩展的esquery(field, QueryDSL)
    • 最佳实践
      • 建议使用时间类型字段
      • 获取ES元数据字段_id

数据准备

首先,准备表和数据:

CREATE TABLE table1 ( siteid INT DEFAULT '10', citycode SMALLINT, username VARCHAR(32) DEFAULT '', pv BIGINT SUM DEFAULT '0' ) AGGREGATE KEY(siteid, citycode, username) DISTRIBUTED BY HASH(siteid) BUCKETS 10 PROPERTIES("replication_num" = "1", "storage_medium" = "SSD");  insert into table1 values (1,1,'jim',2), (2,1,'grace',2), (3,2,'tom',2),(4,3,'bush',3),
(5,3,'helen',3);

Spark读写Doris

准备spark环境

新建Maven项目,编辑pom.xml文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>SparkOnDoris</artifactId>
    <version>1.0</version>

    <properties>
        <scala.binary.version>2.12</scala.binary.version>
        <spark.version>3.0.0</spark.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <!-- Spark 的依赖引入 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <scope>provided</scope>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <scope>provided</scope>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.binary.version}</artifactId>
            <scope>provided</scope>
            <version>${spark.version}</version>
        </dependency>
        <!-- 引入 Scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.12.10</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>
        <!--spark-doris-connector-->
        <dependency>
            <groupId>org.apache.doris</groupId>
            <artifactId>spark-doris-connector-3.1_2.12</artifactId>
            <!--<artifactId>spark-doris-connector- 2.3_2.11</artifactId>-->
            <version>1.0.1</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!--编译 scala 所需插件-->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.1</version>
                <executions>
                    <execution>
                        <id>compile-scala</id>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>test-compile-scala</id>
                        <goals>
                            <goal>add-source</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <!-- 声明绑定到 maven 的 compile 阶段 -->
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- assembly 打包插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <archive>
                        <manifest>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

使用Spark Doris Connector

Spark Doris Connector支持通过Spark对Doris进行数据读写,运行时要在运行配置中开启Add dependencies with "provided" scope to classpath。

SQL方式读写数据

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object SQLDemo { 
        
  def main( args: Array[String] ): Unit = { 
        
    val sparkConf = new SparkConf().setAppName("SQLDemo")
      .setMaster("local[*]")
    val sparkSession =
      SparkSession.builder().config(sparkConf).getOrCreate()
    sparkSession.sql(
      """ |CREATE TEMPORARY VIEW spark_doris |USING doris |OPTIONS( | "table.identifier"="test.table1", | "fenodes"="scentos:8030", | "user"="root", | "password"="root" |); """.stripMargin)
    //读取数据
     sparkSession.sql("select * from spark_doris").show()
    //写入数据
// sparkSession.sql("insert into spark_doris values(99,99,'haha',5)")
  }
}

先写入数据,再读取数据,在IDEA中运行结果如下(其他日志信息省略):

+------+--------+--------+---+
|siteid|citycode|username| pv|
+------+--------+--------+---+
|     2|       1|   grace|  2|
|     1|       1|     jim|  2|
|     3|       2|     tom|  2|
|     4|       3|    bush|  3|
|    99|      99|    haha|  5|
|     5|       3|   helen|  3|
+------+--------+--------+---+

DataFrame方式

读数据:

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object DataFrameDemo { 
        
  def main( args: Array[String] ): Unit = { 
        
    val sparkConf = new SparkConf().setAppName("DataFrameDemo")
      .setMaster("local[*]") //TODO 要打包提交集群执行,注释掉
    val sparkSession =
      SparkSession.builder().config(sparkConf).getOrCreate()
    // 读取数据
     val dorisSparkDF = sparkSession.read.format("doris")
     .option("doris.table.identifier", "test.table1")
     .option("doris.fenodes", "scentos:8030")
     .option("user", "root")
     .option("password", "root")
     .load()
     dorisSparkDF.show()
  }
}

运行结果:

+------+--------+--------+---+
|siteid|citycode|username| pv|
+------+--------+--------+---+
|     3|       2|     tom|  2|
|     4|       3|    bush|  3|
|    99|      99|    haha|  5|
|     5|       3|   helen|  3|
|     2|       1|   grace|  2|
|     1|       1|     jim|  2|
+------+--------+--------+---+

写数据:

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object DataFrameDemo { 
        
  def main( args: Array[String] ): Unit = { 
        
    val sparkConf = new SparkConf().setAppName("DataFrameDemo")
      .setMaster("local[*]") //TODO 要打包提交集群执行,注释掉
    val sparkSession =
      SparkSession.builder().config(sparkConf).getOrCreate()

    // 写入数据
    import sparkSession.implicits._
    val mockDataDF = List(
      (11,23, "haha", 8),
      (11, 3, "hehe", 9),
      (11, 3, "heihei", 10)
    ).toDF("siteid", "citycode", "username","pv")
    mockDataDF.show(5)
    mockDataDF.write.format("doris")
      .option("doris.table.identifier", "test.table1")
      .option("doris.fenodes", "scentos:8030")
      .option("user", "root")
      .option("password", "root")
      //指定你要写入的字段
      // .option("doris.write.fields", "user")
      .save()
  }
}

运行后在Doris中查看:

mysql> select * from table1;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      3 |        2 | tom      |    2 |
|     11 |        3 | hehe     |    9 |
|     11 |        3 | heihei   |   10 |
|     11 |       23 | haha     |    8 |
|      4 |        3 | bush     |    3 |
|     99 |       99 | haha     |    5 |
|      1 |        1 | jim      |    2 |
|      2 |        1 | grace    |    2 |
|      5 |        3 | helen    |    3 |
+--------+----------+----------+------+
9 rows in set (0.12 sec)

RDD方式读取数据

import org.apache.spark.{ 
        SparkConf, SparkContext}

object RDDDemo { 
        
  def main( args: Array[String] ): Unit = { 
        
    val sparkConf = new SparkConf().setAppName("RDDDemo")
      .setMaster("local[*]") //TODO 要打包提交集群执行,注释掉
    val sc = new SparkContext(sparkConf)
    import org.apache.doris.spark._
    val dorisSparkRDD = sc.dorisRDD(
      tableIdentifier = Some("test.table1"),
      cfg = Some(Map(
        "doris.fenodes" -> "scentos:8030",
        "doris.request.auth.user" -> "root",
        "doris.request.auth.password" -> "root"
      ))
    )
    dorisSparkRDD.collect().foreach(println)
  }
}

运行结果:

[1, 1, jim, 2]
[5, 3, helen, 3]
[2, 1, grace, 2]
[4, 3, bush, 3]
[99, 99, haha, 5]
[11, 
        标签: k5连接器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台