9
9
10
10
#include < memory>
11
11
#include < mutex>
12
+ #include < chrono>
12
13
#include < functional>
14
+ #include < boost/network/utils/thread_group.hpp>
13
15
#include < boost/network/include/http/server.hpp>
14
16
#include < boost/network/uri.hpp>
15
-
16
17
#include < boost/asio.hpp>
17
- #include < boost/thread.hpp>
18
18
#include < iostream>
19
19
#include < list>
20
20
#include < signal.h>
21
21
22
- #define Log (line ) \
23
- do { \
24
- std::cout << line << std::endl; \
25
- } while (false )
22
+ // This is needed to terminate the worker queue, and must be visible to the signal handler.
23
+ bool running = true ;
26
24
27
25
struct handler ;
28
26
typedef boost::network::http::server<handler> server;
29
27
28
+ struct server_data {
29
+ boost::network::http::server<handler> server;
30
+
31
+ server_data (const server::options &options)
32
+ : server(options) {}
33
+
34
+ void run () {
35
+ server.run ();
36
+ }
37
+
38
+ void stop () {
39
+ running = false ;
40
+ server.stop ();
41
+ }
42
+ };
43
+
30
44
/* *
31
45
* request + connection encapsulation (work item)
32
46
*/
@@ -36,7 +50,7 @@ struct request_data {
36
50
37
51
typedef std::shared_ptr<request_data> pointer;
38
52
39
- request_data (server::request req, server::connection_ptr conn)
53
+ request_data (server::request req, server::connection_ptr conn)
40
54
: req(std::move(req)), conn(std::move(conn)) {}
41
55
};
42
56
@@ -49,24 +63,24 @@ struct work_queue {
49
63
list requests;
50
64
std::mutex mutex;
51
65
52
- inline void put (const request_data::pointer& p_rd ) {
66
+ inline void put (const request_data::pointer& request ) {
53
67
std::unique_lock<std::mutex> lock (mutex);
54
- requests.push_back (p_rd );
68
+ requests.push_back (request );
55
69
(void )lock;
56
70
}
57
71
58
72
inline request_data::pointer get () {
59
73
std::unique_lock<std::mutex> lock (mutex);
60
74
61
- request_data::pointer p_ret ;
75
+ request_data::pointer request ;
62
76
if (!requests.empty ()) {
63
- p_ret = requests.front ();
77
+ request = requests.front ();
64
78
requests.pop_front ();
65
79
}
66
80
67
81
(void )lock;
68
82
69
- return p_ret ;
83
+ return request ;
70
84
}
71
85
};
72
86
@@ -92,11 +106,11 @@ struct handler {
92
106
*
93
107
* @param error
94
108
* @param signal
95
- * @param p_server_instance
109
+ * @param server
96
110
*/
97
- void shut_me_down (const boost::system::error_code& error, int ,
98
- std::shared_ptr<server> p_server_instance ) {
99
- if (!error) p_server_instance ->stop ();
111
+ void shut_me_down (const boost::system::error_code& error, int signal ,
112
+ std::shared_ptr<server_data> server ) {
113
+ if (!error) server ->stop ();
100
114
}
101
115
102
116
/* *
@@ -105,85 +119,79 @@ void shut_me_down(const boost::system::error_code& error, int,
105
119
* @param queue
106
120
*/
107
121
void process_request (work_queue& queue) {
108
- while (! boost::this_thread::interruption_requested () ) {
109
- request_data::pointer p_req (queue.get ());
110
- if (p_req ) {
122
+ while (running ) {
123
+ request_data::pointer request (queue.get ());
124
+ if (request ) {
111
125
112
126
// some heavy work!
113
- boost ::this_thread::sleep ( boost::posix_time ::seconds (10 ));
127
+ std ::this_thread::sleep_for ( std::chrono ::seconds (10 ));
114
128
115
- p_req ->conn ->set_status (server::connection::ok);
116
- p_req ->conn ->write (" Hello, world!" );
129
+ request ->conn ->set_status (server::connection::ok);
130
+ request ->conn ->write (" Hello, world!" );
117
131
}
118
132
119
- boost ::this_thread::sleep ( boost::posix_time ::microseconds (1000 ));
133
+ std ::this_thread::sleep_for ( std::chrono ::microseconds (1000 ));
120
134
}
121
135
}
122
136
123
- int main (void ) try {
124
- // the thread group
125
- std::shared_ptr<boost::thread_group> p_threads (
126
- std::make_shared<boost::thread_group>());
127
-
128
- // setup asio::io_service
129
- std::shared_ptr<boost::asio::io_service> p_io_service (
130
- std::make_shared<boost::asio::io_service>());
131
- std::shared_ptr<boost::asio::io_service::work> p_work (
132
- std::make_shared<boost::asio::io_service::work>(
133
- boost::ref (*p_io_service)));
134
-
135
- // io_service threads
136
- {
137
- int n_threads = 5 ;
138
- while (0 < n_threads--) {
139
- p_threads->create_thread ([=] () { p_io_service->run (); });
137
+ int main () {
138
+ try {
139
+ // the thread group
140
+ auto threads (std::make_shared<boost::network::utils::thread_group>());
141
+
142
+ // setup asio::io_service
143
+ auto io_service (std::make_shared<boost::asio::io_service>());
144
+ auto work (std::make_shared<boost::asio::io_service::work>(std::ref (*io_service)));
145
+
146
+ // io_service threads
147
+ {
148
+ int n_threads = 5 ;
149
+ while (0 < n_threads--) {
150
+ threads->create_thread ([=] () { io_service->run (); });
151
+ }
140
152
}
141
- }
142
153
143
- // the shared work queue
144
- work_queue queue;
154
+ // the shared work queue
155
+ work_queue queue;
145
156
146
- // worker threads that will process the request; off the queue
147
- {
148
- int n_threads = 5 ;
149
- while (0 < n_threads--) {
150
- p_threads->create_thread ([&queue] () { process_request (queue); });
157
+ // worker threads that will process the request; off the queue
158
+ {
159
+ int n_threads = 5 ;
160
+ while (0 < n_threads--) {
161
+ threads->create_thread ([&queue] () { process_request (queue); });
162
+ }
151
163
}
152
- }
153
164
154
- // setup the async server
155
- handler request_handler (queue);
156
- std::shared_ptr<server> p_server_instance (std::make_shared<server>(
157
- server::options (request_handler)
158
- .address (" 0.0.0.0" )
159
- .port (" 8800" )
160
- .io_service (p_io_service)
161
- .reuse_address (true )
162
- .thread_pool (std::make_shared<boost::network::utils::thread_pool>(
163
- 2 , p_io_service, p_threads))));
164
-
165
- // setup clean shutdown
166
- boost::asio::signal_set signals (*p_io_service, SIGINT, SIGTERM);
167
- signals.async_wait ([=] (boost::system::error_code const &ec, int signal) {
168
- shut_me_down (ec, signal, p_server_instance);
169
- });
170
-
171
- // run the async server
172
- p_server_instance->run ();
173
-
174
- // we are stopped - shutting down
175
-
176
- p_threads->interrupt_all ();
177
-
178
- p_work.reset ();
179
- p_io_service->stop ();
180
-
181
- p_threads->join_all ();
182
-
183
- Log (" Terminated normally" );
184
- exit (EXIT_SUCCESS);
185
- }
186
- catch (const std::exception& e) {
187
- Log (" Abnormal termination - exception:" << e.what ());
188
- exit (EXIT_FAILURE);
165
+ // setup the async server
166
+ handler request_handler (queue);
167
+ auto server (std::make_shared<server_data>(
168
+ server::options (request_handler)
169
+ .address (" 0.0.0.0" )
170
+ .port (" 8800" )
171
+ .io_service (io_service)
172
+ .reuse_address (true )
173
+ .thread_pool (std::make_shared<boost::network::utils::thread_pool>(
174
+ 2 , io_service, threads))));
175
+
176
+ // setup clean shutdown
177
+ boost::asio::signal_set signals (*io_service, SIGINT, SIGTERM);
178
+ signals.async_wait ([=] (boost::system::error_code const &ec, int signal) {
179
+ shut_me_down (ec, signal, server);
180
+ });
181
+
182
+ // run the async server
183
+ server->run ();
184
+
185
+ work.reset ();
186
+ io_service->stop ();
187
+
188
+ threads->join_all ();
189
+
190
+ std::cout << " Terminated normally" << std::endl;
191
+ exit (EXIT_SUCCESS);
192
+ }
193
+ catch (const std::exception& e) {
194
+ std::cerr << " Abnormal termination - exception:" << e.what () << std::endl;
195
+ exit (EXIT_FAILURE);
196
+ }
189
197
}
0 commit comments