Spark SQL works as a module in Apache Spark to integrate relational processing with Spark's API. It is designed for read-only Online Analytical Processing (OLAP) which involves large scale datasets. It uses DataFrame API to unitify the interface for procedural code (e.g. Scala, Java) in Spark and relational code (e.g SQL). It also designs an extensible optimizer called Catalyst to make adding optimization rules easier. Spark SQL combines the strength of Spark (rounded data analytics programming interfaces and tools) and the strength of SQL (declarative interfaces and the possibility of optimization).
- Source Code
- https://github.com/apache/spark[02]
- Governance
- Apache Software Foundation
- Country of Origin
- US
- Project Type
- Open Source
- Supported Languages
- Scala
- Operating System
- All OS with Java VM
- License
- Apache v2
Spark SQL works as a module in Apache Spark to integrate relational processing with Spark's API. It is designed for read-only Online Analytical Processing (OLAP) which involves large scale datasets. It uses DataFrame API to unitify the interface for procedural code (e.g. Scala, Java) in Spark and relational code (e.g SQL). It also designs an extensible optimizer called Catalyst to make adding optimization rules easier. Spark SQL combines the strength of Spark (rounded data analytics programming interfaces and tools) and the strength of SQL (declarative interfaces and the possibility of optimization).
History
Spark SQL is built based on Shark. Shark was originally developed as an academic project at AMPLab at the University of California, Berkeley from 2011. Shark is built on Hive codebase to support query on Spark, and Spark SQL tries to decouple it from Hive. Spark SQL was first integrated into Spark 1.0 in May 2014, and Shark's development was ended in July that year. Spark SQL, as part of Spark, is open-sourced and maintained by Apache Software Foundation. UC Berkeley's AMPLab and Databricks make a lot of contribution to it. Is is still active.
Concurrency Control[05]
Spark SQL does not support concurrency control because it is designed for read-only OLAP. It provides an interface to save DataFrames to data sources like parquet, or persist tables. It has different save modes for the way handling existing data if present. But these save modes do not utilize any locking and are not atomic.
Data Model
Spark SQL uses a nested data model based on Hive or tables and DataFrames. It supports all major SQL data types, as well as complex data types: structs, arrays, maps, and unions. The user can also define User-Defined Types (UTPs) by nesting other data types. Spark SQL provides first-class support for complex data types in the query language and the API. This helps Spark SQL model data from all kinds of sources and formats (e.g. Hive, other databases through JDBC, CSV, native objects of the host language).
Foreign Keys[06][05]
Foreign keys are not supported because Spark SQL is designed for read-only OLAPs.
Indexes[07][05]
Indexes are not supported in Spark SQL. In the official Spark SQL, DataFrames and Datasets Guide, it argues that indexes are "less important due to Spark SQL’s in-memory computational model". Other resources mentioned "If data source supports indexing it can be indirectly utilized by Spark through mechanisms like predicate pushdown."
Isolation Levels[05]
Discussion about isolation levels is not applicable for Spark SQL because it is designed for read-only OLAP. But Spark SQL can use a JDBC interface to access data from other databases, in which isolation levels could be configured for writing to outside databases.
Joins[08][09]
Physical plan for join is selected based on joining keys and size in the logical plan. If at least some of the predicates can be evaluated by matching join keys, the implementation will choose from broadcast, shuffle hash join, and sort merge. Otherwise, there are no joining keys, the implementation will choose from broadcast nested loop join, and cartesian product.
Broadcast is used if one side of the join is estimated to be small (the threshold could be configured). The smaller side will be broadcasted and the other side will be streamed without shuffling. Shuffle hash join is used if the average size of a single partition is small enough to build a hash table. Sort merge is used if the matching join keys are sortable. One blog post mentioned that one reason sort-merge join is sometimes better than shuffle hash join is that the shuffling is implemented by sort-based shuffle, so additional sorting is not needed.
Broadcast nested loop join is used if one side of join could be broadcasted, while the cartesian product is used for inner joins.
Logging[05]
Logging is not supported because Spark SQL is designed for read-only OLAPs. But Spark keeps the lineage of each partition of DataFrames. When some DataFrames are missing because of a worker node fails, it can re-compute that DataFrame according to the lineage information, without redoing all computations. This is a variant of command/logical logging.
Query Compilation
Spark SQL does query compilation by using abstract syntax trees (ASTs) as the intermediate representation.
It first accepts a tree representation of the SQL expression and transforms it to an AST. This step is relatively simple in Scala because it has a feature called quasiquotes, which allows constructing ASTs programmatically. Quasiquotes checks the types at compile time, which makes it more useable than string concatenation, and saves time by not parsing during run time.
It then sends the generated ASTs to Scala compiler at runtime to generate bytecode. The compiler can do expression-level optimization here. Finally, it runs the generated byte code.
Here is an example provided in Spark SQL's original paper: To evaluate an expression like 1 + x, the expression tree may looks like Add(Literal(1), Attribute("x")). Spark SQL implements the transformation from the expression tree to an AST by the following code:
scala
def compile(node: Node): AST = node match {
case Literal(value) => q"$value"
case Attribute(name) => q"row.get($name)"
case Add(left, right) => q"${compile(left)} + ${compile(right)}" }
So the expression tree above could be transformed to an AST for Scala code like 1+row.get("x"). It is important to notice again that it does not generate a string like "1+row.get("x")", but generate an AST before sending to the compiler.
Query Interface
DataFrame is the main abstraction of Spark SQL. It is defined as a distributed collection of rows with a homogeneous schema, which is actually a table in a traditional database.
DataFrames support all common relational operators by a domain-specific language. For example, it can write a query like this, which is self-explanatory:
scala
employees
.join(dept, employees("deptId") === dept("id"))
.where(employees("gender") === "female")
.groupBy(dept("id"), dept("name"))
.agg(count("name"))
One important feature of DataFrame operators is it is actually only defining a logical plan (which is not the final logical plan), rather than execting the operations. This makes optimizations possible when the DataFram needs to be materialized and operations need to be executed.
DataFrames provide the same operations as SQL, but it is easier to be integrated with a full programming language (Scala, Java, Python, R). This improves the expressiveness of the query interface and makes it be a programming interface. DataFrames will also analyze logical plans eagerly to detect problems as soon as possible, even though evaluation is done lazily.
In a word, DataFrame provides a unified interface for procedural and relational code and makes it easy to use.
Also, SQL interface including UDFs are provided, so Spark SQL could also be used in the traditional SQL query interfaces.
Storage Architecture
Though Spark is famous for its optimization of in-memory processing, it is still should be cataloged as a disk-oriented database, or at least a hybrid one. The datasets of Spark SQL are typically stored in a file system (usually a distributed file system like Hadoop File System) in all kinds of formats including CSV, Avro, Parquet. The entire dataset may not be able to fit in memory.
The characteristic of in-memory processing is showed from the fact that Spark SQL can cache tables (or DataFrams) in memory during computation, so it can be reused again in the feature. Spark SQL will not cache if it is not specified by programmers. To cache, it will use suitable columnar compression to minimize memory usage and GC pressure. The cached table will try to fit in memory if possible, otherwise, it will be spilled to disks. Unlike caching RDDs by default storage level (which does not store at all if the RDD cannot fit in memory), cached tables are guaranteed to be stored.
Storage Model
Typically, the storage of DataFrames is columnar, so the storage model is Decomposition Storage Model. Spark's native cache is simply stored as JVM objects. DataFrames, however, uses columnar cache to reduce memory footprint. Because in columnar storage, it can apply columnar compression schemes like dictionary encoding and run-length encoding. DataFrame can also be accessed as a sequence of Row objects, but this does not mean the data is stored in rows because the Row objects are generated on the fly.
However, DataFrame might be transformed from and/or to types of data source like RDD, CSV, Avro, Parquet, JDBC, etc.. Since Spark SQL does not execute eagerly, the storage models of original data source remains.
Stored Procedures
Spark SQL support inline definition of stored procedures by passing Scala, Java, Python, or R functions. Normally, stored procedures are defined in a separate programming environment from the primary query interface. But Spark SQL could support stored procedures without complicated packaging and registration process because the query execution is also expressed in the same language. To be similar to common databases, it also supports registering a stored procedure in general purpose language and then using it via the JDBC/ODBC interface.
System Architecture[10][11]
Spark SQL, as a part of Spark, uses shared-nothing architecture. Spark applications run as independent sets of processes on a cluster, coordinated by the driver program.
The workflow for Spark is 1. The driver program connects to cluster managers, which allocate resources across applications. 2. Application code and tasks are sent to the executors on nodes. 3. Executors are processes that run computations and store data.
Each worker node has its own memory and disks, and all communication between workers happens through the network. This architecture makes it easier to scale up without interfering other nodes, eliminates single points of failure and avoids unexpected downtime.
Views
Spark SQL supports views because the values in DataFrames are computed lazily. When using the relational DSL to transform DataFrames, it does not execute on the fly. Instead, it keeps the lineage between DataFrams as a logical plan. DataFrames can be registered as temporary tables in the system catalog and queried using SQL. The DataFrames registered in the catalog are still not materialized, so that optimizations can happen across SQL and the original DataFrame expressions.
Spark SQL also supports materialized views by caching hot data in memory. It can be available without concerning about updates because DataFrames are read only. When a user call cache() on a DataFrame, it will try to keep the value of this DataFrame in memory if possible, when this DataFrame is being materialized. If it cannot fit in memory, DataFrame is spilled to the disk. This is equivalent to the storage level MEMORY_AND_DISK for RDDs, which can choose other storage levels.
Citations
18 sources- https://spark.apache.org/sql apache.org
- GitHub - apache/spark: Apache Spark - A unified analytics engine for large-scale data processing · GitHub github.com
- Overview - Spark 4.1.2 Documentation apache.org
- Apache Spark - Wikipedia wikipedia.org
- Spark SQL and DataFrames - Spark 4.1.2 Documentation apache.org
- Columns constraints in Spark tables - Stack Overflow stackoverflow.com
- Why Spark SQL considers the support of indexes unimportant? - Stack Overflow stackoverflow.com
- spark/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala at master · apache/spark · GitHub github.com
- Optimizing Apache Spark SQL Joins | PPTX slideshare.net
- IT Resource Library - Technology Business Research | HPE morpheusdata.com
- Cluster Mode Overview - Spark 4.1.2 Documentation apache.org
- Apache Spark - Wikipedia wikipedia.org
- Spark 1.0.0 released | Apache Spark apache.org
- https://github.com/apache/spark/commit/c05170350a0536567d57aa809f6f1c766f81f697 github.com
- https://github.com/apache/spark/commit/0255283f7a4164a3706114ec7726518801071c21 github.com
- https://github.com/apache/spark/commit/c082f824d4e2e837792fdbb2c381fb77a71ccd9f github.com
- https://github.com/apache/spark/commit/ad886be8a84f6c7a333d57551bb270fdd103ad45 github.com
- https://github.com/apache/spark/commit/8a6cb265af60a405ed61ead29d0da5f1b98b5c4f github.com