Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [mosquitto-dev] Cassandra as a retained message store

An update on this.

I have rebased my POC on 1.4 current as of the commits made today. I'm only running locally on my mac at this point.

Retained storage is handled solely by Cassandra in the POC, although in the real world Cassandra should probably handle only subtrees specified by configuration: "global" retained storage.

It passes all broker tests to the same degree as "vanilla" 1.4 on my mac (no TLS) except for those involving "bridge": 06-bridge-*.py and Obviously I don't understand bridge well enough and have to study the code/docs more to fix this.

The mosquitto<->retain_server interaction is fully asynchronous using ZMQ. I augmented the IO loop to handle "retain inserts" from the retain_server in response to asynchronous subscriptions from mosquitto. Storing a retained message in mosquitto pushes it to the retain_server and clears the retain bit in mosquitto. The real world implementation will need a bit more stuff flying back and forth, e.g. heartbeats.

It's not much code. Maybe 50 lines of mods to the existing source plus a few hundred lines for the retain_server glue module. I did have to modify and some tests to start/stop/clear the retain_server as needed. The python retain_server is more complicated than the mosquitto mods due to query planning. There is more grunt work to be done there to generalize, modularize and parallelize but the effort is straightforward and the algorithms are now proven.

I can see that this is possible now. So we are re-imagining our current nytfabrik which is based upon custom websocket gateways and message protocols. It's actually very simple: we autoscale gateways in multiple AWS regions - anywhere between 10 and 100. They are all connected to a global data store and a global message bus. I hope to do an MQTT version to run in parallel and compare.

I'm unsure about timing from this point, but the next step will probably be to involve others on the team and move to a dev environment in AWS.


On Thu, Dec 11, 2014 at 4:19 PM, Laing, Michael <michael.laing@xxxxxxxxxxx> wrote:
I have built a proof-of-concept C interface to my POC retained store Cassandra and Postgres drivers.

It is full duplex, non-blocking and asynchronous using zmq.

I am thinking that I will clone the mosquitto 1.4 repo and incorporate my interface as a POC.

My approach will be to send non-blocking PULLs from mosquitto upon subscription and non-blocking PUSHes upon publish (with retain).

For responses, I will add a polled item to the main polling loop and pull up to 100 or so messages per loop - I'll figure out how to send each directly to the client id.

Does that sound reasonable? It's been a decade or so since I did any C work so I am a bit rusty.


On Wed, Dec 3, 2014 at 5:33 PM, Laing, Michael <michael.laing@xxxxxxxxxxx> wrote:
Just an update.

I have increased the wildcard level span to 9 by combining inversion and indexing strategies. It is still very fast and retrieves all results in a single query.

To scale, it requires one or 2 more strategies to increase the cardinality of the partition keys - the 'row' in the big table model. In the current nytfabrik, we use the 0 level as an index into metadata that lets us do this, in effect moving some levels into the partition key and, sometimes, a rarely wildcarded level as a hash into a partition key shard.

For contrast, I have done the same thing using Postgres - much easier, of course, as the db does the query planning. Also fast.

But now I am thinking about how to implement these in mosquitto. They could be embedded, but maybe it would be more flexible to use something like zmq to asynchronously stream queries and responses. Then I could do my Cassandra and Postgres backends in python - others may want to do mongo or redis, etc., using their language of choice.



On Mon, Dec 1, 2014 at 2:07 AM, Oegma2 <oegma2@xxxxxxxxx> wrote:
Hi Michael

I will definitely be interested to hear more ^_^

On Thu, Nov 27, 2014 at 10:43 AM, Roger Light <roger@xxxxxxxxxx> wrote:
Hi Michael,

I'd definitely be interested to hear more.



On Wed, Nov 26, 2014 at 3:23 PM, Laing, Michael
<michael.laing@xxxxxxxxxxx> wrote:
> I am experimenting with MQTT as a way to extend the nytfabrik at the edge,
> complementing and perhaps replacing our custom WebSocket/SockJS protocol.
> We use RabbitMQ with topic routing on the nytfabrik global backbone mesh, so
> we are familiar with wildcarding, subscription, etc.
> And we have lots of 'retained' messages - 10's of millions growing to 100's
> of millions.
> This is no problem for Cassandra but, as we look to support MQTT, we need a
> technical solution for query with wildcards on retained messages.
> Cassandra has a limited query language but can be very fast. I was looking
> for a way to run 'close to the metal' (no additional layers) and now have a
> working prototype architecture and algorithms for store and query that are
> simple, fast and which scale. So I think that technical blocker is overcome
> for our usage cases.
> Anyway, if this is of interest, I'll keep this list informed, and maybe
> solicit some advice.
> We're not sure yet how we'll add MQTT to the nytfabrik, but most likely we
> would remix mosquitto with our own code, package it up, and run it under
> python on our autoscaling edge instances.
> Cheers,
> Michael
> _______________________________________________
> mosquitto-dev mailing list
> mosquitto-dev@xxxxxxxxxxx
> To change your delivery options, retrieve your password, or unsubscribe from
> this list, visit
mosquitto-dev mailing list
To change your delivery options, retrieve your password, or unsubscribe from this list, visit

mosquitto-dev mailing list
To change your delivery options, retrieve your password, or unsubscribe from this list, visit

Back to the top