(本文翻译官网内容)
英文原文网站:
https://docs.mongodb.com/spark-connector/current/python-api/
为什么使用MongoDB Spark:
http://www.mongoing.com/tj/mongodb_shanghai_spark
详见以下例子的源码introduction.py:
https://github.com/mongodb/mongo-spark/blob/master/examples/src/test/python/introduction.py
目录
1 前提
2 入门指南
2.1 Python Spark Shell
2.2 创建一个SparkSession对象
3 教程
3.1 写入MongoDB
3.2 读取MongoDB
3.3 聚合
3.4 过滤器和SQL
1 前提
MongoDB和Apache Spark基本操作知识。MongoDB文档 和Spark文档
运行MongoDB(2.6及以上版本)
Spark 2.1.x
Scala 2.11.x
2 入门指南
2.1 Python Spark Shell
本指南使用pysparkshell,但代码也适用于独立Python应用程序。
当使用pysparkshell当时,您可以设置:
–packages下载选项MongoDB Spark Connector包。可用以下程序包:
mongo-spark-connector_2.11 (与Scala 2.11.x结合使用)
–conf配置选项MongoDB Spark Connector。配置这些设置SparkConf对象。
注:当通过SparkConf在设置连接器配置时,必须为这些设置添加适当的前缀。细节和其他MongoDB Spark Connector选项参见Configuration Options。
以下示例用命令运行pysparkshell:
./bin/pyspark –conf“spark.mongodb.input.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred” \
–conf“spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection” \
–packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.0
spark.mongodb.output.uri用于设置存储和输出数据MongoDB服务器地址(127.0.0.1)连接数据库( test)和集合(myCollection),默认连接27017端口。
spark.mongodb.input.uri用于设置需要读取的数据所在的MongoDB服务器地址(127.0.0.1)连接数据库( test)和集合(myCollection),以及阅读的优先级。
上述数据库和集合将用于本指南。
2.2 创建一个SparkSession对象
注:当运行pyspark默认会得到一个名字spark的SparkSession对象Python在应用程序中,你需要清楚地创建你SparkSession对象,如下所示。
假如你在运行pyspark时设置spark.mongodb.input.uri和 spark.mongodb.output.uri,SparkSession默认使用对象。假如你想在那里pyspark 创造自己的SparkSession对象,你可以用SparkSession.builder并设置不同的配置项。
from pyspark.sql import SparkSession
my_spark = SparkSession \
.builder \
.appName(“myApp”) \
.config(“spark.mongodb.input.uri”,”mongodb://127.0.0.1/test.coll”) \
.config(“spark.mongodb.output.uri”,”mongodb://127.0.0.1/test.coll”) \
.getOrCreate()
你可以用一个SparkSession对象来向MongoDB写入数据,读取 MongoDB数据,创建DataFrames(数据框架)、执行 SQL操作。
3 教程
3.1 写入MongoDB
在创建一个DataFrame在(数据框架)之前,首先要创建一个SparkSession对象,然后使用对象 createDataFrame()函数。在以下示例中,createDataFrame()函数中有多个包含名称和年龄的元组,以及由列名组成的数组。
people = spark.createDataFrame([(“Bilbo Baggins”, 50), (“Gandalf”, 1000), (“Thorin”, 195), (“Balin”, 178), (“Kili”, 77), (“Dwalin”, 169), (“Oin”, 167), (“Gloin”, 158), (“Fili”, 82), (“Bombur”, None)], [“name”, “age”])
想把这个名字叫我想people”的DataFrame 写入在spark.mongodb.output.uri设置在选项中MongoDB可以使用数据库和集合write 函数:
people.write.format(“com.mongodb.spark.sql.DefaultSource”).mode(“append”).save()
以上操作将与您连接pyspark shell将数据写入时spark.mongodb.output.uri设置在选项中 MongoDB数据库和集合。
想要查看DataFrame可以使用内容show()
people.show()
在pyspark shell此步骤打印以下内容:
————- —-
| name| age|
————- —-
|Bilbo Baggins| 50|
| Gandalf|1000|
| Thorin| 195|
| Balin| 178|
| Kili| 77|
| Dwalin| 169|
| Oin| 167|
| Gloin| 158|
| Fili| 82|
| Bombur|null|
————- —-
用printSchema()函数可以打印出来DataFrame的架构:
people.printSchema()
在pyspark shell此步骤打印以下内容:
root
|– _id: struct (nullable = true)
| |– oid: string (nullable = true)
|– age: long (nullable = true)
|– name: string (nullable = true)
若需写入数据MongoDB另一个集合可以结合使用.option()和 .write()函数。
想把数据写成名字people数据库中的名称是contacts 在集合中,输出URI选项中设置people.contacts。
people.write.format(“com.mongodb.spark.sql.DefaultSource”).mode(“append”).option(“database”,”people”).option(“collection”, “contacts”).save()
3.2 读取MongoDB
你可以创建一个Spark DataFrame来保存从MongoDB集合(为在 SparkSession中用spark.mongodb.input.uri在选项中设置的集合)中读取的数据。
一个假设包含以下内容document名为(文档) fuit”的集合:
{ “_id” : 1, “type” : “apple”, “qty” : 5 }
{ “_id” : 2, “type” : “orange”, “qty” : 10 }
{ “_id” : 3, “type” : “banana”, “qty” : 15 }
在pyspark shell中,用spark.read()将集合赋值给 DataFrame
df = spark.read.format(“com.mongodb.spark.sql.DefaultSource”).load()
可以用spark抽取记录来推断出集合的架构:
df.printSchema()
以上操作会产生如下的shell输出:
root
|– _id: double (nullable = true)
|– qty: double (nullable = true)
|– type: string (nullable = true)
如果你想从另一个MongoDB集合中读取数据,可以在读取数据到DataFrame时使用 .option函数。
想要读取一个名为“people”的数据库中名为“contacts ”的集合里的数据,要在输入URI选项中设置people.contacts。
df = spark.read.format(“com.mongodb.spark.sql.DefaultSource”).option(“uri”,
“mongodb://127.0.0.1/people.contacts”).load()
3.3 聚合
使用MongoDB的聚合管道 可以实现将MongoDB中的数据读取到Spark中时应用过滤规则、执行聚合操作。
假设有一个包含了以下document(文档)的名为“ fruit”的集合:
{ “_id” : 1, “type” : “apple”, “qty” : 5 }
{ “_id” : 2, “type” : “orange”, “qty” : 10 }
{ “_id” : 3, “type” : “banana”, “qty” : 15 }
在pyspark shell中,向spark.read()里增加 option()函数,就可以在创建DataFrame时设置一个聚合管道来使用。
pipeline = “{‘$match’: {‘type’: ‘apple’}}”
df=spark.read.format(“com.mongodb.spark.sql.DefaultSource”).option(“pipeline”, pipeline).load()
df.show()
在pyspark shell中,这一步操作打印出以下内容:
+—+—+—–+
|_id|qty| type|
+—+—+—–+
|1.0|5.0|apple|
+—+—+—–+
3.4 过滤器和SQL
过滤器(Filters)
注:当将过滤器与DataFrame或Python API结合使用时,Mongo连接器的底层代码会创建一个聚合管道,用来在数据送往Spark之前就在MongoDB中过滤数据。
用filter()来读取MongoDB集合中的一个子集。
假设有一个包含了以下document(文档)的名为“ fruit”的集合:
{ “_id” : 1, “type” : “apple”, “qty” : 5 }
{ “_id” : 2, “type” : “orange”, “qty” : 10 }
{ “_id” : 3, “type” : “banana”, “qty” : 15 }
首先,创建一个dataframe来连接默认的 MongoDB数据源:
df = spark.read.format(“com.mongodb.spark.sql.DefaultSource”).load()
以下示例只包含qty字段大于等于10的记录
df.filter(df[‘qty’] >= 10).show()
这一步操作打印出以下结果:
+—+—-+——+
|_id| qty| type|
+—+—-+——+
|2.0|10.0|orange|
|3.0|15.0|banana|
+—+—-+——+
SQL
在运行SQL查询DataFrame之前, 你需要注册一个临时表。
以下示例注册了一个名为“temp”的临时表,然后用SQL 来查询type字段含有字母e的记录:
df.createOrReplaceTempView(“temp”)
some_fruit = spark.sql(“SELECT type, qty FROM temp WHERE type LIKE ‘%e%'”)
some_fruit.show()
在pyspark shell中,这步操作打印出如下结果:
+——+—-+
| type| qty|
+——+—-+
| apple| 5.0|
|orange|10.0|
+——+—-+
“阅读原文”查看原汁原味的英文文档以及相关链接~