In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/02 Report--
Go directly to the code:
Package horizon.graphx.utilimport java.security.InvalidParameterExceptionimport horizon.graphx.util.CollectionUtil.CollectionHelperimport org.apache.spark.graphx._import org.apache.spark.rdd.RDDimport org.apache.spark.storage.StorageLevelimport scala.collection.mutable.ArrayBufferimport scala.reflect.ClassTag/** * Created by yepei.ye on 2017-1-19. * Description: the N-degree relational nodes used to calculate these nodes for the specified nodes in the figure Output the path length between these nodes and the source node id * / object GraphNdegUtil {val maxNDegVerticesCount = 10000 val maxDegree = 1000 / * calculate the N-degree relationship of the node * * @ param edges * @ param choosedVertex * @ tparam ED * @ return * / def aggNdegreedVertices [ED: ClassTag] (edges: RDD [(VertexId, VertexId)], choosedVertex: RDD [VertexId], degree: Int): VertexRDD [Map [Int, Set [VertexId] = {val simpleGraph = Graph.fromEdgeTuples (edges, 0) Option (PartitionStrategy.EdgePartition2D), StorageLevel.MEMORY_AND_DISK_SER, StorageLevel.MEMORY_AND_DISK_SER) aggNdegreedVertices (simpleGraph, choosedVertex, degree)} def aggNdegreedVerticesWithAttr [VD: ClassTag, ED: ClassTag] (graph: Graph [VD, ED], choosedVertex: RDD [VertexId], degree: Int, sendFilter: (VD, VD) = > Boolean = (_: VD, _: VD) = > true): VertexRDD [Int, Set [VD]] = {val ndegs: VertexRDD [Int, Set [VD]] = {val ndegs: VertexRDD [Map, Map [Map] = Int (Map, Int, Int) SendFilter) val flated: RDD [Vertical [VD]] = ndegs.flatMap (e = > e._2.flatMap (t = > t._2.map (s = > Ver (e.ver1, s, t.room1, null.asInstanceOf [VD]) .StorageLevel.MEMORY_AND_DISK_SER val matched: RDD [VerVD] = flated.map (e = > (e.id) Flated.unpersist (blocking = false) ndegs.unpersist (blocking = false) val grouped: RDD [(VertexId, Map [Int, Set [VD]])] = matched.map (e = > (e.source, ArrayBuffer (e)). ReduceByKey (_ + = _) .map (e = > (e.source, ArrayBuffer (e)) .map E._2.map (t = > (t.degree, Set (t.attr)). ReduceByKey (_ + _) .toMap) matched.unpersist (blocking = false) VertexRDD (grouped)} def aggNdegreedVertices [VD: ClassTag, ED: ClassTag] (graph: Graph [VD, ED], choosedVertex: RDD [VertexId], degree: Int, sendFilter: (VD, VD) = > Boolean = (_: VD) _: VD) = > true): VertexRDD [Map [Int, Set [VertexId] = {if (degree
< 1) { throw new InvalidParameterException("度参数错误:" + degree) } val initVertex = choosedVertex.map(e =>Var g: Graph [DegVertex [VD], Int] = graph.outerJoinVertices (graph.degrees) ((_, old, deg) = > (deg.getOrElse (0), old)) .subgraph (vpred = (_, a) = > a.room1 {DegVertex (old._2, hasReceivedMsg.getOrElse (false), ArrayBuffer ((id)) 0) / / initialize the node to send the message}). MapEdges (_ = > 0). Cache () / / simplified edge attribute choosedVertex.unpersist (blocking = false) var I = 0 var prevG: Graph [DegVertex [VD], Int] = null var newVertexRdd: VertexRDD [ArrayBuffer [(VertexId, Int)]] = null while (I
< degree + 1) { prevG = g //发第i+1轮消息 newVertexRdd = prevG.aggregateMessages[ArrayBuffer[(VertexId, Int)]](sendMsg(_, sendFilter), (a, b) =>ReduceVertexIds (a + + b). G.outerJoinVertices (StorageLevel.MEMORY_AND_DISK_SER) g = g.outerJoinVertices (newVertexRdd) ((vid, old, msg) = > if (msg.isDefined) updateVertexByMsg (vid, old) Msg.get) else old.copy (init = false). Cache () prevG.unpersistVertices (blocking = false) prevG.edges.unpersist (blocking = false) newVertexRdd.unpersist (blocking = false) I + = 1} newVertexRdd.unpersist (blocking = false) val maped = g.vertices.join (initVertex) .mapValues (e = > sortResult (e.gg1). StorageLevel.MEMORY_AND_DISK_SER) initVertex.unpersist () g.unpersist (blocking = false) VertexRDD (maped)} private case class Ver [VD: ClassTag] (source: VertexId) Id: VertexId, degree: Int, attr: VD = null.asInstanceOf [VD]) private def updateVertexByMsg [VD: ClassTag] (vertexId: VertexId, oldAttr: DegVertex [VD], msg: ArrayBuffer [(VertexId, Int)]): DegVertex [VD] = {val addOne = msg.map (e = > (e.thermo1, e.hyd2 + 1) val newMsg = reduceVertexIds (oldAttr.degVertices + + addOne) oldAttr.copy (init = msg.nonEmpty, degVertices = newMsg)} private def sortResult [VD] (ClassTag: degs [degs]): DegVertex [degs] ReduceByKey (_ +). ToMap case class DegVertex [VD: ClassTag] (var attr: VD, init: Boolean = false, degVertices: ArrayBuffer [(VertexId, Int)]) case class VertexDegInfo [VD: ClassTag] (var attr: VD, init: Boolean = false, degVertices: ArrayBuffer [(VertexId, Int)]) private def sendMsg [VD: ClassTag] (e: EdgeContext [DegVertex [VD], Int, ArrayBuffer [(VertexId, VertexId)]] SendFilter: (VD, VD) = > Boolean): Unit = {try {val src = e.srcAttr val dst = e.dstAttr / / message if is received only if dst is in ready status (src.degVertices.size
< maxNDegVerticesCount && (src.init || dst.init) && dst.degVertices.size < maxNDegVerticesCount && !isAttrSame(src, dst)) { if (sendFilter(src.attr, dst.attr)) { e.sendToDst(reduceVertexIds(src.degVertices)) } if (sendFilter(dst.attr, dst.attr)) { e.sendToSrc(reduceVertexIds(dst.degVertices)) } } } catch { case ex: Exception =>Println (s "= error found: exception:$ {ex.getMessage}," + s "edgeTriplet: (srcId:$ {e.srcId}, srcAttr: (${e.srcAttr.attr}, ${e.srcAttr.init}, ${e.srcAttr.degVertices.size})," + s "dstId:$ {e.dstId}, dstAttr: (${e.dstAttr.attr}, ${e.dstAttr.init}, ${e.dstAttr.degVertices.size}) Attr:$ {e.attr} ") ex.printStackTrace () throw ex}} private def reduceVertexIds (ids: ArrayBuffer [(VertexId, Int)]): ArrayBuffer [(VertexId, Int)] = ArrayBuffer () + + = ids.reduceByKey (Math.min) private def isAttrSame [VD: ClassTag] (a: DegVertex [VD], b: DegVertex [VD]): Boolean = a.init = = b.init & & allKeysAreSame (a.degVertices, b.degVertices) private def allKeysAreSame (a: ArrayBuffer [(VertexId, Int)], b: ArrayBuffer [(VertexId)] Int)]): Boolean = {val aKeys = a.map (e = > e.room1). ToSet val bKeys = b.map (e = > e.room1). ToSet if (aKeys.size! = bKeys.size | | aKeys.isEmpty) return false aKeys.diff (bKeys). IsEmpty & & bKeys.diff (aKeys). IsEmpty}}
The sortResult method uses the reduceByKey method for collections of type Traversable [(Kjinger V)]. This method is self-encapsulated and needs to be imported when used. The code is as follows:
/ * Created by yepei.ye on 2016-12-21 * Description: * / object CollectionUtil {/ * add reduceByKey related methods * * @ param collection * @ param kt * @ param vt * @ tparam K * @ tparam V * / implicit class CollectionHelper [K, V] (collection: Traversable [(K, V)]) (implicit kt: ClassTag [K], vt: ClassTag [V]) {def reduceByKey (f: (V, V) = > V): Traversable [(K, V)] V)] = collection.groupBy (_. _ 1). Map {case (_: K, values: Traversable [(K, V)]) = > values.reduce ((a, b) = > (a, b))} / * reduceByKey Returns the collection of elements dropped by reduce * * @ param f * @ return * / def reduceByKeyWithReduced (f: (V, V) = > V) (implicit kt: ClassTag [K], vt: ClassTag [V]): (Traversable [(K, V)], Traversable [(K, V)]) = {val reduced: ArrayBuffer [(K, V)] = ArrayBuffer () val newSeq = collection.groupBy (_. _ 1). Map {case (_: K, values: Traversable [(K, V)] V)]) = > values.reduce ((a, b) = > {val newValue: v = f (a.room2, b.room2) val reducedValue: v = if (newValue = = a.room2) b.room2 else a.room2 val reducedPair: (K, V) = (a.room1, reducedValue) reduced + = reducedPair (a.room1, newValue)}} (newSeq, reduced.toTraversable)}
Summary
The above is the whole content of the SparkGraphx calculation of the N-degree relationship node source code of the designated node. I hope it will be helpful to everyone. Interested friends can refer to: talk about seven common Hadoop and Spark project cases Spark broadcast variables and accumulator usage code sample Spark introduction, etc., if you have any questions, please leave a message, the editor will reply to you in time.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.