Wednesday, November 2, 2016

Spark Aggregate function with examples...

Aggregate function takes three parameters (arguments).
1st parameter is the seed value which is (0,1) in most cases
2nd parameter is the computation reduce function
3rd parameter is the combine reduce function

The aggregate function allows the user to apply two different reduce functions to the RDD at the same time.
The first reduce function is applied within each partition to reduce the data within each partition into a single result.
The second reduce function is used to combine the results from all the partitions from the first reduce function.

The ability to have two separate reduce functions for intra partition versus across partition reducing adds a lot of flexibility. For example the first reduce function can be the max function and the second one can be the sum function. The user also specifies an initial value. Here are some important facts.
  • The initial value (1st parameter seed value) is applied at both levels of reduce. So both at the intra partition reduction and across partition reduction.
  • Both reduce functions have to be commutative and associative.
  • Do not assume any execution order for either partition computations or combining partitions.
Lets start with simple examples. Note: changing the argument values to the combOp will gives you drastic results as the 2nd reduce function is combining the results from the seqOp (Sequential Operation) results from the individual partitions. 


From the above:
seqOp ==> is the first reduce function which occurs in across all the nodes
combOp ==> is the second reduce function which combines the result from seqOp

seqOp and combOp are being called with "aggregate" function with 2 parameters for the "collData" collection as collData.aggregate ( (0,0).
First argument 0 is being passed to seqOp lambda function with "x[0] + y", and the second argument is passed to "x[1] + y".
combOp is just doing the sum of the results from seqOp from all the nodes.

So, from the collData list, for every element (1,2,3,4,5) argument x[0] and x[1] is used to apply to the equation. In this example we are passing 0 for both arguments, it will just add 0 to each element and gives the results as 15 which is (0+1) + (0+2) + (0+3) + (0+4) + (0+5).

Lets change the aggregate arguments to 0, 1 and see the behavior.



As you can see the result is changed to 15,2 20. Why?
Because, we changed the second argument value to 1 and results in 20 like showed below:
x[0] + y will be iterated as:
(0 + 1) + (0 + 2) + (0 + 3) + (0 + 4) + (0 + 5)
= 1 + 2 + 3 + 4 + 5
= 15
x[1] + y will be iterated as:
(1 + 1) + (1 + 2) + (1 + 3) + (1 + 4) + (1 + 5)
=2 + 3 + 4 + 5 + 6
= 20

Lets changes the arguments to 2,3 and see the results:









Here is how the iterations occurred to get the results 25, 30
(2 + 1) + (2 + 2) + (2 + 3) + (2 + 4) + (2 + 5)
= 3 + 4 + 5 + 6 + 7
25
x[1] + y will be iterated as:
(3 + 1) + (3 + 2) + (3 + 3) + (3 + 4) + (3 + 5)
=4 + 5 + 6 + 7 + 8
30

Here is the another example with sum and multiplication of the elements:









From the above, notice that the second arguments to the lambda function is the multiplication (*) instead of summation (+).
What we are doing in the above example is, getting the sum of all the elements and multiplication of all the elements in the collection at the same time.
Here is how the iteration with the arguments passed to "aggregate" function:
seqOP lambda (x[0] + y, x[1] * y) is iterated thru all the elements in the collData with (0, 1)

-------------------- x[0] + y -------------------, -------------------- x[0] * y ------------------
( (0 + 1) + (0 + 2) (0 + 3) (0 + 4) (0 + 5) , (1 * 1) * (1 * 2) * (1 * 3) * (1 * 4) * (1 * 5) )
= (( 1 + 2 + 3 + 4 + 5), (1 * 2 * 3 * 4 * 5))
= (15, 120)

Lets change the argument values to aggregate and see the result:








This time we passed arguments (2, 3) to aggregate.
Here is how the iteration with the arguments (2, 3):
seqOP lambda (x[0] + y, x[1] * y) is iterated thru all the elements in the collData with (0, 1)

-------------------- x[0] + y -------------------, -------------------- x[0] * y ------------------
( (2 + 1) + (2 + 2) (2 + 3) (2 + 4) (2 + 5) , (3 * 1) * (3 * 2) * (3 * 3) * (3 * 4) * (3 * 5) )
= (( 3 + 4 + 5 + 6 + 7), (3 * 6 * 9 * 12 * 15))
= (25, 29160)

No comments:

Post a Comment