Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize maximal clique Mining in Graph by Spark Graphx

2025-01-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

Today, I will talk to you about how Spark Graphx can achieve the maximum group mining in the picture. Many people may not know much about it. In order to make you understand better, the editor has summarized the following content for you. I hope you can get something according to this article.

Spark graphx does not provide maximum clique mining algorithm.

The current maximum clique algorithms are all serialization algorithms, based on the Bron-Kerbosch algorithm.

# ideas: #

Spark graphx provides algorithms for connected graphs. Connected graphs and maximal cliques are concepts in undirected graphs, and maximal cliques are subsets of connected graphs.

Using spark graphx to find the connected graph, and using the serialization maximal clique algorithm to find the maximal clique (pseudo-parallelization) from each connected graph.

For the graph with strong correlation, the connected graph is very large, and the serialization maximum clique algorithm will still take a long time. Here, the idea of pruning is used to reduce the amount of sample data, but for large graphs, the optimization space is limited.

Looking forward to the real parallelization of the maximum clique algorithm

# configuration file: #

Graph_data_path=hdfs://localhost/graph_data out_path=hdfs://localhost/clique ck_path=hdfs://localhost/checkpoint numIter=50 pruning times count=3 maximum number of vertices size algorithm=2 maximum clique algorithm, 1: the number of vertices after individual 2:jgrapht percent=90 pruning, accounting for the percentage of the previous one, if the pruning is finished, there is still 90% of the data left Then the pruning efficiency is not high spark.master=local spark.app.name=graph spark.serializer=org.apache.spark.serializer.KryoSerializer spark.yarn.executor.memoryOverhead=20480 spark.yarn.driver.memoryOverhead=20480 spark.driver.extraJavaOptions=-XX:+UseG1GC-XX:+UseCompressedOops-XX:+DisableExplicitGC spark.executor.extraJavaOptions=-XX:+UseG1GC-XX:+UseCompressedOops-XX:+DisableExplicitGC spark.driver.maxResultSize=10g spark.default.parallelism=60.

# sample data: #

{"src": "0", "dst": "1"} {"src": "0", "dst": "2"} {"src": "0", "dst": "3"} {"src": "1", "dst": "0"} {"src": "2", "dst": "1"} {"src": "3", "dst": "5"} {"src": "4" "dst": "6"} {"src": "5", "dst": "4"} {"src": "6", "dst": "5"} {"src": "3", "dst": "2"} {"src": "2", "dst": "3"} {"src": "6", "dst": "4" {"src": "3", "dst": "4"} {"src": "4" "dst": "3"} {"src": "2", "dst": "6"} {"src": "6", "dst": "2"} {"src": "6", "dst": "7"} {"src": "7", "dst": "6"}

# sample diagram: #

# output: #

0,1,2 0,2,3 3,4,5 4,5,6

# Code implementation: #

Import java.util import java.util.Propertiesimport org.apache.spark.broadcast.Broadcast import org.apache.spark.graphx. {Edge, Graph} import org.apache.spark.rdd.RDD import org.apache.spark.sql. {Row, SQLContext} import org.apache.spark.storage.StorageLevel import org.apache.spark. {SparkConf, SparkContext} import org.jgrapht.alg.BronKerboschCliqueFinder import org.jgrapht.graph. {DefaultEdge SimpleGraph} import scala.collection.JavaConverters._ import scala.collection.mutable object ApplicationTitan {def main (args: Array [String]) {val prop = new Properties () prop.load (getClass.getResourceAsStream ("/ config.properties") val graph_data_path = prop.getProperty ("graph_data_path") val out_path = prop.getProperty ("out_path") val ck_path = prop .getProperty ("ck_path") val count = Integer.parseInt (prop.getProperty ("count")) val numIter = Integer.parseInt (prop.getProperty ("numIter")) val algorithm = Integer.parseInt (prop.getProperty ("algorithm")) val percent = Integer.parseInt (prop.getProperty ("percent") val conf = new SparkConf () try {Runtime.getRuntime.exec ("hdfs dfs") -rm-r "+ out_path) / / Runtime.getRuntime.exec (" cmd.exe / C rd / s / Q "+ out_path)} catch {case ex: Exception = > ex.printStackTrace (System.out)} prop.stringPropertyNames (). AsScala.foreach (s = > {if (s.startsWith (" spark ") {conf.set (s) Prop.getProperty (s)}}) conf.registerKryoClasses (Array (getClass)) val sc = new SparkContext (conf) sc.setLogLevel ("ERROR") sc.setCheckpointDir (ck_path) val sqlc = new SQLContext (sc) try {val e_df = sqlc.read / / .json (graph_) Data_path) .parquet (graph_data_path) var e_rdd = e_df .mapPartitions (it = > {it.map ({case Row (dst: String) Src: String) = > val src_long = src.toLong val dst_long = dst.toLong if (src_long

< dst_long) (src_long, dst_long) else (dst_long, src_long) }) }).distinct() e_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) var bc: Broadcast[Set[Long]] = null var iter = 0 var bc_size = 0 //剪枝 while (iter List((x._1, 1), (x._2, 1))) .reduceByKey((x, y) =>

X + y) .filter (x = > x.room2 > = count-1) .mapPartitions (it = > it.map (x = > x.room1)) val bc_value = temp.collect (). ToSet bc = sc.broadcast (bc_value) e_rdd = e_rdd.filter (x = > bc.value.contains (x.room1) & & bc.value Println (x.room2) e_rdd.persist (StorageLevel.MEMORY_AND_DISK_SER) iter + = 1 if (bc_size! = 0 & & bc_value.size > = bc_size * percent / 100) {println ("total iter:" + iter) iter = Int.MaxValue} bc_size = bc _ value.size} / / Construction diagram val edge: RDD [Edge [long]] = e_rdd.mapPartitions (it = > it.map (x = > Edge (x.figure 1) X.room2)) val graph = Graph.fromEdges (edge, 0, StorageLevel.MEMORY_AND_DISK_SER StorageLevel.MEMORY_AND_DISK_SER) / / connected graph val cc = graph.connectedComponents () .vertices cc.persist (StorageLevel.MEMORY_AND_DISK_SER) cc.join (e_rdd) .mapPartitions (it = > it.map (x = > (math.random * 10) .toInt.toString.concat (x._2._1.toString), (x.partitions 1) (list, v) = > list: + v, (list1, list2) = > list1: list2) .mapPartitions (it = > it.map (x = > (x._1.substring (1), x.room2)) .SecretateByKey (List [(Long, Long)] ()) ((list1, list2) = > list1:: list2, (list3) List4) = > list3:: list4) .filter (x = > x._2.size > = count-1) .flatMap (x = > {if (algorithm = = 1) find (x, count) else find2 (x Count) .mapPartitions (it = > {it.map ({case set = > var temp = "" set.asScala.foreach (x = > temp + = x + ",") temp.substring (0) Temp.length-1) case _ = >}) / / .coalesce (1) .saveAsTextFile (out_path)} catch {case ex: Exception = > ex.printStackTrace (System.out)} sc.stop ()} / / def find (x: (String, List [(Long, Long)]) Count: Int): mutable.Set[ util.Set[ string]] = {println (x.room1 + "| s |" + x._2.size) println ("BKCliqueFinder---" + x.room1 + "-" + System.currentTimeMillis ()) val neighbors = new util.HashMap [String, util.Set[ string]] val finder = new CliqueFinder (neighbors) Count) x._2.foreach (r = > {val v1 = r._1.toString val v2 = r._2.toString if (neighbors.containsKey (v1)) {neighbors.get (v1) .add (v2)} else {val temp = new util.HashSet [String] () temp.add (v2) neighbors.put (v1) Temp)} if (neighbors.containsKey (v2)) {neighbors.get (v2) .add (v1)} else {val temp = new util.HashSet [String] () temp.add (v1) neighbors.put (v2) Temp)}}) println ("BKCliqueFinder---" + x.room1 + "-" + System.currentTimeMillis () finder.findMaxCliques () .asScala} / / jgrapht def find2 (x: (String, List [(Long, Long)]) Count: Int): set [util.Set[ string]] = {println (x.room1 + "| s |" + x._2.size) println ("BKCliqueFinder---" + x.room1 + "-" + System.currentTimeMillis ()) val to_clique = new SimpleGraph [String DefaultEdge] (classOf [DefaultEdge]) x._2.foreach (r = > {val v1 = r._1.toString val v2 = r._2.toString to_clique.addVertex (v1) to_clique.addVertex (v2) to_clique.addEdge (v1) V2)}) val finder = new BronKerboschCliqueFinder (to_clique) val list = finder.getAllMaximalCliques.asScala var result = set [util.Set [string]] () list.foreach (x = > {if (x.size () > = count) result = result + x}) println ("BKCliqueFinder---" + x.room1 + "-" + System.currentTimeMillis () result}})

/ / self-implemented maximal clique algorithm

Import java.util.*; / * [@ author] (https://my.oschina.net/arthor) mopspecial@gmail.com * [@ date] (https://my.oschina.net/u/2504391) 2017-7-31 * / public class CliqueFinder {private Map neighbors; private Set nodes; private Set maxCliques = new HashSet (); private Integer minSize; public CliqueFinder (Map neighbors, Integer minSize) {this.neighbors = neighbors This.nodes = neighbors.keySet (); this.minSize = minSize;} private void bk3 (Set clique, List candidates, List excluded) {if (candidates.isEmpty () & & excluded.isEmpty ()) {if (! clique.isEmpty () & & clique.size () > = minSize) {maxCliques.add (clique);} return } for (String s: degeneracy_order (candidates)) {List new_candidates = new ArrayList (candidates); new_candidates.retainAll (neighbors.get (s)); List new_excluded = new ArrayList (excluded); new_excluded.retainAll (neighbors.get (s)); Set nextClique = new HashSet (clique); nextClique.add (s) Bk2 (nextClique, new_candidates, new_excluded); candidates.remove (s); excluded.add (s) }} private void bk2 (Set clique, List candidates, List excluded) {if (candidates.isEmpty () & & excluded.isEmpty ()) {if (! clique.isEmpty () & & clique.size () > = minSize) {maxCliques.add (clique);} return;} String pivot = pick_random (candidates) If (pivot = = null) {pivot = pick_random (excluded);} List tempc = new ArrayList (candidates); tempc.removeAll (neighbors.get (pivot)); for (String s: tempc) {List new_candidates = new ArrayList (candidates); new_candidates.retainAll (neighbors.get (s)); List new_excluded = new ArrayList (excluded) New_excluded.retainAll (neighbors.get (s)); Set nextClique = new HashSet (clique); nextClique.add (s); bk2 (nextClique, new_candidates, new_excluded); candidates.remove (s); excluded.add (s);} private List degeneracy_order (List innerNodes) {List result = new ArrayList () Map deg = new HashMap (); for (String node: innerNodes) {deg.put (node, neighbors.get (node). Size ());} while (! deg.isEmpty ()) {Integer min = Collections.min (deg.values ()); String minKey = null For (String key: deg.keySet ()) {if (deg.get (key) .equals (min)) {minKey = key; break;}} result.add (minKey); deg.remove (minKey) For (String k: neighbors.get (minKey)) {if (deg.containsKey (k)) {deg.put (k, deg.get (k)-1);} return result } private String pick_random (List random) {if (random! = null & &! random.isEmpty ()) {return random.get (0);} else {return null;}} public Set findMaxCliques () {this.bk3 (new HashSet (), new ArrayList (nodes), new ArrayList ()); return maxCliques } public static void main (String [] args) {Map neighbors = new HashMap (); neighbors.put ("0", new HashSet (Arrays.asList ("1", "2", "3")); neighbors.put ("1", new HashSet (Arrays.asList ("0", "2") Neighbors.put ("2", new HashSet (Arrays.asList ("0", "1", "3", "6")); neighbors.put ("3", new HashSet (Arrays.asList ("0", "2", "4", "5"); neighbors.put ("4", new HashSet (Arrays.asList ("3", "5", "6") Neighbors.put ("5", new HashSet (Arrays.asList ("3", "4", "6")); neighbors.put ("6", new HashSet (Arrays.asList ("2", "4", "5"); neighbors.put ("7", new HashSet (Arrays.asList ("6")); CliqueFinder finder = new CliqueFinder (neighbors, 3) Finder.bk3 (new HashSet (), new ArrayList (neighbors.keySet ()), new ArrayList ()); System.out.println (finder.maxCliques);}} after reading the above, do you have any further understanding of how Spark Graphx implements maximum clique mining in the graph? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report