Spark如何实现PageRank

Spark如何实现PageRank,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

包河网站建设公司成都创新互联公司,包河网站设计制作,有大型网站制作公司丰富经验。已为包河成百上千提供企业网站建设服务。企业网站搭建\成都外贸网站建设要多少钱,请找那个售后服务好的包河做网站的公司定做!

PageRank算法简介
PageRank是执行多次连接的一个迭代算法,因此它是RDD分区操作的一个很好的用例。算法会维护两个数据集:一个由(pageID,linkList)的元素组成,包含每个页面的相邻页面的列表;另一个由(pageID,rank)元素组成,包含每个页面的当前排序值。它按如下步骤进行计算。

  1. 将每个页面的排序值初始化为1.0。

  2. 在每次迭代中,对页面p,向其每个相邻页面(有直接链接的页面)发送一个值为rank(p)/numNeighbors(p)的贡献值。

  3. 将每个页面的排序值设为0.15 + 0.85 * contributionsReceived。

        最后两个步骤会重复几个循环,在此过程中,算法会逐渐收敛于每个页面的实际PageRank值。在实际操作中,收敛通常需要大约10轮迭代。

模拟数据
假设一个由4个页面组成的小团体:A,B,C和D。相邻页面如下所示:
A:B C 
B:A C 
C:A B D 
D:C 

object SparkPageRank {

 def showWarning() {
   System.err.println(
     """WARN: This is a naive implementation of PageRank and is given as an example!
       |Please use the PageRank implementation found in org.apache.spark.graphx.lib.PageRank
       |for more conventional use.
     """.stripMargin)
 }

 def main(args: Array[String]) {
   if (args.length < 1) {
     System.err.println("Usage: SparkPageRank ")
     System.exit(1)
   }

   showWarning()

   val spark = SparkSession
     .builder
     .appName("SparkPageRank")
     .getOrCreate()

   val iters = if (args.length > 1) args(1).toInt else 10
   val lines = spark.read.textFile(args(0)).rdd
   val links = lines.map{ s =>
     val parts = s.split("\\s+")
     (parts(0), parts(1))
   }.distinct().groupByKey().cache()
   var ranks = links.mapValues(v => 1.0)

   for (i <- 1 to iters) {
     val contribs = links.join(ranks).values.flatMap{ case (urls, rank) =>
       val size = urls.size
       urls.map(url => (url, rank / size))
     }
     ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
   }

   val output = ranks.collect()
   output.foreach(tup => println(s"${tup._1} has rank:  ${tup._2} ."))

   spark.stop()
 }
}

看完上述内容,你们掌握Spark如何实现PageRank的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注创新互联行业资讯频道,感谢各位的阅读!


本文名称:Spark如何实现PageRank
文章来源:http://pcwzsj.com/article/ihsgjc.html