How to optimise loading partitioned JSON data in Spark ?

In this tutorial we will explore ways to optimise loading partitioned JSON data in Spark.

I have used the SF Bay Area Bike Share dataset, you can find it here. The original data (status.csv) have gone through few transformations. The result looks like:

Partitioned JSON data

Loading from partitioned JSON files


We will load the data filtered by station and month :

val df1 = spark.read
	.json("file:///data/bike-data-big/partitioned_status.json")
	.filter("station_id = 10 and (month in ('2013-08', '2013-09'))")

Despite the fact that the code above does not contain any action yet, Spark starts three jobs that took few minutes to complete (on a local setting, with 8 cores and 32 Gigs of RAM): Slow JSON Loading

[Read More]

How to add row numbers to a Spark DataFrame?

In this tutorial, we will explore a couple of ways to add a sequential consecutive row number to a dataframe.

For example, let this be our dataframe (taken from Spark: The Definitive Guide github repo):

val df = spark.read.option("header", "true").csv(".../data/flight-data/csv/*")
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows

After adding a column containing the row number, the result should look like:

[Read More]

Spark DataFrame - two ways to count the number of rows per partition

Sometimes, we are required to compute the number of rows per each partition. To do this, there are two ways:

  • The first way is using Dataframe.mapPartitions().

  • The second way (the faster according to my observations) is using the spark_partition_id() function, followed by a grouping count aggregation.

Let’s first load the data into a dataframe

I have used the SF Bay Area Bike data source, that can be found here

Scala :

val df = spark.read.csv("file:///.../status.csv")

Method 1 - using mapPartitions()

Scala :

[Read More]