Getting started with JCA Connectors

The Connector Architecture is probably one of the least known specifications in Jakarta EE, and as such, not as widely used as the others. However, this technology, when implemented properly, allows you to connect your application server to just about anything, and do so in a portable manner. In this article, I’ll show you how to get started with a simple connector, introduce some examples that you can take and modify for your own needs to get started, and finally wrap up with some example connectors to inspire you.

Packaging a connector

In this article, we’ll create 4 artifacts in order to deploy and use our connector. These are:

  • An API jar which will contain the interfaces for our connector. Any applications that use our connector will need to have these interfaces on the classpath.
  • A jar with the implementation of our resource adapter.
  • A rar file which will contain the implementation jar, along with a ra.xml deployment descriptor file.
  • An application which will send messages to and receive messages from the system we’re integrating with via the connector.

The sample code for this article is in the GitHub repository here: https://github.com/tomitribe/sample-slack-connector. In this example, we’ll create a connector to communicate with the popular messaging tool, Slack. The connector will use the allbegray Slack API. This API provides a real-time connection to Slack to both post messages and be notified when others post messages. Connections through this library will appear as ’bots’ in Slack.

Creating the base connector classes

Let’s start by creating the base class for the ResourceAdapter in the implementation jar. This class should implement javax.resource.spi.ResourceAdapter, and be annotated with @javax.resource.spi.Connector. This annotation provides basic metadata for the adapter, such as the display name and a brief description.

There are five methods on the resource adapter. The start() and stop() methods are called by the application server when the adapter starts and stops. The endpointActivation() and endpointDeactivation() methods are called when message-driven beans (MDBs) associated with the adapter are started and stopped. We’ll focus on outbound communication to start with, so we’ll leave the implementation of these blank. Finally, there’s the getXAResources() method. For simplicity, we won’t implement transaction handling with this adapter, so this can return an empty array.

@Connector(description = "Sample Resource Adapter", displayName = "Sample Resource Adapter", eisType = "Sample Resource Adapter", version = "1.0")
public class SlackResourceAdapter implements ResourceAdapter {

    public void start(final BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
    }

    public void stop() {
    }

    public void endpointActivation(final MessageEndpointFactory messageEndpointFactory, final ActivationSpec activationSpec)
            throws ResourceException {
    }

    public void endpointDeactivation(final MessageEndpointFactory messageEndpointFactory, final ActivationSpec activationSpec) {
    }

    public XAResource[] getXAResources(final ActivationSpec[] activationSpecs) throws ResourceException {
        return new XAResource[0];
    }
}

The Slack API requires a token (which you can obtain by adding a bot in Slack’s admin interface – see the “testing it out section” later). We can provide the token to the resource adapter as a configuration setting. This can be done by adding a field, annotated with @javax.resource.spi.ConfigProperty.

We can fill out the start() method to connect to Slack through the albergray API, and the stop method to disconnect. Finally, add a sendMessage() method to send a message through to Slack.

@ConfigProperty
    private String token;

    private SlackRealTimeMessagingClient slackRealTimeMessagingClient;
    private SlackWebApiClient webApiClient;
    private String user;

    public String getToken() {
        return token;
    }

    public void setToken(final String token) {
        this.token = token;
    }


    public void start(final BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
        webApiClient = SlackClientFactory.createWebApiClient(token);
        slackRealTimeMessagingClient = SlackClientFactory.createSlackRealTimeMessagingClient(token);

        final Authentication authentication = webApiClient.auth();
        user = authentication.getUser();

        webApiClient.setPresenceUser(Presence.AUTO);

        slackRealTimeMessagingClient.connect();
    }

    public void stop() {
        webApiClient.setPresenceUser(Presence.AWAY);
    }

public void sendMessage(final String channel, final String message) {
        ChatPostMessageMethod postMessage = new ChatPostMessageMethod(channel, message);
        postMessage.setUsername(user);
        webApiClient.postMessage(postMessage);
    }

Outbound communication

Next, want to do add the necessary code to enable an application to send a message through the adapter. This is done by obtaining a connection from a connection factory, and connections are pooled by the application server. If you have sent a message using JMS before, then you’ll be familiar with this pattern.

For this part, we’ll need to create two interfaces for our connector’s custom API jar – the ConnectionFactory and the Connection. We also need four classes for the implementation jar: implementations of the ConnectionFactory and Connection, and also implementations of ManagedConnectionFactory and ManagedConnection.

The flow works as shown in the diagram below. The ConnectionFactory is injected into the application and requests a connection through the createConnection() method. This delegates to the ConnectionManager, which is provided by the application server, and is responsible for managing connection pooling. The ConnectionManager delegates to the ManagedConnectionFactory to create and match connections.


The resource adapter is responsible for providing the Connection, Connection Factory, Managed Connection and Managed Connection Factory classes, while the application server provides the connection manager and is responsible for pooling connections. The application that interacts with resource adapter will use the Connection Factory and Connection classes, via an interface. The ConnectionFactory can either be looked up from JNDI or injected using the @Resource annotation on a field.

These classes are fairly boilerplate and are shown below. You can use the barebones project here to get started rather than write these classes from scratch.

The ManagedConnectionFactory should be annotated with @javax.resource.spi.ConnectionDefinition, which specifies the Connection and ConnectionFactory API and implementation classes.

public interface SlackConnection {
    void sendMessage(final String channel, final String message);
    void close();
}

public interface SlackConnectionFactory extends Serializable, Referenceable {
    SlackConnection getConnection() throws ResourceException;
}

public class SlackConnectionImpl implements SlackConnection {
    private SlackManagedConnection mc;
    private SlackManagedConnectionFactory mcf;

    public SlackConnectionImpl(final SlackManagedConnection mc, final SlackManagedConnectionFactory mcf) {
        this.mc = mc;
        this.mcf = mcf;
    }

    public void sendMessage(final String channel, final String message) {
        mc.sendMessage(channel, message);
    }

    public void close() {
        mc.closeHandle(this);
    }
}

public class SlackConnectionFactoryImpl implements SlackConnectionFactory {
    private static final long serialVersionUID = 1L;
    private final Logger log = Logger.getLogger(SlackConnectionFactoryImpl.class.getName());
    private Reference reference;
    private SlackManagedConnectionFactory mcf;
    private ConnectionManager connectionManager;

    public SlackConnectionFactoryImpl(final SlackManagedConnectionFactory mcf, final ConnectionManager cxManager) {
        this.mcf = mcf;
        this.connectionManager = cxManager;
    }

    @Override
    public SlackConnection getConnection() throws ResourceException {
        log.finest("getConnection()");
        return (SlackConnection) connectionManager.allocateConnection(mcf, null);
    }

    @Override
    public Reference getReference() throws NamingException {
        log.finest("getReference()");
        return reference;
    }

    @Override
    public void setReference(Reference reference) {
        log.finest("setReference()");
        this.reference = reference;
    }
}

public class SlackManagedConnection implements ManagedConnection {

    private final Logger log = Logger.getLogger(SlackManagedConnection.class.getName());
    private PrintWriter logwriter;
    private SlackManagedConnectionFactory mcf;
    private List<ConnectionEventListener> listeners;
    private SlackConnectionImpl connection;

    public SlackManagedConnection(final SlackManagedConnectionFactory mcf) {
        this.mcf = mcf;
        this.logwriter = null;
        this.listeners = Collections.synchronizedList(new ArrayList<ConnectionEventListener>(1));
        this.connection = null;
    }

    public Object getConnection(final Subject subject, final ConnectionRequestInfo cxRequestInfo) throws ResourceException {
        log.finest("getConnection()");
        connection = new SlackConnectionImpl(this, mcf);
        return connection;
    }

    public void associateConnection(final Object connection) throws ResourceException {
        log.finest("associateConnection()");

        if (connection == null)
            throw new ResourceException("Null connection handle");

        if (!(connection instanceof SlackConnectionImpl))
            throw new ResourceException("Wrong connection handle");

        this.connection = (SlackConnectionImpl) connection;
    }

    public void cleanup() throws ResourceException {
        log.finest("cleanup()");
    }

    public void destroy() throws ResourceException {
        log.finest("destroy()");
    }

    public void addConnectionEventListener(final ConnectionEventListener listener) {
        log.finest("addConnectionEventListener()");

        if (listener == null) {
            throw new IllegalArgumentException("Listener is null");
        }

        listeners.add(listener);
    }

    public void removeConnectionEventListener(final ConnectionEventListener listener) {
        log.finest("removeConnectionEventListener()");
        if (listener == null) {
            throw new IllegalArgumentException("Listener is null");
        }

        listeners.remove(listener);
    }

    void closeHandle(final SlackConnection handle) {
        ConnectionEvent event = new ConnectionEvent(this, ConnectionEvent.CONNECTION_CLOSED);
        event.setConnectionHandle(handle);
        for (ConnectionEventListener cel : listeners) {
            cel.connectionClosed(event);
        }
    }

    public PrintWriter getLogWriter() throws ResourceException {
        log.finest("getLogWriter()");
        return logwriter;
    }

    public void setLogWriter(final PrintWriter out) throws ResourceException {
        log.finest("setLogWriter()");
        logwriter = out;
    }

    public LocalTransaction getLocalTransaction() throws ResourceException {
        throw new NotSupportedException("getLocalTransaction() not supported");
    }

    public XAResource getXAResource() throws ResourceException {
        throw new NotSupportedException("getXAResource() not supported");
    }

    public ManagedConnectionMetaData getMetaData() throws ResourceException {
        log.finest("getMetaData()");
        return new SlackManagedConnectionMetaData();
    }

    void sendMessage(final String channel, final String message) {
        log.finest("sendMessage()");

        final SlackResourceAdapter resourceAdapter = (SlackResourceAdapter) mcf.getResourceAdapter();
        resourceAdapter.sendMessage(channel, message);
    }
}

@ConnectionDefinition(connectionFactory = SlackConnectionFactory.class,
        connectionFactoryImpl = SlackConnectionFactoryImpl.class,
        connection = SlackConnection.class,
        connectionImpl = SlackConnectionImpl.class)
public class SlackManagedConnectionFactory implements ManagedConnectionFactory, ResourceAdapterAssociation {

    private static final long serialVersionUID = 1L;
    private final  Logger log = Logger.getLogger(SlackManagedConnectionFactory.class.getName());
    private ResourceAdapter ra;
    private PrintWriter logwriter;

    public Object createConnectionFactory(final ConnectionManager cxManager) throws ResourceException {
        log.finest("createConnectionFactory()");
        return new SlackConnectionFactoryImpl(this, cxManager);
    }

    public Object createConnectionFactory() throws ResourceException {
        throw new ResourceException("This resource adapter doesn't support non-managed environments");
    }

    public ManagedConnection createManagedConnection(final Subject subject, final ConnectionRequestInfo cxRequestInfo) throws ResourceException {
        log.finest("createManagedConnection()");
        return new SlackManagedConnection(this);
    }

    public ManagedConnection matchManagedConnections(final Set connectionSet, final Subject subject, final ConnectionRequestInfo cxRequestInfo) throws ResourceException {
        log.finest("matchManagedConnections()");
        ManagedConnection result = null;
        Iterator it = connectionSet.iterator();
        while (result == null && it.hasNext()) {
            ManagedConnection mc = (ManagedConnection) it.next();
            if (mc instanceof SlackManagedConnection) {
                result = mc;
            }

        }
        return result;
    }

    public PrintWriter getLogWriter() throws ResourceException {
        log.finest("getLogWriter()");
        return logwriter;
    }

    public void setLogWriter(final PrintWriter out) throws ResourceException {
        log.finest("setLogWriter()");
        logwriter = out;
    }

    public ResourceAdapter getResourceAdapter() {
        log.finest("getResourceAdapter()");
        return ra;
    }

    public void setResourceAdapter(ResourceAdapter ra) {
        log.finest("setResourceAdapter()");
        this.ra = ra;
    }
}

See the code highlighted in bold – this is the sendMessage() method on the connection which delegates to the ManagedConnection, which in turn calls the send() method we added to the resource adapter earlier.

Testing it out

To test the resource, we’ll need to package it. To do this, the implementation jar will need to be packaged inside a RAR file, along with a ra.xml deployment descriptor in META-INF. The ra.xml tells the application server which classes in the resource adapter to use.

<connector xmlns="http://xmlns.jcp.org/xml/ns/javaee"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/connector_1_7.xsd"
           version="1.7">
    <description>Chatterbox Slack Connector</description>
    <display-name>Chatterbox Slack Connector</display-name>
    <eis-type>Slack Connector</eis-type>
    <resourceadapter-version>1.0</resourceadapter-version>
    <license>
        <license-required>false</license-required>
    </license>
    <resourceadapter>
        <resourceadapter-class>org.tomitribe.chatterbox.slack.adapter.SlackResourceAdapter</resourceadapter-class>
        <config-property>
            <config-property-name>token</config-property-name>
            <config-property-type>String</config-property-type>
        </config-property>
        <outbound-resourceadapter>
            <connection-definition>
                <managedconnectionfactory-class>org.tomitribe.chatterbox.slack.adapter.out.SlackManagedConnectionFactory</managedconnectionfactory-class>
                <connectionfactory-interface>org.tomitribe.chatterbox.slack.api.SlackConnectionFactory</connectionfactory-interface>
                <connectionfactory-impl-class>org.tomitribe.chatterbox.slack.adapter.out.SlackConnectionFactoryImpl</connectionfactory-impl-class>
                <connection-interface>org.tomitribe.chatterbox.slack.api.SlackConnection</connection-interface>
                <connection-impl-class>org.tomitribe.chatterbox.slack.adapter.out.SlackConnectionImpl</connection-impl-class>
            </connection-definition>
            <transaction-support>NoTransaction</transaction-support>
            <reauthentication-support>false</reauthentication-support>
        </outbound-resourceadapter>
    </resourceadapter>
</connector>

The API jar will need to be added to the classpath for both the resource adapter and the application. I’m using Apache TomEE, so I place the API jar in the lib directory, and the RAR file in the apps directory, and I enable the apps directory by uncommenting the following option in tomee.xml:

<!-- <Deployments dir="apps" /> -->

The final thing you’ll want to do before you start the application is provide your slack token to the connector. The process for this varies from server to server, but TomEE it is provided via a system property. The name of the system property follows the name of the rar file with “RA” on the end, then a dot, and then the name of the property. So using the adapter exactly as it is in the example project, I need to set the following system property:

slack-connector-rar-0.1-SNAPSHOTRA.token=XXXXXXXXXX

You can obtain the token from Slack’s admin interface under: Customize Slack → Configure Apps → Custom Integrations → Bots

In order to send an outbound message, the application will need to obtain a connection to slack from a connection factory. To do this, the ConnectionFactory can either be injected into a managed component or looked up in JNDI. The Connection to Slack can be obtained using the getConnection() method.

@Singleton
@Lock(LockType.READ)
@Path("sender")
public class Sender {
    private final Logger log = Logger.getLogger(Sender.class.getName());

    @Resource
    private SlackConnectionFactory cf;

    @Path("{channel}")
    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void sendMessage(@PathParam("channel") final String channel, final String message) {
        try {
            final SlackConnection connection = cf.getConnection();
            connection.sendMessage(channel, message);
            connection.close();
        } catch (ResourceException e) {
            log.severe("Unexpected error sending message to Slack: " + e.getMessage());
        }
    }
}

Inbound communication

Inbound communication, is also possible, with messages arriving in Slack calling a Message Driven Bean inside the application. Fortunately, there is less boilerplate code with inbound communication than there is with outbound.

We’ll start by adding a simple interface in the interface jar that our MDBs must implement in order to receive messages.

public interface InboundListener {
    void messageReceived(final String channel, final String message);
}

The implementation jar needs an ActivationSpec class adding to it. ActivationSpec provides the means to configure individual MDBs. In this example, the ActivationSpec is minimal and the adapter will broadcast all messages to all MDBs, but it would be possible to extend this to specify which Slack channel a particular MDB is interested in, for example.

@Activation(messageListeners = InboundListener.class)
public class SlackActivationSpec implements ActivationSpec {

    private ResourceAdapter resourceAdapter;

    @Override
    public void validate() throws InvalidPropertyException {
    }

    @Override
    public ResourceAdapter getResourceAdapter() {
        return resourceAdapter;
    }

    @Override
    public void setResourceAdapter(final ResourceAdapter ra) throws ResourceException {
        this.resourceAdapter = ra;
    }
}

You may remember the core of the resource adapter tracks MDBs registered with it through the endpointActivation() and endpointDeactivation() methods.

public void endpointActivation(final MessageEndpointFactory messageEndpointFactory, final ActivationSpec activationSpec)
        throws ResourceException {
    final SlackActivationSpec slackActivationSpec = (SlackActivationSpec) activationSpec;

    final MessageEndpoint messageEndpoint = messageEndpointFactory.createEndpoint(null);
    targets.put(slackActivationSpec, messageEndpoint);
}

public void endpointDeactivation(final MessageEndpointFactory messageEndpointFactory, final ActivationSpec activationSpec) {
    final SlackActivationSpec telnetActivationSpec = (SlackActivationSpec) activationSpec;

    final MessageEndpoint endpoint = targets.get(telnetActivationSpec);
    if (endpoint == null) {
        throw new IllegalStateException("No Endpoint to undeploy for ActivationSpec " + activationSpec);
    }

    endpoint.release();
}

We can make the resource adapter class receive messages from Slack itself by adding the following before the slackRealTimeMessagingClient.connect() call:

slackRealTimeMessagingClient.addListener(Event.MESSAGE, this);

This will require the resource adapter to implement the Slack API’s EventListener class, which defines an onMessage() method. In the implementation of this method, we can push the message out to all the registered MDBs. There is one requirement, which is beforeDelivery() must be called before the call to the messageReceived() method on the MDB, and afterDelivery() must be called afterwards.

public void onMessage(final JsonNode jsonNode) {
    final String text = jsonNode.get("text").textValue();
    final String channel = jsonNode.get("channel").textValue();

    for (final MessageEndpoint endpoint : targets.values()) {
        boolean beforeDelivery = false;

        try {
            endpoint.beforeDelivery(messageReceivedMethod);
            beforeDelivery = true;

            ((InboundListener) endpoint).messageReceived(channel, text);
        } catch (Throwable t) {
            log.severe("Unable to deliver message to endpoint. Cause: " + t.getMessage());
        } finally {
            if (beforeDelivery) {
                try {
                    endpoint.afterDelivery();
                } catch (Throwable t) {
                    log.severe("Unable to call afterDelivery(). Cause: " + t.getMessage());
                }
            }
        }
    }
}

Taking it further

The use the interface on the MDB is a little inflexible. It is possible to get the class name of the MDB, in the endpointActivation() method by using this code:

final EndpointTarget target = new EndpointTarget(messageEndpoint);
final Class<?> endpointClass = slackActivationSpec.getBeanClass() != null ? slackActivationSpec
    .getBeanClass() : messageEndpointFactory.getEndpointClass();

With this, the resource adapter could reflectively look at annotations on the methods, and use those to decide which method it is going to call. Using the Slack example, imagine multiple methods on the MDB, each with a @Channel annotation specifying which channel they are interested in. You can see some examples of connectors using annotations in this way in the chatterbox project – see the Chatterbox and CREST projects for examples.

Other features

This simple introduction has shown how to get started with creating a resource adapter, but has not covered some of the more advanced features of resource adapters, including security, transactions and work management. You can learn more about these in the specification documents or take a look at the Sheldon connector available here: https://tomitribe.io/projects/sheldon. The sheldon connector makes use of the security aspects on the spec, as well as the work manager to allow a user to SSH directly into the application server and issue commands which are executed as MDBs – this could be a neat way to provide a command line interface for management functions for your next application.

About the Author

Jonathan Gallimore

Jonathan Gallimore
Tomitribe