资讯详情

Spark连接MongoDB指南(基于python)

(本文翻译官网内容)

英文原文网站:

https://docs.mongodb.com/spark-connector/current/python-api/

为什么使用MongoDB Spark:

http://www.mongoing.com/tj/mongodb_shanghai_spark

详见以下例子的源码introduction.py:

https://github.com/mongodb/mongo-spark/blob/master/examples/src/test/python/introduction.py

目录

1 前提

2 入门指南

2.1 Python Spark Shell

2.2 创建一个SparkSession对象

3 教程

3.1 写入MongoDB

3.2 读取MongoDB

3.3 聚合

3.4 过滤器和SQL

1 前提

MongoDB和Apache Spark基本操作知识。MongoDB文档 和Spark文档

运行MongoDB(2.6及以上版本)

Spark 2.1.x

Scala 2.11.x

2 入门指南

2.1 Python Spark Shell

本指南使用pysparkshell,但代码也适用于独立Python应用程序。

当使用pysparkshell当时,您可以设置:

–packages下载选项MongoDB Spark Connector包。可用以下程序包:

mongo-spark-connector_2.11 (与Scala 2.11.x结合使用)

–conf配置选项MongoDB Spark Connector。配置这些设置SparkConf对象。

注:当通过SparkConf在设置连接器配置时,必须为这些设置添加适当的前缀。细节和其他MongoDB Spark Connector选项参见Configuration Options。

以下示例用命令运行pysparkshell:

./bin/pyspark –conf“spark.mongodb.input.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred” \

–conf“spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection” \

–packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.0

spark.mongodb.output.uri用于设置存储和输出数据MongoDB服务器地址(127.0.0.1)连接数据库( test)和集合(myCollection),默认连接27017端口。

spark.mongodb.input.uri用于设置需要读取的数据所在的MongoDB服务器地址(127.0.0.1)连接数据库( test)和集合(myCollection),以及阅读的优先级。

上述数据库和集合将用于本指南。

2.2 创建一个SparkSession对象

注:当运行pyspark默认会得到一个名字spark的SparkSession对象Python在应用程序中,你需要清楚地创建你SparkSession对象,如下所示。

假如你在运行pyspark时设置spark.mongodb.input.uri和 spark.mongodb.output.uri,SparkSession默认使用对象。假如你想在那里pyspark 创造自己的SparkSession对象,你可以用SparkSession.builder并设置不同的配置项。

from pyspark.sql import SparkSession

my_spark = SparkSession \

.builder \

.appName(“myApp”) \

.config(“spark.mongodb.input.uri”,”mongodb://127.0.0.1/test.coll”) \

.config(“spark.mongodb.output.uri”,”mongodb://127.0.0.1/test.coll”) \

.getOrCreate()

你可以用一个SparkSession对象来向MongoDB写入数据,读取 MongoDB数据,创建DataFrames(数据框架)、执行 SQL操作。

3 教程

3.1 写入MongoDB

在创建一个DataFrame在(数据框架)之前,首先要创建一个SparkSession对象,然后使用对象 createDataFrame()函数。在以下示例中,createDataFrame()函数中有多个包含名称和年龄的元组,以及由列名组成的数组。

people = spark.createDataFrame([(“Bilbo Baggins”, 50), (“Gandalf”, 1000), (“Thorin”, 195), (“Balin”, 178), (“Kili”, 77), (“Dwalin”, 169), (“Oin”, 167), (“Gloin”, 158), (“Fili”, 82), (“Bombur”, None)], [“name”, “age”])

想把这个名字叫我想people”的DataFrame 写入在spark.mongodb.output.uri设置在选项中MongoDB可以使用数据库和集合write 函数:

people.write.format(“com.mongodb.spark.sql.DefaultSource”).mode(“append”).save()

以上操作将与您连接pyspark shell将数据写入时spark.mongodb.output.uri设置在选项中 MongoDB数据库和集合。

想要查看DataFrame可以使用内容show()

people.show()

在pyspark shell此步骤打印以下内容:

————- —-

| name| age|

————- —-

|Bilbo Baggins| 50|

| Gandalf|1000|

| Thorin| 195|

| Balin| 178|

| Kili| 77|

| Dwalin| 169|

| Oin| 167|

| Gloin| 158|

| Fili| 82|

| Bombur|null|

————- —-

用printSchema()函数可以打印出来DataFrame的架构:

people.printSchema()

在pyspark shell此步骤打印以下内容:

root

|– _id: struct (nullable = true)

| |– oid: string (nullable = true)

|– age: long (nullable = true)

|– name: string (nullable = true)

若需写入数据MongoDB另一个集合可以结合使用.option()和 .write()函数。

想把数据写成名字people数据库中的名称是contacts 在集合中,输出URI选项中设置people.contacts。

people.write.format(“com.mongodb.spark.sql.DefaultSource”).mode(“append”).option(“database”,”people”).option(“collection”, “contacts”).save()

3.2 读取MongoDB

你可以创建一个Spark DataFrame来保存从MongoDB集合(为在 SparkSession中用spark.mongodb.input.uri在选项中设置的集合)中读取的数据。

一个假设包含以下内容document名为(文档) fuit”的集合:

{ “_id” : 1, “type” : “apple”, “qty” : 5 }

{ “_id” : 2, “type” : “orange”, “qty” : 10 }

{ “_id” : 3, “type” : “banana”, “qty” : 15 }

在pyspark shell中,用spark.read()将集合赋值给 DataFrame

df = spark.read.format(“com.mongodb.spark.sql.DefaultSource”).load()

可以用spark抽取记录来推断出集合的架构:

df.printSchema()

以上操作会产生如下的shell输出:

root

|– _id: double (nullable = true)

|– qty: double (nullable = true)

|– type: string (nullable = true)

如果你想从另一个MongoDB集合中读取数据,可以在读取数据到DataFrame时使用 .option函数。

想要读取一个名为“people”的数据库中名为“contacts ”的集合里的数据,要在输入URI选项中设置people.contacts。

df = spark.read.format(“com.mongodb.spark.sql.DefaultSource”).option(“uri”,

“mongodb://127.0.0.1/people.contacts”).load()

3.3 聚合

使用MongoDB的聚合管道 可以实现将MongoDB中的数据读取到Spark中时应用过滤规则、执行聚合操作。

假设有一个包含了以下document(文档)的名为“ fruit”的集合:

{ “_id” : 1, “type” : “apple”, “qty” : 5 }

{ “_id” : 2, “type” : “orange”, “qty” : 10 }

{ “_id” : 3, “type” : “banana”, “qty” : 15 }

在pyspark shell中,向spark.read()里增加 option()函数,就可以在创建DataFrame时设置一个聚合管道来使用。

pipeline = “{‘$match’: {‘type’: ‘apple’}}”

df=spark.read.format(“com.mongodb.spark.sql.DefaultSource”).option(“pipeline”, pipeline).load()

df.show()

在pyspark shell中,这一步操作打印出以下内容:

+—+—+—–+

|_id|qty| type|

+—+—+—–+

|1.0|5.0|apple|

+—+—+—–+

3.4 过滤器和SQL

过滤器(Filters)

注:当将过滤器与DataFrame或Python API结合使用时,Mongo连接器的底层代码会创建一个聚合管道,用来在数据送往Spark之前就在MongoDB中过滤数据。

用filter()来读取MongoDB集合中的一个子集。

假设有一个包含了以下document(文档)的名为“ fruit”的集合:

{ “_id” : 1, “type” : “apple”, “qty” : 5 }

{ “_id” : 2, “type” : “orange”, “qty” : 10 }

{ “_id” : 3, “type” : “banana”, “qty” : 15 }

首先,创建一个dataframe来连接默认的 MongoDB数据源:

df = spark.read.format(“com.mongodb.spark.sql.DefaultSource”).load()

以下示例只包含qty字段大于等于10的记录

df.filter(df[‘qty’] >= 10).show()

这一步操作打印出以下结果:

+—+—-+——+

|_id| qty|  type|

+—+—-+——+

|2.0|10.0|orange|

|3.0|15.0|banana|

+—+—-+——+

SQL

在运行SQL查询DataFrame之前, 你需要注册一个临时表。

以下示例注册了一个名为“temp”的临时表,然后用SQL 来查询type字段含有字母e的记录:

df.createOrReplaceTempView(“temp”)

some_fruit = spark.sql(“SELECT type, qty FROM temp WHERE type LIKE ‘%e%'”)

some_fruit.show()

在pyspark shell中,这步操作打印出如下结果:

+——+—-+

|  type| qty|

+——+—-+

| apple| 5.0|

|orange|10.0|

+——+—-+

“阅读原文”查看原汁原味的英文文档以及相关链接~

标签: 2431tj62连接器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台