Building a content aggregation service with node.js

Fetching, aggregating and transforming data for delivery is a seemingly complex task. Imagine a service that serves aggregated search results from Twitter, Google and Bing where the response has to be tailored for mobile and web. One has to fetch data from different sources, parse and compose the results then transform them into the right markup for delivery to a specific client platform.
To cook this I’ll need:
– a web server
– a nice way to aggregate web service responses (pipelining would be nice)
– a component to transform the raw aggregated representation into a tailored client response.

I could take a stab at it and use Apache/Tomcat, Java (using Apache HttpClient 4.0), a servlet dispatcher (Spring WebMVC) and Velocity templating but it sounds too complex.

Enter Node.js. It’s an event-based web server built on Google’s V8 engine. It’s fast and it’s scalable and you develop on it using the familiar Javascript.
While Nodejs is still new, the community has built a rich ecosystem of extensions (modules) that greatly ease the pain of using it. If you’re unfamiliar with the technology, check-out the Hello World example, it should get you started.
Back to the task at hand, here are the modules I’ll need:
Restler to get me data.
async to allow parallelizing requests for effective data fetching.
Haml-js for view generation


To restate the requirement, we want to aggregate results from Google and Twitter using any arbitrary search predicate.
First we need node.js itself:

git clone http://github.com/ry/node/tree/master
./configure
make
sudo make install

Let’s bring the necessary modules. Create a project directory then fetch them in it:

git clone  http://github.com/caolan/async.git
git clone http://github.com/danwrong/restler.git
git clone http://github.com/creationix/haml-js.git

Most of the action take place in one file that fires the webserver and contains all the event logic. My small little server is called servus.js so I’ll call the file just that.

Update Jun/28: Servus now lives in github, grab it here

First thing, import all the modules, assuming that all of them are in the root of the project directory and create the basic port listener (all code in servus.js):

var sys = require('sys'),
  http = require('http'),
  rest = require('./restler/lib/restler'),
  async = require('./async/lib/async'),
  haml = require('./haml-js/lib/haml');
  url = require('url'),
  fs = require('fs');

http.createServer(function (req, res) {
    process.addListener('uncaughtException', function (err) {
      sys.puts('Caught exception: ' + err);
      res.writeHead(500, 'text/plain');
      res.end('error!');
    });

   res.writeHead(200, {'Content-Type': 'text/plain'});
   res.end('Servus\n');
}).listen(8124, "127.0.0.1");
sys.puts('Server running at http://127.0.0.1:8124/');

Listening for ‘uncaughtException’ events ensures that the server keeps running when an exception occurs. Run this with node servus.js and you should see the server responding with ‘Servus’ when browsing to http://localhost.

Let’s add some real code. First, we define the request URLs for Twitter and Google. Next, parse the URL and extract the query predicate. Servus will respond to /aggregate?q= requests and querying both search services using async. Replace res.writeHead and res.end lines with:

    var serviceUrls = {};
    serviceUrls.twit = 'http://search.twitter.com/search.json?q=';
    serviceUrls.goo = 'http://ajax.googleapis.com/ajax/services/search/web?v=1.0&q=';
    var urlParsed = url.parse(req.url, true);
    var path = urlParsed.pathname;

    switch(path) {
      case '/aggregate':
        //extract search predicate
        var query = urlParsed.query;
        if(query != undefined) {
          predicate = query.q;
        } else {
          res.writeHead(400);
          res.end('Query expected. Use q=... in the URL');
          return;
        }
        async.parallel([
          function(callback) {
            callRestService(serviceUrls['goo'] + predicate, 'goo', callback);
          },
          function(callback) {
            callRestService(serviceUrls['twit'] + predicate, 'twit', callback);
          }
        ],
        //callback
        function (err, results) {
             //use the results argument and a haml template to form the view.
        }
      );
      break;

Through the async module, both GET requests are fetched in parallel. The callback function is called after all functions in the first argument async.parallel have been completed; the ‘results’ argument in the callback function accumulates the search results in an array. The function ‘callRestService’ gets the JSON results and parses them in hashes of format {url: …, text: …}. Let’s implement it:

    function callRestService(url, serviceName, callback) {
        request = rest.get(url);
        request.addListener('success', function(data) {
            searchResults = [];
            if(serviceName == 'goo') {
              dataJson = JSON.parse(data).responseData.results;
              for(sr in dataJson) {
                searchResult = {}
                searchResult.url = dataJson[sr].url;
                searchResult.text = dataJson[sr].title;
                searchResults.push(searchResult);
              }
            } else if(serviceName == 'twit') {
              dataJson = data.results;
              for(sr in dataJson) {
                searchResult = {}
                searchResult.url = 'http://twitter.com/' + dataJson[sr].from_user +
 '/status/' + dataJson[sr].id;
                searchResult.text = dataJson[sr].text;
                searchResults.push(searchResult);
              }
            }
          callback(null, searchResults);
        });
        request.addListener('error', function(data) {
          sys.puts('Error fetching [' + url + ']. Body:\n' + data);
          callback(null, ' ');
        });
    }

The callback function in async.parallel takes the search results and will a use haml template to produce the view for web serving. Here’s the template (search-res.haml):

%h1 Search results
  :if items.length === 0
    %p There are no search results
  :if items.length > 0
    %table
      :each sr in items
        %tr
          %td %a{href: sr.url}&= sr.text

We’ll load this template and use it subsequently in the callback function:

    //load template; this goes before http.createServer
    var searchResHamlTemplate;
    fs.readFile('./search-res.haml', function(e, c) {
        searchResHamlTemplate = c.toString();
      });

The callback function above is implemented like this:

        //callback
        function (err, results) {
          //results accumulated two arrays of search results, 
          //for google and twitter respectively.

          res.writeHead(200, {'Content-Type': 'text/html'});
          var searchItems = [];
          for(i=0;i<results.length;i++) {
            for(sr in results[i]) {
              searchItems.push(results[i][sr]);
            }
          }
          res.end(haml.render(searchResHamlTemplate, {locals: {items: searchItems}}));
        }

Done. You can grab the complete code here: servus.js. The final version contains a few more bits to be able to properly respond to GET queries. Querying the server using http://localhost:8124/aggregate?q=worldcup will render a few google results on top followed by all twitter search results.

Adding a Service Level Agreement guarantee
What if we want to add SLA (service level agreement) to our little server? Some web service clients require that servers respond within a guaranteed time. Turns out it’s not hard at all. We only have to add one more function in the parallel.async call that is called after the preset timeout using setTimeout:

    var timeoutReached = false;
...
        async.parallel([
          function(callback) {
            callRestService(serviceUrls['goo'] + predicate, 'goo', callback);
          },
          function(callback) {
            callRestService(serviceUrls['twit'] + predicate, 'twit', callback);
          },
          function(callback) {
            setTimeout(function() {
              if(!timeoutReached) {
                //serve search results if any
                serveContent(partialResults);
                sys.puts("Timeout reached");
              }
            }, 200);
          }
        ]);

partialResults is an array where search results are accumulated as they come in. If this array size becomes 2 then we know that the fetch/parse has completed and the result can be served to the view:


    function callRestService(url, serviceName, callback) {
        request = rest.get(url);
        request.addListener('success', function(data) {

...same as before...

           partialResults.push(searchResults);
           if(partialResults.length ==  2 && !timeoutReached) {
             sys.puts("Timeout not reached");
             serveContent(partialResults);
           }
        });

    function serveContent(results) {

...same as before...

      timeoutReached = true;  //prevent further search results to be processed
    }

Lastly, timeoutReached flag is turned off in serveContent to prevent further results to be processed.

I’ve attached the final version of servus.js with sla: servus-sla.js. The code is available now in a github project called servus. The sla timeout is specified as an argument when starting nodejs. If none is specified, the default is 2 seconds.

If you’re behind a corporate proxy, use proxychains. You can find more details about this issue here