# Spark API (SparkR)

# Setup Spark context

# Setup Spark context in R

To start working with Sparks distributed dataframes, you must connect your R program with an existing Spark Cluster.

sc <- sparkR.init() # connection to Spark context
sqlContext <- sparkRSQL.init(sc) # connection to SQL context

Here are infos (opens new window) how to connect your IDE to a Spark cluster.

# Get Spark Cluster

There is an Apache Spark introduction topic (opens new window) with install instructions. Basically, you can employ a Spark Cluster locally via java (see instructions (opens new window)) or use (non-free) cloud applications (e.g. Microsoft Azure (opens new window) [topic site] (opens new window), IBM (opens new window)).

# Cache data


Caching can optimize computation in Spark. Caching stores data in memory and is a special case of persistence. Here is explained (opens new window) what happens when you cache an RDD in Spark.


Basically, caching saves an interim partial result - usually after transformations - of your original data. So, when you use the cached RDD, the already transformed data from memory is accessed without recomputing the earlier transformations.


Here is an example how to quickly access large data (here 3 GB big csv) from in-memory storage when accessing it more then once:

# next line is needed for direct csv import:
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.4.0" "sparkr-shell"')
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)

# loading 3 GB big csv file:  
train <- read.df(sqlContext, "/train.csv", source = "com.databricks.spark.csv", inferSchema = "true")
# output: time elapsed: 125 s. This action invokes the caching at this point.
# output: time elapsed: 0.2 s (!!)

# Create RDDs (Resilient Distributed Datasets)

# From dataframe:

mtrdd <- createDataFrame(sqlContext, mtcars)

# From csv:

For csv's, you need to add the csv package (opens new window) to the environment before initiating the Spark context:

Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.4.0" "sparkr-shell"') # context for csv import read csv -> 
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)

Then, you can load the csv either by infering the data schema of the data in the columns:

train <- read.df(sqlContext, "/train.csv", header= "true", source = "com.databricks.spark.csv", inferSchema = "true")

Or by specifying the data schema beforehand:

customSchema <- structType(
    structField("margin", "integer"),
    structField("gross", "integer"),
    structField("name", "string"))

 train <- read.df(sqlContext, "/train.csv", header= "true", source = "com.databricks.spark.csv", schema = customSchema)

# Remarks

The SparkR package let's you work with distributed data frames on top of a Spark cluster (opens new window). These allow you to do operations like selection, filtering, aggregation on very large datasets. SparkR overview (opens new window) SparkR package documentation (opens new window)