Skip to content

Commit 3a94563

Browse files
committed
Create hello_world_async_server_with_work_queue.cpp
A minimalist example to show how to employ background threads to deal with requests; useful when you need to employ more complex request processing strategy. (moved from master branch)
1 parent f8346ae commit 3a94563

File tree

1 file changed

+200
-0
lines changed

1 file changed

+200
-0
lines changed
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
* sample application to show the usage of work queues along with async http server
3+
*
4+
* (C) Copyright Dino Korah 2013.
5+
* Distributed under the Boost Software License, Version 1.0. (See copy at
6+
* http://www.boost.org/LICENSE_1_0.txt)
7+
*/
8+
9+
#include <boost/network/include/http/server.hpp>
10+
#include <boost/network/uri.hpp>
11+
12+
#include <boost/asio.hpp>
13+
#include <boost/thread.hpp>
14+
#include <boost/bind.hpp>
15+
#include <boost/shared_ptr.hpp>
16+
#include <iostream>
17+
#include <list>
18+
#include <signal.h>
19+
20+
#define Log(line) \
21+
do { std::cout << line << std::endl; } while(false)
22+
23+
struct handler;
24+
typedef boost::network::http::async_server<handler> server;
25+
26+
/**
27+
* request + connection encapsulation (work item)
28+
*/
29+
struct request_data
30+
{
31+
const server::request req;
32+
server::connection_ptr conn;
33+
34+
typedef boost::shared_ptr< request_data > pointer;
35+
36+
request_data(server::request const& req, const server::connection_ptr& conn) :
37+
req(req), conn(conn)
38+
{
39+
}
40+
};
41+
42+
/**
43+
* A basic work queue
44+
*/
45+
struct work_queue
46+
{
47+
typedef std::list<request_data::pointer> list;
48+
49+
list requests;
50+
boost::mutex mutex;
51+
52+
inline void put(const request_data::pointer& p_rd)
53+
{
54+
boost::unique_lock< boost::mutex > lock(mutex);
55+
requests.push_back(p_rd);
56+
(void)lock;
57+
}
58+
59+
inline request_data::pointer get()
60+
{
61+
boost::unique_lock< boost::mutex > lock(mutex);
62+
63+
request_data::pointer p_ret;
64+
if (!requests.empty()) {
65+
p_ret = requests.front();
66+
requests.pop_front();
67+
}
68+
69+
(void)lock;
70+
71+
return p_ret;
72+
}
73+
};
74+
75+
struct handler
76+
{
77+
work_queue& queue;
78+
79+
handler(work_queue& queue) : queue(queue) { }
80+
81+
/**
82+
* Feed the work queue
83+
*
84+
* @param req
85+
* @param conn
86+
*/
87+
void operator()(server::request const& req, const server::connection_ptr& conn)
88+
{
89+
queue.put(boost::make_shared<request_data>(req, conn));
90+
}
91+
};
92+
93+
/**
94+
* Clean shutdown signal handler
95+
*
96+
* @param error
97+
* @param signal
98+
* @param p_server_instance
99+
*/
100+
void shut_me_down(
101+
const boost::system::error_code& error
102+
, int signal, boost::shared_ptr< server > p_server_instance)
103+
{
104+
if (!error)
105+
p_server_instance->stop();
106+
}
107+
108+
/**
109+
* Process request; worker (thread)
110+
*
111+
* @param queue
112+
*/
113+
void process_request(work_queue& queue)
114+
{
115+
while(!boost::this_thread::interruption_requested()) {
116+
request_data::pointer p_req(queue.get());
117+
if (p_req) {
118+
119+
// some heavy work!
120+
boost::this_thread::sleep(boost::posix_time::seconds(10));
121+
122+
p_req->conn->set_status(server::connection::ok);
123+
p_req->conn->write("Hello, world!");
124+
}
125+
126+
boost::this_thread::sleep(boost::posix_time::microseconds(1000));
127+
}
128+
}
129+
130+
int main(void) try
131+
{
132+
// the thread group
133+
boost::shared_ptr< boost::thread_group > p_threads(
134+
boost::make_shared< boost::thread_group>());
135+
136+
// setup asio::io_service
137+
boost::shared_ptr< boost::asio::io_service > p_io_service(
138+
boost::make_shared< boost::asio::io_service >());
139+
boost::shared_ptr< boost::asio::io_service::work > p_work(
140+
boost::make_shared< boost::asio::io_service::work >(
141+
boost::ref(*p_io_service)));
142+
143+
// io_service threads
144+
{
145+
int n_threads = 5;
146+
while(0 < n_threads--) {
147+
p_threads->create_thread(
148+
boost::bind(&boost::asio::io_service::run, p_io_service));
149+
}
150+
}
151+
152+
// the shared work queue
153+
work_queue queue;
154+
155+
// worker threads that will process the request; off the queue
156+
{
157+
int n_threads = 5;
158+
while(0 < n_threads--) {
159+
p_threads->create_thread(
160+
boost::bind(process_request, boost::ref(queue)));
161+
}
162+
}
163+
164+
// setup the async server
165+
handler request_handler(queue);
166+
boost::shared_ptr< server > p_server_instance(
167+
boost::make_shared<server>(
168+
server::options(request_handler).
169+
address("0.0.0.0")
170+
.port("8800")
171+
.io_service(p_io_service)
172+
.reuse_address(true)
173+
.thread_pool(
174+
boost::make_shared<boost::network::utils::thread_pool>(
175+
2 , p_io_service, p_threads))));
176+
177+
// setup clean shutdown
178+
boost::asio::signal_set signals(*p_io_service, SIGINT, SIGTERM);
179+
signals.async_wait(boost::bind(shut_me_down, _1, _2, p_server_instance));
180+
181+
// run the async server
182+
p_server_instance->run();
183+
184+
// we are stopped - shutting down
185+
186+
p_threads->interrupt_all();
187+
188+
p_work.reset();
189+
p_io_service->stop();
190+
191+
p_threads->join_all();
192+
193+
Log("Terminated normally");
194+
exit(EXIT_SUCCESS);
195+
}
196+
catch(const std::exception& e)
197+
{
198+
Log("Abnormal termination - exception:"<<e.what());
199+
exit(EXIT_FAILURE);
200+
}

0 commit comments

Comments
 (0)