Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Learning log-PageRank based on hadoop

2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

A brief introduction to PageRank:

Its value is determined by other worthwhile values, as shown in the following example:

Part one:

The calculation corresponding to each mapReduce:

The score of the node referred to by each point is calculated by mapper, which is the same as the whole key of reduce, and calculated by the formula.

The triangle sign represents the difference between two iterations. If it is less than a certain value, the calculation is completed, and the pagerank value of each point is calculated.

The self-implementing code is as follows

The input data is divided into:

Input1.txt

A,B,D

B,C

C,A,B

D,B,C

Represents the node pointed to by the first point of each row, which is used in the setup of reducer to build the hashmap for use.

Input2.txt

A,0.25,B,D

B,0.25,C

C,0.25,A,B

D,0.25,B,C

The middle number, which represents the current pagerank value of each node, can not be found in the file, because it can be calculated and generated by the file above, and there are four nodes, that is, 1x4.

Self-fulfilling code:

Package bbdt.steiss.pageRank;import java.io.BufferedReader;import java.io.BufferedWriter;import java.io.IOException;import java.io.InputStreamReader;import java.io.OutputStreamWriter;import java.net.URI;import java.util.ArrayList;import java.util.HashMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable Import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat Public class PageRank {public static class PageMapper extends Mapper {private Text averageValue = new Text (); private Text node = new Text () @ Override / / find out the sub-pagerank of the corresponding node of each row of data and output the value of the current node divided by the total number of nodes pointing to protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {String string = value.toString (); String [] ss = string.split (",") Int length = ss.length; double pageValue = Double.parseDouble (ss [1]); double average = pageValue/ (length-2); averageValue.set (String.valueOf (average)); int I = 2 While (i0.1) {if (flag = = 1) {/ / the first call to mapreduce does not operate this / / this is to copy the output file of mapreduce to the input file as the input file of this mapreduce copyFile (); flag = 0 } Configuration configuration = new Configuration (); Job job = Job.getInstance (configuration); job.setJarByClass (PageRank.class); job.setMapperClass (PageMapper.class); job.setReducerClass (PageReducer.class); job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (Text.class) Job.setOutputKeyClass (Text.class); job.setOutputValueClass (Text.class); job.setInputFormatClass (TextInputFormat.class); job.setOutputFormatClass (TextOutputFormat.class); FileInputFormat.addInputPath (job, inputPath); FileOutputFormat.setOutputPath (job, outputPath); job.addCacheArchive (cachePath.toUri ()) OutputPath.getFileSystem (configuration) .delete (outputPath, true); job.waitForCompletion (true); String outpathString = outputPath.toString () + "/ part-r-00000"; / / calculate the pagerank value difference of each node of the two files result = fileDo (inputPath, new Path (outpathString)); flag = 1;} System.exit (0) } / / calculate the pagerank difference of each node of the two files and return public static double fileDo (Path inputPath,Path outPath) throws Exception {Configuration conf = new Configuration (); conf.set ("fs.defaultFS", "hdfs://hadoop1:9000"); FileSystem fs = FileSystem.get (conf); FSDataInputStream in1 = null; FSDataInputStream in2 = null; in1 = fs.open (inputPath) In2 = fs.open (outPath); BufferedReader br1 = new BufferedReader (new InputStreamReader (in1)); BufferedReader br2 = new BufferedReader (new InputStreamReader (in2)); String S1 = null; String S2 = null; ArrayList arrayList1 = new ArrayList (); ArrayList arrayList2 = new ArrayList () While ((S1 = br1.readLine ())! = null) {String [] ss = s1.split (","); arrayList1.add (Double.parseDouble (ss [1]));} br1.close () While ((S2 = br2.readLine ())! = null) {String [] ss = s2.split (","); arrayList2.add (Double.parseDouble (ss [1]));} double res = 0; for (int I = 0) I 1) {node.setAdjacentNodeNames (Arrays.copyOfRange (parts, 1, parts.length));} return node;}}

two。 This is the implementation of mapper.

Package org.robby.mr.pagerank;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;// the input type of map here is Text and Text, and the description is two texts, so the input type of job should be set to KeyValueTextInputFormatpublic class Map extends Mapper {private Text outKey = new Text (); private Text outValue = new Text () in the main function. @ Override protected void map (Text key, Text value, Context context) throws IOException, InterruptedException {/ / first output the original data for reduce to find the pointing node using context.write (key, value); / / when passed in, key is the first node, separated by tabs, followed by value Node node = Node.fromMR (value.toString ()) If (node.getAdjacentNodeNames ()! = null & & node.getAdjacentNodeNames (). Length > 0) {double outboundPageRank = node.getPageRank () / (double) node.getAdjacentNodeNames () .length; for (int I = 0; I

< node.getAdjacentNodeNames().length; i++) { String neighbor = node.getAdjacentNodeNames()[i]; outKey.set(neighbor); Node adjacentNode = new Node() .setPageRank(outboundPageRank); outValue.set(adjacentNode.toString()); System.out.println( " output ->

K ["+ outKey +"], V ["+ outValue +"]); / / output the calculated node pagerank value context.write (outKey, outValue);} output data: example A 0.25B DB 0.125 D 0.125

Note:

The input format of KeyValueTextInputFormat (Text,Text), which processes the text content of each line, divides it into key and value with the first tab as the division.

The format of TextInputFormat is (Longwritable,Text), with the line label as key and the content as value.

Implementation of 3.reduce method

Package org.robby.mr.pagerank;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class Reduce extends Reducer {public static final double CONVERGENCE_SCALING_FACTOR = 1000.0; public static final double DAMPING_FACTOR = 0.85; public static String CONF_NUM_NODES_GRAPH = "pagerank.numnodes"; private int numberOfNodesInGraph The method executed during initialization of public static enum Counter {CONV_DELTAS} / / reduce to get the total number of nodes. In the conf object, @ Override protected void setup (Context context) throws IOException, InterruptedException {numberOfNodesInGraph = context.getConfiguration (). GetInt (CONF_NUM_NODES_GRAPH, 0);} private Text outValue = new Text () Public void reduce (Text key, Iterable values, Context context) throws IOException, InterruptedException {System.out.println ("input-> K [" + key + "]"); double summedPageRanks = 0; Node originalNode = new Node (); for (Text textValue: values) {System.out.println ("input-> V [" + textValue + "]"); Node node = Node.fromMR (textValue.toString ()) / / here is the raw data if (node.containsAdjacentNodes ()) {/ / the original node / / originalNode = node;} else {/ / calculate the pagerank sum for a node summedPageRanks + = node.getPageRank ();}} double dampingFactor = ((1.0-DAMPING_FACTOR) / (double) numberOfNodesInGraph) Double newPageRank = dampingFactor + (DAMPING_FACTOR * summedPageRanks); / / calculate the difference double delta = originalNode.getPageRank ()-newPageRank; / / change the pagerank of the original node object to the new originalNode.setPageRank (newPageRank); outValue.set (originalNode.toString ()); System.out.println ("output-> K [" + key + "], V [" + outValue + "]")) / / output the changed node object to context.write (key, outValue); int scaledDelta = Math.abs ((int) (delta * CONVERGENCE_SCALING_FACTOR)); System.out.println ("Delta =" + scaledDelta) / / this is a counter. Mapreduce has many counters. Custom set and take values are passed in through the enum object / / increment means context.getCounter (Counter.CONV_DELTAS) .increment (scaledDelta);}}

The implementation of 4.main function:

Package org.robby.mr.pagerank;import org.apache.commons.io.*;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.*;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.*;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.*;import java.util.* Public final class Main {public static void main (String... Args) throws Exception {/ / path of incoming input file and path of output file String inputFile = args [0]; String outputDir = args [1]; iterate (inputFile, outputDir) } public static void iterate (String input, String output) throws Exception {/ / because this runs on hadoop (hadoop jar...), conf automatically matches the entry of hadoop's hdfs on the cluster / / the files behind it can be found directly to filesystem, that is, hdfs's file manipulation class Configuration conf = new Configuration (); Path outputPath = new Path (output); outputPath.getFileSystem (conf) .delete (outputPath, true) OutputPath.getFileSystem (conf) .mkdirs (outputPath); / / set up the input file Path inputPath = new Path (outputPath, "input.txt"); / / create the file, and return the number of nodes int numNodes = createInputFile (new Path (input), inputPath); int iter = 1; double desiredConvergence = 0.01 When while (true) {/ / path is established, outputpath+ is followed by the file path Path jobOutputPath = new Path (outputPath, String.valueOf (iter)); System.out.println ("= ="); System.out.println ("= Iteration:" + iter); System.out.println ("= Input path:" + inputPath); System.out.println ("= Output path:" + jobOutputPath) System.out.println ("= ="); / / mapreduce if (calcPageRank (inputPath, jobOutputPath, numNodes) < desiredConvergence) {System.out.println ("Convergence is below" + desiredConvergence + ", we're done"); break;} inputPath = jobOutputPath; iter++ The function of this class is to send the contents of the file file plus the pagerank value to targetfile public static int createInputFile (Path file, Path targetFile) throws IOException {Configuration conf = new Configuration (); FileSystem fs = file.getFileSystem (conf); int numNodes = getNumNodes (file); double initialPageRank = 1.0 / (double) numNodes / / fs calls the create method to create a file based on the path object and returns the file stream OutputStream os = fs.create (targetFile); / / the stream iterator of the file file LineIterator iter = IOUtils .lineIterator (fs.open (file), "UTF8"); while (iter.hasNext ()) {String line = iter.nextLine (); / / get the contents of each line String [] parts = StringUtils.split (line) / / create the node object Node node = new Node () .setPageRank (initialPageRank) .setAdjacentNodeNames (Arrays.copyOfRange (parts, 1, parts.length)); IOUtils.write (parts [0] +'\ t' + node.toString () +'\ npermission, os);} os.close (); return numNodes } / / get the number of nodes, that is, the number of lines public static int getNumNodes (Path file) throws IOException {Configuration conf = new Configuration (); FileSystem fs = file.getFileSystem (conf); return IOUtils.readLines (fs.open (file), "UTF8"). Size ();} / perform mapreduce operations public static double calcPageRank (Path inputPath, Path outputPath, int numNodes) throws Exception {Configuration conf = new Configuration () Conf.setInt (Reduce.CONF_NUM_NODES_GRAPH, numNodes); Job job = Job.getInstance (conf); job.setJarByClass (Main.class); job.setMapperClass (Map.class); job.setReducerClass (Reduce.class); / / input key and value are both text, so use this class, with the first separator as the separator, divided into key and value job.setInputFormatClass (KeyValueTextInputFormat.class) / / map output definition: job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (Text.class); FileInputFormat.setInputPaths (job, inputPath); FileOutputFormat.setOutputPath (job, outputPath); if (! job.waitForCompletion (true)) {throw new Exception ("Job failed");} long summedConvergence = job.getCounters (). FindCounter (Reduce.Counter.CONV_DELTAS). GetValue () Double convergence = (double) summedConvergence / Reduce.CONVERGENCE_SCALING_FACTOR) / (double) numNodes; System.out.println ("= ="); System.out.println ("= Num nodes:" + numNodes); System.out.println ("= Summed convergence:" + summedConvergence); System.out.println ("= Convergence:" + convergence) System.out.println ("= ="); return convergence;}}

Note:

This is the method of file stream operation, using the method in the IOUtils class in import org.apache.commons.io.IOUtils.

There is also an Arrays method copyOfRange, which returns the specified position of the array and returns an array

OutputStream os = fs.create (targetFile); / / file file stream iterator LineIterator iter = IOUtils .lineIterator (fs.open (file), "UTF8"); while (iter.hasNext ()) {String line = iter.nextLine (); String [] parts = StringUtils.split (line) Node node = new Node () .setPageRank (initialPageRank) .setAdjacentNodeNames (Arrays.copyOfRange (parts, 1, parts.length)); IOUtils.write (parts [0] +'\ t' + node.toString () +'\ npermission, os);}

Using the readLines method, an array of String is returned, with the contents of each row in each cell

IOUtils.readLines (fs.open (file), UTF8) .size ()

The key-value pair of the output of TextOutPutFormat can be of any type, and the output automatically calls the toString method to convert the object into a string output.

Using stringUtils, the truncated string is an array

String [] parts = StringUtils.splitPreserveAllTokens (value, fieldSeparator)

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report