Goal:
How to understand PageRank algorithm in scala on Spark.This article explains each step using sample data.
Env:
Spark 1.5.2scala 2.10.4
Algorithm:
The PageRank algorithm outputs a probability distribution used to represent the likelihood that a person randomly clicking on links will arrive at any particular page.1. Initialize each page’s rank to 1.0.
2. On each iteration, have page p send a contribution of rank(p)/numNeighbors(p) to its neighbors (the pages it has links to).
3. Set each page’s rank to 0.15 + 0.85 * contributionsReceived.
The last two steps repeat for several iterations, during which the algorithm will con‐ verge to the correct PageRank value for each page. In practice, it’s typical to run about 10 iterations.
Sample Data:
If we consider the outbound links as a data type (String, List(String)), then data should be:
("MapR",List("Baidu","Blogger")), ("Baidu", List("MapR"), ("Blogger",List("Google","Baidu")), ("Google", List("MapR"))
Solution:
1. Initialize each page’s rank to 1.0.
val links = sc.parallelize(List(("MapR",List("Baidu","Blogger")),("Baidu", List("MapR")),("Blogger",List("Google","Baidu")),("Google", List("MapR")))).partitionBy(new HashPartitioner(4)).persist() var ranks = links.mapValues(v => 1.0)
2. On each iteration, have page p send a contribution of rank(p)/numNeighbors(p) to its neighbors (the pages it has links to).
val contributions = links.join(ranks).flatMap { case (url, (links, rank)) => links.map(dest => (dest, rank / links.size)) }See above contributions in red, and it matches the calculations using scala:
scala> contributions.collect res26: Array[(String, Double)] = Array((MapR,1.0), (Baidu,0.5), (Blogger,0.5), (Google,0.5), (Baidu,0.5), (MapR,1.0))
3. Set each page’s rank to 0.15 + 0.85 * contributionsReceived.
val ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v)After 1st iteration, current pagerank value for each page is:
scala> ranks.collect res27: Array[(String, Double)] = Array((Google,0.575), (MapR,1.8499999999999999), (Blogger,0.575), (Baidu,1.0))
Then the last two steps repeat for several iterations.
Note: For the basic scala transformation syntax and examples, please refer to this scala cheat sheet.
Thanks very much.
ReplyDeleteThanks for nice example
ReplyDeleteI have a doubt instead of using links.size how can we get neighbors value like for MapR 0.5 and 0.5,Baidu 1.0,Blogger 0.5 and Google 1.0 please help
Thanks for nice explanation
ReplyDeletehow to replace the list with values assigned in other rdd
our list is(Google,List(MapR)),(MapR,List(Baidu,Blogger)....instead of this i want their values to be attached
like (Google,1.0),(MapR,0.5),(MapR,0.5)....
I do not quite understand the ask. Could you share:
Delete1. What is the input RDD
2. What is the output RDD you want
we have rdd like this no sir (Google,List(MapR)),(MapR,List(Baidu,Blogger),(Blogger,List(Google,Baidu)),(Baidu,List(MapR))
Deleteinstead of value set 1.0 to every vertex set to number of neighbor links like(Google has 1.0) and (MapR has 0.5 (2 neighbors))
(Blogger also 0.5) (Baidu is 1.0) I got this from your code
I want the rdd like this
(Google,1.0,1.0)(MapR,1.0,0.5,0.5)(Blogger,0.5,1.0,0.5)
vertexname,ownprobability,neighbors probability since Google has only one neighbor and its probability is also 1.0
MapR has probability 1.0 and its neighbors probability are 0.5 and 0.5
please help
Why does it required? - reduceByKey((x, y) => x + y)
ReplyDeletePage ids are unique. Sorry I am new to scala and trying to learn.
That is to add the the values of the same key together.
DeleteFor example, (MapR,1.0) + (MapR,1.0) ===> (MapR,2.0)
Small effort to pride the complete code:
ReplyDeleteimport org.apache.spark.HashPartitioner
val links = sc.parallelize(List(("MapR",List("Baidu","Blogger")),("Baidu", List("MapR")),("Blogger",List("Google","Baidu")),("Google", List("MapR")))).partitionBy(new HashPartitioner(4)).persist()
var ranks = links.mapValues(v => 1.0) // Initialized
for (i <- 1 to 10) {
val contributions = links.join(ranks).flatMap { case (url, (links, rank)) => links.map(dest => (dest, rank / links.size)) }
ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v)
}
ranks.collect