1. General Description
This project extends the Spark Dataframe API by adding indexing capabilities. The index implemented is an equality index, but that can theoretically be changed to range indexing. The supported operations are:
- Index Creation
- Point Lookups
- Appends
2. Building the Project
This project is a standalone sbt project, and can be easily built by executing sbt compile, or sbt package from the command line. Additionally, the test suite can be run by executing sbt test.
3. The API
Assuming we have a Spark data frame df, the API is as follows:
- indexedDF = df.createIndex(columnNumber: Int); here, the returned object is an Indexed Dataframe.
- regularDF = indexedDF.getRows(key: AnyVal); this method returns a regular dataframe containing the rows that are indexed by key key.
- newIndexedDF = indexedDF.appendRows(df: Dataframe); this method returns an indexed dataframe where the rows of the df dataframe have been appended to indexedDF
4. Example Code
val sparkSession = SparkSession.builder.
master("local")
.appName("indexed df test app")
// this is the number of partitions the indexed data frame will have, so use this judiciously
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
// we have to make sure to add the [IndexedOperator] strategies and the [ConvertToIndexedOperators] rules
// otherwise, Spark wouldn't know how to deal with the operators on the indexed dataframes
sparkSession.experimental.extraStrategies = (Seq(IndexedOperators) ++
sparkSession.experimental.extraStrategies)
sparkSession.experimental.extraOptimizations = (Seq(ConvertToIndexedOperators) ++
sparkSession.experimental.extraOptimizations)
// read a dataframe
val df = sparkSession.read
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.load("/path/to/data")
// index it on column 0 and cache the indexed result
val indexedDF = df.createIndex(0).cache()
// load another dataframe
val anotherDF = sparkSession.read
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.load("/path/to/data2")
indexedDF.createOrReplaceTempView("indexedtable")
anotherDF.createOrReplaceTempView("nonindexedtable")
// assuming that the indexedDF has column col1 and anotherDF has column col2, we can write the following join:
val result = sparkSession.sql("SELECT * from indexedtable JOIN nonindexedtable ON indexedtable.col1 = nonindexedtable.col2")
result.show()