When working with Big Data, it’s frequent to have the need to aggregate data in real-time, whether it comes from a specific service, such as social networks (Twitter, Facebook…) or even from more diverse sources, like a weather station. A good way to process these large amounts of information is with Spark Streaming, it provides us all the data in real time, but it has one problem: you have to program it yourself.
How to process and aggregate data
To avoid this, we can use Stratio Sparta to process and aggregate data. It helps us by reducing raw information and making it useful after it’s processed. Later we’ll persist the result in a MongoDB cluster and provide the streaming through WebSockets with Node.js.
Stratio Sparta is a very flexible, simple to use tool that allows us to transform raw information with Morphline, through its web interface, and aggregate data in different dimensions and time ranges.
First Steps with Stratio Sparta
First of all, we must install Stratio Sparta as indicated in Stratio’s documentation. For this example we’ve used the Stratio Manager and installed our 3 machine clusters with MongoDB and Stratio Sparta. With it we can also monitor our different service resources.
After we install it, we can execute it and it will open its service in the 9090 doc where we can start to set up the different parameters:
- Input: It’s the origin of the data. It can come from Flume, Kafka, RabbitMQ, from a Socket, from a WebSocket or directly from Twitter.
- Output: We can persist or show the information in different ways like MongoDB, Elasticsearch, Cassandra, Parquet, Redis, a CSV file or on the screen.
- Policies: It’s the most complex part of the set-up. We can do the following:
- Configure the input.
- Configure the outputs.
- Make transformations with Morphlines by type of data and date.
- Make aggregations of different dimensions and time granularity. We can also make common functions in data groupings, such as count,sum,max,minand others.
 
In this example we’ll take the data API from meetup.com, which provides a WebSocket with the information that is being created or being updated in the service. Later, we’ll aggregate it by country and we’ll do an hourly recount.
Afterwards we’ll add 3 nodes with MongoDB as an output and we’ll establish the name of the collection that it will generate.
Of all the raw information provided by the WebSocket, we’ll only take 3 of the attributes and give them the proper format. We’ll establish the output of the transformation as country, response and modified.The last one to easily return the updates from Node.js.
{
  "morphline": {
    "id": "morphline1",
    "importCommands": [
      "org.kitesdk.**"
    ],
    "commands": [
      {
        "readJson": {}
      },
      {
        "extractJsonPaths": {
          "paths": {
            "response": "/response",
            "country": "/group/group_country",
            "modified": "/mtime"
          }
        }
      },
      {
        "removeFields": {
          "blacklist": [
            "literal:_attachment_body"
          ]
        }
      }
    ]
  }
}
After we establish what the structure of the processed information is going to be like, we’ll set up an aggregation in the following way:
- Time dimension: name of the attribute, to which we’ll add the beginning of the aggregation date (days, hours…).
- Granurality: time rate in which the information will be aggregated. In this case it’s hourly.
- Dimensions: We’ll group by country, though it’s possible to do acountryandresponseaggregation.
- Operators: We’ll make several operations, the first one is the event recount (count), afterwards it will add the last value of themodifiedentry, which was taken from themtimeattribute from the WebSocket (withlastValue).
When all these actions are done, we only have to establish a way out (in this case MongoDB) and execute the policy.
With this information the aggregations should start in the following way:
We’ll get the result in real time in our MongoDB cluster, and we can make queries to it from any other technology.
WebSocket Server with Node.js
To set up our MongoDB connected WebSocket server, we need to have previously installed both modules. Later on we’ll be able to read the database registry that will be sent by the WebSocket:
var MongoClient = require('mongodb').MongoClient;
MongoClient.connect('mongodb://localhost:27017/sparta', function(err, db) {
  db
    .collection('id_country_hour')
    .find()
    .toArray(function(err, docs){
      console.log(docs);
    });
});
When we execute the script with the node, it will show the MongoDB content in a console.
After we check the access to the information, we’ll create the WebSocket server in order to transmit the changes produced by Stratio Sparta in real-time.
To serve all the streaming data we must do the following:
- In first place, when a new client logs in we must store the connection that’s generated by the WebSocket API to make a broadcast later on. We have to eliminate the connection after, as well.
- It’s also important to send all the existent registries to date.
- Every so often we send the changes that have taken place on MongoDB using the modifiedattribute to find them.
- For the example, we’ll filter the countries to reduce the amount of data that will be sent.
- We’ll also filter the attributes that will get sent, since we’ll only be using country,hourandcount.
The example is brief and you can consult it here.
WebSocket Client with JavaScript
Once we have all the data in our front-end we can show them in a table, chart or console. In this case we’ll show the information in a table with console.table to simplify the visualization.
var ws = new WebSocket('ws://127.0.0.1:8008/');
var data = {};
ws.onmessage = function(response){
  var responseData = JSON.parse(response.data);
  responseData
    .map(function(row){
      row.hour = new Date(row.hour).toISOString().slice(0, 16).replace('T',' ');
      return row;
    })
    .forEach(function(row){
      if(!data[row.hour]){
        data[row.hour] = {};
      }
      data[row.hour][row.country] = row.count;
    });
  console.clear();
  console.log("Last update:", new Date().toLocaleString());
  console.table(data);
}
The result is simple but functional:
We can also make a more aesthetic visualization with charts. A quick way to make them, is with Chart.js. We simply have to format the data so that they can adjust themselves to the Chart.js scheme and we’ll get the following result.
You can find this simple data visualization example here.
It’s really simple to handle information from JavaScript and there are many data visualization libraries, this makes it very easy to use and visualize data generated by Stratio Sparta data aggregations and make statistical reports, which would be very costly to handle without a tool that processes data in real-time.
 
			
			









 
			