24
24
#include < boost/optional.hpp>
25
25
#include < boost/utility/typed_in_place_factory.hpp>
26
26
#include < boost/thread/locks.hpp>
27
+ #include < boost/thread/recursive_mutex.hpp>
27
28
#include < list>
28
29
#include < vector>
29
30
#include < iterator>
@@ -127,11 +128,20 @@ namespace boost { namespace network { namespace http {
127
128
, handler(handler)
128
129
, thread_pool_(thread_pool)
129
130
, headers_already_sent(false )
131
+ , first_line_already_sent(false )
132
+ , headers_in_progress(false )
133
+ , first_line_in_progress(false )
130
134
, headers_buffer(BOOST_NETWORK_HTTP_SERVER_CONNECTION_HEADER_BUFFER_MAX_SIZE)
131
135
{
132
136
new_start = read_buffer_.begin ();
133
137
}
134
138
139
+ ~async_connection () throw () {
140
+ boost::system::error_code ignored;
141
+ socket_.shutdown (asio::ip::tcp::socket::shutdown_both, ignored);
142
+ socket_.close (ignored);
143
+ }
144
+
135
145
/* * Function: template <class Range> set_headers(Range headers)
136
146
* Precondition: headers have not been sent yet
137
147
* Postcondition: headers have been linearized to a buffer,
@@ -145,16 +155,11 @@ namespace boost { namespace network { namespace http {
145
155
template <class Range >
146
156
void set_headers (Range headers) {
147
157
lock_guard lock (headers_mutex);
158
+ if (first_line_in_progress || headers_in_progress || headers_already_sent)
159
+ boost::throw_exception (std::logic_error (" Headers have already been sent." ));
148
160
149
- if (headers_already_sent) boost::throw_exception (std::logic_error (" Headers have already been sent." ));
150
-
151
- if (error_encountered) boost::throw_exception (boost::system::system_error (*error_encountered));
152
-
153
- bool commit = false ;
154
- BOOST_SCOPE_EXIT_TPL ((&commit)(&headers_already_sent)) {
155
- if (!commit) headers_already_sent = false ;
156
- else headers_already_sent = true ;
157
- } BOOST_SCOPE_EXIT_END
161
+ if (error_encountered)
162
+ boost::throw_exception (boost::system::system_error (*error_encountered));
158
163
159
164
typedef constants<Tag> consts;
160
165
headers_buffer.consume (headers_buffer.size ());
@@ -169,14 +174,13 @@ namespace boost { namespace network { namespace http {
169
174
stream << consts::crlf ();
170
175
}
171
176
stream << consts::crlf ();
177
+ stream.flush ();
172
178
173
179
write_headers_only (
174
180
boost::bind (
175
181
&async_connection<Tag,Handler>::do_nothing
176
182
, async_connection<Tag,Handler>::shared_from_this ()
177
183
));
178
-
179
- commit = true ;
180
184
}
181
185
182
186
void set_status (status_t new_status) {
@@ -189,6 +193,7 @@ namespace boost { namespace network { namespace http {
189
193
190
194
template <class Range >
191
195
void write (Range const & range) {
196
+ lock_guard lock (headers_mutex);
192
197
if (error_encountered) boost::throw_exception (boost::system::system_error (*error_encountered));
193
198
194
199
boost::function<void (boost::system::error_code)> f =
@@ -205,6 +210,7 @@ namespace boost { namespace network { namespace http {
205
210
206
211
template <class Range , class Callback >
207
212
void write (Range const & range, Callback const & callback) {
213
+ lock_guard lock (headers_mutex);
208
214
if (error_encountered) boost::throw_exception (boost::system::system_error (*error_encountered));
209
215
boost::function<void (boost::system::error_code)> f = callback;
210
216
write_impl (boost::make_iterator_range (range), callback);
@@ -256,23 +262,25 @@ namespace boost { namespace network { namespace http {
256
262
typedef boost::shared_ptr<array_list> shared_array_list;
257
263
typedef boost::shared_ptr<std::vector<asio::const_buffer> > shared_buffers;
258
264
typedef request_parser<Tag> request_parser_type;
259
- typedef boost::lock_guard<boost::mutex> lock_guard;
265
+ typedef boost::lock_guard<boost::recursive_mutex> lock_guard;
266
+ typedef std::list<boost::function<void ()> > pending_actions_list;
260
267
261
268
asio::ip::tcp::socket socket_;
262
269
asio::io_service::strand strand;
263
270
Handler & handler;
264
271
utils::thread_pool & thread_pool_;
265
- bool headers_already_sent;
272
+ volatile bool headers_already_sent, first_line_already_sent, headers_in_progress, first_line_in_progress ;
266
273
asio::streambuf headers_buffer;
267
274
268
- boost::mutex headers_mutex;
275
+ boost::recursive_mutex headers_mutex;
269
276
buffer_type read_buffer_;
270
277
status_t status;
271
278
request_parser_type parser;
272
279
request request_;
273
280
buffer_type::iterator new_start;
274
281
string_type partial_parsed;
275
282
optional<boost::system::system_error> error_encountered;
283
+ pending_actions_list pending_actions;
276
284
277
285
template <class , class > friend struct async_server_base ;
278
286
@@ -281,6 +289,10 @@ namespace boost { namespace network { namespace http {
281
289
};
282
290
283
291
void start () {
292
+ typename ostringstream<Tag>::type ip_stream;
293
+ ip_stream << socket_.remote_endpoint ().address ().to_v4 ().to_string () << ' :'
294
+ << socket_.remote_endpoint ().port ();
295
+ request_.source = ip_stream.str ();
284
296
read_more (method);
285
297
}
286
298
@@ -462,7 +474,6 @@ namespace boost { namespace network { namespace http {
462
474
463
475
void parse_headers (string_type & input, typename request::headers_container_type & container) {
464
476
using namespace boost ::spirit::qi;
465
- std::vector<fusion::tuple<std::string,std::string> > headers;
466
477
parse (
467
478
input.begin (), input.end (),
468
479
*(
@@ -471,7 +482,7 @@ namespace boost { namespace network { namespace http {
471
482
>> +(alnum|space|punct)
472
483
>> lit (" \r\n " )
473
484
)
474
- , headers
485
+ , container
475
486
);
476
487
}
477
488
@@ -488,13 +499,17 @@ namespace boost { namespace network { namespace http {
488
499
489
500
template <class Callback >
490
501
void write_first_line (Callback callback) {
502
+ lock_guard lock (headers_mutex);
503
+ if (first_line_in_progress) return ;
504
+ first_line_in_progress = true ;
505
+
491
506
std::vector<asio::const_buffer> buffers;
492
507
typedef constants<Tag> consts;
493
508
typename ostringstream<Tag>::type first_line_stream;
494
509
first_line_stream
495
510
<< consts::http_slash () << 1 << consts::dot () << 1 << consts::space ()
496
511
<< status << consts::space () << status_message (status)
497
- << consts::space ()
512
+ << consts::crlf ()
498
513
;
499
514
std::string first_line = first_line_stream.str ();
500
515
buffers.push_back (asio::buffer (first_line));
@@ -505,6 +520,9 @@ namespace boost { namespace network { namespace http {
505
520
}
506
521
507
522
void write_headers_only (boost::function<void ()> callback) {
523
+ if (headers_in_progress) return ;
524
+ headers_in_progress = true ;
525
+
508
526
write_first_line (
509
527
strand.wrap (
510
528
boost::bind (
@@ -518,6 +536,7 @@ namespace boost { namespace network { namespace http {
518
536
void handle_first_line_written (boost::function<void ()> callback, boost::system::error_code const & ec, std::size_t bytes_transferred) {
519
537
lock_guard lock (headers_mutex);
520
538
if (!ec) {
539
+ first_line_already_sent = true ;
521
540
asio::async_write (
522
541
socket ()
523
542
, headers_buffer
@@ -538,7 +557,13 @@ namespace boost { namespace network { namespace http {
538
557
if (!ec) {
539
558
headers_buffer.consume (headers_buffer.size ());
540
559
headers_already_sent = true ;
541
- callback ();
560
+ thread_pool ().post (callback);
561
+ pending_actions_list::iterator start = pending_actions.begin ()
562
+ , end = pending_actions.end ();
563
+ while (start != end) {
564
+ thread_pool ().post (*start++);
565
+ }
566
+ pending_actions_list ().swap (pending_actions);
542
567
} else {
543
568
error_encountered = in_place<boost::system::system_error>(ec);
544
569
}
@@ -557,17 +582,25 @@ namespace boost { namespace network { namespace http {
557
582
558
583
template <class Range >
559
584
void write_impl (Range range, boost::function<void (boost::system::error_code)> callback) {
560
- if (!headers_already_sent) {
561
- boost::function<void (boost::system::error_code)> callback_function =
562
- callback;
585
+ lock_guard lock (headers_mutex);
586
+ boost::function<void (boost::system::error_code)> callback_function =
587
+ callback;
563
588
589
+ if (!headers_already_sent && !headers_in_progress) {
564
590
write_headers_only (
565
591
boost::bind (
566
592
&async_connection<Tag,Handler>::continue_write<Range>
567
593
, async_connection<Tag,Handler>::shared_from_this ()
568
594
, range, callback_function
569
595
));
570
596
return ;
597
+ } else if (headers_in_progress && !headers_already_sent) {
598
+ pending_actions.push_back (
599
+ boost::bind (
600
+ &async_connection<Tag,Handler>::continue_write<Range>
601
+ , async_connection<Tag,Handler>::shared_from_this ()
602
+ , range, callback_function));
603
+ return ;
571
604
}
572
605
573
606
// linearize the whole range into a vector
@@ -583,14 +616,15 @@ namespace boost { namespace network { namespace http {
583
616
// on doing I/O.
584
617
//
585
618
586
- static std::size_t const connection_buffer_size = BOOST_NETWORK_HTTP_SERVER_CONNECTION_BUFFER_SIZE;
619
+ static std::size_t const connection_buffer_size =
620
+ BOOST_NETWORK_HTTP_SERVER_CONNECTION_BUFFER_SIZE;
587
621
shared_array_list temporaries =
588
622
boost::make_shared<array_list>();
589
623
shared_buffers buffers =
590
624
boost::make_shared<std::vector<asio::const_buffer> >(0 );
591
625
592
626
std::size_t range_size = boost::distance (range);
593
- buffers->resize (
627
+ buffers->reserve (
594
628
(range_size / connection_buffer_size)
595
629
+ ((range_size % connection_buffer_size)?1 :0 )
596
630
);
0 commit comments