This post contains the winning solution for the Stratio challenge 2015 developed by Marco Piva, Leonardo Biagioli, Fabio Fantoni and Andrea De Marco (BitBang).

The work describes the data model and the architecture of a Big Data Analytics solution that can help online advertisers to get fast answers for their analytical questions about impressions, clicks and purchases,basing on the Stratio Challenge requirements.

The emphasis is placed on the technologies that have been used, with particular focus on Spark and Spray.

Big Data analytics tools in Online Advertisement 

We’re living in a world of 24/7 connectivity, accessing our content on our own terms, and we like it that way. Around the globe, 76% of respondents in a Nielsen online survey say they enjoy the freedom of being connected anywhere, anytime. While consumers love this flexibility, it represents a huge challenge for brands and online advertisers vying for our attention in a fragmented viewing arena.

Online Advertising is a relatively new discipline, born in the early 1990s along with the rise of the World Wide Web. After a few years dominated by basic web banners displayed on the major web sites, online companies started to experiment and to consolidate multiple formats and strategies to advertise their products, also defining standards and best practices about display advertising, search engine marketing, etc. According to IAB Internet Advertising Revenue Report, Internet advertising revenues in the USA reached the high of $12.4 billion during the third quarter of 2014, with a year-over-year growth that is continuing since 2010.

In this highly-demanding and ultra-competitive industry tools that support analysis and intelligence are essential: nobody wants to spend money on poor publishers, and every advertiser needs to have a scientific way to demonstrate how effective campaigns are in terms of ROAS (Return On Advertising Spending).

Analytics tools are becoming more and more complex and sophisticated in order to provide valuable insights on the marketing activities and results.

There are three major requirements for analytics tools today:

  1. Real-time: queries must run on datasets that are updated in real-time;
  2. Volume: collected data are frequently quantified in terms of terabytes (TBs) if not petabytes (PBs) on a daily basis;
  3. Velocity: queries on very large datasets must return results in a short time, basing on business needs.
Stratio Challenge requirements:
The solution must meet the following requirements:
  1. Provide the number of impressions, clicks and purchases in a given timeframe;
  2. Provide the metrics at point 1. grouped by country, user-agent, campaign, advertiser or publisher;
  3. Provide combination of two or more of the dimensions above at point 2.;
  4. Receive queries and new data at the same time, without any impacts on performance;
  5. Have one hour as time granularity;
Data Model:
Data are modelled using an Entity-Relationship diagram, along with an additional basic schema that includes the assumptions provided by the Stratio Challenge.
challenge_datamodel
 
 

The Entity-Relationship diagram is designed using the composition relation (instead of aggregation) to emphasize the strong link between these three entities.

Due to this choice, a Click could not exist without its “parent impression” in the same way a Purchase could not exist without its “parent Click”.

Regarding to the cardinality, a single Impression can lead to multiple Clicks (0..N) but every Click must be generated by a single Impression (1..1); Click and Purchase are ruled by the same cardinality behavior.

The following schema shows the assumptions that exist between Impression, Click and Purchase.

challenge_figure2
 
 
Architecture:
The main technologies used to build the architecture are the following:
  • Akka
    • asynchronous and distributed design;
    • remote and/or local supervisor hierarchies;
    • actors are lightweight;
  • Spray
    • flexible, asynchronous and actor-based REST API on top of Akka;
  • HDFS
    • distributed and fault tolerant file system;
  • Spark
    • Spark Streaming
      • can handle Akka actors messages;
      • provides window based operation for real-time analytics;
    • Spark Engine
      • process data in-memory for batch based analysis;
    • Spark SQL
      • engine for interactive or long queries;
      • provides JDBC and ODBC connectivity;
The solution can be built in Scala language upon Akka, Spray and Spark libraries.
Spray provides the ability to create a flexible, asynchronous and actor-based REST API, so the HTTP embedded server spray-can can be used to handle impression, click and purchase data and the solution can take advantage of the Spray’s routing DSL to describe the behaviour of the API.
Every single event received by the Spray server is sent as a message from an Akka actor to the Spark streaming context: in this way the solution can exploit Akka’s actor main features like the asynchronous, distributed and lightweight design and the possibility to create remote and local hierarchies.
Spark Streaming produces a stream of RDDs (DStream) and provides the possibility to execute window-based operations on these; the solution can perform real-time analysis through Spark SQL queries without the need to wait for long processing times.
At the same time each RDD coming from the stream is stored into a HDFS storage layer that allows to execute batch processing jobs for more complex analysis. Here the assumptions provided by the Stratio Challenge (maximum 6 hours between impression and click, 1 month between click and purchase) are applied using Spark core functions.
Spark thrift server functionality can also be used to allow external tools to create some data visualization.
challenge_figure3
Code snippets:
Spray route example for Impression API:
implicit val system = ActorSystem("LocalSystem")
val localActor = system.actorOf(Props[LocalActor], name = "LocalActor")
localActor ! "START"

startServer("localhost", port = 8082) {
    get {
        path("api") {
        parameters('id_impression, 'country, 'campaign, 'advertiser, 'publisher) { (id_impression, country, campaign, advertiser, publisher) =>
           val timestamp = Calendar.getInstance().getTime()
           localActor ! s"$id_impression,$country,$campaign,$advertiser,$publisher,$timestamp"
           complete(s"Sample impression response")
        }
      }
   }
}
API server actor class:
class LocalActor extends Actor {
    val remote = 
context.actorSelection("akka.tcp://sparkDriver@SparkMaster:7777/user/Supervisor0/receiver")
   def receive = {
       case "START" =>
             remote ! "Spray API server has started"
       case msg: String =>
             remote ! s"$msg"
  }
}
Window-based operation on DStream:
val ssc = new StreamingContext(conf, Seconds(30))
val actorName = "receiver"
val actorStream = ssc.actorStream[String](Props[Receiver], actorName)
// window based operations
val impressions = actorStream.window(Seconds(600), Seconds(30))
impressions.foreachRDD((rdd: RDD[String]) => {
   // some RDD operation for each window
}
)
Simple example of batch based analysis (clicks per country):
val impressions = impressionData.map(x => 
(x.split(';').lift(0).get,x.split(';').lift(1).get,x.split(';').lift(2).get))
 .map({case (impressionId,country,timestamp) => (impressionId)->(impressionId,country,timestamp)})

val clicks = clickData.map(x => 
(x.split(';').lift(0).get,x.split(';').lift(1).get,x.split(';').lift(2).get))
 .map({case (clickId,timestamp,impressionId) => (impressionId) -> (clickId,timestamp,impres-
sionId)})

val joined = impressions.join(clicks).map(x=>x._2)

val resultByCountry = joined.map(x => (x._1.productElement(2) 
 ,getMinutesDiff(x._1.productElement(3).toString,x._2.productElement(1).toString)))
 .filter(x => x._2 <= 360) // 6 hours interval filter
 .countByKey()
Conclusions:
The proposed solution is effectively solving the data management and analytics problem introduced by the competition.
The architecture described above allows analysts to perform their analysis without waiting for many hours of data processing, and also makes them free to choose between real time analysis (e.g. for ads performance monitoring), and batch-based analysis that leverage on much more complex queries.
Next steps:
The following optimizations and technologies might be evaluated to extend the efficiency and the reliability of the solution:
  • Fine tuning of Spark jobs: to pick the most efficient actions and transformations in terms of computational performance;
  • Apache Kafka or Apache Flume: to add durability and reliability to data collected through the APIs.

Stratio
Author

Stratio guides businesses on their journey through complete #DigitalTransformation with #BigData and #AI. Stratio works worldwide for large companies and multinationals in the sectors of banking, insurance, healthcare, telco, retail, energy and media.