Paho C++  1.0
The Paho MQTT C++ Client Library
 All Classes Files Functions Variables Typedefs Friends
client.h
Go to the documentation of this file.
1 
8 /*******************************************************************************
9  * Copyright (c) 2013-2017 Frank Pagliughi <fpagliughi@mindspring.com>
10  *
11  * All rights reserved. This program and the accompanying materials
12  * are made available under the terms of the Eclipse Public License v1.0
13  * and Eclipse Distribution License v1.0 which accompany this distribution.
14  *
15  * The Eclipse Public License is available at
16  * http://www.eclipse.org/legal/epl-v10.html
17  * and the Eclipse Distribution License is available at
18  * http://www.eclipse.org/org/documents/edl-v10.php.
19  *
20  * Contributors:
21  * Frank Pagliughi - initial implementation and documentation
22  *******************************************************************************/
23 
24 #ifndef __mqtt_client_h
25 #define __mqtt_client_h
26 
27 #include "mqtt/async_client.h"
28 #include <future>
29 
30 namespace mqtt {
31 
33 
38 class client : private callback
39 {
41  static const std::chrono::minutes DFLT_TIMEOUT;
43  static constexpr int DFLT_QOS = 1;
44 
46  async_client cli_;
48  std::chrono::milliseconds timeout_;
50  callback* userCallback_;
51 
62  template <typename T>
63  std::shared_ptr<T> ptr(const T& val) {
64  return std::shared_ptr<T>(const_cast<T*>(&val), [](T*){});
65  }
66 
67  // User callbacks
68  // Most are launched in a separate thread, for convenience, except
69  // message_arrived, for performance.
70  void connected(const string& cause) override {
71  std::async(std::launch::async, &callback::connected, userCallback_, cause);
72  }
73  void connection_lost(const string& cause) override {
74  std::async(std::launch::async,
75  &callback::connection_lost, userCallback_, cause);
76  }
77  void message_arrived(const_message_ptr msg) override {
78  userCallback_->message_arrived(msg);
79  }
80  void delivery_complete(delivery_token_ptr tok) override {
81  std::async(std::launch::async, &callback::delivery_complete, userCallback_, tok);
82  }
83 
85  client() =delete;
86  client(const async_client&) =delete;
87  client& operator=(const async_client&) =delete;
88 
89 public:
91  using ptr_t = std::shared_ptr<client>;
94 
106  client(const string& serverURI, const string& clientId,
107  iclient_persistence* persistence=nullptr);
118  client(const string& serverURI, const string& clientId,
119  const string& persistDir);
134  client(const string& serverURI, const string& clientId,
135  int maxBufferedMessages, iclient_persistence* persistence=nullptr);
148  client(const string& serverURI, const string& clientId,
149  int maxBufferedMessages, const string& persistDir);
153  virtual ~client() {}
157  virtual void connect() {
158  cli_.connect()->wait_for(timeout_);
159  cli_.start_consuming();
160  }
165  virtual void connect(connect_options opts) {
166  cli_.connect(std::move(opts))->wait_for(timeout_);
167  cli_.start_consuming();
168  }
173  virtual void reconnect() { cli_.reconnect()->wait_for(timeout_); }
177  virtual void disconnect() {
178  cli_.stop_consuming();
179  cli_.disconnect()->wait_for(timeout_);
180  }
187  virtual void disconnect(int timeoutMS) {
188  cli_.stop_consuming();
189  cli_.disconnect(timeoutMS)->wait_for(timeout_);
190  }
197  template <class Rep, class Period>
198  void disconnect(const std::chrono::duration<Rep, Period>& to) {
199  disconnect((int) to_milliseconds_count(to));
200  }
205  virtual string get_client_id() const { return cli_.get_client_id(); }
210  virtual string get_server_uri() const { return cli_.get_server_uri(); }
215  virtual std::chrono::milliseconds get_timeout() const { return timeout_; }
222  virtual topic get_topic(const string& top) { return topic(cli_, top); }
228  virtual bool is_connected() const { return cli_.is_connected(); }
229 
239  virtual void publish(string_ref top, const void* payload, size_t n,
240  int qos, bool retained) {
241  cli_.publish(std::move(top), payload, n, qos, retained)->wait_for(timeout_);
242  }
250  virtual void publish(string_ref top, const void* payload, size_t n) {
251  cli_.publish(std::move(top), payload, n)->wait_for(timeout_);
252  }
257  virtual void publish(const_message_ptr msg) {
258  cli_.publish(msg)->wait_for(timeout_);
259  }
267  virtual void publish(const message& msg) {
268  cli_.publish(ptr(msg))->wait();
269  }
275  virtual void set_callback(callback& cb);
280  virtual void set_timeout(int timeoutMS) {
281  timeout_ = std::chrono::milliseconds(timeoutMS);
282  }
287  template <class Rep, class Period>
288  void set_timeout(const std::chrono::duration<Rep, Period>& to) {
289  timeout_ = to_milliseconds(to);
290  }
295  virtual void subscribe(const string& topicFilter) {
296  cli_.subscribe(topicFilter, DFLT_QOS)->wait_for(timeout_);
297  }
303  virtual void subscribe(const string& topicFilter, int qos) {
304  cli_.subscribe(topicFilter, qos)->wait_for(timeout_);
305  }
311  virtual void subscribe(const string_collection& topicFilters);
317  virtual void subscribe(const string_collection& topicFilters,
318  const qos_collection& qos) {
319  cli_.subscribe(ptr(topicFilters), qos)->wait_for(timeout_);
320  }
325  virtual void unsubscribe(const string& topicFilter) {
326  cli_.unsubscribe(topicFilter)->wait_for(timeout_);
327  }
332  virtual void unsubscribe(const string_collection& topicFilters) {
333  cli_.unsubscribe(ptr(topicFilters))->wait_for(timeout_);
334  }
340  void start_consuming() { cli_.start_consuming(); }
346  void stop_consuming() { cli_.stop_consuming(); }
352  const_message_ptr consume_message() { return cli_.consume_message(); }
359  bool try_consume_message(const_message_ptr* msg) {
360  return cli_.try_consume_message(msg);
361  }
369  template <typename Rep, class Period>
370  bool try_consume_message_for(const_message_ptr* msg,
371  const std::chrono::duration<Rep, Period>& relTime) {
372  return cli_.try_consume_message_for(msg, relTime);
373  }
381  template <class Clock, class Duration>
382  bool try_consume_message_until(const_message_ptr* msg,
383  const std::chrono::time_point<Clock,Duration>& absTime) {
384  return cli_.try_consume_message_until(msg, absTime);
385  }
386 };
387 
389 using client_ptr = client::ptr_t;
390 
392 // end namespace mqtt
393 }
394 
395 #endif // __mqtt_client_h
396 
std::shared_ptr< callback > ptr_t
Smart/shared pointer to an object of this type.
Definition: callback.h:45
virtual void unsubscribe(const string &topicFilter)
Requests the server unsubscribe the client from a topic.
Definition: client.h:325
Lightweight client for talking to an MQTT server using non-blocking methods that allow an operation t...
Definition: async_client.h:60
Represents a persistent data store, used to store outbound and inbound messages while they are in fli...
Definition: iclient_persistence.h:54
string get_server_uri() const override
Returns the address of the server used by this client.
Definition: async_client.h:339
token_ptr reconnect() override
Reconnects the client using options from the previous connect.
const_message_ptr consume_message()
Read the next message from the queue.
Definition: async_client.h:566
bool try_consume_message_for(const_message_ptr *msg, const std::chrono::duration< Rep, Period > &relTime)
Waits a limited time for a message to arrive.
Definition: client.h:370
virtual ~client()
Virtual destructor.
Definition: client.h:153
virtual topic get_topic(const string &top)
Get a topic object which can be used to publish messages on this client.
Definition: client.h:222
virtual void connect(connect_options opts)
Connects to an MQTT server using the specified options.
Definition: client.h:165
virtual void connection_lost(const string &cause)
This method is called when the connection to the server is lost.
Definition: callback.h:63
bool is_connected() const override
Determines if this client is currently connected to the server.
Definition: async_client.h:344
token_ptr connect() override
Connects to an MQTT server using the default options.
bool try_consume_message_until(const_message_ptr *msg, const std::chrono::time_point< Clock, Duration > &absTime)
Waits until a specific time for a message to occur.
Definition: client.h:382
Holds the set of options that control how the client connects to a server.
Definition: connect_options.h:46
virtual std::chrono::milliseconds get_timeout() const
Return the maximum time to wait for an action to complete.
Definition: client.h:215
virtual void subscribe(const string_collection &topicFilters, const qos_collection &qos)
Subscribes to multiple topics, each of which may include wildcards.
Definition: client.h:317
Represents a topic destination, used for publish/subscribe messaging.
Definition: topic.h:42
virtual void set_callback(callback &cb)
Sets the callback listener to use for events that happen asynchronously.
virtual void publish(string_ref top, const void *payload, size_t n, int qos, bool retained)
Publishes a message to a topic on the server and return once it is delivered.
Definition: client.h:239
virtual void connected(const string &cause)
This method is called when the client is connected.
Definition: callback.h:58
virtual void publish(const message &msg)
Publishes a message to a topic on the server.
Definition: client.h:267
virtual bool is_connected() const
Determines if this client is currently connected to the server.
Definition: client.h:228
void start_consuming()
Start consuming messages.
Type for a collection of topics.
Definition: string_collection.h:40
virtual void publish(string_ref top, const void *payload, size_t n)
Publishes a message to a topic on the server and return once it is delivered.
Definition: client.h:250
Declaration of MQTT async_client class.
delivery_token_ptr publish(string_ref topic, const void *payload, size_t n, int qos, bool retained) override
Publishes a message to a topic on the server.
virtual void reconnect()
Reconnects the client using options from the previous connect.
Definition: client.h:173
void stop_consuming()
Stop consuming messages.
Definition: client.h:346
void disconnect(const std::chrono::duration< Rep, Period > &to)
Disconnects from the server.
Definition: client.h:198
bool try_consume_message(const_message_ptr *msg)
Try to read the next message from the queue without blocking.
Definition: async_client.h:573
virtual void publish(const_message_ptr msg)
Publishes a message to a topic on the server.
Definition: client.h:257
virtual void disconnect()
Disconnects from the server.
Definition: client.h:177
virtual void subscribe(const string &topicFilter, int qos)
Subscribe to a topic, which may include wildcards.
Definition: client.h:303
bool try_consume_message_for(const_message_ptr *msg, const std::chrono::duration< Rep, Period > &relTime)
Waits a limited time for a message to arrive.
Definition: async_client.h:584
async_client::qos_collection qos_collection
Type for a collection of QOS values.
Definition: client.h:93
An MQTT message holds everything required for an MQTT PUBLISH message.
Definition: message.h:52
void set_timeout(const std::chrono::duration< Rep, Period > &to)
Set the maximum time to wait for an action to complete.
Definition: client.h:288
void stop_consuming()
Stop consuming messages.
virtual void delivery_complete(delivery_token_ptr tok)
Called when delivery for a message has been completed, and all acknowledgments have been received...
Definition: callback.h:74
std::shared_ptr< client > ptr_t
Smart pointer type for this object.
Definition: client.h:91
virtual void disconnect(int timeoutMS)
Disconnects from the server.
Definition: client.h:187
token_ptr subscribe(const_string_collection_ptr topicFilters, const qos_collection &qos) override
Subscribe to multiple topics, each of which may include wildcards.
Lightweight client for talking to an MQTT server using methods that block until an operation complete...
Definition: client.h:38
bool try_consume_message(const_message_ptr *msg)
Try to read the next message from the queue without blocking.
Definition: client.h:359
virtual void set_timeout(int timeoutMS)
Set the maximum time to wait for an action to complete.
Definition: client.h:280
Provides a mechanism for tracking the completion of an asynchronous action.
Definition: callback.h:41
token_ptr unsubscribe(const string &topicFilter) override
Requests the server unsubscribe the client from a topic.
virtual string get_server_uri() const
Gets the address of the server used by this client.
Definition: client.h:210
bool try_consume_message_until(const_message_ptr *msg, const std::chrono::time_point< Clock, Duration > &absTime)
Waits until a specific time for a message to occur.
Definition: async_client.h:596
string get_client_id() const override
Returns the client ID used by this client.
Definition: async_client.h:334
const_message_ptr consume_message()
Read the next message from the queue.
Definition: client.h:352
virtual void connect()
Connects to an MQTT server using the default options.
Definition: client.h:157
std::vector< int > qos_collection
Type for a collection of QOS values.
Definition: iasync_client.h:64
token_ptr disconnect() override
Disconnects from the server.
Definition: async_client.h:235
virtual void subscribe(const string &topicFilter)
Subscribe to a topic, which may include wildcards using a QoS of 1.
Definition: client.h:295
virtual void message_arrived(const_message_ptr msg)
This method is called when a message arrives from the server.
Definition: callback.h:68
virtual void unsubscribe(const string_collection &topicFilters)
Requests the server unsubscribe the client from one or more topics.
Definition: client.h:332
void start_consuming()
Start consuming messages.
Definition: client.h:340
virtual string get_client_id() const
Gets the client ID used by this client.
Definition: client.h:205