Skip to content

Commit d601264

Browse files
committed
Added a C++11 enabled thread_group (cf. boost's thread_group).
1 parent 6bff172 commit d601264

File tree

3 files changed

+176
-89
lines changed

3 files changed

+176
-89
lines changed

boost/network/utils/thread_group.hpp

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright (c) Glyn Matthews 2016.
2+
// (C) Copyright 2007-9 Anthony Williams
3+
// Distributed under the Boost Software License, Version 1.0.
4+
// (See accompanying file LICENSE_1_0.txt or copy at
5+
// http://www.boost.org/LICENSE_1_0.txt)
6+
7+
#ifndef BOOST_NETWORK_UTILS_THREAD_GROUP_INC
8+
#define BOOST_NETWORK_UTILS_THREAD_GROUP_INC
9+
10+
#include <thread>
11+
#include <mutex>
12+
#include <memory>
13+
#include <list>
14+
#include <algorithm>
15+
16+
namespace boost {
17+
namespace network {
18+
namespace utils {
19+
class thread_group {
20+
private:
21+
thread_group(thread_group const&);
22+
thread_group& operator=(thread_group const&);
23+
24+
public:
25+
thread_group() {}
26+
~thread_group() {}
27+
28+
template <typename F>
29+
std::thread* create_thread(F threadfunc) {
30+
std::lock_guard<std::mutex> guard(m);
31+
std::unique_ptr<std::thread> new_thread(new std::thread(threadfunc));
32+
threads.push_back(std::move(new_thread));
33+
return threads.back().get();
34+
}
35+
36+
void add_thread(std::thread* thrd) {
37+
if (thrd) {
38+
std::lock_guard<std::mutex> guard(m);
39+
threads.push_back(std::unique_ptr<std::thread>(thrd));
40+
}
41+
}
42+
43+
void remove_thread(std::thread* thrd) {
44+
std::lock_guard<std::mutex> guard(m);
45+
auto const it = std::find_if(threads.begin(), threads.end(),
46+
[&thrd] (std::unique_ptr<std::thread> &arg) {
47+
return arg.get() == thrd;
48+
});
49+
if (it != threads.end()) {
50+
threads.erase(it);
51+
}
52+
}
53+
54+
void join_all() {
55+
std::unique_lock<std::mutex> guard(m);
56+
57+
for (auto &thread : threads) {
58+
if (thread->joinable()) {
59+
thread->join();
60+
}
61+
}
62+
}
63+
64+
size_t size() const {
65+
std::unique_lock<std::mutex> guard(m);
66+
return threads.size();
67+
}
68+
69+
private:
70+
std::list<std::unique_ptr<std::thread>> threads;
71+
mutable std::mutex m;
72+
};
73+
74+
} // namespace utils
75+
} // namespace network
76+
} // namespace boost
77+
78+
#endif // BOOST_NETWORK_UTILS_THREAD_GROUP_INC

boost/network/utils/thread_pool.hpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,22 @@
66
// (See accompanying file LICENSE_1_0.txt or copy at
77
// http://www.boost.org/LICENSE_1_0.txt)
88

9+
#include <cstddef>
910
#include <memory>
1011
#include <functional>
1112
#include <boost/asio/io_service.hpp>
1213
#include <boost/function.hpp>
1314
#include <boost/network/tags.hpp>
1415
#include <boost/scope_exit.hpp>
15-
#include <boost/thread/thread.hpp>
16-
#include <cstddef>
16+
//#include <boost/thread/thread.hpp>
17+
#include <boost/network/utils/thread_group.hpp>
1718

1819
namespace boost {
1920
namespace network {
2021
namespace utils {
2122

2223
typedef std::shared_ptr<boost::asio::io_service> io_service_ptr;
23-
typedef std::shared_ptr<boost::thread_group> worker_threads_ptr;
24+
typedef std::shared_ptr<utils::thread_group> worker_threads_ptr;
2425
typedef std::shared_ptr<boost::asio::io_service::work> sentinel_ptr;
2526

2627
template <class Tag>
@@ -46,7 +47,7 @@ struct basic_thread_pool {
4647
sentinel_.reset();
4748
io_service_.reset();
4849
if (worker_threads_.get()) {
49-
worker_threads_->interrupt_all();
50+
// worker_threads_->interrupt_all();
5051
worker_threads_->join_all();
5152
}
5253
worker_threads_.reset();
@@ -59,7 +60,7 @@ struct basic_thread_pool {
5960
}
6061

6162
if (!worker_threads_.get()) {
62-
worker_threads_.reset(new boost::thread_group);
63+
worker_threads_.reset(new utils::thread_group);
6364
}
6465

6566
if (!sentinel_.get()) {

libs/network/example/http/hello_world_async_server_with_work_queue.cpp

Lines changed: 92 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,38 @@
99

1010
#include <memory>
1111
#include <mutex>
12+
#include <chrono>
1213
#include <functional>
14+
#include <boost/network/utils/thread_group.hpp>
1315
#include <boost/network/include/http/server.hpp>
1416
#include <boost/network/uri.hpp>
15-
1617
#include <boost/asio.hpp>
17-
#include <boost/thread.hpp>
1818
#include <iostream>
1919
#include <list>
2020
#include <signal.h>
2121

22-
#define Log(line) \
23-
do { \
24-
std::cout << line << std::endl; \
25-
} while (false)
22+
// This is needed to terminate the worker queue, and must be visible to the signal handler.
23+
bool running = true;
2624

2725
struct handler;
2826
typedef boost::network::http::server<handler> server;
2927

28+
struct server_data {
29+
boost::network::http::server<handler> server;
30+
31+
server_data(const server::options &options)
32+
: server(options) {}
33+
34+
void run() {
35+
server.run();
36+
}
37+
38+
void stop() {
39+
running = false;
40+
server.stop();
41+
}
42+
};
43+
3044
/**
3145
* request + connection encapsulation (work item)
3246
*/
@@ -36,7 +50,7 @@ struct request_data {
3650

3751
typedef std::shared_ptr<request_data> pointer;
3852

39-
request_data(server::request req, server::connection_ptr conn)
53+
request_data(server::request req, server::connection_ptr conn)
4054
: req(std::move(req)), conn(std::move(conn)) {}
4155
};
4256

@@ -49,24 +63,24 @@ struct work_queue {
4963
list requests;
5064
std::mutex mutex;
5165

52-
inline void put(const request_data::pointer& p_rd) {
66+
inline void put(const request_data::pointer& request) {
5367
std::unique_lock<std::mutex> lock(mutex);
54-
requests.push_back(p_rd);
68+
requests.push_back(request);
5569
(void)lock;
5670
}
5771

5872
inline request_data::pointer get() {
5973
std::unique_lock<std::mutex> lock(mutex);
6074

61-
request_data::pointer p_ret;
75+
request_data::pointer request;
6276
if (!requests.empty()) {
63-
p_ret = requests.front();
77+
request = requests.front();
6478
requests.pop_front();
6579
}
6680

6781
(void)lock;
6882

69-
return p_ret;
83+
return request;
7084
}
7185
};
7286

@@ -92,11 +106,11 @@ struct handler {
92106
*
93107
* @param error
94108
* @param signal
95-
* @param p_server_instance
109+
* @param server
96110
*/
97-
void shut_me_down(const boost::system::error_code& error, int,
98-
std::shared_ptr<server> p_server_instance) {
99-
if (!error) p_server_instance->stop();
111+
void shut_me_down(const boost::system::error_code& error, int signal,
112+
std::shared_ptr<server_data> server) {
113+
if (!error) server->stop();
100114
}
101115

102116
/**
@@ -105,85 +119,79 @@ void shut_me_down(const boost::system::error_code& error, int,
105119
* @param queue
106120
*/
107121
void process_request(work_queue& queue) {
108-
while (!boost::this_thread::interruption_requested()) {
109-
request_data::pointer p_req(queue.get());
110-
if (p_req) {
122+
while (running) {
123+
request_data::pointer request(queue.get());
124+
if (request) {
111125

112126
// some heavy work!
113-
boost::this_thread::sleep(boost::posix_time::seconds(10));
127+
std::this_thread::sleep_for(std::chrono::seconds(10));
114128

115-
p_req->conn->set_status(server::connection::ok);
116-
p_req->conn->write("Hello, world!");
129+
request->conn->set_status(server::connection::ok);
130+
request->conn->write("Hello, world!");
117131
}
118132

119-
boost::this_thread::sleep(boost::posix_time::microseconds(1000));
133+
std::this_thread::sleep_for(std::chrono::microseconds(1000));
120134
}
121135
}
122136

123-
int main(void) try {
124-
// the thread group
125-
std::shared_ptr<boost::thread_group> p_threads(
126-
std::make_shared<boost::thread_group>());
127-
128-
// setup asio::io_service
129-
std::shared_ptr<boost::asio::io_service> p_io_service(
130-
std::make_shared<boost::asio::io_service>());
131-
std::shared_ptr<boost::asio::io_service::work> p_work(
132-
std::make_shared<boost::asio::io_service::work>(
133-
boost::ref(*p_io_service)));
134-
135-
// io_service threads
136-
{
137-
int n_threads = 5;
138-
while (0 < n_threads--) {
139-
p_threads->create_thread([=] () { p_io_service->run(); });
137+
int main() {
138+
try {
139+
// the thread group
140+
auto threads(std::make_shared<boost::network::utils::thread_group>());
141+
142+
// setup asio::io_service
143+
auto io_service(std::make_shared<boost::asio::io_service>());
144+
auto work(std::make_shared<boost::asio::io_service::work>(std::ref(*io_service)));
145+
146+
// io_service threads
147+
{
148+
int n_threads = 5;
149+
while (0 < n_threads--) {
150+
threads->create_thread([=] () { io_service->run(); });
151+
}
140152
}
141-
}
142153

143-
// the shared work queue
144-
work_queue queue;
154+
// the shared work queue
155+
work_queue queue;
145156

146-
// worker threads that will process the request; off the queue
147-
{
148-
int n_threads = 5;
149-
while (0 < n_threads--) {
150-
p_threads->create_thread([&queue] () { process_request(queue); });
157+
// worker threads that will process the request; off the queue
158+
{
159+
int n_threads = 5;
160+
while (0 < n_threads--) {
161+
threads->create_thread([&queue] () { process_request(queue); });
162+
}
151163
}
152-
}
153164

154-
// setup the async server
155-
handler request_handler(queue);
156-
std::shared_ptr<server> p_server_instance(std::make_shared<server>(
157-
server::options(request_handler)
158-
.address("0.0.0.0")
159-
.port("8800")
160-
.io_service(p_io_service)
161-
.reuse_address(true)
162-
.thread_pool(std::make_shared<boost::network::utils::thread_pool>(
163-
2, p_io_service, p_threads))));
164-
165-
// setup clean shutdown
166-
boost::asio::signal_set signals(*p_io_service, SIGINT, SIGTERM);
167-
signals.async_wait([=] (boost::system::error_code const &ec, int signal) {
168-
shut_me_down(ec, signal, p_server_instance);
169-
});
170-
171-
// run the async server
172-
p_server_instance->run();
173-
174-
// we are stopped - shutting down
175-
176-
p_threads->interrupt_all();
177-
178-
p_work.reset();
179-
p_io_service->stop();
180-
181-
p_threads->join_all();
182-
183-
Log("Terminated normally");
184-
exit(EXIT_SUCCESS);
185-
}
186-
catch (const std::exception& e) {
187-
Log("Abnormal termination - exception:" << e.what());
188-
exit(EXIT_FAILURE);
165+
// setup the async server
166+
handler request_handler(queue);
167+
auto server(std::make_shared<server_data>(
168+
server::options(request_handler)
169+
.address("0.0.0.0")
170+
.port("8800")
171+
.io_service(io_service)
172+
.reuse_address(true)
173+
.thread_pool(std::make_shared<boost::network::utils::thread_pool>(
174+
2, io_service, threads))));
175+
176+
// setup clean shutdown
177+
boost::asio::signal_set signals(*io_service, SIGINT, SIGTERM);
178+
signals.async_wait([=] (boost::system::error_code const &ec, int signal) {
179+
shut_me_down(ec, signal, server);
180+
});
181+
182+
// run the async server
183+
server->run();
184+
185+
work.reset();
186+
io_service->stop();
187+
188+
threads->join_all();
189+
190+
std::cout << "Terminated normally" << std::endl;
191+
exit(EXIT_SUCCESS);
192+
}
193+
catch (const std::exception& e) {
194+
std::cerr << "Abnormal termination - exception:" << e.what() << std::endl;
195+
exit(EXIT_FAILURE);
196+
}
189197
}

0 commit comments

Comments
 (0)