Practical Apache Spark in 10 minutes. Part 3 – DataFrames and SQL

Spark SQL is a part of Apache Spark big data framework designed for processing structured and semi-structured data. It provides a DataFrame API that simplifies and accelerates data manipulations. DataFrame is a special type of object, conceptually similar to a table in relational database. It represents a distributed collection of data organized into named columns. DataFrames can be created from external sources, retrieved with a query from a database, or converted from RDD; the inverse transform is also possible. This abstraction is designed for sampling, filtering, aggregating, and visualizing the data.

In this blog post, we’re going to show you how to load a DataFrame and perform basic operations on DataFrames with both API and SQL. We’ll also go through DataFrame to RDD and vice-versa conversions. 

For this exercise, we will work with the Movielens small dataset. Download and put these files to previously created your_spark_folder/example/ dir.

Comma-Separated Values (CSV) File

Start by running pyspark. Go to your Spark folder and run:

./bin/pyspark

Then, let’s load some data into a DataFrame. First off, specify few options for the loader, namely set delimiter to a semicolon and header to True so the names of columns will be loaded from the file:

data = spark.read.format("csv")\.option("delimiter", ";")\.option("header", True)\.load("example/movielens.csv")

To take a glance at the data, we use the show() method. For instance, we can display first five rows:

data.show(5)
+-------+--------------------+--------------------+------+------+----------+|movieId| title| genres|userId|rating| timestamp|+-------+--------------------+--------------------+------+------+----------+| 1| Toy Story (1995)|Adventure|Animati...| 7| 3.0| 851866703|
| 2| Jumanji (1995)|Adventure|Childre...| 15| 2.0|1134521380|
| 3|Grumpier Old Men ...| Comedy|Romance| 5| 4.0|1163374957|
| 4|Waiting to Exhale...|Comedy|Drama|Romance| 19| 3.0| 855192868|
| 5|Father of the Bri...| Comedy| 15| 4.5|1093070098|
+-------+--------------------+--------------------+------+------+----------+

Next, we can perform some data manipulations using the API or the regular SQL queries. Let’s select title and rating columns using DataFrame API:

data.select("title", "rating").show(3)
+----------------+------+| title|rating|+----------------+------+|Toy Story (1995)| 3.0|
| Jumanji (1995)| 2.0|
+----------------+------+

You can also filter the DataFrame based on some condition. Say, we want to choose movies with ratings lower than 3.0. To do this, run the following:

data.filter(data['rating'] < 3.0).show(3)
+-------+--------------------+--------------------+------+------+----------+|movieId| title| genres|userId|rating| timestamp|+-------+--------------------+--------------------+------+------+----------+| 2| Jumanji (1995)|Adventure|Childre...| 15| 2.0|1134521380|
| 11|American Presiden...|Comedy|Drama|Romance| 15| 2.5|1093028381|
| 14| Nixon (1995)| Drama| 15| 2.5|1166586286|
+-------+--------------------+--------------------+------+------+----------+

Another useful operation to perform on a DataFrame is grouping by some field. Let’s group our DataFrame by rating, check counts for each rating, and finally order the resulting counts by rating.

data.groupBy(data['rating']).count().orderBy('rating').show()
+------+-----+|rating|count|+------+-----+| 0.5| 227|
| 1.0| 574|
| 1.5| 298|
| 2.0| 831|
| 2.5| 611|
| 3.0| 1790|
| 3.5| 1141|
| 4.0| 2156|
| 4.5| 554|
| 5.0| 884|
+------+-----+

You can also calculate basic statistics for a DataFrame using describe() method. It includes min, max, mean, standard deviation, and count for numeric columns. You may specify columns or calculate overall statistics for a DataFrame. In our case, only rating column is suitable for statistics calculation.

data.describe("rating").show()
+-------+------------------+|summary| rating|+-------+------------------+| count| 9066|
| mean| 3.223527465254798|
| stddev|1.1572344494786881|
| min| 0.5|
| max| 5.0|
+-------+------------------+

Of course, there are many other possibilities for working with DataFrames; for further reference, consider exploring the API documentation.

Now, we will use SQL to query the data. To begin with, we need to register a DataFrame as a temp view with the next command:

data.createOrReplaceTempView("movielens")

Let’s make the same filtering as before – we’ll select only movies with ratings lower than 3 using SQL:

spark.sql("select * from movielens where rating < 3").show(3)
+-------+--------------------+--------------------+------+------+----------+|movieId| title| genres|userId|rating| timestamp|+-------+--------------------+--------------------+------+------+----------+| 2| Jumanji (1995)|Adventure|Childre...| 15| 2.0|1134521380|
| 11|American Presiden...|Comedy|Drama|Romance| 15| 2.5|1093028381|
| 14| Nixon (1995)| Drama| 15| 2.5|1166586286|
+-------+--------------------+--------------------+------+------+----------+

Now, let’s create the DataFrame from RDD. We’ll use the same dataset, but this time will load it as a text file (also without a header). We want to keep only three columns for simplicity. So, load data into RDD, split  by semicolon and select first three entries for each row:

rdd = sc.textFile("example/movielens.txt")\.map(lambda line: line.split(";"))\.map(lambda splits: (int(splits[0]), splits[1], splits[2]))

Then, take a look at the contents of rdd:

for elem in rdd.take(5): print(elem)
(1, 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy')(2, 'Jumanji (1995)', 'Adventure|Children|Fantasy')(3, 'Grumpier Old Men (1995)', 'Comedy|Romance')(4, 'Waiting to Exhale (1995)', 'Comedy|Drama|Romance')
(5, 'Father of the Bride Part II (1995)', 'Comedy')

At this moment, we import dependencies and create fields with specific types for the schema and as well as a schema itself.

from pyspark.sql.types import *id_field = StructField("id", IntegerType(), True)title_field = StructField("title", StringType(), True)
genres_field = StructField("genres", StringType(), True)

schema = StructType([id_field, title_field, genres_field])

Finally, we construct the DataFrame:

movielens = spark.createDataFrame(rdd, schema)movielens.show(3)
+---+--------------------+--------------------+| id| title| genres|+---+--------------------+--------------------+| 1| Toy Story (1995)|Adventure|Animati...|
| 2| Jumanji (1995)|Adventure|Childre...|
| 3|Grumpier Old Men ...| Comedy|Romance|
+---+--------------------+--------------------+

If you need to convert a DataFrame to RDD, simply use .rdd on the DataFrame. This way you’ll get an RDD of Rows.

movielensRDD = movielens.rddfor row in movielensRDD.take(2): print(row)
Row(id=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy')Row(id=2, title='Jumanji (1995)', genres='Adventure|Children|Fantasy')

JSON Files

In the previous examples, we’ve been loading data from text files, but datasets are also often saved in JSON format. Spark provides a simple way to operate with JSON files.

movies = spark.read.json("example/movielens.json")

Let’s perform a query on it:

movies.createOrReplaceTempView("movies")nice_movies = spark.sql("select * from movies where rating > 4.9")nice_movies.show(5)
+--------------------+-------+------+----------+--------------------+------+| genres|movieId|rating| timestamp| title|userId|+--------------------+-------+------+----------+--------------------+------+| Drama|Romance| 17| 5.0| 835355681|Sense and Sensibi...| 2|
| Drama|Romance| 28| 5.0| 854714394| Persuasion (1995)| 67|
| Crime|Drama| 30| 5.0| 848161285|Shanghai Triad (Y...| 86|
|Mystery|Sci-Fi|Th...| 32| 5.0|1154465405|Twelve Monkeys (a...| 8|
| Children|Drama| 34| 5.0| 949919556| Babe (1995)| 4|
+--------------------+-------+------+----------+--------------------+------+

And finally, we can save the result of the query as a json file:

nice_movies.write.json("nice_movies")

The respective file you will see in your_spark_folder/nice_movies folder:

Parquet Files

Apache Parquet is a popular column-oriented storage format, which is supported by a wide variety of data processing systems. It is often used with tools in the Hadoop ecosystem and supports all of the data types in Spark SQL. Spark SQL provides methods to read from and write to parquet files.

Let’s save our first DataFrame as Parquet file:

data.write.parquet('movielens.parquet')

After that, we are able to read this file:

parquet = spark.read.parquet('movielens.parquet')

Now we can play with it as with regular DataFrame. Let’s register it as a view and select all of the movies which are neither good nor bad:

parquet.createOrReplaceTempView('parquetlens')just_movies = spark.sql("select title, rating from parquetlens where rating between 2 and 5")just_movies.show(5)
+--------------------+------+| title|rating|+--------------------+------+| Toy Story (1995)| 3.0|
| Jumanji (1995)| 2.0|
|Grumpier Old Men ...| 4.0|
|Waiting to Exhale...| 3.0|
|Father of the Bri...| 4.5|
+--------------------+------+

Conclusions

Today, we’ve briefly discussed how to create DataFrames from CSV, JSON, and parquet files in Spark SQL. From the article, you should have understood some basic manipulations, but there are many other abilities for you to explore. Hope, the article was helpful for you. Study every day and improve yourself. See you soon!

http://www.datasciencecentral.com/xn/detail/6448529:BlogPost:744581