The logic for the broker, in abstract form, without the details of the MQTT packets.

  Copyright (c) 2013, 2014 IBM Corp.
  All rights reserved. This program and the accompanying materials
  are made available under the terms of the Eclipse Public License v1.0
  and Eclipse Distribution License v1.0 which accompany this distribution. 
  The Eclipse Public License is available at
  and the Eclipse Distribution License is available at
     Ian Craggs - initial implementation and/or documentation

import types, time, logging

from . import Topics
from .SubscriptionEngines import SubscriptionEngines

logger = logging.getLogger('MQTT broker')
class Brokers:

  def __init__(self, overlapping_single=True): = SubscriptionEngines()
    self.__clients = {} # clientid -> client
    self.overlapping_single = overlapping_single

  def reinitialize(self):
    self.__clients = {}

  def getClient(self, clientid):
    return self.__clients[clientid] if (clientid in self.__clients.keys()) else None

  def cleanSession(self, aClientid):
    "clear any outstanding subscriptions and publications"
    if len("#")) > 0:"[MQTT-3.1.2-7] retained messages not cleaned up as part of session state for client %s", aClientid)

  def connect(self, aClient):
    aClient.connected = True
    aClient.timestamp = time.clock()
    self.__clients[] = aClient
    if aClient.cleansession:

  def terminate(self, aClientid):
    "Abrupt disconnect which also causes a will msg to be sent out"
    if aClientid in self.__clients.keys() and self.__clients[aClientid].connected:
      if self.__clients[aClientid].will != None:"[MQTT-3.1.2-8] sending will message for client %s", aClientid)
        willtopic, willQoS, willmsg, willRetain = self.__clients[aClientid].will
        if willRetain:
"[MQTT-3.1.2-15] sending will message retained for client %s", aClientid)
"[MQTT-3.1.2-14] sending will message non-retained for client %s", aClientid)
        self.publish(aClientid, willtopic, willmsg, willQoS, willRetain)

  def disconnect(self, aClientid):
    if aClientid in self.__clients.keys():
      self.__clients[aClientid].connected = False
      if self.__clients[aClientid].cleansession:"[MQTT-3.1.2-6] broker must discard the session data for client %s", aClientid)
        del self.__clients[aClientid]
      else:"[MQTT-3.1.2-4] broker must store the session data for client %s", aClientid)
        self.__clients[aClientid].timestamp = time.clock()
        self.__clients[aClientid].connected = False"[MQTT-3.1.2-10] will message is deleted after use or disconnect, for client %s", aClientid)"[MQTT-3.14.4-3] on receipt of disconnect, will message is deleted")
        self.__clients[aClientid].will = None

  def disconnectAll(self):
    for c in self.__clients.keys()[:]: # copy the array because disconnect will remove an element

  def publish(self, aClientid, topic, message, qos, retained=False):
    """publish to all subscribed connected clients
       also to any disconnected non-cleansession clients with qos in [1,2]
    if retained:"[MQTT-2.1.2-6] store retained message and QoS"), message, qos)
    else:"[MQTT-2.1.2-12] non-retained message - do not store")

    for subscriber in  # all subscribed clients
      # qos is lower of publication and subscription
      if len(, subscriber)) > 1:"[MQTT-3.3.5-1] overlapping subscriptions")
      if retained:"[MQTT-2.1.2-10] outgoing publish does not have retained flag set")
      if self.overlapping_single:   
        out_qos = min(, topic), qos)
        self.__clients[subscriber].publishArrived(topic, message, out_qos)
        for subscription in, subscriber):
          out_qos = min(subscription.getQoS(), qos)
          self.__clients[subscriber].publishArrived(topic, message, out_qos)

  def __doRetained__(self, aClientid, topic, qos):
    # topic can be single, or a list
    if type(topic) != type([]):
      topic = [topic]
      qos = [qos]
    i = 0
    for t in topic: # t is a wildcard subscription topic
      topicsUsed = []
      for s in # s is a non-wildcard retained topic
        if s not in topicsUsed and Topics.topicMatches(t, s):
          # topic has retained publication
          (ret_msg, ret_qos) =
          thisqos = min(ret_qos, qos[i])
          self.__clients[aClientid].publishArrived(s, ret_msg, thisqos, True)
      i += 1

  def subscribe(self, aClientid, topic, qos):
    rc =, topic, qos)
    self.__doRetained__(aClientid, topic, qos)
    return rc

  def unsubscribe(self, aClientid, topic):, topic)

  def getSubscriptions(self, aClientid=None):