Apache Spark: Aggregate method explained

spark-logo-trademarkHi,

In this post I’ll be discussing about aggregate method in Spark. This was something I felt quite confusing to grasp for the first time I was learning Spark. So I thought this would be helpful for a lot of people learning Spark. I am using Scala for this post. If you are using Spark in some other Language, it will be helpful to understand the usage of aggregate, only syntax might be a bit different.

   def aggregate[U](zeroValue: U)(seqOp: (U, (String, String)) => U,combOp: (U, U) => U): U

From the syntax of aggregate method, we can see that it takes a zero value as fold and seqOp and comOp as other arguments.

How Aggregate Works

First, let’s understand how aggregate method works. It takes a zeroValue as first argument. This should have the return type of the final value you expect. Consider this as a value that will be combined with the result you will get after applying the operation in your RDD. So if you are summing your values then you should set zeroValue to be 0 and for product, it should be 1(consider this as an identity element in this case).

Next parameter it takes is seqOp. It will take the values from you RDD and combine them with some value of type similar to zeroValue(consider it as the value containing current result) to get the result of type similar to the type of zeroValue. Here, first argument of seqOp should have type similar to the type of zeroValue. This function starts by applying to zeroValue and first value it reads from RDD and applies the last computed value while applying on new values from RDD

Third parameter to pass to is combOp. This function will take arguments of type similar to that of zeroValue. This function will be applied to the various values that are obtained by applying seqOp.

Example

This code counts the number of cities of a particular country.

import org.apache.spark.{SparkConf, SparkContext}
//create RDD
val countries = new SparkContext(new SparkConf().setMaster("local").setAppName("test_app")).parallelize(List(("Nepal", "Kathmandu"), ("Nepal", "Pokhara"), ("USA", "NY"), ("USA", "Los Angeles"), ("China", "Beijing"), ("France", "Paris"), ("Nepal", "Kusma"), ("India", "Delhi")))
//define seqOp
def seqOpFun(runningAccumulator: Map[String, Int], currentPairToProcess: (String, String)): Map[String, Int] = {
  runningAccumulator.updated(currentPairToProcess._1, 1 + runningAccumulator.getOrElse(currentPairToProcess._1, 0))
}
//define comOp
def comOpFun(acc1: Map[String, Int], acc2: Map[String, Int]): Map[String, Int] = {
  acc1 ++ acc2.map { case (countryName, noCities) => countryName -> (noCities + acc1.getOrElse(countryName, 0)) }
}
//get result
val res = countries.aggregate(Map.empty[String, Int])(seqOpFun, comOpFun)

As we can see in the example, zeroValue is the empty map of type [String, Int]. This will be the type of result as well. String part represents name of the Country and Int part gives the number of cities.

seqOpFun takes two parameters, runningAccumulator which is the current result of the operation and currentPairToProcess is the element from RDD that is passed to the function. It will check if the current country exists in the map or not. It if exists, then it will increase the number of cities by 1, 0 otherwise. This method is applied in each partitions.

combOpFun will take the values returned by all partitions and combine them to give the final value.

In summary, aggregate will apply seqOp to the elements in partition and produce result for all partitions. All those results generated by seqOp will be combined by comOp to give the final result.

I hope this post was helpful to you. Please comment in you have any queries/suggestions.

I’ll see you in the next one.

Cheers 🙂

 

 

 

134 total views, 8 views today

Leave a Reply

Your email address will not be published. Required fields are marked *