# Spark API (SparkR)

# Setup Spark context

# Setup Spark context in R

To start working with Sparks distributed dataframes, you must connect your R program with an existing Spark Cluster.

library(SparkR)
sc <- sparkR.init() # connection to Spark context
sqlContext <- sparkRSQL.init(sc) # connection to SQL context

Here are infos (opens new window) how to connect your IDE to a Spark cluster.

# Get Spark Cluster

There is an Apache Spark introduction topic (opens new window) with install instructions. Basically, you can employ a Spark Cluster locally via java (see instructions (opens new window)) or use (non-free) cloud applications (e.g. Microsoft Azure (opens new window) [topic site] (opens new window), IBM (opens new window)).

# Cache data

What:

Caching can optimize computation in Spark. Caching stores data in memory and is a special case of persistence. Here is explained (opens new window) what happens when you cache an RDD in Spark.

Why:

Basically, caching saves an interim partial result - usually after transformations - of your original data. So, when you use the cached RDD, the already transformed data from memory is accessed without recomputing the earlier transformations.

How:

Here is an example how to quickly access large data (here 3 GB big csv) from in-memory storage when accessing it more then once:

library(SparkR)
# next line is needed for direct csv import:
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.4.0" "sparkr-shell"')
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)

# loading 3 GB big csv file:  
train <- read.df(sqlContext, "/train.csv", source = "com.databricks.spark.csv", inferSchema = "true")
cache(train)
system.time(head(train))
# output: time elapsed: 125 s. This action invokes the caching at this point.
system.time(head(train))
# output: time elapsed: 0.2 s (!!)

# Create RDDs (Resilient Distributed Datasets)

# From dataframe:

mtrdd <- createDataFrame(sqlContext, mtcars)

# From csv:

For csv's, you need to add the csv package (opens new window) to the environment before initiating the Spark context:

Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.4.0" "sparkr-shell"') # context for csv import read csv -> 
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)

Then, you can load the csv either by infering the data schema of the data in the columns:

train <- read.df(sqlContext, "/train.csv", header= "true", source = "com.databricks.spark.csv", inferSchema = "true")

Or by specifying the data schema beforehand:


customSchema <- structType(
    structField("margin", "integer"),
    structField("gross", "integer"),
    structField("name", "string"))

 train <- read.df(sqlContext, "/train.csv", header= "true", source = "com.databricks.spark.csv", schema = customSchema)

# Remarks

The SparkR package let's you work with distributed data frames on top of a Spark cluster (opens new window). These allow you to do operations like selection, filtering, aggregation on very large datasets. SparkR overview (opens new window) SparkR package documentation (opens new window)