Hi,
 
I am new to this mailing list. I am developing an MQTT client application with a large number of file descriptors used.
As referenced in 
GitLab issue #1299 this is causing the MQTT library to return MOSQ_ERR_INVAL errors.
 
Following an advice in this GitLab issue, I am now trying to use my own event loop, but am having some issues with it. I found the client is not sending the PING command, causing the MQTT server to disconnect the client.
 Server log shows:
1632236557: Client mosq-dDa3ooaeKWKrdxSdQa has exceeded timeout, disconnecting.
 
I am reproducing the issue with the sample code below (also in attachment).
The on_disconnect callback is called before the message is published.
When replacing my custom loop with a call to
mosquito_loop_start, this all works fine.
This is with libmosquitto version 1.6.9, on Ubuntu 20.04
 
Would you have any pointers indicating what I’m doing wrong?
 
Many thanks,
Jeremy
 
#include <mosquittopp.h>
 
#include <atomic>
#include <cassert>
#include <iostream>
#include <sys/epoll.h>
#include <thread>
 
static void on_connect(struct mosquitto*, void*, int rc)
{
      std::cerr << "on_connect " + std::to_string(rc) << std::endl;
}
 
static void on_disconnect(struct mosquitto*, void*, int rc)
{
      std::cerr << "on_disconnect " + std::to_string(rc) << std::endl;
}
 
static void on_publish(struct mosquitto*, void*, int mid)
{
      std::cerr << "on_publish " + std::to_string(mid) << std::endl;
}
 
static void on_message(struct mosquitto*, void*, const mosquitto_message*)
{
      std::cerr << "on_message" << std::endl;
}
 
static void on_subscribe(struct mosquitto*, void*, int, int, const int *)
{
      std::cerr << "on_subscribe" << std::endl;
}
 
int main()
{
      auto ret = mosquitto_lib_init();
      assert(ret == MOSQ_ERR_SUCCESS);
 
      struct mosquitto * mosq = mosquitto_new(NULL, true, NULL);
      assert(mosq != nullptr);
 
      mosquitto_connect_callback_set(mosq, on_connect);
      mosquitto_disconnect_callback_set(mosq, on_disconnect);
      mosquitto_publish_callback_set(mosq, on_publish);
      mosquitto_message_callback_set(mosq, on_message);
      mosquitto_subscribe_callback_set(mosq, on_subscribe);
 
      ret = mosquitto_connect(mosq, "127.0.0.1", 1883, 5);
      assert(ret == MOSQ_ERR_SUCCESS);
 
      /**
      * Use our own thread with a polling logic.
      */
      std::atomic_bool ended(false);
      auto worker = std::thread([mosq, &ended]
      {
            /**
            * Tell the library we are using threads, but not mosquitto_loop_start.
            */
            mosquitto_threaded_set(mosq, true);
 
            struct epoll_event ev;
            auto epfd = epoll_create1(0);
            assert(epfd >= 0);
 
            int sock = mosquitto_socket(mosq);
            ev.data.fd = sock;
            ev.events = EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLET;
            auto err = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev);
            assert(err == 0);
 
            while (!ended)
            {
                  auto err = epoll_wait(epfd, &ev, 1, 250);
 
                  if (err > 0)
                  {
                        if (ev.events & EPOLLIN)
                             mosquitto_loop_read(mosq, 1);
                        if (ev.events & EPOLLOUT)
                        {
                             if (mosquitto_want_write(mosq))
                                   mosquitto_loop_write(mosq, 1);
                        }
                  }
 
                  mosquitto_loop_misc(mosq);
            }
      });
 
      /**
      * Wait for keepalive * 2 to see if Ping requests are working.
      */
      std::this_thread::sleep_for(std::chrono::seconds{10});
 
      std::string msg = "test message";
      ret = mosquitto_publish(mosq, NULL, "test-topic", msg.size(), msg.c_str(), 2, false);
      std::cerr << "publish result=" + std::to_string(ret) << std::endl;
 
      std::this_thread::sleep_for(std::chrono::milliseconds{500});
 
      mosquitto_disconnect(mosq);
 
      ended = true;
      worker.join();
 
      mosquitto_lib_cleanup();
}