From PostgreSQL via RabbitMQ and Server Sent Events to OpenLayers with Go
Last year we wrote a small tech demo for a customer who wanted to visualize geographical point data surveyed in real-time with a OpenLayers 2 based web client in a streaming manner. Further constraints were the scalability to a larger number of users in a cloud based setup.
We chose Go as the implementation language as it is a good fit for this kind of task. You can find the sources here.
The points are stored in a PostgreSQL database. Being a little more complex in the real world application we simplified the model for demo purposes to a minimum: An id, a time stamp and a point consisting of a pair of lat/lon components. For the sake of simplicity we were not using the corresponding simple feature GIS type here. The real application does 1.
CREATE TABLE points (
id serial PRIMARY KEY,
time timestamp NOT NULL,
lat numeric NOT NULL,
lng numeric NOT NULL
);
The first tool of the demo is pointgenerator
which inserts points randomly
distributed with a given rate into the database. Even being not very fancy this
helped us to simulate some pressure scenarios of incoming data rates higher
than the rates in the real application to prove that the system is stable
in such circumstances.
To get the points out of the database the second tool pointpoller
is used.
As the name suggests the program has a mode to poll the data with a given rate.
The more interesting mode is the usage of the
NOTIFY/LISTEN feature
of PostgreSQL combined with its JSON
support. Each time a point is inserted into the database a trigger is called
to send the JSONified row into a channel. This channel is observed by pointpoller
so without an extra polling latency window the points are directly received
and filled into a RabbitMQ message queue.
CREATE OR REPLACE FUNCTION notify_points_inserts()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('points_inserts', row_to_json(NEW)::text);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER points_inserts_trigger
AFTER INSERT ON points
FOR EACH ROW EXECUTE PROCEDURE notify_points_inserts();
Message queuing is used to enable scaling up the number of the web endpoints
called pointbroadcaster
which serve the points to the web clients.
RabbitMQ in particular was chosen because of its availability
in the cloud solution of our customer. It may not be the fastest message
queue around and a little bit too feature rich but it transports
the data without introducing too much extra latency.
A single broadcasting node was sufficient to fulfill the current needs of our customer. If the number of web clients will increase in the future adding more nodes will be easy.
The pointbroadcaster
endpoint serves an OpenLayers 2 client and a simple
text based web app for debugging purposes. The point data itself is brought
to these clients with Server-sent events (SSE).
They are the little less known brothers of the HTML5 Websockets being
only unidirectional from server to to client. They are much easier to
tunnel through proxies as they are in fact only some kind of left open
GET requests with a special message based content format.
As the support even on modern Mircosoft browser products is not existing we had to incorporate a little polyfill to make it work on these platforms, too.
Each connected SSE client is served with a Go routine receiving points from a broker via a Go channel.
// ServeHTTP is the handler used to do the streaming
// to a SSE client.
func (brk *broker) ServeHTTP(rw http.ResponseWriter,
req *http.Request) {
// Close connection
defer req.Body.Close()
// Make sure that the writer supports flushing.
flusher, ok := rw.(http.Flusher)
if !ok {
http.Error(rw, "Streaming unsupported!",
http.StatusInternalServerError)
return
}
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")
rw.Header().Set("Access-Control-Allow-Origin", "*")
// Each connection registers its own message
// channel with the broker's connections registry
messageChan := make(chan []byte, 2)
// Signal the brk that we have a new connection
brk.newClients <- messageChan
// Remove this client from the map of connected clients
// when this handler exits.
defer func() {
brk.closingClients <- messageChan
}()
// Listen to connection close and un-register messageChan
notify := rw.(http.CloseNotifier).CloseNotify()
for {
select {
case msg := <-messageChan:
if writeSSE(rw, msg) != nil {
return
}
flusher.Flush()
case <-notify:
return
}
}
}
This is inspired by this Gist from Ismael Celis with some improved body close handling.
To compensate the latency differences from the upstream
PostgreSQL/RabbitMQ network stack the timestamps associates with the points are
taken together with some internal time measurement and a debt value to replay them
in a consistent way. See func (brk *broker) replay()
in the
source for details.
This works pretty well even if a local traffic spike let
the events pile up for a while.
Besides some buffering there are no counter measures to deal with slow clients. This would lead to some feedback from the clients propagated back into the broadcaster which seem to be not necessary at the moment. The real world may prove us wrong here. ;-)
The system was able to serve thousands of clients with a pretty un-tuned setup 2 with up to thousand points per minute on a single node. The overall server side latency from pointpoller to pointbroadcaster is within 10 to 100ms which is a small value compared to time spans from the sensors to the database which are above 10 seconds.
Hopefully we will report back from the real production setup this year. The first tests look promising.
Have a happy 2016! :-)