PahoMqttCpp
MQTT C++ Client for POSIX and Windows
Loading...
Searching...
No Matches
client.h
Go to the documentation of this file.
1
7
8/*******************************************************************************
9 * Copyright (c) 2013-2023 Frank Pagliughi <fpagliughi@mindspring.com>
10 *
11 * All rights reserved. This program and the accompanying materials
12 * are made available under the terms of the Eclipse Public License v2.0
13 * and Eclipse Distribution License v1.0 which accompany this distribution.
14 *
15 * The Eclipse Public License is available at
16 * http://www.eclipse.org/legal/epl-v20.html
17 * and the Eclipse Distribution License is available at
18 * http://www.eclipse.org/org/documents/edl-v10.php.
19 *
20 * Contributors:
21 * Frank Pagliughi - initial implementation and documentation
22 *******************************************************************************/
23
24#ifndef __mqtt_client_h
25#define __mqtt_client_h
26
27#include <future>
28
29#include "mqtt/async_client.h"
30
31namespace mqtt {
32
34
39class client : private callback
40{
42 PAHO_MQTTPP_EXPORT static const std::chrono::seconds DFLT_TIMEOUT;
44 PAHO_MQTTPP_EXPORT static const int DFLT_QOS; // =1;
45
47 async_client cli_;
49 std::chrono::milliseconds timeout_;
51 callback* userCallback_;
52
63 template <typename T>
64 std::shared_ptr<T> ptr(const T& val) {
65 return std::shared_ptr<T>(const_cast<T*>(&val), [](T*) {});
66 }
67
68 // User callbacks
69 // Most are launched in a separate thread, for convenience, except
70 // message_arrived, for performance.
71 void connected(const string& cause) override {
72 std::async(std::launch::async, &callback::connected, userCallback_, cause).wait();
73 }
74 void connection_lost(const string& cause) override {
75 std::async(std::launch::async, &callback::connection_lost, userCallback_, cause)
76 .wait();
77 }
78 void message_arrived(const_message_ptr msg) override {
79 userCallback_->message_arrived(msg);
80 }
81 void delivery_complete(delivery_token_ptr tok) override {
82 std::async(std::launch::async, &callback::delivery_complete, userCallback_, tok)
83 .wait();
84 }
85
87 client() = delete;
88 client(const async_client&) = delete;
89 client& operator=(const async_client&) = delete;
90
91public:
93 using ptr_t = std::shared_ptr<client>;
96
99
109 const string& serverURI, const string& clientId = string{},
110 const persistence_type& persistence = NO_PERSISTENCE
111 );
126 const string& serverURI, const string& clientId, int maxBufferedMessages,
127 const persistence_type& persistence = NO_PERSISTENCE
128 );
141 const string& serverURI, const string& clientId, const create_options& opts,
142 const persistence_type& persistence = NO_PERSISTENCE
143 );
148 client(const create_options& opts);
152 virtual ~client() {}
170 virtual void disconnect();
177 virtual void disconnect(int timeoutMS);
184 template <class Rep, class Period>
185 void disconnect(const std::chrono::duration<Rep, Period>& to) {
187 }
192 virtual string get_client_id() const { return cli_.get_client_id(); }
197 virtual string get_server_uri() const { return cli_.get_server_uri(); }
202 virtual std::chrono::milliseconds get_timeout() const { return timeout_; }
218 const string& top, int qos = message::DFLT_QOS, bool retained = message::DFLT_RETAINED
219 ) {
220 return topic(cli_, top, qos, retained);
221 }
227 virtual bool is_connected() const { return cli_.is_connected(); }
236
246 virtual void publish(
247 string_ref top, const void* payload, size_t n, int qos, bool retained
248 ) {
249 if (!cli_.publish(std::move(top), payload, n, qos, retained)->wait_for(timeout_))
250 throw timeout_error();
251 }
259 virtual void publish(string_ref top, const void* payload, size_t n) {
260 if (!cli_.publish(std::move(top), payload, n)->wait_for(timeout_))
261 throw timeout_error();
262 }
267 virtual void publish(const_message_ptr msg) {
268 if (!cli_.publish(msg)->wait_for(timeout_))
269 throw timeout_error();
270 }
278 virtual void publish(const message& msg) { cli_.publish(ptr(msg))->wait(); }
284 virtual void set_callback(callback& cb);
289 virtual void set_timeout(int timeoutMS) {
290 timeout_ = std::chrono::milliseconds(timeoutMS);
291 }
296 template <class Rep, class Period>
297 void set_timeout(const std::chrono::duration<Rep, Period>& to) {
298 timeout_ = to_milliseconds(to);
299 }
308 const string& topicFilter, const subscribe_options& opts = subscribe_options(),
309 const properties& props = properties()
310 );
320 const string& topicFilter, int qos,
321 const subscribe_options& opts = subscribe_options(),
322 const properties& props = properties()
323 );
333 const string_collection& topicFilters,
334 const std::vector<subscribe_options>& opts = std::vector<subscribe_options>(),
335 const properties& props = properties()
336 );
346 const string_collection& topicFilters, const qos_collection& qos,
347 const std::vector<subscribe_options>& opts = std::vector<subscribe_options>(),
348 const properties& props = properties()
349 );
357 const string& topicFilter, const properties& props = properties()
358 );
366 const string_collection& topicFilters, const properties& props = properties()
367 );
373 virtual void start_consuming() { cli_.start_consuming(); }
379 virtual void stop_consuming() { cli_.stop_consuming(); }
393 return cli_.try_consume_message(msg);
394 }
402 template <typename Rep, class Period>
404 const_message_ptr* msg, const std::chrono::duration<Rep, Period>& relTime
405 ) {
406 return cli_.try_consume_message_for(msg, relTime);
407 }
415 template <class Clock, class Duration>
417 const_message_ptr* msg, const std::chrono::time_point<Clock, Duration>& absTime
418 ) {
419 return cli_.try_consume_message_until(msg, absTime);
420 }
421};
422
425
427} // namespace mqtt
428
429#endif // __mqtt_client_h
Definition async_client.h:137
bool try_consume_message(const_message_ptr *msg) override
void stop_consuming() override
connect_options get_connect_options() const
Definition async_client.h:527
bool try_consume_message_for(const_message_ptr *msg, const std::chrono::duration< Rep, Period > &relTime)
Definition async_client.h:944
std::function< bool(connect_data &)> update_connection_handler
Definition async_client.h:151
delivery_token_ptr publish(string_ref topic, const void *payload, size_t n, int qos, bool retained, const properties &props=properties()) override
string get_client_id() const override
Definition async_client.h:506
bool try_consume_message_until(const_message_ptr *msg, const std::chrono::time_point< Clock, Duration > &absTime)
Definition async_client.h:990
const_message_ptr consume_message() override
string get_server_uri() const override
Definition async_client.h:511
void set_update_connection_handler(update_connection_handler cb)
bool is_connected() const override
Definition async_client.h:535
void start_consuming() override
Definition callback.h:43
virtual void connected(const string &)
Definition callback.h:61
virtual void connection_lost(const string &)
Definition callback.h:65
virtual void delivery_complete(delivery_token_ptr)
Definition callback.h:74
virtual void message_arrived(const_message_ptr)
Definition callback.h:69
Definition client.h:40
virtual subscribe_response subscribe(const string &topicFilter, const subscribe_options &opts=subscribe_options(), const properties &props=properties())
std::shared_ptr< client > ptr_t
Definition client.h:93
client(const string &serverURI, const string &clientId=string{}, const persistence_type &persistence=NO_PERSISTENCE)
virtual void stop_consuming()
Definition client.h:379
bool try_consume_message_until(const_message_ptr *msg, const std::chrono::time_point< Clock, Duration > &absTime)
Definition client.h:416
virtual bool try_consume_message(const_message_ptr *msg)
Definition client.h:392
virtual void publish(const message &msg)
Definition client.h:278
void set_update_connection_handler(update_connection_handler cb)
Definition client.h:233
virtual void disconnect(int timeoutMS)
virtual void publish(const_message_ptr msg)
Definition client.h:267
virtual subscribe_response subscribe(const string_collection &topicFilters, const qos_collection &qos, const std::vector< subscribe_options > &opts=std::vector< subscribe_options >(), const properties &props=properties())
virtual void disconnect()
virtual const_message_ptr consume_message()
Definition client.h:385
bool try_consume_message_for(const_message_ptr *msg, const std::chrono::duration< Rep, Period > &relTime)
Definition client.h:403
virtual void set_callback(callback &cb)
void disconnect(const std::chrono::duration< Rep, Period > &to)
Definition client.h:185
virtual string get_server_uri() const
Definition client.h:197
virtual void start_consuming()
Definition client.h:373
virtual connect_response connect(connect_options opts)
virtual subscribe_response subscribe(const string &topicFilter, int qos, const subscribe_options &opts=subscribe_options(), const properties &props=properties())
virtual unsubscribe_response unsubscribe(const string_collection &topicFilters, const properties &props=properties())
async_client::update_connection_handler update_connection_handler
Definition client.h:98
virtual topic get_topic(const string &top, int qos=message::DFLT_QOS, bool retained=message::DFLT_RETAINED)
Definition client.h:217
void set_timeout(const std::chrono::duration< Rep, Period > &to)
Definition client.h:297
connect_options get_connect_options() const
Definition client.h:208
virtual std::chrono::milliseconds get_timeout() const
Definition client.h:202
virtual subscribe_response subscribe(const string_collection &topicFilters, const std::vector< subscribe_options > &opts=std::vector< subscribe_options >(), const properties &props=properties())
virtual bool is_connected() const
Definition client.h:227
virtual unsubscribe_response unsubscribe(const string &topicFilter, const properties &props=properties())
virtual void publish(string_ref top, const void *payload, size_t n, int qos, bool retained)
Definition client.h:246
virtual connect_response connect()
client(const string &serverURI, const string &clientId, const create_options &opts, const persistence_type &persistence=NO_PERSISTENCE)
async_client::qos_collection qos_collection
Definition client.h:95
virtual connect_response reconnect()
client(const create_options &opts)
virtual void publish(string_ref top, const void *payload, size_t n)
Definition client.h:259
virtual ~client()
Definition client.h:152
client(const string &serverURI, const string &clientId, int maxBufferedMessages, const persistence_type &persistence=NO_PERSISTENCE)
virtual string get_client_id() const
Definition client.h:192
virtual void set_timeout(int timeoutMS)
Definition client.h:289
Definition connect_options.h:50
Definition server_response.h:73
Definition create_options.h:77
std::vector< int > qos_collection
Definition iasync_client.h:66
Definition message.h:57
static constexpr bool DFLT_RETAINED
Definition message.h:62
static constexpr int DFLT_QOS
Definition message.h:60
Definition properties.h:293
Definition string_collection.h:45
Definition subscribe_options.h:49
Definition exception.h:196
Definition topic.h:45
Definition server_response.h:172
#define PAHO_MQTTPP_EXPORT
Definition export.h:40
Definition async_client.h:60
client::ptr_t client_ptr
Definition client.h:424
delivery_token::ptr_t delivery_token_ptr
Definition delivery_token.h:127
constexpr no_persistence NO_PERSISTENCE
Definition create_options.h:43
message::const_ptr_t const_message_ptr
Definition message.h:372
std::chrono::milliseconds to_milliseconds(const std::chrono::duration< Rep, Period > &dur)
Definition types.h:84
std::variant< no_persistence, string, iclient_persistence * > persistence_type
Definition create_options.h:53
long to_milliseconds_count(const std::chrono::duration< Rep, Period > &dur)
Definition types.h:95
Definition server_response.h:131