Spark sql partitions

Spark sql partitions DEFAULT

In Apache Spark, shuffle is one of costliest operation. Effective parallelising of this operation gives good performing for spark jobs.

Shuffle Partitions in Spark SQL

Shuffle partitions are the partitions in spark dataframe, which is created using a grouped or join operation. Number of partitions in this dataframe is different than the original dataframe partitions.

For example, the below code

will print 2 for small sales file. This indicates there are two partitions in the dataframe.

Now when we run the groupby operation on the same, the number of partitions will change

The above code prints 200. The 2 partition increased to 200.

This is because the parameter spark.sql.shuffle.partitions which controls number of shuffle partitions is set to 200 by default.

Challenges with Default Shuffle Partitions

The number of shuffle partitions in spark is static. It doesn’t change with different data size. This will lead into below issues

  • For smaller data, 200 is a overkill which often leads to slower processing because of scheduling overheads.

  • For large data, 200 is small and doesn’t effectively use the all resources in the cluster.

To over come the issues mentioned above, we need to control shuffle partitions dynamically.

Dynamically Setting the Shuffle Partitions

Spark allows changing the configuration of spark sql using conf method on the sparkSession. Using this method, we can set wide variety of configurations dynamically.

So if we need to reduce the number of shuffle partitions for a given dataset, we can do that by below code

The above code will print 100. This shows how to set the number partitions dynamically.

The exact logic for coming up with number of shuffle partitions depends on actual analysis. You can typically set it to be 1.5 or 2 times of the initial partitions.

Sours: http://blog.madhukaraphatak.com/dynamic-spark-shuffle-partitions

Spark Partitions

Spark is an engine for parallel processing of data on a cluster. Parallelism in Apache Spark allows developers to perform tasks on hundreds of machines in a cluster in parallel and independently. All thanks to the basic concept in Apache Spark — RDD.

Under the hood, these RDDs are stored in partitions on different cluster nodes. Partition basically is a logical chunk of a large distributed data set. It provides the possibility to distribute the work across the cluster, divide the task into smaller parts, and reduce memory requirements for each node. Partition is the main unit of parallelism in Apache Spark.

Let's talk partitions

Spark Partitions


Note right away that spark partitions ≠ hive partitions. They are both chunks of data, but Spark splits data in order to process it in parallel in memory. Hive partition is in the storage, in the disk, in persistence. From now on, by a partition, I'll be referring to a Spark partition.


Once the user has submitted his job into the cluster, each partition is sent to a specific executor for further processing. Only one partition is processed by one executor at a time, so the size and number of partitions transferred to the executor are directly proportional to the time it takes to complete them. Thus the more partitions the more work is distributed to executors, with a smaller number of partitions the work will be done in larger pieces (and often faster).

You use partitions even if you are unaware of their existence, but you can manipulate them to speed up data processing.

Let's see what's going on under the hood.

Looking at the partition structure, we can see that our data is divided into four partitions (because my laptop has 4 cores and the spark created 4 executables in standalone mode), and if we apply the transformations on this data frame, the work of each partition will be done in a separate thread (and in my case on each individual processor core).

Why the hell would you even think about that?

The most important reason is performance. By having all the data needed to calculate on a single node, we reduce the overhead on the shuffle (the need for serialization and network traffic).

The second reason is the cost reduction — better utilization of the cluster will help to reduce idle resources.

To solve these problems effectively, we need a mechanism to manage partitions.

Repartitioning

The first way to manage partitions is operation.

Under repartitioning meant the operation to reduce or increase the number of partitions in which the data in the cluster will be split. This process involves a full shuffle. Consequently, it is clear that repartitioning is an expensive process. In a typical scenario, most of the data should be serialized, moved, and deserialized.

Example,

We see that the number of partitions increased to 8 and the data were reallocated by partitions.

In addition to specifying the number of partitions directly, you can pass in the name of the column by which you want to partition the data.

Example,

We see that the number of partitions has become 200 and many of these partitions are completely empty. We will discuss this point a little further in the article.

There are lots of cases when this operation will not be very useful and this method is only worth using when you clearly understand how it can speed up your Spark jobs.

Coalesce

The second way to manage partitions is .

This operation reduces the number of partitions and avoids a full shuffle. The executor can safely leave data on a minimum number of partitions, moving data only from redundant nodes. Therefore, it is better to use coalesce than repartition if you need to reduce the number of partitions.

However, you should understand that you can drastically reduce the parallelism of your data processing — is often pushed up further in the chain of transformation and can lead to fewer nodes for your processing than you would like. To avoid this, you can pass . This will add a shuffle step, but it also means that the reshuffled partitions will be using full cluster resources if possible.

On my toy example, you can walk through the remaining partitions and see which data have been moved and which have remained in their places.

Layers of partitions

I would like to highlight the topic of Spark partitions on different layers of data processing. In this post, I will tell you only about one of them, in the next post there will be a follow-up with direct tips.

There are three stages at the physical level where the number of partitions is important. They are input, shuffle, and output.

Each of them is tweaked and managed differently. For example, at input and output, you can control the size of a partition, but at the output, I can also control the number of files or the number of tasks by doing coalesce or repartition. With shuffle, you can control the number of data that will move around the network.

Partitioning on input stage

Let's start by determining the number of partitions based on the size of the input dataset.

Spark can handle input pretty well as well as your data is explodable. There is a clear match between how data is placed in data storage such as HDFS or Cassandra and how Spark split the data while reading.

Let us imagine that the size of our input dataset is about 30 GB (~30000 MB) of an uncompressed text file on the HDFS which is distributing it on 10 nodes.

When Spark reads a file from HDFS, it creates a single partition for a single input split. Input split is set by the Hadoop used to read this file. If you have a 30GB uncompressed text file stored on HDFS, then with the default HDFS block size setting (128MB) and default (128MB) it would be stored in 240 blocks, which means that the dataframe you read from this file would have 240 partitions.

This is equal to the Spark default parallelism () value.

If your data is not explodable then Spark will use the default number of partitions. When a job starts the number of partitions is equal to the total number of cores on all executor nodes.

Shuffle partitioning

Shuffle partitioning

The most painful spot of any Spark pipeline is the wide transformations that require information from other partitions and trigger shuffle. Unfortunately, you can' t get rid of such transformations anyway but you can reduce shuffle impact on performance.

Shuffle partitions are partitions that are used at data shuffle for wide transformations. However, for wide transformations, the number of shuffle partitions is set to 200. It does not matter if your data is small or large, or if your cluster configuration has 20 executors, it is still 200. Yes, yes, that's what we saw in the repartition section, that's the mysterious number.

So the parameter that controls the parallelism that results from a shuffle is a parameter called .The reason why the default is 200 is from real-world experience that was found to be a very good default. But in practice, that value is usually always bad.

When dealing with small amounts of data, you should usually reduce the number of shuffle partitions otherwise you will end up with many partitions with small numbers of entries in each partition, which results in underutilization of all executors and increases the time it takes for data to be transferred over the network from the executor to the executor.

Disk spill

On the other hand, when you have too much data and too few partitions, it causes fewer tasks to be processed in executors, but it increases the load on each individual executor and often leads to memory errors. Also, if you increase the size of the partition larger than the available memory in the executor, you will get disk spills. Spills are the slowest thing you can probably be able to do. Essentially, during disk spills Spark operations place part of its RAM into a disk if it does not fit in memory, allowing Spark job to run well on any sized data. Even though it won't break your Pipeline it makes it super inefficient because of the additional overhead of disk I/O and increased garbage collection.

Therefore is one of the most frequently configured parameters when working with Spark.

Output partitioning

Saving the partied data on the properly selected condition can significantly speed up the reading and retrieval of the necessary data in the future processing pipelines.

First, in some cases it is possible to use partition pruning after partition discovery of DataSource, it limits the number of files and partitions that Spark reads when querying. In some cases (for example AWS s3) it even avoids unnecessary partition discovery. It is also valuable with the concept of Dynamic Partition Pruning in Spark 3.0. But sometimes these optimizations can make things worse, e.g. recursive scanning of the file system for metadata to understand partitions for an initial query can take a long time(in case of a large number of partitions). Also, all table metadata must be materialized into the memory of the driver process and can significantly increase its memory burden.

Second, when saving DataFrame to disk, pay particular attention to partition sizes. During writing Spark will produce one file per task (i.e. one file per partition) and will read at least one file in the task while reading. The problem here is that if the cluster setup, in which DataFrame was saved, had more aggregate memory and thus could process large partitions sizes without any problems, then a smaller cluster may have problems with reading saved DataFrame.

For example, you have a large preprocessing cluster, and a smaller, more cost-effective service cluster. In this situation, the solution will be to repartition DataFrame into more partitions before writing, so that the next cluster will not get choked.

Conclusion

  • if you are increasing the number of partitions use (performing full shuffle),
  • if you are decreasing the number of partitions use  (minimizes shuffles)

Today we talked about partitions at the physical level. In the next post, we will move to a higher level and talk about how we can do partition tuning.

Recommended books


Sours: https://luminousmen.com/post/spark-partitions
  1. Buckeye ornaments
  2. Afrobeat instruments
  3. Repurposed dog crate
  4. Liquid hose
  5. Intel hls

Spark/PySpark partitioning is a way to split the data into multiple partitions so that you can execute transformations on multiple partitions in parallel which allows completing the job faster. You can also write partitioned data into a file system (multiple sub-directories) for faster reads by downstream systems.

Spark has several partitioning methods to achieve parallelism, based on your need, you should choose which one to use.

Spark DataFrame Partitioning Methods (Scala)Key points
repartition(numPartitions : scala.Int)Uses RoundRobinPartitioning
repartition(partitionExprs : Column*)Uses HashPartitioner
repartition(numPartitions : scala.Int, partitionExprs : Column*)partition = hash(partitionExprs) % numPartitions
coalesce(numPartitions : scala.Int)Use only to reduce the number of partitions.
repartitionByRange(partitionExprs : Column*)Uses rangepartitioning. Ideal to use on numeric columns.
repartitionByRange(numPartitions : scala.Int, partitionExprs : Column*)
partitionBy(colNames : _root_.scala.Predef.String*)Use to write the data into sub-folder

Note: is a method from class, all others are from .

1. Understanding Spark Partitioning

  • By default, Spark/PySpark creates partitions that are equal to the number of CPU cores in the machine.
  • Data of each partition resides in a single machine.
  • Spark/PySpark creates a task for each partition.
  • Spark Shuffle operations move the data from one partition to other partitions.
  • Partitioning is an expensive operation as it creates a data shuffle (Data could move between the nodes)
  • By default, DataFrame shuffle operations create 200 partitions.

Spark/PySpark supports partitioning in memory (RDD/DataFrame) and partitioning on the disk (File system).

Partition in memory: You can partition or repartition the DataFrame by calling or transformations.

Partition on disk: While writing the PySpark DataFrame back to disk, you can choose how to partition the data based on columns by using of . This is similar to Hives partitions.

2. Spark Partitioning Advantages

As you are aware Spark is designed to process large datasets 100x faster than traditional processing, this wouldn’t have been possible without partitions. Below are some of the advantages of using Spark partitions on memory or on disk.

  • Fast accessed to the data.
  • Provides the ability to perform an operation on a smaller dataset.

Partitioning at rest (disk) is a feature of many databases and data processing frameworks and it is key to make reads faster.

3. Default Spark Partitions & Configurations

Spark by default partitions data based on a number of factors, and the factors differ were you running your job on and what mode.

3.1 Local mode

When you running on local in standalone mode, Spark partitions data into the number of CPU cores you have on your system or the value you specify at the time of creating object

The above example provides as an argument to method meaning to run the job locally with 5 partitions. Though if you have just 2 cores on your system, it still creates 5 partition tasks.

Above example yields output as 5 partitions.

3.2 HDFS cluster mode

When you running Spark jobs on the Hadoop cluster the default number of partitions is based on the following.

  • On the HDFS cluster, by default, Spark creates one Partition for each block of the file.
  • In Version 1 Hadoop the HDFS block size is 64 MB and in Version 2 Hadoop the HDFS block size is 128 MB
  • Total number of cores on all executor nodes in a cluster or 2, whichever is larger

For example if you have 640 MB file and running it on Hadoop version 2, creates 5 partitions with each consists on 128 MB blocks (5 blocks * 128 MB = 640 MB). If you repartition to 10 then it creates 2 partitions for each block.

3.3 Spark configuration

  •  configuration default value set to the number of all cores on all nodes in a cluster, on local it is set to a number of cores on your system.
  • configuration default value is set to 200 and it is used when you call shuffle operations like , , and many more. This property is available only in DataFrame API but not in RDD.

You can change the values of these properties through programmatically using the below statement.

You can also set the partition value of these configurations using command.

4. Dynamically Changing Spark Partitions

When you create an RDD/DataFrame from a file/table, based on certain parameters Spark creates them with a certain number of partitions and it also provides a way to change the partitions runtime in memory and options to partition based on one or multiple columns while writing to disk.

4.1 repartition() & coalesce()

While working with partition data we often need to increase or decrease the partitions based on data distribution. Methods repartition() and coalesce() helps us to repartition.

You can find the dataset explained in this article at GitHub zipcodes.csv file

Note: When you want to reduce the number of partitions, It is recommended to use PySpark coalesce() over repartition() as it uses fewer resources due to less number of shuffles it takes.

4.2 partitionBy()

Spark partitionBy() is a function of  class which is used to partition based on one or multiple column values while writing DataFrame to Disk/File system.

When you write Spark DataFrame to disk by calling , PySpark splits the records based on the partition column and stores each partition data into a sub-directory.

On our DataFrame, we have a total of 6 different states hence, it creates 6 directories as shown below. The name of the sub-directory would be the partition column and its value (partition column=value).

spark partitioning

4.3 partitionBy() Multiple Columns

You can also create partitions on multiple columns using Spark . Just pass columns you want to partition as arguments to this method.

It creates a folder hierarchy for each partition; we have mentioned the first partition as  followed by  hence, it creates a  folder inside the  folder (one folder for each  in a ).

pyspark partitioning

4.4 repartitionByRange() – Range Partition

Below is a range partition example using transformation.

Let’s check data for one partition.

Above, all data for are in one partition.

5. How to Choose Spark Partition Column?

When using partitionBy(), you have to be very cautious with the number of partitions it creates, as having too many partitions creates too many sub-directories in a directory which brings unnecessarily and overhead to NameNode (if you are using Hadoop) since it must keep all metadata for the file system in memory.

Let’s assume you have a US census table that contains zip code, city, state, and other columns. Creating a partition on the state, splits the table into around 50 partitions, when searching for a zipcode within a state (state=’CA’ and zipCode =’92704′) results in faster as it needs to scan only in a state=CA partition directory.

Partition on zipcode may not be a good option as you might end up with too many partitions.

Another good example of partition is on the Date column. Ideally, you should partition on Year/Month but not on a date.

6. Too Many Partitions Good?

  • If you are a beginner, you would think too many partitions will boost the Spark Job Performance actually, it won’t and it’s overkill.
  • Spark has to create one task per partition and most of the time goes into creating, scheduling, and managing the tasks then executing.

7. Too Few Partitions Good?

  • Too few partitions are not good as well, as you may not fully utilize your cluster resources.
  • Less parallelism
  • Applications may run longer as each partition takes more time to complete.

Conclusion

In this article, you have learned what is Spark/PySpark partitioning, different ways to do the partitioning, how to create dynamic partitions, and examples of how to do partitions.

Hope you like it. Happy Learning !!

Related Articles

References

Tags: pyspark partition,pyspark partitioning,spark partition,spark partitioning

NNK

SparkByExamples.com is a Big Data and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment Read more ..

Spark Partitioning & Partition Understanding
Sours: https://sparkbyexamples.com/spark/spark-partitioning-understanding/
Basics of Apache Spark - Shuffle Partition [200] - learntospark

Generally speaking, partitions are subsets of a file in memory or storage. However, Spark partitions have more usages than a subset compared to the SQL database or HIVE system. Spark will use the partitions to parallel run the jobs to gain maximum performance. While we operate Spark DataFrame, there are majorly three places Spark uses partitions which are input, output, and shuffle.

Input and output partitions could be easier to control by setting the maxPartitionBytes, coalesce to shrink, repartition to increasing partitions, or even set maxRecordsPerFile, but shuffle partition which default number is 200 does not fit the usage scenarios most of the time. This blog will introduce the general ideas about how to set up the right shuffle partition number and the impact of shuffle partitions on Spark jobs.

Key points for optimizing performance with the shuffle partition technique

  1. Each partition size should be smaller than 200 MB to gain optimized performance.
  2. Usually, the number of partitions should be 1x to 4x of the number of cores you have to gain optimized performance (which means create a cluster that matches your data scale is also important).

Best practices for common scenarios

  • The limited size of cluster working with small DataFrame: set the number of shuffle partitions to 1x or 2x the number of cores you have. (each partition should less than 200 mb to gain better performance)
    • e.g. input size: 2 GB with 20 cores, set shuffle partitions to 20 or 40
  • The limited size of clusters, but working with huge DataFrame: set the number of shuffle partitions to Input Data Size / Partition Size (<= 200mb per partition), even better to be the multiple of the number of cores you have
    • e.g. input size: 20 GB with 40 cores, set shuffle partitions to 120 or 160 (3x to 4x of the cores & makes each partition less than 200 mb)
  • Powerful clusters which have more number of cores than the number calculated above: set the number of shuffle partitions to 1x or 2x the number of cores
    • e.g. input size: 80 GB with 400 cores, set shuffle partitions to 400 or 800.

Here is an example of how to improve the performance by simply changing the number of partitions on small DataFrame working with a limited size of cluster (8 cores total).

Sours: https://nealanalytics.com/blog/databricks-spark-jobs-optimization-techniques-shuffle-partition-technique-part-1/

Partitions spark sql

How to Efficiently Re-Partition Spark DataFrames

Spark Partitioning in a nutshell

In orderto achieve high parallelism, Spark will split the data into smaller chunks called partitions which are distributed across different nodes in the Spark Cluster. Every node, can have more than one executor each of which can execute a task.

The distribution of the work into multiple executors requires data to be partitioned and distributed across the executors, so that the work can be done in parallel in order to optimise the data processing for a specific job.

How to get current number of partitions

Before jumping into re-partitioning, it is worth describing the way one can use to get the current number of partitions of a Spark DataFrame. As an example, let’s assume that we have the following minimal Spark DataFrame

In order to get the number of partitions of the above dataframe, all we have to do is run the following

Note that the output is dependent to your current setup and configuration so you might see a different output.

How to increase the number of partitions

If you want to increase the partitions of your DataFrame, all you need to run is the function.

Returns a new partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.

The code below will increase the number of partitions to 1000:

How to decrease the number of partitions

Now if you want to repartition your Spark DataFrame so that it has fewer partitions, you can still use however, there’s a more efficient way to do so.

results in a narrow dependency, which means that when used for reducing the number of partitions, there will be no shuffle, which is probably one of the most costly operations in Spark.

Returns a new that has exactly N partitions.

In the example below we limit our partitions to 100. The Spark DataFrame that originally has 1000 partitions, will be repartitioned to 100 partitions without shuffling. By no shuffling we mean that each the 100 new partitions will be assigned to 10 existing partitions. Therefore, it is way more efficient to call when one wants to reduce the number of partitions of a Spark DataFrame.

Conclusion

In this article we discussed how data processing is optimised through partitions that allow the work to be distributed across the executors of a Spark Cluster. Additionally, we also explored the two possible ways one can use in order to increase or decrease the number of partitions in DataFrames.

can be used for increasing or decreasing the number of partitions of a Spark DataFrame. However, involves shuffling which is a costly operation.

On the other hand, can be used when we want to reduce the number of partitions as this is more efficient due to the fact that this method won’t trigger data shuffling across the nodes of the Spark Cluster.

Sours: https://towardsdatascience.com/how-to-efficiently-re-partition-spark-dataframes-c036e8261418
Number of Partitions in Dataframe - Spark Tutorial - Interview Question

Spark SQL, DataFrames and Datasets Guide

Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations. There are several ways to interact with Spark SQL including SQL and the Dataset API. When computing a result the same execution engine is used, independent of which API/language you are using to express the computation. This unification means that developers can easily switch back and forth between different APIs based on which provides the most natural way to express a given transformation.

All of the examples on this page use sample data included in the Spark distribution and can be run in the , shell, or shell.

SQL

One use of Spark SQL is to execute SQL queries. Spark SQL can also be used to read data from an existing Hive installation. For more on how to configure this feature, please refer to the Hive Tables section. When running SQL from within another programming language the results will be returned as a Dataset/DataFrame. You can also interact with the SQL interface using the command-line or over JDBC/ODBC.

Datasets and DataFrames

A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (, , , etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally ). The case for R is similar.

A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of s. In the Scala API, is simply a type alias of . While, in Java API, users need to use to represent a .

Throughout this document, we will often refer to Scala/Java Datasets of s as DataFrames.

Starting Point: SparkSession

The entry point into all functionality in Spark is the class. To create a basic , just use :

importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option","some-value").getOrCreate()// For implicit conversions like converting RDDs to DataFramesimportspark.implicits._

Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

The entry point into all functionality in Spark is the class. To create a basic , just use :

importorg.apache.spark.sql.SparkSession;SparkSessionspark=SparkSession.builder().appName("Java Spark SQL basic example").config("spark.some.config.option","some-value").getOrCreate();

Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.

The entry point into all functionality in Spark is the class. To create a basic , just use :

frompyspark.sqlimportSparkSessionspark=SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option","some-value") \ .getOrCreate()

Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.

The entry point into all functionality in Spark is the class. To initialize a basic , just call :

sparkR.session(appName ="R Spark SQL basic example", sparkConfig =list(spark.some.config.option ="some-value"))

Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.

Note that when invoked for the first time, initializes a global singleton instance, and always returns a reference to this instance for successive invocations. In this way, users only need to initialize the once, then SparkR functions like will be able to access this global instance implicitly, and users don’t need to pass the instance around.

in Spark 2.0 provides builtin support for Hive features including the ability to write queries using HiveQL, access to Hive UDFs, and the ability to read data from Hive tables. To use these features, you do not need to have an existing Hive setup.

Creating DataFrames

With a , applications can create DataFrames from an existing , from a Hive table, or from Spark data sources.

As an example, the following creates a DataFrame based on the content of a JSON file:

valdf=spark.read.json("examples/src/main/resources/people.json")// Displays the content of the DataFrame to stdoutdf.show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+

Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

With a , applications can create DataFrames from an existing , from a Hive table, or from Spark data sources.

As an example, the following creates a DataFrame based on the content of a JSON file:

importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Row;Dataset<Row>df=spark.read().json("examples/src/main/resources/people.json");// Displays the content of the DataFrame to stdoutdf.show();// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+

Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.

With a , applications can create DataFrames from an existing , from a Hive table, or from Spark data sources.

As an example, the following creates a DataFrame based on the content of a JSON file:

# spark is an existing SparkSessiondf=spark.read.json("examples/src/main/resources/people.json")# Displays the content of the DataFrame to stdoutdf.show()# +----+-------+# | age| name|# +----+-------+# |null|Michael|# | 30| Andy|# | 19| Justin|# +----+-------+

Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.

With a , applications can create DataFrames from a local R data.frame, from a Hive table, or from Spark data sources.

As an example, the following creates a DataFrame based on the content of a JSON file:

df <- read.json("examples/src/main/resources/people.json")# Displays the content of the DataFramehead(df)## age name## 1 NA Michael## 2 30 Andy## 3 19 Justin# Another method to print the first few rows and optionally truncate the printing of long values showDF(df)## +----+-------+## | age| name|## +----+-------+## |null|Michael|## | 30| Andy|## | 19| Justin|## +----+-------+

Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.

Untyped Dataset Operations (aka DataFrame Operations)

DataFrames provide a domain-specific language for structured data manipulation in Scala, Java, Python and R.

As mentioned above, in Spark 2.0, DataFrames are just Dataset of s in Scala and Java API. These operations are also referred as “untyped transformations” in contrast to “typed transformations” come with strongly typed Scala/Java Datasets.

Here we include some basic examples of structured data processing using Datasets:

// This import is needed to use the $-notationimportspark.implicits._// Print the schema in a tree formatdf.printSchema()// root// |-- age: long (nullable = true)// |-- name: string (nullable = true)// Select only the "name" columndf.select("name").show()// +-------+// | name|// +-------+// |Michael|// | Andy|// | Justin|// +-------+// Select everybody, but increment the age by 1df.select($"name",$"age"+1).show()// +-------+---------+// | name|(age + 1)|// +-------+---------+// |Michael| null|// | Andy| 31|// | Justin| 20|// +-------+---------+// Select people older than 21df.filter($"age">21).show()// +---+----+// |age|name|// +---+----+// | 30|Andy|// +---+----+// Count people by agedf.groupBy("age").count().show()// +----+-----+// | age|count|// +----+-----+// | 19| 1|// |null| 1|// | 30| 1|// +----+-----+

Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

For a complete list of the types of operations that can be performed on a Dataset refer to the API Documentation.

In addition to simple column references and expressions, Datasets also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the DataFrame Function Reference.

// col("...") is preferable to df.col("...")import staticorg.apache.spark.sql.functions.col;// Print the schema in a tree formatdf.printSchema();// root// |-- age: long (nullable = true)// |-- name: string (nullable = true)// Select only the "name" columndf.select("name").show();// +-------+// | name|// +-------+// |Michael|// | Andy|// | Justin|// +-------+// Select everybody, but increment the age by 1df.select(col("name"),col("age").plus(1)).show();// +-------+---------+// | name|(age + 1)|// +-------+---------+// |Michael| null|// | Andy| 31|// | Justin| 20|// +-------+---------+// Select people older than 21df.filter(col("age").gt(21)).show();// +---+----+// |age|name|// +---+----+// | 30|Andy|// +---+----+// Count people by agedf.groupBy("age").count().show();// +----+-----+// | age|count|// +----+-----+// | 19| 1|// |null| 1|// | 30| 1|// +----+-----+

Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.

For a complete list of the types of operations that can be performed on a Dataset refer to the API Documentation.

In addition to simple column references and expressions, Datasets also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the DataFrame Function Reference.

In Python it’s possible to access a DataFrame’s columns either by attribute () or by indexing (). While the former is convenient for interactive data exploration, users are highly encouraged to use the latter form, which is future proof and won’t break with column names that are also attributes on the DataFrame class.

# spark, df are from the previous example# Print the schema in a tree formatdf.printSchema()# root# |-- age: long (nullable = true)# |-- name: string (nullable = true)# Select only the "name" columndf.select("name").show()# +-------+# | name|# +-------+# |Michael|# | Andy|# | Justin|# +-------+# Select everybody, but increment the age by 1df.select(df['name'],df['age']+1).show()# +-------+---------+# | name|(age + 1)|# +-------+---------+# |Michael| null|# | Andy| 31|# | Justin| 20|# +-------+---------+# Select people older than 21df.filter(df['age']>21).show()# +---+----+# |age|name|# +---+----+# | 30|Andy|# +---+----+# Count people by agedf.groupBy("age").count().show()# +----+-----+# | age|count|# +----+-----+# | 19| 1|# |null| 1|# | 30| 1|# +----+-----+

Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.

For a complete list of the types of operations that can be performed on a DataFrame refer to the API Documentation.

In addition to simple column references and expressions, DataFrames also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the DataFrame Function Reference.

# Create the DataFrame df <- read.json("examples/src/main/resources/people.json")# Show the content of the DataFramehead(df)## age name## 1 NA Michael## 2 30 Andy## 3 19 Justin# Print the schema in a tree format printSchema(df)## root## |-- age: long (nullable = true)## |-- name: string (nullable = true)# Select only the "name" columnhead(select(df,"name"))## name## 1 Michael## 2 Andy## 3 Justin# Select everybody, but increment the age by 1head(select(df, df$name, df$age +1))## name (age + 1.0)## 1 Michael NA## 2 Andy 31## 3 Justin 20# Select people older than 21head(where(df, df$age >21))## age name## 1 30 Andy# Count people by agehead(count(groupBy(df,"age")))## age count## 1 19 1## 2 NA 1## 3 30 1

Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.

For a complete list of the types of operations that can be performed on a DataFrame refer to the API Documentation.

In addition to simple column references and expressions, DataFrames also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the DataFrame Function Reference.

Running SQL Queries Programmatically

The function on a enables applications to run SQL queries programmatically and returns the result as a .

// Register the DataFrame as a SQL temporary viewdf.createOrReplaceTempView("people")valsqlDF=spark.sql("SELECT * FROM people")sqlDF.show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+

Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

The function on a enables applications to run SQL queries programmatically and returns the result as a .

importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Row;// Register the DataFrame as a SQL temporary viewdf.createOrReplaceTempView("people");Dataset<Row>sqlDF=spark.sql("SELECT * FROM people");sqlDF.show();// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+

Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.

The function on a enables applications to run SQL queries programmatically and returns the result as a .

# Register the DataFrame as a SQL temporary viewdf.createOrReplaceTempView("people")sqlDF=spark.sql("SELECT * FROM people")sqlDF.show()# +----+-------+# | age| name|# +----+-------+# |null|Michael|# | 30| Andy|# | 19| Justin|# +----+-------+

Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.

The function enables applications to run SQL queries programmatically and returns the result as a .

df <- sql("SELECT * FROM table")

Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.

Global Temporary View

Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it terminates. If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view. Global temporary view is tied to a system preserved database , and we must use the qualified name to refer it, e.g. .

// Register the DataFrame as a global temporary viewdf.createGlobalTempView("people")// Global temporary view is tied to a system preserved database `global_temp`spark.sql("SELECT * FROM global_temp.people").show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+// Global temporary view is cross-sessionspark.newSession().sql("SELECT * FROM global_temp.people").show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+

Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

// Register the DataFrame as a global temporary viewdf.createGlobalTempView("people");// Global temporary view is tied to a system preserved database `global_temp`spark.sql("SELECT * FROM global_temp.people").show();// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+// Global temporary view is cross-sessionspark.newSession().sql("SELECT * FROM global_temp.people").show();// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+

Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.

# Register the DataFrame as a global temporary viewdf.createGlobalTempView("people")# Global temporary view is tied to a system preserved database `global_temp`spark.sql("SELECT * FROM global_temp.people").show()# +----+-------+# | age| name|# +----+-------+# |null|Michael|# | 30| Andy|# | 19| Justin|# +----+-------+# Global temporary view is cross-sessionspark.newSession().sql("SELECT * FROM global_temp.people").show()# +----+-------+# | age| name|# +----+-------+# |null|Michael|# | 30| Andy|# | 19| Justin|# +----+-------+

Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.

Creating Datasets

Datasets are similar to RDDs, however, instead of using Java serialization or Kryo they use a specialized Encoder to serialize the objects for processing or transmitting over the network. While both encoders and standard serialization are responsible for turning an object into bytes, encoders are code generated dynamically and use a format that allows Spark to perform many operations like filtering, sorting and hashing without deserializing the bytes back into an object.

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,// you can use custom classes that implement the Product interfacecaseclassPerson(name:String,age:Long)// Encoders are created for case classesvalcaseClassDS=Seq(Person("Andy",32)).toDS()caseClassDS.show()// +----+---+// |name|age|// +----+---+// |Andy| 32|// +----+---+// Encoders for most common types are automatically provided by importing spark.implicits._valprimitiveDS=Seq(1,2,3).toDS()primitiveDS.map(_+1).collect()// Returns: Array(2, 3, 4)// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by namevalpath="examples/src/main/resources/people.json"valpeopleDS=spark.read.json(path).as[Person]peopleDS.show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+

Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

importjava.util.Arrays;importjava.util.Collections;importjava.io.Serializable;importorg.apache.spark.api.java.function.MapFunction;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.Encoder;importorg.apache.spark.sql.Encoders;publicstaticclassPersonimplementsSerializable{privateStringname;privateintage;publicStringgetName(){returnname;}publicvoidsetName(Stringname){this.name=name;}publicintgetAge(){returnage;}publicvoidsetAge(intage){this.age=age;}}// Create an instance of a Bean classPersonperson=newPerson();person.setName("Andy");person.setAge(32);// Encoders are created for Java beansEncoder<Person>personEncoder=Encoders.bean(Person.class);Dataset<Person>javaBeanDS=spark.createDataset(Collections.singletonList(person),personEncoder);javaBeanDS.show();// +---+----+// |age|name|// +---+----+// | 32|Andy|// +---+----+// Encoders for most common types are provided in class EncodersEncoder<Integer>integerEncoder=Encoders.INT();Dataset<Integer>primitiveDS=spark.createDataset(Arrays.asList(1,2,3),integerEncoder);Dataset<Integer>transformedDS=primitiveDS.map((MapFunction<Integer,Integer>)value->value+1,integerEncoder);transformedDS.collect();// Returns [2, 3, 4]// DataFrames can be converted to a Dataset by providing a class. Mapping based on nameStringpath="examples/src/main/resources/people.json";Dataset<Person>peopleDS=spark.read().json(path).as(personEncoder);peopleDS.show();// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+

Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.

Interoperating with RDDs

Spark SQL supports two different methods for converting existing RDDs into Datasets. The first method uses reflection to infer the schema of an RDD that contains specific types of objects. This reflection based approach leads to more concise code and works well when you already know the schema while writing your Spark application.

The second method for creating Datasets is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows you to construct Datasets when the columns and their types are not known until runtime.

Inferring the Schema Using Reflection

The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex types such as s or s. This RDD can be implicitly converted to a DataFrame and then be registered as a table. Tables can be used in subsequent SQL statements.

// For implicit conversions from RDDs to DataFramesimportspark.implicits._// Create an RDD of Person objects from a text file, convert it to a DataframevalpeopleDF=spark.sparkContext.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(attributes=>Person(attributes(0),attributes(1).trim.toInt)).toDF()// Register the DataFrame as a temporary viewpeopleDF.createOrReplaceTempView("people")// SQL statements can be run by using the sql methods provided by SparkvalteenagersDF=spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")// The columns of a row in the result can be accessed by field indexteenagersDF.map(teenager=>"Name: "+teenager(0)).show()// +------------+// | value|// +------------+// |Name: Justin|// +------------+// or by field nameteenagersDF.map(teenager=>"Name: "+teenager.getAs[String]("name")).show()// +------------+// | value|// +------------+// |Name: Justin|// +------------+// No pre-defined encoders for Dataset[Map[K,V]], define explicitlyimplicitvalmapEncoder=org.apache.spark.sql.Encoders.kryo[Map[String, Any]]// Primitive types and case classes can be also defined as// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]teenagersDF.map(teenager=>teenager.getValuesMap[Any](List("name","age"))).collect()// Array(Map("name" -> "Justin", "age" -> 19))

Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

Spark SQL supports automatically converting an RDD of JavaBeans into a DataFrame. The , obtained using reflection, defines the schema of the table. Currently, Spark SQL does not support JavaBeans that contain field(s). Nested JavaBeans and or fields are supported though. You can create a JavaBean by creating a class that implements Serializable and has getters and setters for all of its fields.

importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.function.Function;importorg.apache.spark.api.java.function.MapFunction;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.Encoder;importorg.apache.spark.sql.Encoders;// Create an RDD of Person objects from a text fileJavaRDD<Person>peopleRDD=spark.read().textFile("examples/src/main/resources/people.txt").javaRDD().map(line->{String[]parts=line.split(",");Personperson=newPerson();person.setName(parts[0]);person.setAge(Integer.parseInt(parts[1].trim()));returnperson;});// Apply a schema to an RDD of JavaBeans to get a DataFrameDataset<Row>peopleDF=spark.createDataFrame(peopleRDD,Person.class);// Register the DataFrame as a temporary viewpeopleDF.createOrReplaceTempView("people");// SQL statements can be run by using the sql methods provided by sparkDataset<Row>teenagersDF=spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");// The columns of a row in the result can be accessed by field indexEncoder<String>stringEncoder=Encoders.STRING();Dataset<String>teenagerNamesByIndexDF=teenagersDF.map((MapFunction<Row,String>)row->"Name: "+row.getString(0),stringEncoder);teenagerNamesByIndexDF.show();// +------------+// | value|// +------------+// |Name: Justin|// +------------+// or by field nameDataset<String>teenagerNamesByFieldDF=teenagersDF.map((MapFunction<Row,String>)row->"Name: "+row.<String>getAs("name"),stringEncoder);teenagerNamesByFieldDF.show();// +------------+// | value|// +------------+// |Name: Justin|// +------------+

Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.

Spark SQL can convert an RDD of Row objects to a DataFrame, inferring the datatypes. Rows are constructed by passing a list of key/value pairs as kwargs to the Row class. The keys of this list define the column names of the table, and the types are inferred by sampling the whole dataset, similar to the inference that is performed on JSON files.

frompyspark.sqlimportRowsc=spark.sparkContext# Load a text file and convert each line to a Row.lines=sc.textFile("examples/src/main/resources/people.txt")parts=lines.map(lambdal:l.split(","))people=parts.map(lambdap:Row(name=p[0],age=int(p[1])))# Infer the schema, and register the DataFrame as a table.schemaPeople=spark.createDataFrame(people)schemaPeople.createOrReplaceTempView("people")# SQL can be run over DataFrames that have been registered as a table.teenagers=spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")# The results of SQL queries are Dataframe objects.# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.teenNames=teenagers.rdd.map(lambdap:"Name: "+p.name).collect()fornameinteenNames:print(name)# Name: Justin

Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.

Programmatically Specifying the Schema

When case classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a can be created programmatically with three steps.

  1. Create an RDD of s from the original RDD;
  2. Create the schema represented by a matching the structure of s in the RDD created in Step 1.
  3. Apply the schema to the RDD of s via method provided by .

For example:

importorg.apache.spark.sql.types._// Create an RDDvalpeopleRDD=spark.sparkContext.textFile("examples/src/main/resources/people.txt")// The schema is encoded in a stringvalschemaString="name age"// Generate the schema based on the string of schemavalfields=schemaString.split(" ").map(fieldName=>StructField(fieldName,StringType,nullable=true))valschema=StructType(fields)// Convert records of the RDD (people) to RowsvalrowRDD=peopleRDD.map(_.split(",")).map(attributes=>Row(attributes(0),attributes(1).trim))// Apply the schema to the RDDvalpeopleDF=spark.createDataFrame(rowRDD,schema)// Creates a temporary view using the DataFramepeopleDF.createOrReplaceTempView("people")// SQL can be run over a temporary view created using DataFramesvalresults=spark.sql("SELECT name FROM people")// The results of SQL queries are DataFrames and support all the normal RDD operations// The columns of a row in the result can be accessed by field index or by field nameresults.map(attributes=>"Name: "+attributes(0)).show()// +-------------+// | value|// +-------------+// |Name: Michael|// | Name: Andy|// | Name: Justin|// +-------------+

Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

When JavaBean classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a can be created programmatically with three steps.

  1. Create an RDD of s from the original RDD;
  2. Create the schema represented by a matching the structure of s in the RDD created in Step 1.
  3. Apply the schema to the RDD of s via method provided by .

For example:

importjava.util.ArrayList;importjava.util.List;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.function.Function;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.types.DataTypes;importorg.apache.spark.sql.types.StructField;importorg.apache.spark.sql.types.StructType;// Create an RDDJavaRDD<String>peopleRDD=spark.sparkContext().textFile("examples/src/main/resources/people.txt",1).toJavaRDD();// The schema is encoded in a stringStringschemaString="name age";// Generate the schema based on the string of schemaList<StructField>fields=newArrayList<>();for(StringfieldName:schemaString.split(" ")){StructFieldfield=DataTypes.createStructField(fieldName,DataTypes.StringType,true);fields.add(field);}StructTypeschema=DataTypes.createStructType(fields);// Convert records of the RDD (people) to RowsJavaRDD<Row>rowRDD=peopleRDD.map((Function<String,Row>)record->{String[]attributes=record.split(",");returnRowFactory.create(attributes[0],attributes[1].trim());});// Apply the schema to the RDDDataset<Row>peopleDataFrame=spark.createDataFrame(rowRDD,schema);// Creates a temporary view using the DataFramepeopleDataFrame.createOrReplaceTempView("people");// SQL can be run over a temporary view created using DataFramesDataset<Row>results=spark.sql("SELECT name FROM people");// The results of SQL queries are DataFrames and support all the normal RDD operations// The columns of a row in the result can be accessed by field index or by field nameDataset<String>namesDS=results.map((MapFunction<Row,String>)row->"Name: "+row.getString(0),Encoders.STRING());namesDS.show();// +-------------+// | value|// +-------------+// |Name: Michael|// | Name: Andy|// | Name: Justin|// +-------------+

Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.

When a dictionary of kwargs cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a can be created programmatically with three steps.

  1. Create an RDD of tuples or lists from the original RDD;
  2. Create the schema represented by a matching the structure of tuples or lists in the RDD created in the step 1.
  3. Apply the schema to the RDD via method provided by .

For example:

# Import data typesfrompyspark.sql.typesimport*sc=spark.sparkContext# Load a text file and convert each line to a Row.lines=sc.textFile("examples/src/main/resources/people.txt")parts=lines.map(lambdal:l.split(","))# Each line is converted to a tuple.people=parts.map(lambdap:(p[0],p[1].strip()))# The schema is encoded in a string.schemaString="name age"fields=[StructField(field_name,StringType(),True)forfield_nameinschemaString.split()]schema=StructType(fields)# Apply the schema to the RDD.schemaPeople=spark.createDataFrame(people,schema)# Creates a temporary view using the DataFrameschemaPeople.createOrReplaceTempView("people")# SQL can be run over DataFrames that have been registered as a table.results=spark.sql("SELECT name FROM people")results.show()# +-------+# | name|# +-------+# |Michael|# | Andy|# | Justin|# +-------+

Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.

Aggregations

The built-in DataFrames functions provide common aggregations such as , , , , , etc. While those functions are designed for DataFrames, Spark SQL also has type-safe versions for some of them in Scala and Java to work with strongly typed Datasets. Moreover, users are not limited to the predefined aggregate functions and can create their own.

Untyped User-Defined Aggregate Functions

Users have to extend the UserDefinedAggregateFunction abstract class to implement a custom untyped aggregate function. For example, a user-defined average can look like:

importorg.apache.spark.sql.expressions.MutableAggregationBufferimportorg.apache.spark.sql.expressions.UserDefinedAggregateFunctionimportorg.apache.spark.sql.types._importorg.apache.spark.sql.Rowimportorg.apache.spark.sql.SparkSessionobjectMyAverageextendsUserDefinedAggregateFunction{// Data types of input arguments of this aggregate functiondefinputSchema:StructType=StructType(StructField("inputColumn",LongType)::Nil)// Data types of values in the aggregation bufferdefbufferSchema:StructType={StructType(StructField("sum",LongType)::StructField("count",LongType)::Nil)}// The data type of the returned valuedefdataType:DataType=DoubleType// Whether this function always returns the same output on the identical inputdefdeterministic:Boolean=true// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides// the opportunity to update its values. Note that arrays and maps inside the buffer are still// immutable.definitialize(buffer:MutableAggregationBuffer):Unit={buffer(0)=0Lbuffer(1)=0L}// Updates the given aggregation buffer `buffer` with new input data from `input`defupdate(buffer:MutableAggregationBuffer,input:Row):Unit={if(!input.isNullAt(0)){buffer(0)=buffer.getLong(0)+input.getLong(0)buffer(1)=buffer.getLong(1)+1}}// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`defmerge(buffer1:MutableAggregationBuffer,buffer2:Row):Unit={buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0)buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)}// Calculates the final resultdefevaluate(buffer:Row):Double=buffer.getLong(0).toDouble/buffer.getLong(1)}// Register the function to access itspark.udf.register("myAverage",MyAverage)valdf=spark.read.json("examples/src/main/resources/employees.json")df.createOrReplaceTempView("employees")df.show()// +-------+------+// | name|salary|// +-------+------+// |Michael| 3000|// | Andy| 4500|// | Justin| 3500|// | Berta| 4000|// +-------+------+valresult=spark.sql("SELECT myAverage(salary) as average_salary FROM employees")result.show()// +--------------+// |average_salary|// +--------------+// | 3750.0|// +--------------+

Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala" in the Spark repo.

importjava.util.ArrayList;importjava.util.List;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.SparkSession;importorg.apache.spark.sql.expressions.MutableAggregationBuffer;importorg.apache.spark.sql.expressions.UserDefinedAggregateFunction;importorg.apache.spark.sql.types.DataType;importorg.apache.spark.sql.types.DataTypes;importorg.apache.spark.sql.types.StructField;importorg.apache.spark.sql.types.StructType;publicstaticclassMyAverageextendsUserDefinedAggregateFunction{privateStructTypeinputSchema;privateStructTypebufferSchema;publicMyAverage(){List<StructField>inputFields=newArrayList<>();inputFields.add(DataTypes.createStructField("inputColumn",DataTypes.LongType,true));inputSchema=DataTypes.createStructType(inputFields);List<StructField>bufferFields=newArrayList<>();bufferFields.add(DataTypes.createStructField("sum",DataTypes.LongType,true));bufferFields.add(DataTypes.createStructField("count",DataTypes.LongType,true));bufferSchema=DataTypes.createStructType(bufferFields);}// Data types of input arguments of this aggregate functionpublicStructTypeinputSchema(){returninputSchema;}// Data types of values in the aggregation bufferpublicStructTypebufferSchema(){returnbufferSchema;}// The data type of the returned valuepublicDataTypedataType(){returnDataTypes.DoubleType;}// Whether this function always returns the same output on the identical inputpublicbooleandeterministic(){returntrue;}// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides// the opportunity to update its values. Note that arrays and maps inside the buffer are still// immutable.publicvoidinitialize(MutableAggregationBufferbuffer){buffer.update(0,0L);buffer.update(1,0L);}// Updates the given aggregation buffer `buffer` with new input data from `input`publicvoidupdate(MutableAggregationBufferbuffer,Rowinput){if(!input.isNullAt(0)){longupdatedSum=buffer.getLong(0)+input.getLong(0);longupdatedCount=buffer.getLong(1)+1;buffer.update(0,updatedSum);buffer.update(1,updatedCount);}}// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`publicvoidmerge(MutableAggregationBufferbuffer1,Rowbuffer2){longmergedSum=buffer1.getLong(0)+buffer2.getLong(0);longmergedCount=buffer1.getLong(1)+buffer2.getLong(1);buffer1.update(0,mergedSum);buffer1.update(1,mergedCount);}// Calculates the final resultpublicDoubleevaluate(Rowbuffer){return((double)buffer.getLong(0))/buffer.getLong(1);}}// Register the function to access itspark.udf().register("myAverage",newMyAverage());Dataset<Row>df=spark.read().json("examples/src/main/resources/employees.json");df.createOrReplaceTempView("employees");df.show();// +-------+------+// | name|salary|// +-------+------+// |Michael| 3000|// | Andy| 4500|// | Justin| 3500|// | Berta| 4000|// +-------+------+Dataset<Row>result=spark.sql("SELECT myAverage(salary) as average_salary FROM employees");result.show();// +--------------+// |average_salary|// +--------------+// | 3750.0|// +--------------+

Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedUntypedAggregation.java" in the Spark repo.

Type-Safe User-Defined Aggregate Functions

User-defined aggregations for strongly typed Datasets revolve around the Aggregator abstract class. For example, a type-safe user-defined average can look like:

importorg.apache.spark.sql.expressions.Aggregatorimportorg.apache.spark.sql.Encoderimportorg.apache.spark.sql.Encodersimportorg.apache.spark.sql.SparkSessioncaseclassEmployee(name:String,salary:Long)caseclassAverage(varsum:Long,varcount:Long)objectMyAverageextendsAggregator[Employee, Average, Double]{// A zero value for this aggregation. Should satisfy the property that any b + zero = bdefzero:Average=Average(0L,0L)// Combine two values to produce a new value. For performance, the function may modify `buffer`// and return it instead of constructing a new objectdefreduce(buffer:Average,employee:Employee):Average={buffer.sum+=employee.salarybuffer.count+=1buffer}// Merge two intermediate valuesdefmerge(b1:Average,b2:Average):Average={b1.sum+=b2.sumb1.count+=b2.countb1}// Transform the output of the reductiondeffinish(reduction:Average):Double=reduction.sum.toDouble/reduction.count// Specifies the Encoder for the intermediate value typedefbufferEncoder:Encoder[Average]=Encoders.product// Specifies the Encoder for the final output value typedefoutputEncoder:Encoder[Double]=Encoders.scalaDouble}valds=spark.read.json("examples/src/main/resources/employees.json").as[Employee]ds.show()// +-------+------+// | name|salary|// +-------+------+// |Michael| 3000|// | Andy| 4500|// | Justin| 3500|// | Berta| 4000|// +-------+------+// Convert the function to a `TypedColumn` and give it a namevalaverageSalary=MyAverage.toColumn.name("average_salary")valresult=ds.select(averageSalary)result.show()// +--------------+// |average_salary|// +--------------+// | 3750.0|// +--------------+

Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala" in the Spark repo.

importjava.io.Serializable;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Encoder;importorg.apache.spark.sql.Encoders;importorg.apache.spark.sql.SparkSession;importorg.apache.spark.sql.TypedColumn;importorg.apache.spark.sql.expressions.Aggregator;publicstaticclassEmployeeimplementsSerializable{privateStringname;privatelongsalary;// Constructors, getters, setters...}publicstaticclassAverageimplementsSerializable{privatelongsum;privatelongcount;// Constructors, getters, setters...}publicstaticclassMyAverageextendsAggregator<Employee,Average,Double>{// A zero value for this aggregation. Should satisfy the property that any b + zero = bpublicAveragezero(){returnnewAverage(0L,0L);}// Combine two values to produce a new value. For performance, the function may modify `buffer`// and return it instead of constructing a new objectpublicAveragereduce(Averagebuffer,Employeeemployee){longnewSum=buffer.getSum()+employee.getSalary();longnewCount=buffer.getCount()+1;buffer.setSum(newSum);buffer.setCount(newCount);returnbuffer;}// Merge two intermediate valuespublicAveragemerge(Averageb1,Averageb2){longmergedSum=b1.getSum()+b2.getSum();longmergedCount=b1.getCount()+b2.getCount();b1.setSum(mergedSum);b1.setCount(mergedCount);returnb1;}// Transform the output of the reductionpublicDoublefinish(Averagereduction){return((double)reduction.getSum())/reduction.getCount();}// Specifies the Encoder for the intermediate value typepublicEncoder<Average>bufferEncoder(){returnEncoders.bean(Average.class);}// Specifies the Encoder for the final output value typepublicEncoder<Double>outputEncoder(){returnEncoders.DOUBLE();}}Encoder<Employee>employeeEncoder=Encoders.bean(Employee.class);Stringpath="examples/src/main/resources/employees.json";Dataset<Employee>ds=spark.read().json(path).as(employeeEncoder);ds.show();// +-------+------+// | name|salary|// +-------+------+// |Michael| 3000|// | Andy| 4500|// | Justin| 3500|// | Berta| 4000|// +-------+------+MyAveragemyAverage=newMyAverage();// Convert the function to a `TypedColumn` and give it a nameTypedColumn<Employee,Double>averageSalary=myAverage.toColumn().name("average_salary");Dataset<Double>result=ds.select(averageSalary);result.show();// +--------------+// |average_salary|// +--------------+// | 3750.0|// +--------------+

Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedTypedAggregation.java" in the Spark repo.

Spark SQL supports operating on a variety of data sources through the DataFrame interface. A DataFrame can be operated on using relational transformations and can also be used to create a temporary view. Registering a DataFrame as a temporary view allows you to run SQL queries over its data. This section describes the general methods for loading and saving data using the Spark Data Sources and then goes into specific options that are available for the built-in data sources.

Generic Load/Save Functions

In the simplest form, the default data source ( unless otherwise configured by ) will be used for all operations.

valusersDF=spark.read.load("examples/src/main/resources/users.parquet")usersDF.select("name","favorite_color").write.save("namesAndFavColors.parquet")

Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Dataset<Row>usersDF=spark.read().load("examples/src/main/resources/users.parquet");usersDF.select("name","favorite_color").write().save("namesAndFavColors.parquet");

Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.

df=spark.read.load("examples/src/main/resources/users.parquet")df.select("name","favorite_color").write.save("namesAndFavColors.parquet")

Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

df <- read.df("examples/src/main/resources/users.parquet") write.df(select(df,"name","favorite_color"),"namesAndFavColors.parquet")

Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.

Manually Specifying Options

You can also manually specify the data source that will be used along with any extra options that you would like to pass to the data source. Data sources are specified by their fully qualified name (i.e., ), but for built-in sources you can also use their short names (, , , , , , ). DataFrames loaded from any data source type can be converted into other types using this syntax.

valpeopleDF=spark.read.format("json").load("examples/src/main/resources/people.json")peopleDF.select("name","age").write.format("parquet").save("namesAndAges.parquet")

Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Dataset<Row>peopleDF=spark.read().format("json").load("examples/src/main/resources/people.json");peopleDF.select("name","age").write().format("parquet").save("namesAndAges.parquet");

Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.

df=spark.read.load("examples/src/main/resources/people.json",format="json")df.select("name","age").write.save("namesAndAges.parquet",format="parquet")

Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

df <- read.df("examples/src/main/resources/people.json","json") namesAndAges <- select(df,"name","age") write.df(namesAndAges,"namesAndAges.parquet","parquet")

Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.

Run SQL on files directly

Instead of using read API to load a file into DataFrame and query it, you can also query that file directly with SQL.

valsqlDF=spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Dataset<Row>sqlDF=spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");

Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.

df=spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.

Save Modes

Save operations can optionally take a , that specifies how to handle existing data if present. It is important to realize that these save modes do not utilize any locking and are not atomic. Additionally, when performing an , the data will be deleted before writing out the new data.

Scala/JavaAny LanguageMeaning
(default) (default) When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown.
When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.
Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.
Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to a in SQL.

Saving to Persistent Tables

can also be saved as persistent tables into Hive metastore using the command. Notice that an existing Hive deployment is not necessary to use this feature. Spark will create a default local Hive metastore (using Derby) for you. Unlike the command, will materialize the contents of the DataFrame and create a pointer to the data in the Hive metastore. Persistent tables will still exist even after your Spark program has restarted, as long as you maintain your connection to the same metastore. A DataFrame for a persistent table can be created by calling the method on a with the name of the table.

For file-based data source, e.g. text, parquet, json, etc. you can specify a custom table path via the option, e.g. . When the table is dropped, the custom table path will not be removed and the table data is still there. If no custom table path is specified, Spark will write data to a default table path under the warehouse directory. When the table is dropped, the default table path will be removed too.

Starting from Spark 2.1, persistent datasource tables have per-partition metadata stored in the Hive metastore. This brings several benefits:

  • Since the metastore can return only necessary partitions for a query, discovering all the partitions on the first query to the table is no longer needed.
  • Hive DDLs such as are now available for tables created with the Datasource API.

Note that partition information is not gathered by default when creating external datasource tables (those with a option). To sync the partition information in the metastore, you can invoke .

Bucketing, Sorting and Partitioning

For file-based data source, it is also possible to bucket and sort or partition the output. Bucketing and sorting are applicable only to persistent tables:

peopleDF.write.bucketBy(42,"name").sortBy("age").saveAsTable("people_bucketed")

Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

peopleDF.write().bucketBy(42,"name").sortBy("age").saveAsTable("people_bucketed");

Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.

df.write.bucketBy(42,"name").sortBy("age").saveAsTable("people_bucketed")

Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

while partitioning can be used with both and when using the Dataset APIs.

usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

usersDF.write().partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet");

Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.

df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

It is possible to use both partitioning and bucketing for a single table:

usersDF.write.partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("users_partitioned_bucketed")

Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

peopleDF.write().partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("people_partitioned_bucketed");

Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.

df=spark.read.parquet("examples/src/main/resources/users.parquet")(df.write.partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("people_partitioned_bucketed"))

Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

creates a directory structure as described in the Partition Discovery section. Thus, it has limited applicability to columns with high cardinality. In contrast distributes data across a fixed number of buckets and can be used when a number of unique values is unbounded.

Parquet Files

Parquet is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data. When writing Parquet files, all columns are automatically converted to be nullable for compatibility reasons.

Loading Data Programmatically

Using the data from the above example:

// Encoders for most common types are automatically provided by importing spark.implicits._importspark.implicits._valpeopleDF=spark.read.json("examples/src/main/resources/people.json")// DataFrames can be saved as Parquet files, maintaining the schema informationpeopleDF.write.parquet("people.parquet")// Read in the parquet file created above// Parquet files are self-describing so the schema is preserved// The result of loading a Parquet file is also a DataFramevalparquetFileDF=spark.read.parquet("people.parquet")// Parquet files can also be used to create a temporary view and then used in SQL statementsparquetFileDF.createOrReplaceTempView("parquetFile")valnamesDF=spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")namesDF.map(attributes=>"Name: "+attributes(0)).show()// +------------+// | value|// +------------+// |Name: Justin|// +------------+

Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

importorg.apache.spark.api.java.function.MapFunction;importorg.apache.spark.sql.Encoders;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Row;Dataset<Row>peopleDF=spark.read().json("examples/src/main/resources/people.json");// DataFrames can be saved as Parquet files, maintaining the schema informationpeopleDF.write().parquet("people.parquet");// Read in the Parquet file created above.// Parquet files are self-describing so the schema is preserved// The result of loading a parquet file is also a DataFrameDataset<Row>parquetFileDF=spark.read().parquet("people.parquet");// Parquet files can also be used to create a temporary view and then used in SQL statementsparquetFileDF.createOrReplaceTempView("parquetFile");Dataset<Row>namesDF=spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");Dataset<String>namesDS=namesDF.map((MapFunction<Row,String>)row->"Name: "+row.getString(0),Encoders.STRING());namesDS.show();// +------------+// | value|// +------------+// |Name: Justin|// +------------+

Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.

peopleDF=spark.read.json("examples/src/main/resources/people.json")# DataFrames can be saved as Parquet files, maintaining the schema information.peopleDF.write.parquet("people.parquet")# Read in the Parquet file created above.# Parquet files are self-describing so the schema is preserved.# The result of loading a parquet file is also a DataFrame.parquetFile=spark.read.parquet("people.parquet")# Parquet files can also be used to create a temporary view and then used in SQL statements.parquetFile.createOrReplaceTempView("parquetFile")teenagers=spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")teenagers.show()# +------+# | name|# +------+# |Justin|# +------+

Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

df <- read.df("examples/src/main/resources/people.json","json")# SparkDataFrame can be saved as Parquet files, maintaining the schema information. write.parquet(df,"people.parquet")# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.# The result of loading a parquet file is also a DataFrame. parquetFile <- read.parquet("people.parquet")# Parquet files can also be used to create a temporary view and then used in SQL statements. createOrReplaceTempView(parquetFile,"parquetFile") teenagers <- sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")head(teenagers)## name## 1 Justin# We can also run custom R-UDFs on Spark DataFrames. Here we prefix all the names with "Name:" schema <- structType(structField("name","string")) teenNames <- dapply(df,function(p){cbind(paste("Name:", p$name))}, schema)for(teenName in collect(teenNames)$name){cat(teenName,"\n")}## Name: Michael## Name: Andy## Name: Justin

Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.

Partition Discovery

Table partitioning is a common optimization approach used in systems like Hive. In a partitioned table, data are usually stored in different directories, with partitioning column values encoded in the path of each partition directory. All built-in file sources (including Text/CSV/JSON/ORC/Parquet) are able to discover and infer partitioning information automatically. For example, we can store all our previously used population data into a partitioned table using the following directory structure, with two extra columns, and as partitioning columns:

By passing to either or , Spark SQL will automatically extract the partitioning information from the paths. Now the schema of the returned DataFrame becomes:

Notice that the data types of the partitioning columns are automatically inferred. Currently, numeric data types, date, timestamp and string type are supported. Sometimes users may not want to automatically infer the data types of the partitioning columns. For these use cases, the automatic type inference can be configured by , which is default to . When type inference is disabled, string type will be used for the partitioning columns.

Starting from Spark 1.6.0, partition discovery only finds partitions under the given paths by default. For the above example, if users pass to either or , will not be considered as a partitioning column. If users need to specify the base path that partition discovery should start with, they can set in the data source options. For example, when is the path of the data and users set to , will be a partitioning column.

Schema Merging

Like ProtocolBuffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with a simple schema, and gradually add more columns to the schema as needed. In this way, users may end up with multiple Parquet files with different but mutually compatible schemas. The Parquet data source is now able to automatically detect this case and merge schemas of all these files.

Since schema merging is a relatively expensive operation, and is not a necessity in most cases, we turned it off by default starting from 1.5.0. You may enable it by

  1. setting data source option to when reading Parquet files (as shown in the examples below), or
  2. setting the global SQL option to .
// This is used to implicitly convert an RDD to a DataFrame.importspark.implicits._// Create a simple DataFrame, store into a partition directoryvalsquaresDF=spark.sparkContext.makeRDD(1to5).map(i=>(i,i*i)).toDF("value","square")squaresDF.write.parquet("data/test_table/key=1")// Create another DataFrame in a new partition directory,// adding a new column and dropping an existing columnvalcubesDF=spark.sparkContext.makeRDD(6to10).map(i=>(i,i*i*i)).toDF("value","cube")cubesDF.write.parquet("data/test_table/key=2")// Read the partitioned tablevalmergedDF=spark.read.option("mergeSchema","true").parquet("data/test_table")mergedDF.printSchema()// The final schema consists of all 3 columns in the Parquet files together// with the partitioning column appeared in the partition directory paths// root// |-- value: int (nullable = true)// |-- square: int (nullable = true)// |-- cube: int (nullable = true)// |-- key: int (nullable = true)

Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

importjava.io.Serializable;importjava.util.ArrayList;importjava.util.Arrays;importjava.util.List;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Row;publicstaticclassSquareimplementsSerializable{privateintvalue;privateintsquare;// Getters and setters...}publicstaticclassCubeimplementsSerializable{privateintvalue;privateintcube;// Getters and setters...}List<Square>squares=newArrayList<>();for(intvalue=1;value<=5;value++){Squaresquare=newSquare();square.setValue(value);square.setSquare(value*value);squares.add(square);}// Create a simple DataFrame, store into a partition directoryDataset<Row>squaresDF=spark.createDataFrame(squares,Square.class);squaresDF.write().parquet("data/test_table/key=1");List<Cube>cubes=newArrayList<>();for(intvalue=6;value<=10;value++){Cubecube=newCube();cube.setValue(value);cube.setCube(value*value*value);cubes.add(cube);}// Create another DataFrame in a new partition directory,// adding a new column and dropping an existing columnDataset<Row>cubesDF=spark.createDataFrame(cubes,Cube.class);cubesDF.write().parquet("data/test_table/key=2");// Read the partitioned tableDataset<Row>mergedDF=spark.read().option("mergeSchema",true).parquet
Sours: https://spark.apache.org/docs/2.2.2/sql-programming-guide.html

Similar news:

The Spark SQL shuffle is a mechanism for redistributing or re-partitioning data so that the data grouped differently across partitions, based on your data size you may need to reduce or increase the number of partitions of RDD/DataFrame using configuration or through code.

Spark shuffle is a very expensive operation as it moves the data between executors or even between worker nodes in a cluster so try to avoid it when possible. When you have a performance issue on Spark jobs, you should look at the Spark transformations that involve shuffling.

In this tutorial, you will learn what triggers the shuffle on RDD and DataFrame transformations using scala examples. The same approach also can be used with PySpark (Spark with Python)

What is Spark Shuffle?

Shuffling is a mechanism Spark uses to redistribute the data across different executors and even across machines. Spark shuffling triggers for transformation operations like , , , , e.t.c

Spark Shuffle is an expensive operation since it involves the following

  • Disk I/O
  • Involves data serialization and deserialization
  • Network I/O

When creating an RDD, Spark doesn’t necessarily store the data for all keys in a partition since at the time of creation there is no way we can set the key for data set.

Hence, when we run the reduceByKey() operation to aggregate the data on keys, Spark does the following.

  • Spark first runs map tasks on all partitions which groups all values for a single key.
  • The results of the map tasks are kept in memory.
  • When results do not fit in memory, Spark stores the data into a disk.
  • Spark shuffles the mapped data across partitions, some times it also stores the shuffled data into a disk for reuse when it needs to recalculate.
  • Run the garbage collection
  • Finally runs reduce tasks on each partition based on key.

Spark RDD Shuffle

Spark RDD triggers shuffle for several operations like , ,  ,  ,  and  but not .

Both from the above examples return the same number of partitions. Though triggers data shuffle, it doesn’t change the partition count as RDD’s inherit the partition size from parent RDD.

You may get partition count different based on your setup and how Spark creates partitions.

Spark SQL DataFrame Shuffle

Unlike RDD, Spark SQL DataFrame API increases the partitions when the transformation operation performs shuffling. DataFrame operations that trigger shufflings are join(), union() and all aggregate functions.

This outputs the partition count as 200.

Spark Default Shuffle Partition

DataFrame increases the partition number to 200 automatically when Spark operation performs data shuffling (join(), union(), aggregation functions). This default shuffle partition number comes from Spark SQL configuration  which is by default set to .

You can change this default shuffle partition value using conf method of the  object or using Spark Submit Command Configurations.

Shuffle partition size

Based on your dataset size, number of cores, and memory, Spark shuffling can benefit or harm your jobs. When you dealing with less amount of data, you should typically reduce the shuffle partitions otherwise you will end up with many partitioned files with a fewer number of records in each partition. which results in running many tasks with lesser data to process.

On other hand, when you have too much of data and having less number of partitions results in fewer longer running tasks and some times you may also get out of memory error.

Getting a right size of the shuffle partition is always tricky and takes many runs with different value to achieve the optimized number. This is one of the key property to look for when you have performance issues on Spark jobs.

Conclusion

In this article, you have learned what is Spark SQL shuffle, how some Spark operation triggers re-partition the data, how to change the default spark shuffle partition, and finally how to get right partition size.

Reference

Happy Learning !!

Tags: dataframe shuffle,rdd shuffle

NNK

SparkByExamples.com is a Big Data and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment Read more ..

Spark SQL Shuffle Partitions

Photo by Sabri Tuzcu on Unsplash

Sours: https://sparkbyexamples.com/spark/spark-shuffle-partitions/


1434 1435 1436 1437 1438