Spark GraphX 中的 Pregel 函数学习笔记

Pregel简介

Pregel是Google的图算法引擎,用于分布式图计算,主要用于图遍历(BFS)、最短路径(SSSP)、PageRank计算等等。Pregel的名字来历很有意思。是为了纪念欧拉的七桥问题,七座桥就位于名为Pregel这条河上。

Pregel是BSP模型,就是“计算”-“通信”-“同步”的模式,

  • 输入输出为有向图
  • 分成超步
  • 以节点为中心计算,超步内每个节点执行自己的任务,执行节点的顺序不确定
  • 两个超步之间是通信阶段

在Pregel中,以节点为中心计算。Step 0时每节点都是活动状态,每个节点主动“给停止投票”进入不活动状态。如果接收到消息,则激活。没有活动节点和消息时,整个算法结束。

GraphX中的Pregel实现

Spark中的GraphX模块复现了Pregel引擎。 Spark Pregel api中写到:

Unlike the original Pregel API, the GraphX Pregel API factors the sendMessage computation over edges, enables the message sending computation to read both vertex attributes, and constrains messages to the graph structure. These changes allow for substantially more efficient distributed execution while also exposing greater flexibility for graph-based computation.

也就是说,GraphX中的实现可以使用EdgeTriplet模块对某条边的两边的节点属性都进行读写操作,这一点使得其比起原来的引擎更加方便易用。

Pregel函数的调用格式为:

def apply[VD, ED, A](graph: Graph[VD, ED], initialMsg: A, 
		maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either
	)(
		vprog: (VertexId, VD, A)  VD, sendMsg: (EdgeTriplet[VD, ED])  Iterator[(VertexId, A)], 
		mergeMsg: (A, A)  A
	)(
		implicit arg0: ClassTag[VD], arg1: ClassTag[ED], arg2: ClassTag[A]
	): Graph[VD, ED]

On the first iteration all vertices receive the initialMsg and on subsequent iterations if a vertex does not receive a message then the vertex-program is not invoked.

在第一次迭代的时候,所有的顶点都会接收到initialMsg消息,在次轮迭代的时候,如果顶点没有接收到消息,verteProgram就不会被调用。

This function iterates until there are no remaining messages, or for maxIterations iterations.

这个函数会一直运行直到没有信息被发送,或者达到了最大循环次数才会停止。

函数参数解析:

  • VD:输入graph的顶点的数据类型
  • ED:输入graph的边的数据类型
  • APregel message的数据类型
  • graph:输入的图,可以进行一些初始化操作
  • initialMsg:在第一次迭代的时候顶点收到的消息
  • maxIterations:迭代的最大次数
  • activeDirection:接收消息的方向,默认为双向即EdgeDirection.Either
  • vprog:用户定义的顶点程序,运行在每一个顶点中,负责接收进来的信息和计算新的顶点值。在第一次迭代的时候,所有的顶点程序将会被默认的defaultMessage调用,在次轮迭代中,顶点程序只有接收到message才会被调用
  • sendMsg:用户提供的函数,用于定义发送的信息
  • mergeMsg:用户提供定义的函数,将两个类型为Amessage合并为一个类型为Amessage(当某个顶点接收到多个消息的时候的处理函数)

下面的官方example是使用Pregel对PageRank算法的实现。

val pagerankGraph: Graph[Double, Double] = graph
  // Associate the degree with each vertex
  .outerJoinVertices(graph.outDegrees) {
    (vid, vdata, deg) => deg.getOrElse(0)
  }
  // Set the weight on the edges based on the degree
  .mapTriplets(e => 1.0 / e.srcAttr)
  // Set the vertex attributes to the initial pagerank values
  .mapVertices((id, attr) => 1.0)
def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
  resetProb + (1.0 - resetProb) * msgSum
def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
  Iterator((edge.dstId, edge.srcAttr * edge.attr))
def messageCombiner(a: Double, b: Double): Double = a + b
val initialMessage = 0.0
// Execute Pregel for a fixed number of iterations.
Pregel(pagerankGraph, initialMessage, numIter)(
  vertexProgram, sendMessage, messageCombiner)

还有个小插曲:在工程中使用这个函数时,出现了Task not serializable的提示,而之前没有加这个函数功能的时候并没有这个问题,在spark操作中的类都已经是可序列化的类了。通过查看错误日志,可以看到有

Caused by: java.io.NotSerializableException: xxxxxxx class

这样的提示。解决办法很简单,就是在后面涉及到的相关类的声明后面加上extends Serializable即可。

分析原因,应该是这个算法可能需要将更多的内容发送至节点上面,所以相比一般的Map, Reduce, Filter等操作,之前不需要序列化的一些类也需要Serialzable了,当然只是推测,具体原因还不是很清楚。