21
21
#include < boost/network/protocol/http/server/request_parser.hpp>
22
22
#include < boost/range/iterator_range.hpp>
23
23
#include < boost/spirit/include/qi.hpp>
24
+ #include < boost/optional.hpp>
25
+ #include < boost/utility/typed_in_place_factory.hpp>
26
+ #include < boost/thread/locks.hpp>
24
27
#include < list>
25
28
#include < vector>
26
29
#include < iterator>
@@ -64,6 +67,55 @@ namespace boost { namespace network { namespace http {
64
67
65
68
typedef typename string<Tag>::type string_type;
66
69
typedef basic_request<Tag> request;
70
+ typedef shared_ptr<async_connection> connection_ptr;
71
+
72
+ private:
73
+ static char const * status_message (status_t status) {
74
+ static char const
75
+ ok_[] = " OK"
76
+ , created_[] = " Created"
77
+ , accepted_[] = " Accepted"
78
+ , no_content_[] = " No Content"
79
+ , multiple_choices_[] = " Multiple Choices"
80
+ , moved_permanently_[] = " Moved Permanently"
81
+ , moved_temporarily_[] = " Moved Temporarily"
82
+ , not_modified_[] = " Not Modified"
83
+ , bad_request_[] = " Bad Request"
84
+ , unauthorized_[] = " Unauthorized"
85
+ , forbidden_[] = " Fobidden"
86
+ , not_found_[] = " Not Found"
87
+ , not_supported_[] = " Not Supported"
88
+ , not_acceptable_[] = " Not Acceptable"
89
+ , internal_server_error_[] = " Internal Server Error"
90
+ , not_implemented_[] = " Not Implemented"
91
+ , bad_gateway_[] = " Bad Gateway"
92
+ , service_unavailable_[] = " Service Unavailable"
93
+ , unknown_[] = " Unknown"
94
+ ;
95
+ switch (status) {
96
+ case ok: return ok_;
97
+ case created: return created_;
98
+ case accepted: return accepted_;
99
+ case no_content: return no_content_;
100
+ case multiple_choices: return multiple_choices_;
101
+ case moved_permanently: return moved_permanently_;
102
+ case moved_temporarily: return moved_temporarily_;
103
+ case not_modified: return not_modified_;
104
+ case bad_request: return bad_request_;
105
+ case unauthorized: return unauthorized_;
106
+ case forbidden: return forbidden_;
107
+ case not_found: return not_found_;
108
+ case not_supported: return not_supported_;
109
+ case not_acceptable: return not_acceptable_;
110
+ case internal_server_error: return internal_server_error_;
111
+ case not_implemented: return not_implemented_;
112
+ case bad_gateway: return bad_gateway_;
113
+ case service_unavailable: return service_unavailable_;
114
+ default : return unknown_;
115
+ }
116
+ }
117
+
118
+ public:
67
119
68
120
async_connection (
69
121
asio::io_service & io_service
@@ -82,21 +134,26 @@ namespace boost { namespace network { namespace http {
82
134
83
135
/* * Function: template <class Range> set_headers(Range headers)
84
136
* Precondition: headers have not been sent yet
85
- * Postcondition: headers have been linearized to a buffer.
137
+ * Postcondition: headers have been linearized to a buffer,
138
+ * and assumed to have been sent already when the function exits
86
139
* Throws: std::logic_error in case the headers have already been sent.
87
140
*
88
141
* A call to set_headers takes a Range where each element models the
89
142
* Header concept. This Range will be linearized onto a buffer, which is
90
143
* then sent as soon as the first call to `write` or `flush` commences.
91
144
*/
92
145
template <class Range >
93
- void set_headers (Range headers, bool immediate = true ) {
94
- if (headers_already_sent)
95
- boost::throw_exception (std::logic_error (" Headers have already been sent." ));
146
+ void set_headers (Range headers) {
147
+ lock_guard lock (headers_mutex);
148
+
149
+ if (headers_already_sent) boost::throw_exception (std::logic_error (" Headers have already been sent." ));
150
+
151
+ if (error_encountered) boost::throw_exception (boost::system::system_error (*error_encountered));
96
152
97
153
bool commit = false ;
98
154
BOOST_SCOPE_EXIT_TPL ((&commit)(&headers_already_sent)) {
99
155
if (!commit) headers_already_sent = false ;
156
+ else headers_already_sent = true ;
100
157
} BOOST_SCOPE_EXIT_END
101
158
102
159
typedef constants<Tag> consts;
@@ -113,50 +170,94 @@ namespace boost { namespace network { namespace http {
113
170
stream << consts::crlf ();
114
171
}
115
172
stream << consts::crlf ();
116
- if (immediate) write_headers_only ();
173
+
174
+ write_headers_only (
175
+ boost::bind (
176
+ &async_connection<Tag,Handler>::do_nothing
177
+ , async_connection<Tag,Handler>::shared_from_this ()
178
+ ));
117
179
118
180
commit = true ;
119
181
}
120
182
121
183
void set_status (status_t new_status) {
184
+ lock_guard lock (headers_mutex);
185
+ if (headers_already_sent) boost::throw_exception (std::logic_error (" Headers have already been sent, cannot reset status." ));
186
+ if (error_encountered) boost::throw_exception (boost::system::system_error (*error_encountered));
187
+
122
188
status = new_status;
123
189
}
124
190
125
191
template <class Range >
126
192
void write (Range const & range) {
127
- write_impl (
128
- boost::make_iterator_range (range)
129
- , boost::bind (
193
+ if (error_encountered) boost::throw_exception (boost::system::system_error (*error_encountered));
194
+
195
+ boost::function<void (boost::system::error_code)> f =
196
+ boost::bind (
130
197
&async_connection<Tag,Handler>::default_error
131
198
, async_connection<Tag,Handler>::shared_from_this ()
132
- , _1
133
- )
199
+ , _1);
200
+
201
+ write_impl (
202
+ boost::make_iterator_range (range)
203
+ , f
134
204
);
135
205
}
136
206
137
207
template <class Range , class Callback >
138
208
void write (Range const & range, Callback const & callback) {
139
- write_impl (
140
- boost::make_iterator_range (range)
141
- , callback
142
- );
209
+ if (error_encountered) boost::throw_exception (boost::system::system_error (*error_encountered));
210
+ boost::function<void (boost::system::error_code)> f = callback;
211
+ write_impl (boost::make_iterator_range (range), callback);
212
+ }
213
+
214
+ private:
215
+ typedef boost::array<char , BOOST_NETWORK_HTTP_SERVER_CONNECTION_BUFFER_SIZE> buffer_type;
216
+
217
+ public:
218
+ typedef iterator_range<buffer_type::const_iterator> input_range;
219
+ typedef boost::function<void (input_range, boost::system::error_code, std::size_t , connection_ptr)> read_callback_function;
220
+
221
+ void read (read_callback_function callback) {
222
+ if (error_encountered) boost::throw_exception (boost::system::system_error (*error_encountered));
223
+ socket ().async_read_some (
224
+ asio::buffer (read_buffer_)
225
+ , strand.wrap (
226
+ boost::bind (
227
+ &async_connection<Tag,Handler>::wrap_read_handler
228
+ , async_connection<Tag,Handler>::shared_from_this ()
229
+ , callback
230
+ , asio::placeholders::error, asio::placeholders::bytes_transferred)));
143
231
}
144
232
145
233
asio::ip::tcp::socket & socket () { return socket_; }
146
234
utils::thread_pool & thread_pool () { return thread_pool_; }
235
+ bool has_error () { return (!!error_encountered); }
236
+ optional<boost::system::system_error> error ()
237
+ { return error_encountered; }
147
238
148
239
private:
149
240
241
+ void wrap_read_handler (read_callback_function callback, boost::system::error_code const & ec, std::size_t bytes_transferred) {
242
+ if (ec) error_encountered = in_place<boost::system::system_error>(ec);
243
+ thread_pool ().post (
244
+ boost::bind (
245
+ callback
246
+ , ec
247
+ , bytes_transferred
248
+ , async_connection<Tag,Handler>::shared_from_this ()));
249
+ }
250
+
150
251
void default_error (boost::system::error_code const & ec) {
151
- // TODO implement a sane default here, for now ignore the error
252
+ error_encountered = in_place<boost::system::system_error>(ec);
152
253
}
153
254
154
- typedef boost::array<char , BOOST_NETWORK_HTTP_SERVER_CONNECTION_BUFFER_SIZE> buffer_type;
155
255
typedef boost::array<char , BOOST_NETWORK_HTTP_SERVER_CONNECTION_BUFFER_SIZE> array;
156
256
typedef std::list<shared_ptr<array> > array_list;
157
257
typedef boost::shared_ptr<array_list> shared_array_list;
158
258
typedef boost::shared_ptr<std::vector<asio::const_buffer> > shared_buffers;
159
259
typedef request_parser<Tag> request_parser_type;
260
+ typedef boost::lock_guard<boost::mutex> lock_guard;
160
261
161
262
asio::ip::tcp::socket socket_;
162
263
asio::io_service::strand strand;
@@ -165,12 +266,14 @@ namespace boost { namespace network { namespace http {
165
266
bool headers_already_sent;
166
267
asio::streambuf headers_buffer;
167
268
269
+ boost::mutex headers_mutex;
168
270
buffer_type read_buffer_;
169
- boost:: uint16_t status;
271
+ status_t status;
170
272
request_parser_type parser;
171
273
request request_;
172
274
buffer_type::iterator new_start;
173
275
string_type partial_parsed;
276
+ optional<boost::system::system_error> error_encountered;
174
277
175
278
template <class , class > friend struct async_server_base ;
176
279
@@ -317,12 +420,45 @@ namespace boost { namespace network { namespace http {
317
420
BOOST_ASSERT (false && " This is a bug, report to the cpp-netlib devel mailing list!" );
318
421
std::abort ();
319
422
}
423
+ } else {
424
+ error_encountered = in_place<boost::system::system_error>(ec);
320
425
}
321
- // TODO log the error
322
426
}
323
427
324
428
void client_error () {
325
- // FIXME write out a client request error
429
+ status = bad_request;
430
+ write_first_line (
431
+ strand.wrap (
432
+ boost::bind (
433
+ &async_connection<Tag,Handler>::client_error_first_line_written
434
+ , async_connection<Tag,Handler>::shared_from_this ()
435
+ , asio::placeholders::error
436
+ , asio::placeholders::bytes_transferred)));
437
+ }
438
+
439
+ void client_error_first_line_written (boost::system::error_code const & ec, std::size_t bytes_transferred) {
440
+ static char const * bad_request =
441
+ " HTTP/1.0 400 Bad Request\r\n Connection: close\r\n Content-Type: text/plain\r\n Content-Length: 12\r\n\r\n Bad Request." ;
442
+
443
+ asio::async_write (
444
+ socket ()
445
+ , asio::buffer (bad_request, 115 )
446
+ , strand.wrap (
447
+ boost::bind (
448
+ &async_connection<Tag,Handler>::client_error_sent
449
+ , async_connection<Tag,Handler>::shared_from_this ()
450
+ , asio::placeholders::error
451
+ , asio::placeholders::bytes_transferred)));
452
+ }
453
+
454
+ void client_error_sent (boost::system::error_code const & ec, std::size_t bytes_transferred) {
455
+ if (!ec) {
456
+ boost::system::error_code ignored;
457
+ socket ().shutdown (asio::ip::tcp::socket::shutdown_both, ignored);
458
+ socket ().close (ignored);
459
+ } else {
460
+ error_encountered = in_place<boost::system::system_error>(ec);
461
+ }
326
462
}
327
463
328
464
void parse_headers (string_type & input, typename request::headers_container_type & container) {
@@ -339,38 +475,99 @@ namespace boost { namespace network { namespace http {
339
475
, headers
340
476
);
341
477
}
342
- template <class Range , class Callback >
343
- void write_headers (Range range, Callback callback) {
344
- // TODO send out the headers, then once that's done
345
- // call the write again on the range and callback
478
+
479
+ void do_nothing () {}
480
+
481
+ template <class Range >
482
+ void continue_write (Range range, boost::function<void (boost::system::error_code)> callback) {
483
+ thread_pool ().post (
484
+ boost::bind (
485
+ &async_connection<Tag,Handler>::write_impl<Range>
486
+ , async_connection<Tag,Handler>::shared_from_this ()
487
+ , range, callback));
346
488
}
347
489
348
- void write_headers_only () {
490
+ template <class Callback >
491
+ void write_first_line (Callback callback) {
492
+ std::vector<asio::const_buffer> buffers;
493
+ typedef constants<Tag> consts;
494
+ typename ostringstream<Tag>::type first_line_stream;
495
+ first_line_stream
496
+ << consts::http_slash () << 1 << consts::dot () << 1 << consts::space ()
497
+ << status << consts::space () << status_message (status)
498
+ << consts::space ()
499
+ ;
500
+ std::string first_line = first_line_stream.str ();
501
+ buffers.push_back (asio::buffer (first_line));
502
+ asio::async_write (
503
+ socket ()
504
+ , buffers
505
+ , callback);
349
506
}
350
507
351
- void handle_write_headers (boost::system::error_code const & ec) {
352
- if (ec) {
353
- // TODO signal somehow that there was an error so that subsequent
354
- // calls to write would throw an exception
355
- return ;
508
+ void write_headers_only (boost::function<void ()> callback) {
509
+ write_first_line (
510
+ strand.wrap (
511
+ boost::bind (
512
+ &async_connection<Tag,Handler>::handle_first_line_written
513
+ , async_connection<Tag,Handler>::shared_from_this ()
514
+ , callback
515
+ , asio::placeholders::error
516
+ , asio::placeholders::bytes_transferred)));
517
+ }
518
+
519
+ void handle_first_line_written (boost::function<void ()> callback, boost::system::error_code const & ec, std::size_t bytes_transferred) {
520
+ lock_guard lock (headers_mutex);
521
+ if (!ec) {
522
+ asio::async_write (
523
+ socket ()
524
+ , headers_buffer
525
+ , strand.wrap (
526
+ boost::bind (
527
+ &async_connection<Tag,Handler>::handle_write_headers
528
+ , async_connection<Tag,Handler>::shared_from_this ()
529
+ , callback
530
+ , asio::placeholders::error
531
+ , asio::placeholders::bytes_transferred)));
532
+ } else {
533
+ error_encountered = in_place<boost::system::system_error>(ec);
534
+ }
535
+ }
536
+
537
+ void handle_write_headers (boost::function<void ()> callback, boost::system::error_code const & ec, std::size_t bytes_transferred) {
538
+ lock_guard lock (headers_mutex);
539
+ if (!ec) {
540
+ headers_buffer.consume (headers_buffer.size ());
541
+ headers_already_sent = true ;
542
+ callback ();
543
+ } else {
544
+ error_encountered = in_place<boost::system::system_error>(ec);
356
545
}
357
- headers_already_sent = true ;
358
546
}
359
547
360
548
void handle_write (
361
549
boost::function<void (boost::system::error_code const &)> callback
362
550
, shared_array_list temporaries
363
551
, shared_buffers buffers
364
552
, boost::system::error_code const & ec
553
+ , std::size_t bytes_transferred
365
554
) {
366
555
// we want to forget the temporaries and buffers
367
556
thread_pool ().post (boost::bind (callback, ec));
368
557
}
369
558
370
- template <class Range , class Callback >
371
- void write_impl (Range range, Callback callback) {
559
+ template <class Range >
560
+ void write_impl (Range range, boost::function< void (boost::system::error_code)> callback) {
372
561
if (!headers_already_sent) {
373
- write_headers (range, callback);
562
+ boost::function<void (boost::system::error_code)> callback_function =
563
+ callback;
564
+
565
+ write_headers_only (
566
+ boost::bind (
567
+ &async_connection<Tag,Handler>::continue_write<Range>
568
+ , async_connection<Tag,Handler>::shared_from_this ()
569
+ , range, callback_function
570
+ ));
374
571
return ;
375
572
}
376
573
@@ -431,6 +628,7 @@ namespace boost { namespace network { namespace http {
431
628
, temporaries
432
629
, buffers // keep these alive until the handler is called!
433
630
, boost::asio::placeholders::error
631
+ , boost::asio::placeholders::bytes_transferred
434
632
)
435
633
)
436
634
);
0 commit comments