7
7
#ifndef NETWORK_UTILS_THREAD_POOL_IPP_20111021
8
8
#define NETWORK_UTILS_THREAD_POOL_IPP_20111021
9
9
10
+ #include < vector>
11
+ #include < thread>
10
12
#include < network/utils/thread_pool.hpp>
11
13
12
14
namespace network { namespace utils {
13
15
14
16
struct thread_pool_pimpl {
15
- thread_pool_pimpl (
16
- std::size_t threads = 1 ,
17
+ thread_pool_pimpl (std::size_t threads = 1 ,
17
18
io_service_ptr io_service = io_service_ptr(),
18
- worker_threads_ptr worker_threads = worker_threads_ptr()
19
- )
19
+ std::vector<std::thread> worker_threads = {})
20
20
: threads_(threads)
21
21
, io_service_(io_service)
22
- , worker_threads_(worker_threads)
22
+ , worker_threads_(std::move( worker_threads) )
23
23
, sentinel_()
24
24
{
25
25
bool commit = false ;
26
26
BOOST_SCOPE_EXIT ((&commit)(&io_service_)(&worker_threads_)(&sentinel_)) {
27
27
if (!commit) {
28
28
sentinel_.reset ();
29
29
io_service_.reset ();
30
- if (worker_threads_.get ()) {
31
- worker_threads_->interrupt_all ();
32
- worker_threads_->join_all ();
33
- }
34
- worker_threads_.reset ();
30
+ for (auto & thread : worker_threads_)
31
+ if (thread.joinable ()) thread.join ();
32
+ worker_threads_.clear ();
35
33
}
36
34
} BOOST_SCOPE_EXIT_END
37
35
38
- if (!io_service_.get ()) {
39
- io_service_.reset (new boost::asio::io_service);
40
- }
41
-
42
- if (!worker_threads_.get ()) {
43
- worker_threads_.reset (new boost::thread_group);
44
- }
45
-
46
- if (!sentinel_.get ()) {
36
+ if (!io_service_.get ()) io_service_.reset (new boost::asio::io_service);
37
+ if (!sentinel_.get ())
47
38
sentinel_.reset (new boost::asio::io_service::work (*io_service_));
48
- }
49
-
39
+ auto local_io_service = io_service_;
50
40
for (std::size_t counter = 0 ; counter < threads_; ++counter)
51
- worker_threads_->create_thread (
52
- boost::bind (
53
- &boost::asio::io_service::run,
54
- io_service_
55
- )
56
- );
41
+ worker_threads_.emplace_back ([local_io_service](){
42
+ local_io_service->run ();});
57
43
58
44
commit = true ;
59
45
}
60
46
47
+ thread_pool_pimpl (thread_pool_pimpl const &) = delete ;
48
+ thread_pool_pimpl & operator =(thread_pool_pimpl const &) = delete ;
49
+
50
+ thread_pool_pimpl (thread_pool_pimpl&& other) {
51
+ other.swap (*this );
52
+ }
53
+
61
54
std::size_t const thread_count () const {
62
55
return threads_;
63
56
}
@@ -69,34 +62,32 @@ namespace network { namespace utils {
69
62
~thread_pool_pimpl () {
70
63
sentinel_.reset ();
71
64
try {
72
- worker_threads_-> join_all ();
65
+ for ( auto & thread : worker_threads_) thread. join ();
73
66
} catch (...) {
74
67
BOOST_ASSERT (false && " A handler was not supposed to throw, but one did." );
75
68
std::abort ();
76
69
}
77
70
}
78
71
79
72
void swap (thread_pool_pimpl & other) {
80
- std::swap (other.threads_ , threads_);
81
- std::swap (other.io_service_ , io_service_);
82
- std::swap (other.worker_threads_ , worker_threads_);
83
- std::swap (other.sentinel_ , sentinel_);
73
+ using std::swap;
74
+ swap (other.threads_ , threads_);
75
+ swap (other.io_service_ , io_service_);
76
+ swap (other.worker_threads_ , worker_threads_);
77
+ swap (other.sentinel_ , sentinel_);
84
78
}
79
+
85
80
protected:
86
81
std::size_t threads_;
87
82
io_service_ptr io_service_;
88
- worker_threads_ptr worker_threads_;
83
+ std::vector<std::thread> worker_threads_;
89
84
sentinel_ptr sentinel_;
90
-
91
- private:
92
- thread_pool_pimpl (thread_pool_pimpl const &); // no copies please
93
- thread_pool_pimpl & operator =(thread_pool_pimpl); // no assignment please
94
85
};
95
86
96
87
thread_pool::thread_pool (std::size_t threads,
97
88
io_service_ptr io_service,
98
- worker_threads_ptr worker_threads)
99
- : pimpl(new (std::nothrow) thread_pool_pimpl(threads, io_service, worker_threads))
89
+ std::vector<std::thread> worker_threads)
90
+ : pimpl(new (std::nothrow) thread_pool_pimpl(threads, io_service, std::move( worker_threads) ))
100
91
{}
101
92
102
93
std::size_t const thread_pool::thread_count () const {
@@ -110,11 +101,11 @@ namespace network { namespace utils {
110
101
void thread_pool::swap (thread_pool & other) {
111
102
std::swap (other.pimpl , this ->pimpl );
112
103
}
113
-
104
+
114
105
thread_pool::~thread_pool () {
115
106
delete pimpl;
116
107
}
117
-
108
+
118
109
} // namespace utils
119
110
} // namespace network
120
111
0 commit comments