Skip to content

Commit 9a8b4aa

Browse files
committed
Adding implementation of a real thread pool, supporting the adding of work tasks.
1 parent 96367d1 commit 9a8b4aa

File tree

2 files changed

+105
-1
lines changed

2 files changed

+105
-1
lines changed

boost/network/utils/thread_pool.hpp

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,98 @@
88

99
#include <cstddef>
1010
#include <boost/network/tags.hpp>
11+
#include <boost/thread/thread.hpp>
12+
#include <boost/shared_ptr.hpp>
13+
#include <boost/function.hpp>
14+
#include <boost/asio/io_service.hpp>
15+
#include <boost/scope_exit.hpp>
1116

1217
namespace boost { namespace network { namespace utils {
18+
19+
typedef boost::shared_ptr<boost::asio::io_service> io_service_ptr;
20+
typedef boost::shared_ptr<boost::thread_group> worker_threads_ptr;
21+
typedef boost::shared_ptr<boost::asio::io_service::work> sentinel_ptr;
1322

1423
template <class Tag>
1524
struct basic_thread_pool {
16-
basic_thread_pool(std::size_t threads = 1) : threads_(threads) {}
25+
basic_thread_pool(
26+
std::size_t threads = 1,
27+
io_service_ptr io_service = io_service_ptr(),
28+
worker_threads_ptr worker_threads = worker_threads_ptr()
29+
)
30+
: threads_(threads)
31+
, io_service_(io_service)
32+
, worker_threads_(worker_threads)
33+
, sentinel_()
34+
{
35+
bool commit = false;
36+
BOOST_SCOPE_EXIT_TPL((&commit)(&io_service_)(&worker_threads_)(&sentinel_)) {
37+
if (!commit) {
38+
sentinel_.reset();
39+
io_service_.reset();
40+
if (worker_threads_.get()) {
41+
worker_threads_->interrupt_all();
42+
worker_threads_->join_all();
43+
}
44+
worker_threads_.reset();
45+
}
46+
} BOOST_SCOPE_EXIT_END
47+
48+
if (!io_service_.get()) {
49+
io_service_.reset(new boost::asio::io_service);
50+
}
51+
52+
if (!worker_threads_.get()) {
53+
worker_threads_.reset(new boost::thread_group);
54+
}
55+
56+
if (!sentinel_.get()) {
57+
sentinel_.reset(new boost::asio::io_service::work(*io_service_));
58+
}
59+
60+
for (std::size_t counter = 0; counter < threads_; ++counter)
61+
worker_threads_->create_thread(
62+
boost::bind(
63+
&boost::asio::io_service::run,
64+
io_service_
65+
)
66+
);
67+
68+
commit = true;
69+
}
70+
1771
std::size_t const thread_count() const {
1872
return threads_;
1973
}
74+
75+
void post(boost::function<void()> f) {
76+
io_service_->post(f);
77+
}
78+
79+
~basic_thread_pool() throw () {
80+
sentinel_.reset();
81+
try {
82+
worker_threads_->join_all();
83+
} catch (...) {
84+
BOOST_ASSERT(false && "A handler was not supposed to throw, but one did.");
85+
}
86+
}
87+
88+
void swap(basic_thread_pool & other) {
89+
std::swap(other.threads_, threads_);
90+
std::swap(other.io_service_, io_service_);
91+
std::swap(other.worket_threads_, worker_threads_);
92+
std::swap(other.sentinel_, sentinel_);
93+
}
2094
protected:
2195
std::size_t threads_;
96+
io_service_ptr io_service_;
97+
worker_threads_ptr worker_threads_;
98+
sentinel_ptr sentinel_;
99+
100+
private:
101+
basic_thread_pool(basic_thread_pool const &); // no copies please
102+
basic_thread_pool & operator=(basic_thread_pool); // no assignment please
22103
};
23104

24105
typedef basic_thread_pool<tags::default_> thread_pool;

libs/network/test/utils_thread_pool.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <boost/config/warning_disable.hpp>
99
#include <boost/test/unit_test.hpp>
1010
#include <boost/network/utils/thread_pool.hpp>
11+
#include <boost/bind.hpp>
1112

1213
using namespace boost::network;
1314

@@ -24,3 +25,25 @@ BOOST_AUTO_TEST_CASE( default_constructor ) {
2425
BOOST_CHECK_EQUAL(pool.thread_count(), 1);
2526
}
2627

28+
struct foo {
29+
foo() : val_(0) {}
30+
void bar(int val) {
31+
val_ += val;
32+
}
33+
int const val() const {
34+
return val_;
35+
}
36+
protected:
37+
int val_;
38+
};
39+
40+
BOOST_AUTO_TEST_CASE( post_work ) {
41+
foo instance;
42+
{
43+
utils::thread_pool pool;
44+
BOOST_CHECK_NO_THROW(pool.post(boost::bind(&foo::bar, &instance, 1)));
45+
BOOST_CHECK_NO_THROW(pool.post(boost::bind(&foo::bar, &instance, 2)));
46+
// require that pool is destroyed here, RAII baby
47+
}
48+
BOOST_CHECK_EQUAL(instance.val(), 3);
49+
}

0 commit comments

Comments
 (0)