Hi,
I am new to this mailing list. I am developing an MQTT client application with a large number of file descriptors used.
As referenced in
GitLab issue #1299 this is causing the MQTT library to return MOSQ_ERR_INVAL errors.
Following an advice in this GitLab issue, I am now trying to use my own event loop, but am having some issues with it. I found the client is not sending the PING command, causing the MQTT server to disconnect the client.
Server log shows:
1632236557: Client mosq-dDa3ooaeKWKrdxSdQa has exceeded timeout, disconnecting.
I am reproducing the issue with the sample code below (also in attachment).
The on_disconnect callback is called before the message is published.
When replacing my custom loop with a call to
mosquito_loop_start, this all works fine.
This is with libmosquitto version 1.6.9, on Ubuntu 20.04
Would you have any pointers indicating what I’m doing wrong?
Many thanks,
Jeremy
#include <mosquittopp.h>
#include <atomic>
#include <cassert>
#include <iostream>
#include <sys/epoll.h>
#include <thread>
static void on_connect(struct mosquitto*, void*, int rc)
{
std::cerr << "on_connect " + std::to_string(rc) << std::endl;
}
static void on_disconnect(struct mosquitto*, void*, int rc)
{
std::cerr << "on_disconnect " + std::to_string(rc) << std::endl;
}
static void on_publish(struct mosquitto*, void*, int mid)
{
std::cerr << "on_publish " + std::to_string(mid) << std::endl;
}
static void on_message(struct mosquitto*, void*, const mosquitto_message*)
{
std::cerr << "on_message" << std::endl;
}
static void on_subscribe(struct mosquitto*, void*, int, int, const int *)
{
std::cerr << "on_subscribe" << std::endl;
}
int main()
{
auto ret = mosquitto_lib_init();
assert(ret == MOSQ_ERR_SUCCESS);
struct mosquitto * mosq = mosquitto_new(NULL, true, NULL);
assert(mosq != nullptr);
mosquitto_connect_callback_set(mosq, on_connect);
mosquitto_disconnect_callback_set(mosq, on_disconnect);
mosquitto_publish_callback_set(mosq, on_publish);
mosquitto_message_callback_set(mosq, on_message);
mosquitto_subscribe_callback_set(mosq, on_subscribe);
ret = mosquitto_connect(mosq, "127.0.0.1", 1883, 5);
assert(ret == MOSQ_ERR_SUCCESS);
/**
* Use our own thread with a polling logic.
*/
std::atomic_bool ended(false);
auto worker = std::thread([mosq, &ended]
{
/**
* Tell the library we are using threads, but not mosquitto_loop_start.
*/
mosquitto_threaded_set(mosq, true);
struct epoll_event ev;
auto epfd = epoll_create1(0);
assert(epfd >= 0);
int sock = mosquitto_socket(mosq);
ev.data.fd = sock;
ev.events = EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLET;
auto err = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev);
assert(err == 0);
while (!ended)
{
auto err = epoll_wait(epfd, &ev, 1, 250);
if (err > 0)
{
if (ev.events & EPOLLIN)
mosquitto_loop_read(mosq, 1);
if (ev.events & EPOLLOUT)
{
if (mosquitto_want_write(mosq))
mosquitto_loop_write(mosq, 1);
}
}
mosquitto_loop_misc(mosq);
}
});
/**
* Wait for keepalive * 2 to see if Ping requests are working.
*/
std::this_thread::sleep_for(std::chrono::seconds{10});
std::string msg = "test message";
ret = mosquitto_publish(mosq, NULL, "test-topic", msg.size(), msg.c_str(), 2, false);
std::cerr << "publish result=" + std::to_string(ret) << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds{500});
mosquitto_disconnect(mosq);
ended = true;
worker.join();
mosquitto_lib_cleanup();
}