兴迈手把手教你 | MongoDB Spark Connector 实战指南


Why Spark with MongoDB?

  1. 高性能,官方号称 100x faster,因为可以全内存运行,性能提升肯定是很明显的
  2. 简单易用,支持 Java、Python、Scala、SQL 等多种语言,使得构建分析应用非常简单
  3. 统一构建 ,支持多种数据源,通过 Spark RDD 屏蔽底层数据差异,同一个分析应用可运行于不同的数据源;
  4. 应用场景广泛,能同时支持批处理以及流式处理

MongoDB Spark Connector 为官方推出,用于适配 Spark 操作 MongoDB 数据;本文以 Python 为例,介绍 MongoDB Spark Connector 的使用,帮助你基于 MongoDB 构建第一个分析应用。

准备 MongoDB 环境

安装 MongoDB 参考 Install MongoDB Community Edition on Linux

mkdir mongodata
mongod --dbpath mongodata --port 9555

准备 Spark python 环境

参考 PySpark - Quick Guide

下载 Spark

cd /home/mongo-spark
wget http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
tar zxvf spark-2.4.4-bin-hadoop2.7.tgz

设置 Spark 环境变量

export SPARK_HOME=/home/mongo-spark/spark-2.4.4-bin-hadoop2.7
export PATH=$PATH:/home/mongo-spark/spark-2.4.4-bin-hadoop2.7/bin
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
export PATH=$SPARK_HOME/python:$PATH

运行 Spark RDD 示例

# count.py
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
 ["scala", 
 "java", 
 "hadoop", 
 "spark", 
 "akka",
 "spark vs hadoop", 
 "pyspark",
 "pyspark and spark"]
)
counts = words.count()
$SPARK_HOME/bin/spark-submit count.py
Number of elements in RDD → 8 

如果上述程序运行成功,说明 Spark python 环境准备成功,还可以测试 Spark 的其他 RDD 操作,比如 collector、filter、map、reduce、join 等,更多示例参考 PySpark - Quick Guide

Spark 操作 MongoDB 数据

参考 Spark Connector Python Guide

准备测试数据 test.coll01 插入3条测试数据,test.coll02 未空

mongo --port 9555
> db.coll01.find()
{ "_id" : 1, "type" : "apple", "qty" : 5 }
{ "_id" : 2, "type" : "orange", "qty" : 10 }
{ "_id" : 3, "type" : "banana", "qty" : 15 }
> db.coll02.find()

准备操作脚本,将输入集合的数据按条件进行过滤,写到输出集合

# mongo-spark-test.py
from pyspark.sql import SparkSession
# Create Spark Session
spark = SparkSession \
 .builder \
 .appName("myApp") \
 .config("spark.mongodb.input.uri", "mongodb://127.0.0.1:9555/test.coll01") \
 .config("spark.mongodb.output.uri", "mongodb://127.0.0.1:9555/test.coll") \
 .getOrCreate()
# Read from MongoDB
df = spark.read.format("mongo").load()
df.show()
# Filter and Write
df.filter(df['qty'] >= 10).write.format("mongo").mode("append").save() 
# Use SQL 
# df.createOrReplaceTempView("temp")
# some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'")
# some_fruit.show()

运行脚本

$SPARK_HOME/bin/spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.1 mongo-spark-test.py
mongo --port 9555
> db.coll02.find()
{ "_id" : 2, "qty" : 10, "type" : "orange" }
{ "_id" : 3, "qty" : 15, "type" : "banana" }

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

热门产品

php编程基础教程.pptx|php编程培训,php,编程,基础,教程,pptx
php编程基础教程.pptx

历史上的今天:04月30日

热门专题

国家开放大学|国家开放大学报名,国家开放大学报考,国家开放大学,什么是国家开放大学,国家开放大学学历,国家开放大学学费,国家开放大学报名条件,国家开放大学报名时间,国家开放大学学历,国家开放大学专业
国家开放大学
天麻的功效与作用吃法|天麻的功效与作用,天麻的功效与作用吃法,天麻炖什么治头痛最好,天麻的功效与作用禁忌,天麻多少钱一斤,天麻的功效与作用吃法及禁忌,天麻怎么吃效果最好,天麻粉的功效与作用,天麻怎么吃
天麻的功效与作用吃法
易捷尔单招|易捷尔单招,易捷尔单招培训,易捷尔单招报名,易捷尔单招考试,易捷尔单招培训学校,易捷尔单招分数
易捷尔单招
一年制中专|中专学历,中专是什么学历,中专是什么,中专有什么专业,中专升大专,一年制中专
一年制中专
开放大学|开放大学报名,开放大学报考,开放大学,什么是开放大学,开放大学学历,开放大学学费,开放大学报名条件,开放大学报名时间,开放大学学历,开放大学专业
开放大学
大理科技管理学校|大理科技管理中等职业技术学校,大理市科技管理中等职业技术学校
大理科技管理学校
APP开发|app开发_app开发公司_app软件开发_专业app开发_云南app开发公司_app定制_原生app开发定制
APP开发
大理科技管理学校|大理科技管理学校,大理科技,大理科技中等职业技术学校,大理科技管理中等职业技术学校,大理科技学校
大理科技管理学校

微信小程序

微信扫一扫体验

立即
投稿

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部