//
you're reading...
MongoDB

Real-Time Analytics With MongoDB

MongoDB is one of the most popular NoSQL databases. The unique features it offers make it super easy to implement real-time analytics. While the natural tendency is to gravitate to Map Reduce feature provided by MongoDB to aggregate data, I will focus on the upsert feature and $inc operator to demonstrate how complex aggregations and groupings can be performed in near real-time.

We can update total counts, averages and find distinct entities for a given period (per second, minute or hour) in a single upsert statement. This data is then immediately available to draw real-time charts or to take automated actions based on pre-configured thresholds.

To keep things simple, I will take the example of Web Log analytics. The code examples below are in Java.

Let us say that we want to obtain the following information every second for our application:

  1. Total hits
  2. Unique users
  3. Unique IP address
  4. Average response time for each URL

We will start by setting up some sample requests. Here, I am hardcoding them. In real world, we can tail and parse a log file. Alternatively, this information can be sent directly from the application to our analytics system over something like ZeroMQ.

Request r1 = new Request("2012_01_31_19_23_01", "10_1_2_3",  "/contextRoot/feature1", "user1", 1);
Request r2 = new Request("2012_01_31_19_23_01", "10_1_2_3",  "/contextRoot/feature1", "user2", 2);
Request r3 = new Request("2012_01_31_19_23_01", "10_1_2_3",  "/contextRoot/feature2", "user1", 1);
Request r4 = new Request("2012_01_31_19_23_01", "10_1_2_3",  "/contextRoot/feature2", "user2", 1);
Request r5 = new Request("2012_01_31_19_23_01", "10_1_2_3",  "/contextRoot/feature1", "user1", 1);
Request r6 = new Request("2012_01_31_19_23_01", "192_168_1_20",  "/contextRoot/feature1", "user3", 4);
Request r7 = new Request("2012_01_31_19_23_01", "10_1_2_3",  "/contextRoot/feature3", "user1", 1);
Request r8 = new Request("2012_01_31_19_23_02", "10_1_2_3",  "/contextRoot/feature1", "user4", 2);
Request requests[] = { r1, r2, r3, r4, r5, r6, r7, r8 };

Note that the periods in IP Address have been replaced with underscores. This is done because we will be using them as the keys in our JSON documents and having periods in there will cause undesirable effects.

Next, we need to decide the aggregation interval. This determines how we choose our document id’s.
In this example, we have decided to aggregate every second, so we can choose our id as the request time that includes second level information(2012_01_31_19_23_01). If we were aggregating at an hourly interval, we could have simply used the id that contains only hour level information; such as ”2012_01_31_19″.

Based on the chosen id, all requests coming within a second will update  a single document. While there may be hundreds of request per second, we will end up having one document per second, that will represent the aggregated view for that second.

This has a very nice side effect. MongoDB index sizes are directly propotional to the number of documents.  Since we maintain one record per second instead of storing each request as a separate document, we greatly reduce the required index size and its much easier to fit the index in memory.

So, let us see how does aggregation and counts happen. Following is the function that creates the upsert request for each record.

public static DBObject createUpsert(Request request) {

DBObject upsert = new BasicDBObject();
DBObject inc = new BasicDBObject();
// Count total hits
inc.put("hits", 1);
// Get distinct IP's & Count hits per IP
inc.put("ip." + request.getIp() + ".hits", 1);
// Get Distinct Users & Count hits per user
inc.put("users." + request.getUser() + ".hits", 1);

// Get Distinct URL's & Count hits per URL
inc.put("urls." + request.getUrl() + ".hits", 1);
  // Total time taken to process 'n' requests.
  // Divide it by the count obtained above to get the average processing  time for each URL.
inc.put("urls." + request.getUrl() + ".totalTime",    request.getProcessTime());
upsert.put("$inc", inc);
return upsert;
}

Line Number 6 simply increments the ‘hits’ field, giving us the total hits in a second.
In Line Number 8, we get more creative. For the current request, if the field ip.’ip_address’ (e.g. ip.10_1_2_3) exists, then its ‘hits’ field is incremented. Otherwise, the field  ip.’ip_address’ get added to the ‘ip’ field. At the end of the second, we have collected all distinct IPs from which the application got the requests as well as the number of requests received from each IP.

We do the same for users and URL’s. Finally, in line number 16, we accumulate the total processing time for all requests for each ‘url’ in the field urls.’url’.totalTime.
By simply dividing the accumulated time by the ‘hits’ count later, we can get the average response time.

Here’s the function that gets invoked for each request, updating/inserting the records in database.

public static void upsert(String id, Request request,   DBCollection collection) {
BasicDBObject query = new BasicDBObject();
query.put("_id", id);  DBObject dbo = createUpsert(request);
collection.update(query, dbo, true, false);

}

As each request is received and upserted, our data keeps on getting aggregated immediately. Below is the final aggregated document at the end of processing requests for the time period represented by ‘2012_01_31_19_23_01’.

{
        "_id" : "2012_01_31_19_23_01",
        "hits" : 7,
        "ip" : {
                "10_1_2_3" : {
                        "hits" : 6
                },
                "192_168_1_20" : {
                        "hits" : 1
                }
        },
"urls" : {
"/contextRoot/feature1" : {
                        "hits" : 4,
"totalTime" : 8
                },
"/contextRoot/feature2" : {
                        "hits" : 2,
"totalTime" : 2
                },
"/contextRoot/feature3" : {
                        "hits" : 1,
"totalTime" : 1
                }
        },
        "users" : {
"user1" : {
                        "hits" : 4
                },
"user2" : {
                        "hits" : 2
                },
"user3" : {
                        "hits" : 1
                }
        }
}

In this approach, each request is a single write operation. With each write we do multiple aggregations and generate the latest aggregated view with almost no delay. There is absolutely no need to run any batch processes or expensive Map-Reduce to get this information. It is right there and can be read from the database and presented to the UI in less than a second.

Other Considerations

In real world, you may want to dump raw records in a capped collection.

MongoDB documents have a size limit of 16MB.  Depending upon the type of data to be aggregated, aggregating over large time periods can cause these documents to grow in huge size, which isn’t desirable.

If you chose to implement sharding, then the document id’s would have to be chosen differently.
For example, in the current scenario, we might append host name to the time, effectively storing data for each host in a different shard. At runtime, we can do a simple aggregation at the application level to present a unified view, while equally distributing the load to multiple database servers.

Complete Code Listing

Below are the complete code listings for your reference.

Request.java

package com.samarthbhargava.blog.mongodb.realtime;

public class Request {
	private String time;
	private String ip;
	private String url;
	private String user;
	private int processTime;

	public Request(String time, String ip, String url, String user,
			int processTime) {
		this.ip = ip;
		this.time = time;
		this.user = user;
		this.url = url;
		this.processTime = processTime;
	}

	public String getTime() {
		return time;
	}

	public String getIp() {
		return ip;
	}

	public String getUrl() {
		return url;
	}

	public String getUser() {
		return user;
	}

	public int getProcessTime() {
		return processTime;
	}

}

Aggregator.java

package com.samarthbhargava.blog.mongodb.realtime;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;

public class Aggregator {
	public static void main(String[] args) {
		Request r1 = new Request("2012_01_31_19_23_01", "10_1_2_3",
				"/contextRoot/feature1", "user1", 1);
		Request r2 = new Request("2012_01_31_19_23_01", "10_1_2_3",
				"/contextRoot/feature1", "user2", 2);
		Request r3 = new Request("2012_01_31_19_23_01", "10_1_2_3",
				"/contextRoot/feature2", "user1", 1);
		Request r4 = new Request("2012_01_31_19_23_01", "10_1_2_3",
				"/contextRoot/feature2", "user2", 1);
		Request r5 = new Request("2012_01_31_19_23_01", "10_1_2_3",
				"/contextRoot/feature1", "user1", 1);
		Request r6 = new Request("2012_01_31_19_23_01", "192_168_1_20",
				"/contextRoot/feature1", "user3", 4);
		Request r7 = new Request("2012_01_31_19_23_01", "10_1_2_3",
				"/contextRoot/feature3", "user1", 1);
		Request r8 = new Request("2012_01_31_19_23_02", "10_1_2_3",
				"/contextRoot/feature1", "user4", 2);
		Request requests[] = { r1, r2, r3, r4, r5, r6, r7, r8 };
		DBCollection collection = getCollection();
		for (Request request : requests) {
			upsert(request.getTime(), request, collection);
		}
	}

	public static void upsert(String id, Request request,
			DBCollection collection) {
		BasicDBObject query = new BasicDBObject();
		query.put("_id", id);
		DBObject dbo = createUpsert(request);
		collection.update(query, dbo, true, false);

	}

	public static DBObject createUpsert(Request request) {
		DBObject upsert = new BasicDBObject();
		DBObject inc = new BasicDBObject();
		// Count total hits
		inc.put("hits", 1);
		// Get distinct IP's & Count hits per IP
		inc.put("ip." + request.getIp() + ".hits", 1);
		// Get Distinct Users & Count hits per user
		inc.put("users." + request.getUser() + ".hits", 1);

		// Get Distinct URL's & Count hits per URL
		inc.put("urls." + request.getUrl() + ".hits", 1);
		// Total time taken to process 'n' requests.
		// Divide it by the count obtained above to get the average processing
		// time for each URL.
		inc.put("urls." + request.getUrl() + ".totalTime",
				request.getProcessTime());
		upsert.put("$inc", inc);
		return upsert;
	}

	public static DBCollection getCollection() {

		Mongo mongo;
		try {
			mongo = new Mongo();
		} catch (Exception e) {
			throw new RuntimeException(e);
		}

		DB db = mongo.getDB("realtime");
		return db.getCollection("requests");
	}

}

Advertisements

Discussion

11 thoughts on “Real-Time Analytics With MongoDB

  1. If we want to serve queries to provide aggregated data per second,per minute,per hour or even aggregation for a time range, then what’s the approach? From what I understand, we need to decide at the time of loading what our aggregation approach would be. Do we do the aggregation at runtime (at the time of serving the query – effectively something like map-reduce)?

    Posted by Prakash | March 6, 2012, 8:47 pm
    • I’ll recommed that you aggregate at multiple resolutions of 1 sec, 1 min and 1hr. The resolutions will depend upon your requirements. This will still be much cheaper than maintaing raw records. Any further agregations can be done on these pre-aggregated values using map-reduce or via the new aggregation framework that will be part of a future MongoDB release.

      Posted by Samarth Bhargava | March 6, 2012, 9:53 pm
  2. “The unique features it offers make it super easy to implement real-time analytics”
    Why is an upsert an unique feature (i’m assuming this is the whole point of the article … display how aggregation can be done in realtime using upsert)? You can achieve the exact same thing with an SELECT … ON DUPLICATE KEY UPDATE with MySQL and a myisam storage engine.

    Posted by Alex | March 22, 2012, 7:12 pm
  3. Thanks for this post. It’s close to what I had in mind for my analytics solution and it definitely saved implementation time. Cheers!

    Posted by Matt Walker | April 24, 2012, 2:10 am
  4. Thanks Samarth. Great write-up!

    Posted by Bjorn Harvold | June 14, 2012, 10:00 pm
  5. So let’s say you’re doing a daily aggregation (assuming you’ve got a lot of unique visits per day) – how do you count unique visitors without pulling the entire object and check the “length” of the “ip” field?

    Posted by Oren Ellenbogen | October 24, 2012, 2:48 am
    • Daily aggregation for getting unique visitors will result in a single huge document where we may hit the 16 MB limit pretty quickly. I would probably aggregate at more frequent intervals and then run a map-reduce on aggregated data to get the final daily unique visitor count. Since the MR will run on pre-aggregated data, it will be much faster as compared to running it on raw data.

      Posted by Samarth Bhargava | December 1, 2012, 3:46 pm
  6. Nice post. I like the solution. How would you handle inserting numbers that continue to increment (i.e. RX packets and TX packets) and determine the difference between the last number and current and record that?

    Posted by James Manring | November 29, 2012, 4:04 am
  7. I love the thinking behind this, but I’m missing something syntactical – how is the increment on multiple levels of embedded docs working? I’m trying this in the mongo console, and get errors if I try to “reach” more than one level down into the bucket. i.e. .update({query},{$inc:{“ip.127_0_0_1_hits”:1}},{upsert:true}) works, but .update({query},{$inc:{“ip.127_0_0_1.hits”:1}},{upsert:true}) fails. Same issue using findAndModify, too. Interestingly, if I do something wacky, like .update({query},{$inc:{“‘ip.127_0_0_1’.hits”:1}},{upsert:true}), it works, but now I have a new key of ‘ip with a subdoc of 127_0_0_1’, which itself has the correct subdoc of hits = .

    Posted by Greg Bryant | March 16, 2013, 12:27 am
    • Just figured it out – double quotes work for the single-level, but not for multiple levels. Single quotes work for all cases. Thanks for the great article.

      Posted by Greg Bryant | March 16, 2013, 1:24 am

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: