[
Date Prev][
Date Next][
Thread Prev][
Thread Next][
Date Index][
Thread Index]
[
List Home]
[mosquitto-dev] checking wildcard subscriptions against wildcard ACLs
|
There has been a little discussion about applying ACLs to subscriptions on the list.
And I saw a comment in read_handle_server.c:
...mosquitto_topic_matches_sub, which can't cope with checking subscriptions that have wildcards against ACLs that have wildcards...
Based on my work on topics for a persistence layer I thought this possible and came up with an algorithm implemented in the attached python script with test data, which may be useful.
Of course, it may not be bulletproof, and probably could be improved - if anyone sees a hole or finds a case that fails, please let me know.
ml
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
def _check_for_subtree(tpl):
if tpl[-1] == u'#':
del tpl[-1] # trim '#'
return True
else:
return False
def acl_check(ap, sp): # ap: acl pattern; sp: subscription pattern
apl = ap.split(u'/') # make a list
ap_stf = _check_for_subtree(apl) # set the subtree flag and trim if True
apn = len(apl)
spl = sp.split(u'/') # make a list
sp_stf = _check_for_subtree(spl) # set the subtree flag and trim if True
spn = len(spl)
if sp_stf and not ap_stf:
return False # subscription may have subtree only if acl does also
if apn > spn: # subscription must have at least as many levels as acl
return False
elif spn > apn:
if ap_stf:
apl += [u'+'] * (spn - apn) # pad the apl with '+' for matching
else: # if no acl subtree, subscription levels cannot exceed acl's
return False
else: # apn == spn
pass
for i in range(spn): # check each level - all levels must match
if apl[i] == u'+': # any value in sub matches; '+' also matches
pass
elif apl[i] == spl[i]: # an explicit match
pass
else: # an explict failure to match at this level
return False
return True
if __name__ == u"__main__":
test_data_tuple_list = [
# (acl pattern, subscription pattern, expected result)
(u'foo/+/bar', u'foo/#', False),
(u'foo/+/ba℞/#', u'foo/baz/ba℞', True),
(u'foo/+/ba℞/#', u'foo/baz/ba℞/+', True),
(u'foo/+/ba℞/#', u'foo/baz/ba℞/#', True),
(u'foo/+/ba℞/#', u'foo/baz/+/#', False),
(u'/+//#', u'/foo///#', True),
(u'#', u'#', True),
(u'#', u'+', True),
(u'+', u'#', False),
(u'+', u'+', True),
]
for tdt in test_data_tuple_list:
result = acl_check(*tdt[:2])
test_result = u'passed' if result == tdt[2] else u'failed'
print(
(
u"ap: {}; sp: {}; Expectation: {}; "
u"Result: {}; Test Result: {}"
).format(tdt[0], tdt[1], tdt[2], result, test_result)
)
sys.exit()