Sujith Jay Nair Thinking Aloud

Shuffle Hash and Sort Merge Joins in Apache Spark

Introduction

This post is the second in my series on Joins in Apache Spark SQL. The first part explored Broadcast Hash Join; this post will focus on Shuffle Hash Join & Sort Merge Join.

Although Broadcast Hash Join is the most performant join strategy, it is applicable to a small set of scenarios. Shuffle Hash Join & Sort Merge Join are the true work-horses of Spark SQL; a majority of the use-cases involving joins you will encounter in Spark SQL will have a physical plan using either of these strategies.

MCVE

Let us take an example to understand the join strategies better. This time we will be using the Mondrian Foodmart dataset to write our queries against. For those unaware of it, the foodmart dataset is a popular test dataset for OLAP scenarios. It originated as part of the test suite of the Pentaho Mondrian OLAP engine. You can check out its schema layout here. We will be concerned with only a couple of tables from the dataset: sales_fact_98 & customer.

In Spark REPL, you can create the tables as shown below:

The explain output on the join-table describes the physical plan of the join operation:

The Spark SQL planner chooses to implement the join operation using ‘SortMergeJoin’. The precedence order for equi-join implementations (as in Spark 2.2.0) is as follows:

  • Broadcast Hash Join
  • Shuffle Hash Join: if the average size of a single partition is small enough to build a hash table.
  • Sort Merge: if the matching join keys are sortable.

Pick One, Please

There is some confusion over the choice between Shuffle Hash Join & Sort Merge Join, particularly after Spark 2.3. Part of the reason is the introduction of a new configuration spark.sql.join.preferSortMergeJoin, which is internal, and is set by default.

This means that Sort Merge is chosen every time over Shuffle Hash in Spark 2.3.0.

The preference of Sort Merge over Shuffle Hash in Spark is an ongoing discussion which has seen Shuffle Hash going in and out of Spark’s join implementations multiple times. It was first removed from Spark in version 1.6.0. It made a comeback in 2.0.0. In 2.3.0, it has again been voted out in favour of Sort Merge.

A reason for the preference of Sort Merge is that it is considered a more robust implementation, as Shuffle Hash Join requires the hashed table to fit in memory, counter to Sort Merge Join which can spill to disk. But Shuffle Hash does have its benefits, particularly when the build side is much smaller than stream side. In that case, the building of a hash table on smaller side should be faster than sorting the bigger side. And given this clear benefit, I am sure Shuffle Hash will rise again from the ashes.

Deep Dive

I would like to spend this section on Sort Merge Join alone, since its presence is invariant across Spark versions. The implementation of Sort Merge Join in Spark is similar to any other SQL engine, except that it happens over partitions because of the distributed nature of data. This means the best performance of this strategy is when the rows corresponding to same join-key are co-located. In every other case, it involves a shuffle operation to co-locate the data. We will have more to say on performance in the Caveats section.

I have below the code generated by Spark to perform the sort & merge operations. I give it to you without comment.

Caveats

The performance of Sort Merge Join, as with every distributed join strategy, is optimal under certain conditions and sub-par under certain others.

Best case scenarios :

In deteriorating order,

  1. The Datasets have a known, shared partitioner; if the Datasets sharing the same partitioner are materialized by the same action, they will end up being co-located.
  2. The Datasets are distributed evenly on the join columns.
  3. The number of keys (combinations of join column values) is adequately large for the cluster and data-size at hand (since parallelism is proportional to the number of unique keys).

Worst case scenarios :

  1. Extremely uneven sharding of Datasets on the join columns.
  2. A large Dataset is joined with another Dataset, such that a majority of the rows of larger Dataset are not relevant to the join condition. (In this case, these non-relevant rows of the large Dataset will still be shuffled across before being filtered out; hence, the performance hit.)

Upcoming posts in this series will explore Cartesian Product, Broadcast Nested Loop Joins and others. Tune in for them. Please leave a comment for suggestions, opinions, or just to say hello. Until next time!