val args = new Array[String](3) args(0)="D:\\Users\\spark\\SparkTest\\input\\link.txt" args(1)="2" args(2)="D:\\Users\\spark\\SparkTest\\input\\node.txt"
val sparkConf = new SparkConf().setAppName("PageRank").setMaster("local") val iters = if (args.length > 1) args(1).toInt else 10 val ctx = new SparkContext(sparkConf) val lines = ctx.textFile(args(0), 1) val links = lines.map{ s => val parts = s.split("\\s+") (parts(0), parts(1)) }.distinct().groupByKey().cache() //var ranks = links.mapValues(v => 1.0) // the init node score val nodes = ctx.textFile(args(2), 1) var ranks = nodes.map{ s => val parts = s.split("\\s+") (parts(0), parts(1).toDouble) } for (i <- 1 to iters) { val contribs = links.join(ranks).values.flatMap{ case (urls, rank) => val size = urls.size urls.map(url => (url, rank / (size))) } //ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _) ranks = contribs.reduceByKey(_ + _).mapValues(0.0 + 1.0 * _) }
val output = ranks.collect() output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + "."))
val args = new Array[String](3) args(0)="D:\\Users\\spark\\SparkTest\\input\\link.txt" args(1)="2" args(2)="D:\\Users\\spark\\SparkTest\\input\\node.txt"
val sparkConf = new SparkConf().setAppName("PageRank").setMaster("local") val iters = if (args.length > 1) args(1).toInt else 10 val ctx = new SparkContext(sparkConf) val lines = ctx.textFile(args(0), 1) val links = lines.map{ s => val parts = s.split("\\s+") (parts(0), parts(1)) } val cnt = links.groupByKey.map{ t => (t._1, t._2.size) } //var ranks = links.mapValues(v => 1.0) // the init node score val nodes = ctx.textFile(args(2), 1) var ranks = nodes.map{ s => val parts = s.split("\\s+") (parts(0), parts(1).toDouble) } for (i <- 1 to iters) { ranks = links.join(cnt).map{ t => (t._2._1, (t._1, t._2._2)) }.join(ranks).map{ t => (t._2._1._1, t._2._2/t._2._1._2) }.reduceByKey(_ + _) }
val output = ranks.collect() output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + "."))