commit efab450b0127280fb0be404b0fb945cc7075898c
Author: James Halliday
Date: Mon Nov 8 22:16:39 2010 +0000

Everybody has their own PubSub module for node.js these days and so can you! First we'll make a PubSub for plain-old network sockets and then we'll retrofit the code to talk websockets too.

Updated March 27, 2011 for dnode 0.6 and browserify

If you want to run the code in this article for yourself, you'll need node.js and dnode. DNode is an RPC library that I wrote to make network programming awesome. It transparently wraps nested callbacks you supply in arguments to remote methods so that when you call a wrapped function, you're actually making an RPC call back to the other side of the connection! And the arguments to that wrapped function are also transparently wrapped!

Now then, let's build a PubSub with dnode!

First let's write a publish() function that just prints out the arguments that it gets called with. To make sure it works, we can set an interval to call publish() with the string 'data' and a random number from 0 to 99, inclusive.

// spew.js

function publish (ev, n) {
    console.log(ev + ': ' + n);
}

setInterval(function () {
    var n = Math.floor(Math.random() * 100);
    publish('data', n);
}, 1000);

Then we can run it to make sure it works:

$ node spew.js 
data: 55
data: 91
data: 92
data: 64
data: 89

^C

Yep! It prints out a random integer from 0 to 99 inclusive every second as expected.

Now we can write a simple dispatch for publish(). A hash subs will map unique subscription IDs to emitter functions. Using the hash library (now called hashish) from my last blog post to iterate over subs, the arguments to publish() are just passed along to each emitter function.

To test publish() now, I'll just hard-code subs with two subscriptions.

// pub.js

<b>var Hash = require('hashish');
var subs = {};</b>

function publish () {
<b>    var args = arguments;
    Hash(subs).forEach(function (emit) {
        emit.apply(emit, args);
    });</b>
}

setInterval(function () {
    var n = Math.floor(Math.random() * 100);
    publish('data', n);
}, 1000);

<b>// for testing purposes:
subs.bar = function (ev, n) { console.log('bar.' + ev + ': ' + n) };
subs.baz = function (ev, n) { console.log('baz.' + ev + ': ' + n) };</b>

Now let's run pub.js:

$ node pub.js
bar.data: 67
baz.data: 67
bar.data: 61
baz.data: 61
bar.data: 10
baz.data: 10

^C

Every second, both bar and baz subscriptions print a message. Looks good!

Now we can take out the test subscriptions and add a super-simple stub of a dnode server.

// stub.js

var Hash = require('hashish');
var subs = {};

function publish () {
    var args = arguments;
    Hash(subs).forEach(function (emit) {
        emit.apply(emit, args);
    });
}

setInterval(function () {
    var n = Math.floor(Math.random() * 100);
    publish('data', n);
}, 1000);

<b>var dnode = require('dnode');
dnode(function (client, conn) {
    // ...
}.listen(5050);</b>

Now dnode will listen on port 5050 with an empty handler function. The second argument to the handler, conn is an EventEmitter that represents the status of the connection so you can listen for changes in its status.

The first argument to the handler, client, is a transparently wrapped reference to the dnode object of a client that connects. client is just {} initially, an empty object, until after the function executes but you can listen for conn's "ready" event or use a reference in functions that you define.

But enough about dnode, let's write a subscription function!

// sub.js

var Hash = require('hashish');
var subs = {};

function publish () {
    var args = arguments;
    Hash(subs).forEach(function (emit) {
        emit.apply(emit, args);
    });
}

setInterval(function () {
    var n = Math.floor(Math.random() * 100);
    publish('data', n);
}, 1000);

var dnode = require('dnode');
dnode(function (client, conn) {
    <b>this.subscribe = function (emit) {
        subs[conn.id] = emit;

        conn.on('end', function () {
            delete subs[conn.id];
        });
    };</b>
}).listen(5050);

All the subscription function needs to do is store the emitter function that the client supplies and then delete the function when the client disconnects. DNode takes care of the rest!

To test this server, let's write a client!

// client.js

var dnode = require('dnode');
var EventEmitter = require('events').EventEmitter;

dnode.connect(5050, function (remote) {
    var em = new EventEmitter;
    em.on('data', function (n) {
        console.log('data: ' + n);
    });

    var emit = em.emit.bind(em);
    remote.subscribe(emit);
});

The client creates an event emitter and listens for 'data' events. Then the event emitter's emit function is passed to the server so it can publish to it. The .bind(em) part just makes sure that the this supplied to the EventEmitter prototype will be the em object.

Now we can run the server:

$ node sub.js

And in another shell launch the client:

$ node client.js
data: 86
data: 30
data: 12
data: 65

^C

Aww yiss, it works! Launch another client instance in another shell with the first instance still running and you should get the same data in both clients! We now have a working PubSub!

But wait, there's more! We can retrofit this example for browser clients with websockets and fallbacks for browsers that don't have them through dnode's socket.io support.

All we need to do is add some Connect code to the top of sub.js and add another .listen().

We'll use browserify to bundle up the dnode source at /browserify.js so we can require('dnode') and require('events').EventEmitters in the browser.

// web.js

<b>var connect = require('connect');
var webserver = connect.createServer();

webserver.use(connect.static(__dirname));
webserver.use(require('browserify')({ require : 'dnode' }));

webserver.listen(5051);
console.log('http://localhost:5051/');
</b>

var Hash = require('hashish');
var subs = {};

function publish () {
    var args = arguments;
    Hash(subs).forEach(function (emit) {
        emit.apply(emit, args);
    });
}

setInterval(function () {
    var n = Math.floor(Math.random() * 100);
    publish('data', n);
}, 1000);

var dnode = require('dnode');
dnode(function (client, conn) {
    this.subscribe = function (emit) {
        subs[conn.id] = emit;

        conn.on('end', function () {
            delete subs[conn.id];
        });
    };
}).listen(5050)<b>.listen(webserver)</b>;

Pow, that was easy! Now just drop an index.html into the same directory as web.js:

&lt;!-- index.html --&gt;
&lt;html&gt;
&lt;head&gt;
&lt;title&gt;Rolling your own PubSub with DNode!&lt;/title&gt;
&lt;script src="/browserify.js" type="text/javascript"&gt;&lt;/script&gt;
&lt;script type="text/javascript"&gt;
    var dnode = require('dnode');
    var EventEmitter = require('events').EventEmitter;

    window.onload = function () {
        dnode.connect(function (remote) {
            var em = new EventEmitter;

            em.on('data', function (n) {
                document.getElementById('output').innerHTML += n + ' ';
            });

            var emit = em.emit.bind(em);
            remote.subscribe(emit);
        });
    };
&lt;/script&gt;
&lt;/head&gt;
&lt;body&gt;

&lt;div id="output"&gt;&lt;/div&gt;

&lt;/body&gt;
&lt;/html&gt;

Now run it by going to http://localhost:5051. You can even still run node client.js and you'll get the same data that the browser gets!

Now we've rolled our own PubSub with dnode and it was super easy! It even works on the web browser with shiny websockets!

You can check out the code from this article at substack/dnode-pubsub on github.

You should also follow me on github!

more
git clone http://substack.net/blog.git