In this post we will show how to use the different SQL contexts for data query on Spark.

We will begin with Spark SQL and follow up with HiveContext. In addition to this, we will conduct queries on various NoSQL databases and analyze the advantages / disadvantages of using them, so without further ado, let’s get started!

First of all we need to create a context that will add Spark to the configuration options for connecting to Cassandra:

val sparkConf = new SparkConf().setAppName("sparkSQLExamples").setMaster("local[*]")
    .setIfMissing("hive.execution.engine", "spark")
    .setIfMissing("spark.cassandra.connection.host", "127.0.0.1")
    .setIfMissing("spark.cassandra.connection.port", "9042")
val sparkContext = new SparkContext(sparkConf)

Spark SQLContext allows us to connect to different Data Sources to write or read data from them, but it has limitations, namely that when the program ends or the Spark shell is closed, all links to the datasoruces we have created are temporary and will not be available in the next session.

This limitation is solved with HiveContext, since it uses a MetaStore to store the information of those “external” tables. In our example, this MetaStore is MySql. This configuration is included in a resource file (hive-site.xml) used by Hive. You can see the different properties in the GitHub project as the user and be careful if you are setting in the environment variables as the HADOOP_USER_NAME and HADOOP_CONF_DIR

At first glance it seems that everything is solved, but we have lost high availability. If you do not want to miss out on this, you can use XDContext with Stratio Crossdata, which is capable of storing the MetaStore in Zookeeper.

val sqlContext = new SQLContext(sparkContext)
val hiveContext = new HiveContext(sparkContext)

To use the datasources’ API we need to know how to create DataFrames. Two concepts that are basic:

  1. Schema:

In one DataFrame Spark is nothing more than an RDD composed of Rows which have a schema where we indicate the name and type of each column of the Rows.

val schema = new StructType(Array(StructField("id", StringType, false)))
  1. RDD[Row]:

Each element of the RDD has to be a Row, which is a set of values. Normally we have to transform an RDD of another type to an RDD of Rows.

val registers = for (a <- 0 to 10000) yield a.toString
val rdd = sparkContext.parallelize(registers)
val rddOfRow = rdd.map(Row(_))

With all this we are able to create a Data Frame both with SqlContext as well as HiveContext:

val dataFrame = sqlContext.createDataFrame(rddOfRow, schema)

Another option to create DataFrames is using RDD[Case Class]. Each element of the RDD has to be a case class. Normally we have to transform an RDD of another type to an RDD of our case class.

case class IdClass(id: String)  
val registers = for (a <- 0 to 10000) yield a.toString
val rdd = sparkContext.parallelize(registers)
val rddOfClass = rdd.map(IdClass(_))
val dataFrame = sqlContext.createDataFrame(rddOfClass)

We will be able to store any DataFrame we have created with simple configuration parameters in tables, indexes or collections in Cassandra, Elasticsearch, or MongoDB, respectively. Thanks to the different implementations that Spark Packages DataStax, Elastic or Stratio offer us.

MongoDB:

val mongoDbOptions = Map(
    "host" -> "localhost:27017",
    "database" -> "mongodatabase",
    "collection" -> "mongoclient"
)

dataFrame.write
    .format("com.stratio.datasource.mongodb")
    .mode(SaveMode.Append)
    .options(mongoDbOptions)
    .save()

Note: We can also insert items in a collection using the functions that the Stratio library offers us.

val mongoDbOptionsLib = Map(
    "host" -> "localhost:27017",
    "database" -> "mongodatabase",
    "collection" -> "mongoclientlib"
)    	
val libarayConfig = MongodbConfigBuilder(mongoDbOptionsLib)
dataFrame.saveToMongodb(libarayConfig.build)

ElasticSearch:

val elasticOptions = Map("es.mapping.id" -> "id",
    "es.nodes" -> "localhost",
    "es.port" -> "9200",
    "es.index.auto.create" -> "yes"
)

dataFrame.write.format("org.elasticsearch.spark.sql")
    .mode(SaveMode.Append)
    .options(elasticOptions)
    .save(s"$elasticIndex/$elasticMapping")

Note: We can also insert items in a collection using the functions that the Elastic library offers us.

dataFrame.saveToEs(s"$elasticIndex/$elasticMappingLib", elasticOptions)

Cassandra:

val cassandraOptions = Map("table" -> cassandraTable, "keyspace" -> cassandraKeyspace)

dataFrame.write
    .format("org.apache.spark.sql.cassandra")
    .mode(SaveMode.Append)
    .options(cassandraOptions)
    .save()

Now that we know how to write information in each of the NoSQL databases, lets see how we can consult and read from each of them:

1. Using the new API functions of DataFrames.

val dataFrameSelectElastic = sqlContext.read.format("org.elasticsearch.spark.sql")
    .options(elasticOptions)
    .load()
    .select("id")

val dataFrameSelectMongo = sqlContext.read.format("com.stratio.datasource.mongodb")
    .options(mongoDbOptions)
    .load()
    .select("id")

val dataFrameSelectCassandra = sqlContext.read.format("org.apache.spark.sql.cassandra")
    .options(cassandraOptions)
    .load()
    .select("id")

dataFrameSelectElastic.registerTempTable("tempelastic")
dataFrameSelectMongo.registerTempTable("tempmongo")
dataFrameSelectCassandra.registerTempTable("tempcassandra")

sqlContext.sql("select * from tempelastic")
sqlContext.sql("select * from tempmongo")
sqlContext.sql("select * from tempcassandra")

2. Creating the physical tables and temporary external tables within the Spark SqlContext are experimental, if you use HiveContext only create the temporary table, for use this feature correctly you can use CrossdataContext (XDContext).

XDContext.createExternalTable("externalelastic", "org.elasticsearch.spark.sql", schema, elasticOptions)
XDContext.createExternalTable("externalmongo", "com.stratio.datasource.mongodb", schema, mongoDbOptions)
XDContext.createExternalTable("externalcassandra", "org.apache.spark.sql.cassandra", schema, cassandraOptions)

XDContext.sql("select * from externalelastic")
XDContext.sql("select * from externalmongo")
XDContext.sql("select * from externalcassandra")

3. Using HiveContext creating a link to the physical tables and storing it in Hive’s MetaStore.

hiveContext.sql(s"""CREATE TABLE IF NOT EXISTS testElastic(id STRING)
		       |USING org.elasticsearch.spark.sql
		       |OPTIONS (
		       |   path '$elasticIndex/$elasticMapping', readMetadata 'true', nodes '127.0.0.1', port '9200', cluster 'default'
		       | )
		     """.stripMargin)

hiveContext.sql(s"""CREATE TABLE IF NOT EXISTS testCassandra(id STRING)
		        |USING "org.apache.spark.sql.cassandra"
		        |OPTIONS (
		        |   table 'cassandraclient', keyspace 'testkeyspace'
		        | )
		     """.stripMargin)

hiveContext.sql(s"""CREATE TABLE IF NOT EXISTS testMongo(id STRING)
		        |USING com.stratio.datasource.mongodb"
		        |OPTIONS (
		        |  host 'localhost:27017', database 'mongodatabase', collection 'mongoclient'
		        | )
		     """.stripMargin)

val queryElastic = hiveContext.sql(s"SELECT id FROM testElastic limit 100")
val queryMongo = hiveContext.sql(s"SELECT id FROM testMongo limit 100")
val queryCassandra = hiveContext.sql(s"SELECT id FROM testCassandra limit 100")

In this way we can have access to a SQL language with more functionality than each DataSource provides natively, for optimal access I recommend using Crossdata as it optimizes queries that run natively on each of the three databases NoSQL.

The biggest advantage it offers, apart from the execution of queries in a cluster of Spark, based on memory is that we can do JOINS on the various NoSQL databases:

val joinElasticCassandraMongo = hiveContext.sql(s"SELECT tc.id from testCassandra as tc" +
    	s" JOIN testElastic as te ON tc.id = te.id" +
    	s" JOIN testMongo tm on tm.id = te.id")

In the following link you can see all the code of the project, in GitHub.

In order to run it is necessary to have Elasticsearch, Cassandra and MongoDB installed and running.

Used versions:

* Scala 2.10.4

* Spark 1.5.2

* Spark-MongoDb 0.11.1

* Spark-ElasticSearch 2.2.0

* Spark-Cassandra 1.5.0

* Elasticsearch 1.7.2

* Cassandra 2.2.5

* MongoDB 3.0.7

I hope I have clarified the different ways to access and write data with Spark in each of the three major NoSQL databases.

Stratio
Author

Stratio guides businesses on their journey through complete #DigitalTransformation with #BigData and #AI. Stratio works worldwide for large companies and multinationals in the sectors of banking, insurance, healthcare, telco, retail, energy and media.