spark-local
API enabling switching between Spark execution engine and local implementation based on Scala collections.
Table of contents
- Goals
- Getting started
- Examples
- IO operations
- Aggregations
- Supported Spark versions
- Supported Spark operations
- Benchmarks
- Contributing
Goals
- Speed up unit testing for Spark applications
- Enable switching between Spark execution engine and Scala collections depending on use case, especially size of data without changing implementation
Getting started
Spark 2.1.1
Include dependency:
"com.github.piotr-kalanski" % "spark-local_2.1.1_2.11" % "0.6.0"
or
<dependency>
<groupId>com.github.piotr-kalanski</groupId>
<artifactId>spark-local_2.1.1_2.11</artifactId>
<version>0.6.0</version>
</dependency>
Spark 2.1.0
Include dependency:
"com.github.piotr-kalanski" % "spark-local_2.1.0_2.11" % "0.6.0"
or
<dependency>
<groupId>com.github.piotr-kalanski</groupId>
<artifactId>spark-local_2.1.0_2.11</artifactId>
<version>0.6.0</version>
</dependency>
Examples
Creating Session
Entry point for library is session object which is similar to SparkSession object from Apache Spark.
Process of creating Session is also similar to Apache Spark.
When creating Session object you can choose between different execution engines. Currently supported:
- Spark - wrapper on Spark, which can be used at production data volumes
- ScalaEager - implementation based on Scala collection with eager transformations, which makes it fast for unit testing
- ScalaLazy - implementation based on Scala collection with lazy transformations, dedicated for working with small/mid size data
- ScalaParallel - implementation based on Scala parallel collection with eager transformations
- ScalaParallelLazy - implementation based on Scala parallel collection with lazy transformations
import com.datawizards.sparklocal.session.ExecutionEngine.ExecutionEngine
import com.datawizards.sparklocal.session.{ExecutionEngine, SparkSessionAPI}
// just change this value to start using different execution engine
val engine = ExecutionEngine.ScalaEager
val session = SparkSessionAPI
.builder(engine)
.master("local")
.getOrCreate()
val ds = session.read[Person](CSVDataStore(file))
RDD API
import com.datawizards.sparklocal.rdd.RDDAPI
import org.apache.spark.sql.SparkSession
object ExampleRDD1 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").getOrCreate()
val data = Seq(1,2,3)
val rdd = spark.sparkContext.parallelize(data)
assertEquals(
calculateSum(RDDAPI(data)),
calculateSum(RDDAPI(rdd))
)
assertEquals(
calculateSumOfSquares(RDDAPI(data)),
calculateSumOfSquares(RDDAPI(rdd))
)
}
def assertEquals[T](r1:T, r2:T): Unit = {
println(r1)
assert(r1 == r2)
}
def calculateSum(ds: RDDAPI[Int]): Int = ds.reduce(_ + _)
def calculateSumOfSquares(ds: RDDAPI[Int]): Int = ds.map(x=>x*x).reduce(_ + _)
}
Dataset API
Simple example
import com.datawizards.sparklocal.dataset.DataSetAPI
import org.apache.spark.sql.SparkSession
object ExampleDataset1 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._
val data = Seq(1,2,3)
val ds = data.toDS()
assertEquals(
calculateSum(DataSetAPI(data)),
calculateSum(DataSetAPI(ds))
)
assertEquals(
calculateSumOfSquares(DataSetAPI(data)),
calculateSumOfSquares(DataSetAPI(ds))
)
}
def assertEquals[T](r1:T, r2:T): Unit = {
println(r1)
assert(r1 == r2)
}
def calculateSum(ds: DataSetAPI[Int]): Int = ds.reduce(_ + _)
def calculateSumOfSquares(ds: DataSetAPI[Int]): Int = ds.map(x=>x*x).reduce(_ + _)
}
Example report
case class Person(id: Int, name: String, gender: String)
case class WorkExperience(personId: Int, year: Int, title: String)
case class HRReport(year: Int, title: String, gender: String, count: Int)
object ExampleHRReport {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._
val people = SampleData.people
val peopleDs = people.toDS()
val experience = SampleData.experience
val experienceDs = experience.toDS()
calculateReport(DataSetAPI(people), DataSetAPI(experience))
calculateReport(DataSetAPI(peopleDs), DataSetAPI(experienceDs))
}
def calculateReport(people: DataSetAPI[Person], workExperience: DataSetAPI[WorkExperience]): DataSetAPI[HRReport] = {
workExperience
.join(people)(_.personId, _.id)
.groupByKey(wp => (wp._1.year, wp._1.title, wp._2.gender))
.mapGroups{case ((year, title, gender), vals) => HRReport(year, title, gender, vals.size)}
}
}
IO operations
Library provides dedicated API for input/output operations with implementation for Spark and Scala collections.
Supported formats
Supported formats:
- CSV
- JSON
- Parquet
- Avro
- Hive
- JDBC
- H2
- MySQL
- Elasticsearch
CSV
Read CSV file
val reader: Reader = ReaderScalaImpl // Scala implementation
//val reader: Reader = ReaderSparkImpl // Spark implementation
reader.read[Person](
CSVDataStore(
path = "people.csv",
delimiter = ';',
header = false,
columns = Seq("name","age")
)
)
Write to CSV file
ds.write(CSVDataStore(file), SaveMode.Overwrite)
JSON
Read JSON file
reader.read[Person](JsonDataStore("people.json"))
Write to JSON file
ds.write(JsonDataStore("people.json"), SaveMode.Overwrite)
Avro
Current implementation produces different binary files for Spark and Scala. Spark by default compress files with snappy and spark-local implementation is based on: https://github.com/sksamuel/avro4s, which saves data without compression.
Read Avro file
reader.read[Person](AvroDataStore("people.avro"))
Write to Avro file
ds.write(AvroDataStore("people.avro"), SaveMode.Overwrite)
Parquet
Read Parquet file
reader.read[Person](ParquetDataStore("people.parquet"))
Write to Parquet file
ds.write(ParquetDataStore("people.parquet"), SaveMode.Overwrite)
Hive
Read Hive table
reader.read[Person](HiveDataStore("db", "table"))
Write to Hive table
ds.write(HiveDataStore("db", "table"), SaveMode.Overwrite)
JDBC
Read JDBC table
val database = "public"
val table = "people"
val properties = new java.util.Properties()
reader.read[Person](H2DataStore(connectionString, database, table, properties))
Write to JDBC table
ds.write(H2DataStore(connectionString, database, table, properties), SaveMode.Append)
Elasticsearch
Write to Elasticsearch index
val indexName = "people"
val typeName = "person"
ds.write(ElasticsearchSimpleIndexDataStore("localhost", indexName, typeName), SaveMode.Append)
Data model versioning
Library supports reading data using old and new version of data model.
Reading with old version of data model is straightforward, just existing fields will be read. On other hand, when reading with new version of data model, new columns should be Option
type and value None
is assigned to those fields.
Example
import com.datawizards.sparklocal.datastore.CSVDataStore
import com.datawizards.sparklocal.session.{ExecutionEngine, SparkSessionAPI}
import org.apache.spark.sql.SaveMode
case class Person(name: String, age: Int)
case class PersonV2(name: String, age: Int, title: Option[String])
case class PersonV3(name: String, age: Int, title: Option[String], salary: Option[Long])
val session = SparkSessionAPI
.builder(ExecutionEngine.ScalaEager)
.master("local")
.getOrCreate()
import session.implicits._
val peopleV2 = session.createDataset(Seq(
PersonV2("p1", 10, Some("Mr")),
PersonV2("p2", 20, Some("Ms")),
PersonV2("p3", 30, None),
PersonV2("p4", 40, Some("Mr"))
))
val dataStore = CSVDataStore("people_v2.csv")
peopleV2.write(dataStore, SaveMode.Overwrite)
// Read old version:
val peopleV1 = session.read[Person](dataStore)
// Read new version:
val peopleV3 = session.read[PersonV3](dataStore)
peopleV1.show()
peopleV3.show()
+----+---+
|name|age|
+----+---+
|p1 |10 |
|p2 |20 |
|p3 |30 |
|p4 |40 |
+----+---+
+----+---+-----+------+
|name|age|title|salary|
+----+---+-----+------+
|p1 |10 |Mr | |
|p2 |20 |Ms | |
|p3 |30 | | |
|p4 |40 |Mr | |
+----+---+-----+------+
Column mapping
Library supports changing name of fields when writing and reading data.
Mapping is provided using column
annotation from project: https://github.com/piotr-kalanski/data-model-generator.
Example
import com.datawizards.dmg.annotations.column
import com.datawizards.sparklocal.dataset.io.ModelDialects
import com.datawizards.sparklocal.datastore.JsonDataStore
import com.datawizards.sparklocal.session.{ExecutionEngine, SparkSessionAPI}
import org.apache.spark.sql.SaveMode
case class PersonWithMapping(
@column("personName", dialect = ModelDialects.JSON)
name: String,
@column("personAge", dialect = ModelDialects.JSON)
age: Int
)
val session = SparkSessionAPI
.builder(ExecutionEngine.ScalaEager)
.master("local")
.getOrCreate()
import session.implicits._
val people = session.createDataset(Seq(
PersonWithMapping("p1", 10),
PersonWithMapping("p2", 20),
PersonWithMapping("p3", 30),
PersonWithMapping("p4", 40)
))
val dataStore = JsonDataStore("people_mapping.json")
people.write(dataStore, SaveMode.Overwrite)
Name of fields in JSON file are consistent with column names provided in annotations:
{"personName":"p1","personAge":10}
{"personName":"p2","personAge":20}
{"personName":"p3","personAge":30}
{"personName":"p4","personAge":40}
Reading data:
val people2 = session.read[PersonWithMapping](dataStore)
people2.show()
Names of columns are the same as case class fields:
+----+---+
|name|age|
+----+---+
|p1 |10 |
|p2 |20 |
|p3 |30 |
|p4 |40 |
+----+---+
Aggregations
Library provides custom type-safe API for aggregations.
Example operations:
import com.datawizards.sparklocal.dataset.agg._
ds
.groupByKey(_.name)
.agg(sum(_.age), count(), mean(_.age), max(_.age))
Supported Spark versions
spark-local | Spark version |
---|---|
0.7 | 2.1.1 2.1.0 |
0.6 | 2.1.1 2.1.0 |
0.5 | 2.1.0 |
0.4 | 2.1.0 |
0.3 | 2.1.0 |
0.2 | 2.1.0 |
0.1 | 2.1.0 |
Bugs
Please report any bugs to spark-local Github issue tracker.