Connection model

You can integrate your Ditto instance with external messaging services such as Eclipse Hono, a RabbitMQ broker or an Apache Kafka broker via custom “connections”.

Additionally, you may invoke foreign HTTP endpoints by using the HTTP connection type.

A connection represents a communication channel for the exchange of messages between any service and Ditto. It requires a transport protocol, which is used to transmit Ditto Protocol messages. Ditto supports one-way and two-way communication over connections. This enables consumer/producer scenarios as well as fully-fledged command and response use cases. Nevertheless, those options might be limited by the transport protocol or the other endpoint’s capabilities.

All connections are configured and supervised via Ditto’s Connectivity service. The following model defines the connection itself:

Connection types

The top design priority of this model is to be as generic as possible, while still allowing protocol specific customizations and tweaks. This enables the implementations of different customizable connection types, and support for custom payload formats. Currently the following connection types are supported:

The sources and targets address formats depends on the connectionType and has therefore connectionType specific limitations. Those are documented with the corresponding protocol bindings.

Sources

Sources are used to connect to message brokers / external systems in order to consume messages from them.

Source messages can be of the following type:

Sources contain:

  • several addresses (depending on the connection type those are interpreted differently, e.g. as queues, topics, etc.),
  • a consumer count defining how many consumers should be attached to each source address,
  • an authorization context (see authorization) specifying which authorization subject is used to authorize messages from the source,
  • enforcement information that allows filtering the messages that are consumed in this source,
  • acknowledgement requests this source requires in order to ensure QoS 1 (“at least once”) processing of consumed messages before technically acknowledging them to the channel,
  • declared labels of acknowledgements the source is allowed to send,
  • header mapping for mapping headers of source messages to internal headers, and
  • a reply-target to configure publication of any responses of incoming commands.

Source enforcement

Messages received from external systems are mapped to Ditto internal format, either by applying some custom mapping or the default mapping for Ditto Protocol messages.

During this mapping the digital twin of the device is determined i.e. which thing is accessed or modified as a result of the message. By default, no sanity check is done if this target thing corresponds to the device that originally sent the message. In some use cases this might be valid, but in other scenarios you might want to enforce that a device only sends data to its digital twin. Note that this could also be achieved by assigning a specific policy to each device and use placeholders in the authorization subject, but this can get cumbersome to maintain for a large number of devices.

With an enforcement, you can use a single policy for all devices and still make sure that a device only modifies its associated digital twin. Enforcement is only feasible if the message contains the verified identity of the sending device (e.g. in a message header). This verification has to be done by the external system e.g. by properly authenticating the devices and providing the identity in the messages sent to Ditto.

The enforcement configuration consists of two fields:

  • input: Defines where device identity is extracted.
  • filters: Defines the filters that are matched against the input. At least one filter must match the input value, otherwise the message is rejected.

The following placeholders are available for the input field:

Placeholder Description Example
{{ header:<name> }} Any header from the message received via the source (case-insensitive). {{header:device_id }}
{{ source:address }} The address on which the message was received. devices/sensors/temperature1

The following placeholders are available for the filters field:

Placeholder Description Example
{{ thing:id }} Full ID composed of ‘‘namespace’’ + ‘’:’’ as a separator + ‘‘name’’ eclipse.ditto:thing-42
{{ thing:namespace }} Namespace (i.e. first part of an ID) eclipse.ditto
{{ thing:name }} Name (i.e. second part of an ID ) thing-42

Assuming a device sensor:temperature1 pushes its telemetry data to Ditto which is stored in a thing named sensor:temperature1. The device identity is provided in a header field device_id. To enforce that the device can only send data to the Thing sensor:temperature1 the following enforcement configuration can be used:

{
  "addresses": [ "telemetry/hono_tenant" ],
  "authorizationContext": ["ditto:inbound-auth-subject"],
  "enforcement": {
    "input": "{{ header:device_id }}",
    "filters": [ "{{ thing:id }}" ]
  }
}

Note: This example assumes that there is a valid user named ditto:inbound-auth-subject in Ditto. If you want to use a user for the basic auth (from the HTTP API) use the prefix nginx:, e.g. nginx:ditto. See Basic Authentication for more information.

Source acknowledgement requests

A source can configure, that for each incoming message additional acknowledgement requests are added.

That is desirable whenever incoming messages should be processed with a higher “quality of service” than the default, which is “at most once” (or QoS 0).

In order to process messages from sources with an “at least once” (or QoS 1) semantic, configure the source’s "acknowledgementRequests/includes" to add the “twin-persisted” acknowledgement request, which will cause that a consumed message over this source will technically be acknowledged, it the twin was successfully updated/persisted by Ditto.

How the technical acknowledgment is done is specific for the used connection type and documented in scope of that connection type.

In addition to the "includes" defining which acknowledgements to request for each incoming message, the optional "filter" holds an fn:filter() function defining when to request acknowledgements at all for an incoming message. This filter is applied on both acknowledgements: those requested in the message and the ones requested via the configured "includes" array.

The JSON for a source with acknowledgement requests could look like this. The "filter" in the example causes that acknowledgements are only requested if the “qos” header was either not present or does not equal 0:

{
  "addresses": [
    "<source>"
  ],
  "authorizationContext": ["ditto:inbound-auth-subject"],
  "headerMapping": {
    "qos": "{{ header:qos }}"
  },
  "acknowledgementRequests": {
    "includes": [
      "twin-persisted",
      "receiver-connection-id:my-custom-ack"
    ],
    "filter": "fn:filter(header:qos,'ne',0)"
  }
}

Source declared acknowledgement labels

The acknowledgements sent via a source must have their labels declared in the field declardAcks as a JSON array.
If the label of an acknowledgement is not in the declaredAcks array, then the acknowledgement is rejected with an error. The declared labels must be prefixed by the connection ID followed by a colon or the {{connection:id}} placeholder followed by a colon. For example:


{
  "addresses": [
    "<source>"
  ],
  "authorizationContext": ["ditto:inbound-auth-subject"],
  "declaredAcks": [
    "{{connection:id}}:my-custom-ack"
  ]
}

Source header mapping

For incoming messages, an optional header mapping may be applied. Mapped headers are added to the headers of the Ditto protocol message obtained by payload mapping. The default Ditto payload mapper does not retain any external header; in this case all Ditto protocol headers come from the header mapping.

The JSON for a source with header mapping could look like this:

{
  "addresses": [
    "<source>"
  ],
  "authorizationContext": ["ditto:inbound-auth-subject"],
  "headerMapping": {
    "correlation-id": "{{ header:message-id }}",
    "content-type": "{{ header:content-type }}"
  }
}

Source reply target

A source may define a reply target to publish the responses of incoming commands. For a reply target, the address and header mapping are defined in itself, whereas its payload mapping is inherited from the parent source, because a payload mapping definition specifies the transformation for both: incoming and outgoing messages.

For example, to publish responses at the target address equal to the reply-to header of incoming commands, define source header mapping and reply target as follows. If an incoming command does not have the reply-to header, then its response is dropped.

{
  "headerMapping": {
    "reply-to": "{{ header:reply-to }}"
  },
  "replyTarget": {
    "enabled": true,
    "address": "{{ header:reply-to }}"
  }
}

The reply target may contain its own header mapping ("headerMapping") in order to map response headers.

In addition, the reply target contains the expected response types ("expectedResponseTypes") which should be published to the reply target.
The following reply targets are available to choose from:

  • response: Send back successful responses (e.g. responses after a Thing was successfully modified, but also responses for query commands). Includes positive acknowledgements.
  • error: Send back error responses (e.g. thing not modifiable due to lacking permissions)
  • nack: If negative acknowledgement responses should be delivered.

This is an example "replyTarget" containing both header mapping and expected response types:

{
  "replyTarget": {
    "enabled": true,
    "address": "{{ header:reply-to }}",
    "headerMapping": {
      "correlation-id": "{{ header:correlation-id }}"
    },
    "expectedResponseTypes": [
      "response",
      "error",
      "nack"
    ]
  }
}

Targets

Targets are used to connect to messages brokers / external systems in order to publish messages to them.

Target messages can be of the following type:

Targets contain:

  • one address (that is interpreted differently depending on the connection type, e.g. as queue, topic, etc.),
  • topics that will be sent to the target,
  • an authorization context (see authorization) specifying which authorization subject is used to authorize messages to the target, and
  • header mapping to compute external headers from Ditto protocol headers.

Target topics and filtering

Which types of messages should be published to the target address, can be defined via configuration.

In order to only consume specific events like described in change notifications, the following parameters can additionally be provided when specifying the topics of a target:

Description Topic Filter by namespaces Filter by RQL expression
Subscribe for events/change notifications _/_/things/twin/events
Subscribe for messages _/_/things/live/messages
Subscribe for live commands _/_/things/live/commands
Subscribe for live events _/_/things/live/events

The parameters are specified similar to HTTP query parameters, the first one separated with a ? and all following ones with &. You need to URL-encode the filter values before using them in a configuration.

For example, this way the connection session would register for all events in the namespace org.eclipse.ditto and which would match an attribute “counter” to be greater than 42. Additionally it would subscribe to messages in the namespace org.eclipse.ditto:

{
  "address": "<target-address>",
  "topics": [
    "_/_/things/twin/events?namespaces=org.eclipse.ditto&filter=gt(attributes/counter,42)",
    "_/_/things/twin/events?extraFields=attributes/placement&filter=gt(attributes/placement,'Kitchen')",
    "_/_/things/live/messages?namespaces=org.eclipse.ditto"
  ],
  "authorizationContext": ["ditto:outbound-auth-subject", "..."]
}

Target topics and enrichment

When extra fields should be added to outgoing messages on a connection, an extraFields parameter can be added to the topic. This is supported for all topics:

Description Topic Enrich by extra fields
Subscribe for events/change notifications _/_/things/twin/events
Subscribe for messages _/_/things/live/messages
Subscribe for live commands _/_/things/live/commands
Subscribe for live events _/_/things/live/events

Example:

{
  "address": "<target-address>",
  "topics": [
    "_/_/things/twin/events?extraFields=attributes/placement",
    "_/_/things/live/messages?extraFields=features/ConnectionStatus"
  ],
  "authorizationContext": ["ditto:outbound-auth-subject", "..."]
}

Target issued acknowledgement label

A target can be configured to automatically issue acknowledgements for each published/emitted message, once the underlying channel confirmed that the message was successfully received.

That is desirable whenever outgoing messages (e.g. events) are handled in scope of a command sent with an “at least once” (QoS 1) semantic in order to only acknowledge that command, if the event was successfully forwarded into another system.

For more details on that topic, please refer to the acknowledgements section.

Whether an outgoing message is treated as successfully sent or not is specific for the used connection type and documented in scope of that connection type.

The issued acknowledgement label must be prefixed by the connection ID followed by a colon or the {{connection:id}} placeholder followed by a colon.
The JSON for a target with issued acknowledgement labels could look like this:


{
  "address": "<target>",
  "topics": [
    "_/_/things/twin/events"
  ],
  "authorizationContext": ["ditto:inbound-auth-subject"],
  "issuedAcknowledgementLabel": "{{connection:id}}:my-custom-ack"
}

Target header mapping

For outgoing messages, an optional header mapping may be applied. Mapped headers are added to the external headers. The default Ditto payload mapper does not define any external header; in this case, all external headers come from the header mapping.

The JSON for a target with header mapping could like this:

{
  "address": "<target>",
  "topics": [
    "_/_/things/twin/events",
    "_/_/things/live/messages?namespaces=org.eclipse.ditto"
  ],
  "authorizationContext": ["ditto:inbound-auth-subject"],
  "headerMapping": {
    "message-id": "{{ header:correlation-id }}",
    "content-type": "{{ header:content-type }}",
    "subject": "{{ topic:subject }}",
    "reply-to": "all-replies"
  }
}

Authorization

A connection is initiated by the connectivity service. This obviates the need for client authorization, because Ditto becomes the client in this case. Nevertheless, to access resources within Ditto, the connection must know on whose behalf it is acting. This is controlled via the configured authorizationContext, which holds a list of self-assigned authorization subjects. Before a connection can access a Ditto resource, one of its authorizationSubjects must be granted the access rights by an authorization mechanism such as ACLs or Policies.

A connection target can only send data for things to which it has READ rights, as data flows from a thing to a target. A connection source can only receive data for things to which it has WRITE rights, as data flows from a source to a thing.

Specific configuration

Some connection types require specific configuration, which is not supported for other connection types. Those are put into the specificConfig field.

Payload Mapping

For more information on mapping message payloads see the corresponding Payload Mapping Documentation.

Placeholders

The configuration of a connection allows to use placeholders at certain places. This allows more fine-grained control over how messages are consumed or where they are published to. The general syntax of a placeholder is {{ placeholder }}. Have a look at the placeholders concept for more details on that.

Placeholder for source authorization subjects

Processing the messages received via a source using the same fixed authorization subject may not be suitable for every scenario. For example, if you want to declare fine-grained write permissions per device, this would not be possible with a fixed global subject. For this use case, we have introduced placeholder substitution for authorization subjects of source addresses that are resolved when processing messages from a source. Of course, this requires the sender of the message to provide necessary information about the original issuer of the message.

You can access any header value of the incoming message by using a placeholder like {{ header:name }}.

Example:

Assuming the messages received from the source telemetry contain a device_id header (e.g. sensor-123), you may configure your source’s authorization subject as follows:

   {
      "id": "auth-subject-placeholder-example",
      "sources": [
        {
          "addresses": [ "telemetry" ],
          "authorizationContext": ["device:{{ header:device_id }}"]
        }
      ]
  }

The placeholder is then replaced by the value from the message headers and the message is forwarded and processed under the subject device:sensor-123. In case the header cannot be resolved or the header contains unexpected characters, an exception is thrown, which is sent back to the sender as an error message, if a valid reply-to header was provided, otherwise the message is dropped.

Placeholder for target addresses

Another use case for placeholders may be to publish twin events or live commands and events to a target address containing thing-specific information e.g. you can distribute things from different namespaces to different target addresses. You can use the placeholders {{ thing:id }}, {{ thing:namespace }} and {{ thing:name }} in the target address for this purpose. For a thing with the ID org.eclipse.ditto:device-123 these placeholders would be resolved as follows:

Placeholder Description Resolved value
thing:id Full ID composed of namespace : (as a separator), and name org.eclipse.ditto:device-123
thing:namespace Namespace (i.e. first part of an ID) org.eclipse.ditto
thing:name Name (i.e. second part of an ID ) device-123

Additionally to the placeholders mentioned above, all documented connection placeholders may be used in target addresses. However, if any placeholder in the target address fails to resolve, then the message will be dropped.

Example:

Sending live commands and events to a target address that contains the thing’s namespace.

   {
      "id": "target-placeholder-example",
      "targets": [
        {
          "addresses": [ "live/{{ thing:namespace }}" ],
          "authorizationContext": ["ditto:auth-subject"],
          "topics": [ "_/_/things/live/events", "_/_/things/live/commands" ]
        }
      ]
  }
Tags: connectivity