PahoMqttCpp
MQTT C++ Client for POSIX and Windows
Loading...
Searching...
No Matches
thread_queue.h
Go to the documentation of this file.
1
8
9/*******************************************************************************
10 * Copyright (c) 2017-2022 Frank Pagliughi <fpagliughi@mindspring.com>
11 *
12 * All rights reserved. This program and the accompanying materials
13 * are made available under the terms of the Eclipse Public License v2.0
14 * and Eclipse Distribution License v1.0 which accompany this distribution.
15 *
16 * The Eclipse Public License is available at
17 * http://www.eclipse.org/legal/epl-v20.html
18 * and the Eclipse Distribution License is available at
19 * http://www.eclipse.org/org/documents/edl-v10.php.
20 *
21 * Contributors:
22 * Frank Pagliughi - initial implementation and documentation
23 *******************************************************************************/
24
25#ifndef __mqtt_thread_queue_h
26#define __mqtt_thread_queue_h
27
28#include <algorithm>
29#include <condition_variable>
30#include <deque>
31#include <limits>
32#include <mutex>
33#include <queue>
34#include <thread>
35
36namespace mqtt {
37
42class queue_closed : public std::runtime_error
43{
44public:
45 queue_closed() : std::runtime_error("queue is closed") {}
46};
47
49
84template <typename T, class Container = std::deque<T>>
86{
87public:
89 using container_type = Container;
91 using value_type = T;
93 using size_type = typename Container::size_type;
94
96 static constexpr size_type MAX_CAPACITY = std::numeric_limits<size_type>::max();
97
98private:
100 mutable std::mutex lock_;
102 std::condition_variable notEmptyCond_;
104 std::condition_variable notFullCond_;
108 bool closed_{false};
109
111 std::queue<T, Container> que_;
112
114 using guard = std::lock_guard<std::mutex>;
116 using unique_guard = std::unique_lock<std::mutex>;
117
119 bool is_done() const { return closed_ && que_.empty(); }
120
121public:
133 explicit thread_queue(size_t cap) : cap_(std::max<size_type>(cap, 1)) {}
139 bool empty() const {
140 guard g{lock_};
141 return que_.empty();
142 }
148 guard g{lock_};
149 return cap_;
150 }
157 void capacity(size_type cap) {
158 guard g{lock_};
159 cap_ = cap;
160 }
165 size_type size() const {
166 guard g{lock_};
167 return que_.size();
168 }
175 void close() {
176 guard g{lock_};
177 closed_ = true;
178 notFullCond_.notify_all();
179 notEmptyCond_.notify_all();
180 }
188 bool closed() const {
189 guard g{lock_};
190 return closed_;
191 }
198 bool done() const {
199 guard g{lock_};
200 return is_done();
201 }
206 void clear() {
207 guard g{lock_};
208 while (!que_.empty()) que_.pop();
209 notFullCond_.notify_all();
210 }
217 void put(value_type val) {
218 unique_guard g{lock_};
219 notFullCond_.wait(g, [this] { return que_.size() < cap_ || closed_; });
220 if (closed_)
221 throw queue_closed{};
222
223 que_.emplace(std::move(val));
224 notEmptyCond_.notify_one();
225 }
232 bool try_put(value_type val) {
233 guard g{lock_};
234 if (que_.size() >= cap_ || closed_)
235 return false;
236
237 que_.emplace(std::move(val));
238 notEmptyCond_.notify_one();
239 return true;
240 }
250 template <typename Rep, class Period>
251 bool try_put_for(value_type val, const std::chrono::duration<Rep, Period>& relTime) {
252 unique_guard g{lock_};
253 bool to = !notFullCond_.wait_for(g, relTime, [this] {
254 return que_.size() < cap_ || closed_;
255 });
256 if (to || closed_)
257 return false;
258
259 que_.emplace(std::move(val));
260 notEmptyCond_.notify_one();
261 return true;
262 }
273 template <class Clock, class Duration>
275 value_type val, const std::chrono::time_point<Clock, Duration>& absTime
276 ) {
277 unique_guard g{lock_};
278 bool to = !notFullCond_.wait_until(g, absTime, [this] {
279 return que_.size() < cap_ || closed_;
280 });
281
282 if (to || closed_)
283 return false;
284
285 que_.emplace(std::move(val));
286 notEmptyCond_.notify_one();
287 return true;
288 }
295 bool get(value_type* val) {
296 if (!val)
297 return false;
298
299 unique_guard g{lock_};
300 notEmptyCond_.wait(g, [this] { return !que_.empty() || closed_; });
301 if (que_.empty()) // We must be done
302 return false;
303
304 *val = std::move(que_.front());
305 que_.pop();
306 notFullCond_.notify_one();
307 return true;
308 }
316 unique_guard g{lock_};
317 notEmptyCond_.wait(g, [this] { return !que_.empty() || closed_; });
318 if (que_.empty()) // We must be done
319 throw queue_closed{};
320
321 value_type val = std::move(que_.front());
322 que_.pop();
323 notFullCond_.notify_one();
324 return val;
325 }
334 bool try_get(value_type* val) {
335 if (!val)
336 return false;
337
338 guard g{lock_};
339 if (que_.empty())
340 return false;
341
342 *val = std::move(que_.front());
343 que_.pop();
344 notFullCond_.notify_one();
345 return true;
346 }
357 template <typename Rep, class Period>
358 bool try_get_for(value_type* val, const std::chrono::duration<Rep, Period>& relTime) {
359 if (!val)
360 return false;
361
362 unique_guard g{lock_};
363 notEmptyCond_.wait_for(g, relTime, [this] { return !que_.empty() || closed_; });
364
365 if (que_.empty())
366 return false;
367
368 *val = std::move(que_.front());
369 que_.pop();
370 notFullCond_.notify_one();
371 return true;
372 }
383 template <class Clock, class Duration>
385 value_type* val, const std::chrono::time_point<Clock, Duration>& absTime
386 ) {
387 if (!val)
388 return false;
389
390 unique_guard g{lock_};
391 notEmptyCond_.wait_until(g, absTime, [this] { return !que_.empty() || closed_; });
392 if (que_.empty())
393 return false;
394
395 *val = std::move(que_.front());
396 que_.pop();
397 notFullCond_.notify_one();
398 return true;
399 }
400};
401
403} // namespace mqtt
404
405#endif // __mqtt_thread_queue_h
Definition thread_queue.h:43
queue_closed()
Definition thread_queue.h:45
Definition thread_queue.h:86
typename Container::size_type size_type
Definition thread_queue.h:93
T value_type
Definition thread_queue.h:91
size_type size() const
Definition thread_queue.h:165
void capacity(size_type cap)
Definition thread_queue.h:157
bool done() const
Definition thread_queue.h:198
Container container_type
Definition thread_queue.h:89
bool try_put(value_type val)
Definition thread_queue.h:232
bool try_get_for(value_type *val, const std::chrono::duration< Rep, Period > &relTime)
Definition thread_queue.h:358
bool try_put_for(value_type val, const std::chrono::duration< Rep, Period > &relTime)
Definition thread_queue.h:251
static constexpr size_type MAX_CAPACITY
Definition thread_queue.h:96
bool try_get(value_type *val)
Definition thread_queue.h:334
thread_queue()
Definition thread_queue.h:126
bool try_put_until(value_type val, const std::chrono::time_point< Clock, Duration > &absTime)
Definition thread_queue.h:274
void clear()
Definition thread_queue.h:206
bool closed() const
Definition thread_queue.h:188
bool get(value_type *val)
Definition thread_queue.h:295
bool empty() const
Definition thread_queue.h:139
void close()
Definition thread_queue.h:175
size_type capacity() const
Definition thread_queue.h:147
thread_queue(size_t cap)
Definition thread_queue.h:133
void put(value_type val)
Definition thread_queue.h:217
value_type get()
Definition thread_queue.h:315
bool try_get_until(value_type *val, const std::chrono::time_point< Clock, Duration > &absTime)
Definition thread_queue.h:384
Definition async_client.h:60