Spark SQL works as a module in Apache Spark to integrate relational processing with Spark's API. It is designed for read-only Online Analytical Processing (OLAP) which involves large scale datasets. It uses DataFrame API to unitify the interface for procedural code (e.g. Scala, Java) in Spark and relational code (e.g SQL). It also designs an extensible optimizer called Catalyst to make adding optimization rules easier. Spark SQL combines the strength of Spark (rounded data analytics programming interfaces and tools) and the strength of SQL (declarative interfaces and the possibility of optimization).
Spark SQL is built based on Shark. Shark was originally developed as an academic project at AMPLab at the University of California, Berkeley from 2011. Shark is built on Hive codebase to support query on Spark, and Spark SQL tries to decouple it from Hive. Spark SQL was first integrated into Spark 1.0 in May 2014, and Shark's development was ended in July that year. Spark SQL, as part of Spark, is open-sourced and maintained by Apache Software Foundation. UC Berkeley's AMPLab and Databricks make a lot of contribution to it. Is is still active.
DataFrame is the main abstraction of Spark SQL. It is defined as a distributed collection of rows with a homogeneous schema, which is actually a table in a traditional database.
DataFrames support all common relational operators by a domain-specific language. For example, it can write a query like this, which is self-explanatory:
scala
employees
.join(dept, employees("deptId") === dept("id"))
.where(employees("gender") === "female")
.groupBy(dept("id"), dept("name"))
.agg(count("name"))
One important feature of DataFrame operators is it is actually only defining a logical plan (which is not the final logical plan), rather than execting the operations. This makes optimizations possible when the DataFram needs to be materialized and operations need to be executed.
DataFrames provide the same operations as SQL, but it is easier to be integrated with a full programming language (Scala, Java, Python, R). This improves the expressiveness of the query interface and makes it be a programming interface. DataFrames will also analyze logical plans eagerly to detect problems as soon as possible, even though evaluation is done lazily.
In a word, DataFrame provides a unified interface for procedural and relational code and makes it easy to use.
Also, SQL interface including UDFs are provided, so Spark SQL could also be used in the traditional SQL query interfaces.
Spark SQL does not support concurrency control because it is designed for read-only OLAP. It provides an interface to save DataFrames to data sources like parquet, or persist tables. It has different save modes for the way handling existing data if present. But these save modes do not utilize any locking and are not atomic.
Indexes are not supported in Spark SQL. In the official Spark SQL, DataFrames and Datasets Guide, it argues that indexes are "less important due to Spark SQL’s in-memory computational model". Other resources mentioned "If data source supports indexing it can be indirectly utilized by Spark through mechanisms like predicate pushdown."
Virtual Views Materialized Views
Spark SQL supports views because the values in DataFrames are computed lazily. When using the relational DSL to transform DataFrames, it does not execute on the fly. Instead, it keeps the lineage between DataFrams as a logical plan. DataFrames can be registered as temporary tables in the system catalog and queried using SQL. The DataFrames registered in the catalog are still not materialized, so that optimizations can happen across SQL and the original DataFrame expressions.
Spark SQL also supports materialized views by caching hot data in memory. It can be available without concerning about updates because DataFrames are read only. When a user call cache()
on a DataFrame, it will try to keep the value of this DataFrame in memory if possible, when this DataFrame is being materialized. If it cannot fit in memory, DataFrame is spilled to the disk. This is equivalent to the storage level MEMORY_AND_DISK
for RDDs, which can choose other storage levels.
Spark SQL support inline definition of stored procedures by passing Scala, Java, Python, or R functions. Normally, stored procedures are defined in a separate programming environment from the primary query interface. But Spark SQL could support stored procedures without complicated packaging and registration process because the query execution is also expressed in the same language. To be similar to common databases, it also supports registering a stored procedure in general purpose language and then using it via the JDBC/ODBC interface.
Decomposition Storage Model (Columnar)
Typically, the storage of DataFrames is columnar, so the storage model is Decomposition Storage Model. Spark's native cache is simply stored as JVM objects. DataFrames, however, uses columnar cache to reduce memory footprint. Because in columnar storage, it can apply columnar compression schemes like dictionary encoding and run-length encoding. DataFrame can also be accessed as a sequence of Row objects, but this does not mean the data is stored in rows because the Row objects are generated on the fly.
However, DataFrame might be transformed from and/or to types of data source like RDD, CSV, Avro, Parquet, JDBC, etc.. Since Spark SQL does not execute eagerly, the storage models of original data source remains.
Though Spark is famous for its optimization of in-memory processing, it is still should be cataloged as a disk-oriented database, or at least a hybrid one. The datasets of Spark SQL are typically stored in a file system (usually a distributed file system like Hadoop File System) in all kinds of formats including CSV, Avro, Parquet. The entire dataset may not be able to fit in memory.
The characteristic of in-memory processing is showed from the fact that Spark SQL can cache tables (or DataFrams) in memory during computation, so it can be reused again in the feature. Spark SQL will not cache if it is not specified by programmers. To cache, it will use suitable columnar compression to minimize memory usage and GC pressure. The cached table will try to fit in memory if possible, otherwise, it will be spilled to disks. Unlike caching RDDs by default storage level (which does not store at all if the RDD cannot fit in memory), cached tables are guaranteed to be stored.
Spark SQL does query compilation by using abstract syntax trees (ASTs) as the intermediate representation.
It first accepts a tree representation of the SQL expression and transforms it to an AST. This step is relatively simple in Scala because it has a feature called quasiquotes, which allows constructing ASTs programmatically. Quasiquotes checks the types at compile time, which makes it more useable than string concatenation, and saves time by not parsing during run time.
It then sends the generated ASTs to Scala compiler at runtime to generate bytecode. The compiler can do expression-level optimization here. Finally, it runs the generated byte code.
Here is an example provided in Spark SQL's original paper: To evaluate an expression like 1 + x
, the expression tree may looks like Add(Literal(1), Attribute("x"))
. Spark SQL implements the transformation from the expression tree to an AST by the following code:
scala
def compile(node: Node): AST = node match {
case Literal(value) => q"$value"
case Attribute(name) => q"row.get($name)"
case Add(left, right) => q"${compile(left)} + ${compile(right)}" }
So the expression tree above could be transformed to an AST for Scala code like 1+row.get("x")
. It is important to notice again that it does not generate a string like "1+row.get("x")"
, but generate an AST before sending to the compiler.
Spark SQL, as a part of Spark, uses shared-nothing architecture. Spark applications run as independent sets of processes on a cluster, coordinated by the driver program.
The workflow for Spark is 1. The driver program connects to cluster managers, which allocate resources across applications. 2. Application code and tasks are sent to the executors on nodes. 3. Executors are processes that run computations and store data.
Each worker node has its own memory and disks, and all communication between workers happens through the network. This architecture makes it easier to scale up without interfering other nodes, eliminates single points of failure and avoids unexpected downtime.
Nested Loop Join Hash Join Sort-Merge Join Broadcast Join
Physical plan for join is selected based on joining keys and size in the logical plan. If at least some of the predicates can be evaluated by matching join keys, the implementation will choose from broadcast, shuffle hash join, and sort merge. Otherwise, there are no joining keys, the implementation will choose from broadcast nested loop join, and cartesian product.
Broadcast is used if one side of the join is estimated to be small (the threshold could be configured). The smaller side will be broadcasted and the other side will be streamed without shuffling. Shuffle hash join is used if the average size of a single partition is small enough to build a hash table. Sort merge is used if the matching join keys are sortable. One blog post mentioned that one reason sort-merge join is sometimes better than shuffle hash join is that the shuffling is implemented by sort-based shuffle, so additional sorting is not needed.
Broadcast nested loop join is used if one side of join could be broadcasted, while the cartesian product is used for inner joins.