Spark Scala - Perform data aggregation on last or next n seconds time window



Often while performing statistical aggregations, we get scenario to perform aggregations on next n number of seconds from current row for each of the rows.




Spark Window api provides a nice rangeBetween functionality which facilitates performing above.




For Example:
// Sample data with timestamp

// Sample data with timestamp
val customers = sc.parallelize(List(("Alice", "2016-05-01 00:00:00", 10,4),
("Alice", "2016-05-01 00:00:01", 20,2),
("Alice", "2016-05-01 00:00:02", 30,4),
("Alice", "2016-05-01 00:00:02", 40,6),
("Alice", "2016-05-01 00:00:03", 50,1),
("Alice", "2016-05-01 00:00:03", 60,4),
("Alice", "2016-05-01 00:00:04", 70,2),
("Alice", "2016-05-01 00:00:05", 80,4),
("Bob", "2016-05-01 00:00:03", 25,6),
("Bob", "2016-05-01 00:00:04", 29,7),
("Bob", "2016-05-01 00:00:05", 27,10))).toDF("name", "time", "amountSpent","NumItems")


Spark Console output;scala> customers.show
+-----+-------------------+-----------+--------+
| name| time|amountSpent|NumItems|
+-----+-------------------+-----------+--------+
|Alice|2016-05-01 00:00:00| 10| 4|
|Alice|2016-05-01 00:00:01| 20| 2|
|Alice|2016-05-01 00:00:02| 30| 4|
|Alice|2016-05-01 00:00:02| 40| 6|
|Alice|2016-05-01 00:00:03| 50| 1|
|Alice|2016-05-01 00:00:03| 60| 4|
|Alice|2016-05-01 00:00:04| 70| 2|
|Alice|2016-05-01 00:00:05| 80| 4|
| Bob|2016-05-01 00:00:03| 25| 6|
| Bob|2016-05-01 00:00:04| 29| 7|
| Bob|2016-05-01 00:00:05| 27| 10|
+-----+-------------------+-----------+--------+

// Convert data to epoch, because Window orderBy column need to be a numeric type
val tdf = customers.withColumn("epoch", unix_timestamp($"time"))


// Create a Window partition from current row to next five seconds of data (inclusive)
val wSpec1 = Window.partitionBy("name").orderBy("epoch").rangeBetween(Window.currentRow, 5)


// Get the list of amountSpent by customers from now onwards till next 5 seconds
tdf.withColumn("sumSpent",collect_list(customers("amountSpent")).over(wSpec1)).show()
whose output:
scala> tdf.withColumn("sumSpent",collect_list(customers("amountSpent")).over(wSpec1)).show(false)
+-----+-------------------+-----------+--------+----------+--------------------------------+
|name |time |amountSpent|NumItems|epoch |sumSpent |
+-----+-------------------+-----------+--------+----------+--------------------------------+
|Bob |2016-05-01 00:00:03|25 |6 |1462024803|[25, 29, 27] |
|Bob |2016-05-01 00:00:04|29 |7 |1462024804|[29, 27] |
|Bob |2016-05-01 00:00:05|27 |10 |1462024805|[27] |
|Alice|2016-05-01 00:00:00|10 |4 |1462024800|[10, 20, 30, 40, 50, 60, 70, 80]|
|Alice|2016-05-01 00:00:01|20 |2 |1462024801|[20, 30, 40, 50, 60, 70, 80] |
|Alice|2016-05-01 00:00:02|30 |4 |1462024802|[30, 40, 50, 60, 70, 80] |
|Alice|2016-05-01 00:00:02|40 |6 |1462024802|[30, 40, 50, 60, 70, 80] |
|Alice|2016-05-01 00:00:03|50 |1 |1462024803|[50, 60, 70, 80] |
|Alice|2016-05-01 00:00:03|60 |4 |1462024803|[50, 60, 70, 80] |
|Alice|2016-05-01 00:00:04|70 |2 |1462024804|[70, 80] |
|Alice|2016-05-01 00:00:05|80 |4 |1462024805|[80] |
+-----+-------------------+-----------+--------+----------+--------------------------------+
// Same as above, but sum of amountSpent
tdf.withColumn("sumSpent",sum(customers("amountSpent")).over(wSpec1)).show()


Comments

Popular posts from this blog

Spark Cluster Mode - Too many open files

Binary Data to Float using Spark SQL

HDFS filenames without rest of the file details