Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What are the basics of GraphX

2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly explains "what are the basics of GraphX". Friends who are interested may wish to have a look. The method introduced in this paper is simple, fast and practical. Now let the editor take you to learn "what are the basics of GraphX"?

Spark GraphX is a distributed graph processing framework. Based on the Spark platform, Spark GraphX provides a simple, easy-to-use and colorful interface for graph computing and graph mining, which greatly facilitates the needs of distributed graph processing. Because the bottom layer of Spark GraphX is based on Spark, it is naturally a distributed graph processing system. The distributed or parallel processing of the graph actually divides the graph into many sub-graphs, and then we calculate these sub-graphs separately, which can be iterated and calculated in stages, that is, parallel computing for the graph.

When designing GraphX, both point segmentation and GAS are mature, and they are optimized in design and coding, and the best balance between function and performance is found. Like Spark itself, each submodule has a core abstraction. The core abstraction of GraphX is Resilient Distributed Property Graph, a directed multigraph with attributes on both vertices and edges. It extends the abstraction of Spark RDD, with both Table and Graph views, and requires only one physical storage. Both views have their own unique operators, resulting in flexible operation and execution efficiency.

GraphX Foundation

Class member

In GraphX, the basic class of a graph is Garph, which contains two RDD: one is edge RDD and the other is vertex RDD. You can build a graph with a given edge RDD and vertex RDD. Once the graph is built, you can use edges () and vertices () to access the set of edges and vertices. VD and ED represent user-defined vertex and edge classes, and the corresponding graph is a parameterized generic type Graph [VD,ED]. A graph in GraphX must have vertex and edge attributes. Vertice and Edge in GraphX hold VerticeID values, not references to vertices. The graph is distributed in the cluster and does not belong to a single JVM, so the vertices of one edge may be on different cluster nodes.

Vertex: Vertice (VertexId, VD)

Abstract class VertexRDD [VD] extends RDD [(VertexId, VD)]

Abstract value member

InnerJoin leftJoin mapValues

Specific value member

Collect count distinct filter foreach groupBy isEmpty persist map reduce sortBy toString

Edge: Edge (VertexId, VertexId, ED)

Class Edge [ED] (srcId:VertexId, dstId:VertexId, attire:E

Abstract class EdgeRDD [ED] extends RDD[Edge[ED]] class EdgeTriplet [VD, ED] extends Edge [ED]

Value member

Attr srcId srcAttr dstId dstAttr

Abstract value member

InnerJoin mapValues reverse

Specific value member

+ + aggregate cache collect count distinct filter foreach groupBy isEmpty map persist reduce sortBy toString

Figure: Graph (VD, ED)

Abstract class Graph [VD,ED] extend Serializableclass GraphOps [VD,ED] extends Serializable

Value member

CollectEdges collectNeiborIds collectNeibors degrees filter inDegrees joinVertices numEdges numVertices outDegrees pageRank personalizedPageRank pickRandomVertex pregel triangleCount

Abstract value member

Cache edges mapEdges mapTriplets mapVertices mask outerJoinVertices persist reverse subgraph triplets vertices

Specific value member

AggregateMessages mapEdges mapTriplets

GraphX instance

Quote

Import org.apache.spark._import org.apache.spark.graphx._import org.apache.spark.rdd.RDD

Composition

There are many ways to construct a property graph from an original file, RDD. The most common method is to use Graph object. The following code generates property diagrams from the RDD collection.

/ / assume that SparkContext has been constructed val sc: SparkContext// creation point RDDval users: RDD [(VertexId, (String, String))] = sc.parallelize (Array ((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin") "prof")), (2L, ("istoica", "prof") / / create edge RDDval relationships: RDD [Edge [string]] = sc.parallelize (Edge (3L, 7L, "collab"), Edge (5L, 3L, "advisor") Edge (2L, 5L, "colleague"), Edge (5L, 7L, "pi") Edge (5L, 0L, "colleague") / / define a default user Avoid relationships with non-existing users val defaultUser = ("John Doe", "Missing") / / construct Graphval graph = Graph (users, relationships, defaultUser)

Caching

/ / caching. By default, images cached in memory are forced to be cleaned when memory is tight, using the LRU algorithm graph.cache () graph.persist (StorageLevel.MEMORY_ONLY) graph.unpersistVertices (true)

Points, edges, and triples

The following code uses the Edge sample class. The edge has an srcId and a dstId corresponding to the source and target vertices, respectively. In addition, the Edge class has an attr member for storing edge attributes. A diagram can be deconstructed into corresponding vertices and edges with graph.vertices and graph.edges members, respectively. Graph.vertices returns a VertexRDD [(String, String)], which inherits from RDD [(VertexID, (String, String))]. So we can deconstruct this tuple with scala's case expression. On the other hand, graph.edges returns an EdgeRDD that contains an Edge [string] object, and we can also use the type constructor of the case class.

In addition to the vertex and edge views of the property graph, GraphX also contains a triple view that logically saves the properties of vertices and edges as a RDD [EdgeTriplet [VD, ED]], which contains instances of the EdgeTriplet class. The EdgeTriplet class inherits from the Edge class and adds srcAttr and dstAttr members, which contain the properties of the source and destination, respectively. We can use a triple view to render a collection of strings to describe the relationship between users.

/ / find the professional postdoc graph.vertices.filter {case (id, (name, pos)) = > pos = = "postdoc"}. Collect// calculates the number of edges where the source vertex ID is greater than the target vertex ID (e = > e.srcId > e.dstId). Countgraph.edges.filter {case Edge (src, dst) Prop) = > src > dst}. Count// uses triple views to describe relational facts val facts: RDD [String] = graph.triplets.map (triplet = > triplet.srcAttr._1 + "is the" + triplet.attr + "of" + triplet.dstAttr._1) facts.collect.foreach (println (_))

Degree, entry degree, exit degree

Just as RDDs has basic operations map, filter, and reduceByKey, property graphs also have basic collection operations that take user-defined functions and produce new diagrams containing transformation features and structures. The core operations defined in Graph are optimized implementations. Convenient operations that are represented as a combination of core operations are defined in GraphOps. However, because of the implicit conversion of Scala, the operations defined in GraphOps can be used automatically as members of Graph. For example, we can calculate the penetration of each vertex (defined in GraphOps) in the following way. The reason for distinguishing core diagram operations from GraphOps is to support different graph representations in the future. Each graphical representation must provide an implementation of the core operations and reuse many of the useful operations defined in GraphOps.

Val degrees: VertexRDD [Int] = graph.degrees;degrees.collect () .foreach (println) val inDegrees: VertexRDD [Int] = graph.inDegreesinDegrees.collect () .foreach (println) val outDegrees: VertexRDD [Int] = graph.outDegreesoutDegrees.collect () .foreach (println)

Attribute operations: modify the properties of vertices and edges

Attribute operations each operation produces a new graph that contains attributes of vertices or edges modified by a user-defined map operation. The Map operation obtains the new graph according to some characteristics of the original graph, and the structure of the original graph is unchanged. An important feature of these operations is that it allows the resulting graphics to reuse the structural index (indices) of the original graphics. The following two lines of code are logically equivalent, but the first one is not a graph operation, it does not save the structural index, so it does not benefit from GraphX system optimization. The Map operation obtains the new graph according to some characteristics of the original graph, and the structure of the original graph is unchanged. These operations are often used to initialize drawings, for specific calculations or to deal with properties that the project does not need. For example, given a graph whose vertex feature contains the degree, we initialize it for PageRank.

/ / Vertex conversion, vertex age+1//RDD operation, reconstruct a new graph, do not save the structural index, will not be optimized by the system val newVertices = graph.vertices.map {case (id, attr) = > (id, (attr._1 + "- 1", attr._2 + "- 2"))} val newGraph2 = Graph (newVertices, graph.edges) / / graph Map operation Optimized by the system val newGraph3 = graph.mapVertices ((id, attr) = > (id, (attr._1 + "- 1", attr._2 + "- 2")) / / construct a new graph The vertex attribute is val inputGraph: Graph [Int, String] = graph.outerJoinVertices (graph.outDegrees) ((vid, _, degOpt) = > degOpt.getOrElse (0)) / / construct a new graph based on the graph whose vertex attribute is the degree. Initialize edges and points according to PageRank algorithm val outputGraph: Graph [Double, Double] = inputGraph.mapTriplets (triplet = > 1.0 / triplet.srcAttr). MapVertices ((id, _) = > 1.0) / / create a new graph. The data type of vertex VD is User. And do type conversion from graph case class User (name: String, pos: String, inDeg: Int, outDeg: Int) val initialUserGraph: Graph [User, String] = graph.mapVertices {case (id, (name, age)) = > User (name, pos, 0,0)} / initialUserGraph connects with inDegrees and outDegrees (RDD) And modify the inDeg value in initialUserGraph, outDeg value val userGraph = initialUserGraph.outerJoinVertices (initialUserGraph.inDegrees) {case (id, u, inDegOpt) = > User (u.name, u.pos, inDegOpt.getOrElse (0), u.outDeg)} .outerJoinVerticals (initialUserGraph.outDegrees) {case (id, u, outDegOpt) = > User (u.name, u.pos, u.inDeg) OutDegOpt.getOrElse (0)} userGraph.vertices.collect.foreach (v = > println (s "${v._2.name} inDeg: ${v._2.inDeg} outDeg: ${v._2.outDeg}") / / userGraph.vertices.filter {case (id, u) = > u.inDeg = = u.outDeg}. Collect.foreach {case (id, property) = > println (property.name)}

Custom type

Join operation

Map operation

Structural operation

/ A subgraph composed of defined vertices val subGraph = graph.subgraph (vpred = (id, attr) = > attr._2! = "Missing") subGraph.vertices.collect (). Foreach (println (_)) subGraph.triplets.map (triplet = > triplet.srcAttr._1 + "is the" + triplet.attr + "of" + triplet.dstAttr._1). Collect (). Reverse operation of foreach (println (_)) / / graph All the edges of the new graph are in the opposite direction. Without modifying the vertex or edge attribute and the number of edges that remain unchanged, it can effectively achieve unnecessary data movement or copy var rGraph = graph.reverse//Mask operation and construct a new graph based on the input graph. Achieve a restrictive effect val ccGraph = graph.connectedComponents () val validGraph = graph.subgraph (vpred = (id, attr) = > attr._2! = "Missing") val validCCGraph = ccGraph.mask (validGraph)

Mask

Reverse of graph

Subgraph

Aggregation operation

/ / calculate the total number and age of followers older than their own val olderFollowers: VertexRDD [(Int, Double)] = graph.mapReduceTriplets [(Int, Double)] (/ / Map function triplet = > {if (triplet.srcAttr > triplet.dstAttr) {Iterator ((triplet.dstId, (1, triplet.srcAttr))} else {Iterator.empty}} / / Reduce function (a, b) = > (a. Case 1 + b. Girls 1, a. Girls 2 + b. Girls 2) / / calculate the average age of those who are older than their followers val avgAgeOfOlderFollowers: VertexRDD [Double] = olderFollowers.mapValues ((id, value) = > value match {case (count) TotalAge) = > totalAge / count}) avgAgeOfOlderFollowers.collect.foreach (println (_)) / / defines a Reduce function to calculate the larger points in the graph def max (a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {if (a.room2 > b.room2) an else b} val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce (max) println (s "maxInDegree: $maxInDegree") val maxOutDegree: (VertexId) Int) = graph.outDegrees.reduce (max) println (s "maxOutDegree: $maxOutDegree") val maxDegrees: (VertexId, Int) = graph.degrees.reduce (max) println (s "maxDegrees: $maxDegrees")

Adjacent polymerization

/ / calculate neighbor-related functions, which are quite expensive and require a lot of repetitive information for their communication, so the same calculation is recommended as mapReduceTriplets val neighboorIds: VertexRDD [Array [VertexId]] = graph.collectNeighborIds (EdgeDirection.Out) val neighboors:VertexRDD [Array [(VertexId, Double)]] = graph.collectNeighbors (EdgeDirection.Out)

Pregel API

/ / Pregel API . Calculate the shortest path to a single source / / construct a random graph val numVertices = 100val numEParts = 2val mu = 4.0val sigma = 1.3val graph2 = GraphGenerators.logNormalGraph (sc, numVertices, numEParts, mu, sigma) .mapEdges (e = > e.attr.toDouble) / / define a source point val sourceId: VertexId = 42 impulse / initialize all points of the graph, except that the point with the same value as the specified source point is 0.0 The other point is infinite val initialGraph = graph2.mapVertices ((id, _) = > if (id = = sourceId) 0.0 else Double.PositiveInfinity) / / Pregel has two parameter lists. The first parameter list includes: initialization message, large number of iterations, Out of edges. The second parameter list includes user-defined functions that accept messages, compute messages, and merge messages. Val sssp = initialGraph.pregel (Double.PositiveInfinity) (/ / Point programs (id, dist, newDist) = > math.min (dist, newDist), / / send messages triplet = > {if (triplet.srcAttr + triplet.attr)

< triplet.dstAttr) { Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) } else { Iterator.empty } }, //合并消息 (a, b) =>

Math.min (a, b)) println (sssp.vertices.collect.mkString ("\ n"))

Main graph algorithm

Val pageRankGraph = graph2.pageRank (0.001) pageRankGraph.vertices.sortBy (_. _ 2 False) .saveAsTextFile ("/ user/hadoop/data/temp/graph/graph.pr") pageRankGraph.vertices.top (5) (Ordering.by (_. _ 2)) .foreach (println) val connectedComponentsGraph = graph2.connectedComponents () connectedComponentsGraph.vertices.sortBy (_. _ 2) False) .saveAsTextFile ("/ user/hadoop/data/temp/graph/graph.cc") connectedComponentsGraph.vertices.top (5) (Ordering.by (_. _ 2)). One of the main uses of foreach (println) / / TriangleCount is for community discovery to keep sourceId less than destIdval graph3 = GraphLoader.edgeListFile (sc, path, true) val triangleCountGraph = graph3.triangleCount () triangleCountGraph.vertices.sortBy (_. _ 2) False) .saveAsTextFile ("/ user/hadoop/data/temp/graph/graph.tc") triangleCountGraph.vertices.top (5) (Ordering.by (_. _ 2)) .foreach (println)

TriangleCount

Connected Components

PageRank

Other actions

Var path = "/ user/Hadoop/data/temp/graph/graph.txt" var minEdgePartitions = 1var canonicalOrientation = false / / if sourceId

< destId this value is trueval graph2 = GraphLoader.edgeListFile(sc, path, canonicalOrientation, minEdgePartitions,StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY)//通过GraphGenerators构建一个随机图val numVertices = 100val numEParts = 2val mu = 4.0val sigma = 1.3val graph: Graph[Double, Int] = GraphGenerators.logNormalGraph(sc, numVertices, numEParts, mu, sigma).mapVertices((id, _) =>

Id.toDouble) graph.triplets.collect.foreach (triplet = > println (triplet.srcId + "-" + triplet.srcAttr + "-" + triplet.attr + "-" + triplet.dstId + "-" + triplet.dstAttr)) val setA: VertexRDD [Int] = VertexRDD (sc.parallelize (0L until 100L) .map (id = > (id, 1)) val rddB: RDD [(VertexId) Double)] = sc.parallelize (0L until 100L) .flatMap (id = > List ((id, 1.0), (id, 2.0)) val setB: VertexRDD [Double] = setA.aggregateUsingIndex (rddB, _ + _) val setC: VertexRDD [Double] = setA.innerJoin (setB) ((id, a, b) = > a + b) so far I believe that you have a deeper understanding of "what is the basic knowledge of GraphX?" you might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 302

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report