In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
怎么实践Spark,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
Spark小试牛刀
随着项目的运营,收集了很多的用户数据。最近业务上想做些社交图谱相关的产品,但因为数据很多、很杂,传统的数据库查询已经满足不了业务的需求。 试着用Spark来做,权当练练手了。
安装Spark
因为有Scala的开发经验,所以就不用官方提供的二进制包了,自编译scala 2.11版本。
下载Spark:http://ftp.cuhk.edu.hk/pub/packages/apache.org/spark/spark-1.5.0/spark-1.5.0.tgz
tar zxf spark-1.5.0.tgzcd spark-1.5.0./dev/change-scala-version.sh 2.11mvn -Pyarn -Phadoop-2.6 -Dscala-2.11 -DskipTests clean package
以上命令完成Spark基于scala 2.11版本的编译。可以运行自带的一个示例程序来验证安装是否成功。
./bin/run-example SparkPi编写Standalone application
使用sbt来构建一个可提交的简单Spark程序,功能是计算每个用户加入的群组,并把结果保存下来。project/Build.scala配置文件如下:
import _root_.sbt.Keys._import _root_.sbt._import sbtassembly.AssemblyKeys._object Build extends Build { override lazy val settings = super.settings :+ { shellPrompt := (s => Project.extract(s).currentProject.id + " > ") } lazy val root = Project("spark-mongodb", file(".")) .settings( scalaVersion := "2.11.7", assemblyJarName in assembly := "spark-mongodb.jar", assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false), libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % verSpark % "scopeProvidedTest, "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.4.0" excludeAll( ExclusionRule(organization = "javax.servlet"), ExclusionRule(organization = "commons-beanutils"), ExclusionRule(organization = "org.apache.hadoop"))) ) privateval scopeProvidedTest = "provided,test" privateval verSpark = "1.5.0"}
数据存储在MongoDB数据库中,所以我们还需要使用mongo-hadoop连接器来访问MongoDB数据库。
示例程序
示例程序非常的简单,把数据从数据库里全部读出,使用map来把每条记录里用户ID对应加入的群组ID转换成一个Set,再使用 reduceByKey来把相同用户ID的set合并到一起,存入数据库即可。
import com.mongodb.BasicDBObjectimport com.mongodb.hadoop.{MongoInputFormat, MongoOutputFormat}import org.apache.hadoop.conf.Configurationimport org.apache.spark.{SparkConf, SparkContext}import org.bson.BSONObjectimport scala.collection.JavaConverters._object QQGroup { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("QQGroup") val sc = new SparkContext(sparkConf) val inputConfig = new Configuration() inputConfig.set("mongo.input.uri", "mongodb://192.168.31.121:27017/db.userGroup") inputConfig.set("mongo.input.fields", """{"userId":1, "groupId":1, "_id":0}""") inputConfig.set("mongo.input.noTimeout", "true") val documentRDD = sc.newAPIHadoopRDD( inputConfig, classOf[MongoInputFormat], classOf[Object], classOf[BSONObject]) val userRDD = documentRDD.map { case (_, doc) => (getValue(doc, "userId"), getValue(doc, "groupId")) }.reduceByKey(_ ++ _) val resultRDD = userRDD.map { case (userId, groupIds) => val o = new BasicDBObject() o.put("groupIds", groupIds.asJava) userId -> o } val outputConfig = new Configuration() outputConfig.set("mongo.output.uri", "mongodb://192.168.31.121:27017/db_result.userGroup") resultRDD.saveAsNewAPIHadoopFile( "file://this-is-completely-unused", classOf[Object], classOf[BSONObject], classOf[MongoOutputFormat[Object, BSONObject]], outputConfig) } def getValue(dbo: BSONObject, key: String) = { val value = dbo.get(key) if (value eq null) "" else value.asInstanceOf[String] }}
MongoDB官方提供了Hadoop连接器,Spark可以使用mongo-hadoop连接器来读、写MongoDB数据库。 主要的输入配置荐有:
mongo.input.uri: MongoDB的连接URI
mongo.input.fields: 指定返回哪些数据,与db.query里的第2个参数功能一样
mongo.input.query: MongoDB的查询参数
相应的MongoDB也提供了一系列的输出参数,如:
mongo.output.uri: MongoDB的连接URI
sc.newAPIHadoopRDD()方法有4个参数,分别为:配置、输入格式化类、待映射数据主键类型、待映射数据类型。
主要的操作代码:
val userRDD = documentRDD.map { case (_, doc) => (getValue(doc, "userId"), Set(getValue(doc, "groupId"))) }.reduceByKey(_ ++ _) val resultRDD = userRDD.map { case (userId, groupIds) => val o = new BasicDBObject() o.put("groupIds", groupIds.asJava) userId -> o }
先使用map方法获取userId和groupId,并把groupId转换为一个Set。
在把数据转换成Tuple2,就是一个KV的形式以后,我们就可以调用一系列的转换方法来对RDD进行操作,这里使用reduceByKey方法来将同一个userId的所以value都合并在一起。这样我们就有了所有用户对应加入的群组 的一个RDD集了。
(RDD上有两种类型的操作。一种是"变换",它只是描述了待进行的操作指令,并不会触发实际的计算;另一种是"动作", 它将触发实际的计算动作,这时候系统才会实际的从数据源读入数据,操作内存,保存数据等)
最后使用resultRDD.saveAsNewAPIHadoopFile()方法来把计算结果存入MongoDB,这里的一个参数:用于指定 HDFS的存储位置并不会使用到,因为mongo-hadoop将会使用mongo.output.uri指定的存储URI连接地址来保存数据。
关于怎么实践Spark问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注行业资讯频道了解更多相关知识。
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.