Processing CSV files with Spark 2 – Part 1

Intro

This is part of our migrating/updating to Spark 2 series.

See all our posts on Spark and Spark2.

In this post, we are going to analyze a CSV datafile with Spark2, using newer SparkSession API

Code repository
Learning Spark @ Github

Screencast

little background

In Spark v1 there was a separate package called spark-csv.  It wasn’t part of Spark, so we had to include this package in order to use it (using –packages option).

In Spark  v2 the CSV package is included with Spark distribution… hoooray.
We will be using SparkSession API to read CSV.  Apparently CSV is such an important format, we have a first level API in SparkSession.

spark.read.csv("file.csv")

Also see the spark-csv for configuring options for parsing CSV.

Data: Spark Commit logs

I thought it would be interesting to to analyze code contributions to Spark project using Spark ๐Ÿ™‚

Here is how to get data we need.

 # first clone spark git repo
 $ git clone https://github.com/apache/spark

 $ cd spark

 # inspect the logs
 $ git log --pretty=format:"%H|%an|%ae|%cd|%s"

 # save to file
 $ git log --pretty=format:"%H|%an|%ae|%cd|%s" > spark-commits.csv

 # create a small sample
 $ head -n 20 spark-commits.csv > sample.csv

Commit log format

  • SHA
  • committer
  • email
  • date
  • comment

Sample data below.
I am using | (pipe)  as a delimiter (separator) instead of default , (comma)

sha|committer|email|date|comment

8f2142cfd2ca632a4afb0cc29cc358edbb21f8d|Dilip Biswal|dbiswal@us.ibm.com|Sat Feb 25 23:56:57 2017 -0800|[SQL] Duplicate test exception in SQLQueryTestSuite due to meta files(.DS_Store) on Mac

89608cf26226e28f331a4695fee89394d0d38ea0|Wenchen Fan|wenchen@databricks.com|Sat Feb 25 23:01:44 2017 -0800|[SPARK-17075][SQL][FOLLOWUP] fix some minor issues and clean up the code

6ab60542e8e803b1d91371a92f4aaef6a64106f6|Joseph K. Bradley|joseph@databricks.com|Sat Feb 25 22:24:08 2017 -0800|[MINOR][ML][DOC] Document default value for GeneralizedLinearRegression.linkPower

 

Question

Find the top-10 committers with most commits

Let’s code

Here I am going to show you how we arrive at the solution, rather than just showing you the solution. (I like to teach you to fish, rather than giving you a fish ๐Ÿ™‚

Fire up Spark-Shell

# assuming Spark is installed under  ~/apps/spark directory
$  ~/apps/spark/bin/spark-shell

Rest of the code in Spark-Shell

// default read
val commits = spark.read.csv("data/spark-commits/sample.csv")
commits.printSchema

// with header
val commits = spark.read.
              option("header", "true").
              csv("data/spark-commits/sample.csv")
commits.printSchema

// header + custom delimiter
val commits = spark.read.
              option("header", "true").
              option("delimiter", "|").
              csv("data/spark-commits/sample.csv")
commits.printSchema
commits.show(false)
commits.count

// ==== good to go!
// read the full file
val commits = spark.read.
              option("header", "true").
              option("delimiter", "|").
              csv("data/spark-commits/spark-commits.csv")
commits.count // 19001

// find commits from databricks
val db = commits.filter(commits("email").contains("databricks.com"))
db.count // 4116
db.show

// find top committers
commits.groupBy("email")
commits.groupBy("email").count
commits.groupBy("email").count.show
commits.groupBy("email").count.printSchema
// sort by desc
commits.groupBy("email").count.orderBy(desc("count"))
commits.groupBy("email").count.orderBy(desc("count")).show
// show top-10
commits.groupBy("email").count.orderBy(desc("count")).show(10) 

+--------------------+-----+
| email              |count|
+--------------------+-----+
|matei@eecs.berkel...| 1341|
| pwendell@gmail.com |  791|
| rxin@databricks.com|  700|
|tathagata.das1565...|  502|
| rxin@apache.org    |  430|
-----

 

Results

Not surprisingly the top contributors happened to be the Spark creators  ๐Ÿ™‚

  • Matei Zaharia – 1341 commits
  • Patrick Wendell – 791 commits
  • Ronald Xin – 700 commits

Final Code

Here is the final snippet

val commits = spark.read.
              option("header", "true").
              option("delimiter", "|").
              csv("data/spark-commits/spark-commits.csv")
commits.groupBy("email").count.orderBy(desc("count")).show(10)

Summary

We have shown you how to parse CSV files very easily using Spark2 new API

Sujee Maniyam
Written by:

Sujee Maniyam

Sujee Maniyam is the co-founder of Elephantscale. He teaches and works on Big Data, AI and Cloud technologies. Sujee is a published author and a frequent speaker

Leave a Reply

Your email address will not be published. Required fields are marked *