Pointstream: From PostgreSQL to OpenLayers in real-time with Go

  2016-01-01


From PostgreSQL via RabbitMQ and Server Sent Events to OpenLayers with Go

Last year we wrote a small tech demo for a customer who wanted to visualize geographical point data surveyed in real-time with a OpenLayers 2 based web client in a streaming manner. Further constraints were the scalability to a larger number of users in a cloud based setup.

We chose Go as the implementation language as it is a good fit for this kind of task. You can find the sources here.

The points are stored in a PostgreSQL database. Being a little more complex in the real world application we simplified the model for demo purposes to a minimum: An id, a time stamp and a point consisting of a pair of lat/lon components. For the sake of simplicity we were not using the corresponding simple feature GIS type here. The real application does 1.

CREATE TABLE points (
  id   serial    PRIMARY KEY,
  time timestamp NOT NULL,
  lat  numeric   NOT NULL,
  lng  numeric   NOT NULL
);

The first tool of the demo is pointgenerator which inserts points randomly distributed with a given rate into the database. Even being not very fancy this helped us to simulate some pressure scenarios of incoming data rates higher than the rates in the real application to prove that the system is stable in such circumstances.

To get the points out of the database the second tool pointpoller is used. As the name suggests the program has a mode to poll the data with a given rate. The more interesting mode is the usage of the NOTIFY/LISTEN feature of PostgreSQL combined with its JSON support. Each time a point is inserted into the database a trigger is called to send the JSONified row into a channel. This channel is observed by pointpoller so without an extra polling latency window the points are directly received and filled into a RabbitMQ message queue.

CREATE OR REPLACE FUNCTION notify_points_inserts()
RETURNS TRIGGER AS $$
BEGIN
  PERFORM pg_notify('points_inserts', row_to_json(NEW)::text);
  RETURN NULL;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER points_inserts_trigger
AFTER INSERT ON points
  FOR EACH ROW EXECUTE PROCEDURE notify_points_inserts();

Message queuing is used to enable scaling up the number of the web endpoints called pointbroadcaster which serve the points to the web clients. RabbitMQ in particular was chosen because of its availability in the cloud solution of our customer. It may not be the fastest message queue around and a little bit too feature rich but it transports the data without introducing too much extra latency.

A single broadcasting node was sufficient to fulfill the current needs of our customer. If the number of web clients will increase in the future adding more nodes will be easy.

The pointbroadcaster endpoint serves an OpenLayers 2 client and a simple text based web app for debugging purposes. The point data itself is brought to these clients with Server-sent events (SSE). They are the little less known brothers of the HTML5 Websockets being only unidirectional from server to to client. They are much easier to tunnel through proxies as they are in fact only some kind of left open GET requests with a special message based content format.

As the support even on modern Mircosoft browser products is not existing we had to incorporate a little polyfill to make it work on these platforms, too.

Each connected SSE client is served with a Go routine receiving points from a broker via a Go channel.

// ServeHTTP is the handler used to do the streaming
// to a SSE client.
func (brk *broker) ServeHTTP(rw http.ResponseWriter,
  req *http.Request) {

  // Close connection
  defer req.Body.Close()

  // Make sure that the writer supports flushing.
  flusher, ok := rw.(http.Flusher)
  if !ok {
    http.Error(rw, "Streaming unsupported!",
      http.StatusInternalServerError)
    return
  }

  rw.Header().Set("Content-Type", "text/event-stream")
  rw.Header().Set("Cache-Control", "no-cache")
  rw.Header().Set("Connection", "keep-alive")
  rw.Header().Set("Access-Control-Allow-Origin", "*")

  // Each connection registers its own message 
  // channel with the broker's connections registry
  messageChan := make(chan []byte, 2)

  // Signal the brk that we have a new connection
  brk.newClients <- messageChan

  // Remove this client from the map of connected clients
  // when this handler exits.
  defer func() {
    brk.closingClients <- messageChan
  }()

  // Listen to connection close and un-register messageChan
  notify := rw.(http.CloseNotifier).CloseNotify()

  for {
    select {
    case msg := <-messageChan:
      if writeSSE(rw, msg) != nil {
        return
      }
      flusher.Flush()
    case <-notify:
      return
    }
  }
}

This is inspired by this Gist from Ismael Celis with some improved body close handling.

To compensate the latency differences from the upstream PostgreSQL/RabbitMQ network stack the timestamps associates with the points are taken together with some internal time measurement and a debt value to replay them in a consistent way. See func (brk *broker) replay() in the source for details. This works pretty well even if a local traffic spike let the events pile up for a while.

Besides some buffering there are no counter measures to deal with slow clients. This would lead to some feedback from the clients propagated back into the broadcaster which seem to be not necessary at the moment. The real world may prove us wrong here. ;-)

The system was able to serve thousands of clients with a pretty un-tuned setup 2 with up to thousand points per minute on a single node. The overall server side latency from pointpoller to pointbroadcaster is within 10 to 100ms which is a small value compared to time spans from the sensors to the database which are above 10 seconds.

Hopefully we will report back from the real production setup this year. The first tests look promising.

Have a happy 2016! :-)


  1. The real points have about 15 physical attributes associated. [return]
  2. We only raised the ulimit -n count to allow more open file handles. [return]