Skip to content

Commit f1deeda

Browse files
committed
Completes (Untested) Asynchronous Connection
This commit actually completes the asynchronous connection implementation. What remains to be done are: - Complete the tests, make sure the server doesn't fail due to some serious asynchoronous handling issues. - Write a static file server that uses HTTP/1.1 streaming In this commit the whole system builds, but there needs to be a check on whether the thing actually works. In this commit are: * A means for reading and scheduling an asynchronous read handler. Documentation is required on the API of the read handler. * A fully asynchronous event handling and thread-pool dispatching of user supplied handlers. Expect a lot more changes starting this commit so hold your hats folks it's going to be a bumpy ride.
1 parent 89f73f6 commit f1deeda

File tree

1 file changed

+231
-33
lines changed

1 file changed

+231
-33
lines changed

boost/network/protocol/http/server/async_connection.hpp

Lines changed: 231 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
#include <boost/network/protocol/http/server/request_parser.hpp>
2222
#include <boost/range/iterator_range.hpp>
2323
#include <boost/spirit/include/qi.hpp>
24+
#include <boost/optional.hpp>
25+
#include <boost/utility/typed_in_place_factory.hpp>
26+
#include <boost/thread/locks.hpp>
2427
#include <list>
2528
#include <vector>
2629
#include <iterator>
@@ -64,6 +67,55 @@ namespace boost { namespace network { namespace http {
6467

6568
typedef typename string<Tag>::type string_type;
6669
typedef basic_request<Tag> request;
70+
typedef shared_ptr<async_connection> connection_ptr;
71+
72+
private:
73+
static char const * status_message(status_t status) {
74+
static char const
75+
ok_[] = "OK"
76+
, created_[] = "Created"
77+
, accepted_[] = "Accepted"
78+
, no_content_[] = "No Content"
79+
, multiple_choices_[] = "Multiple Choices"
80+
, moved_permanently_[] = "Moved Permanently"
81+
, moved_temporarily_[] = "Moved Temporarily"
82+
, not_modified_[] = "Not Modified"
83+
, bad_request_[] = "Bad Request"
84+
, unauthorized_[] = "Unauthorized"
85+
, forbidden_[] = "Fobidden"
86+
, not_found_[] = "Not Found"
87+
, not_supported_[] = "Not Supported"
88+
, not_acceptable_[] = "Not Acceptable"
89+
, internal_server_error_[] = "Internal Server Error"
90+
, not_implemented_[] = "Not Implemented"
91+
, bad_gateway_[] = "Bad Gateway"
92+
, service_unavailable_[] = "Service Unavailable"
93+
, unknown_[] = "Unknown"
94+
;
95+
switch(status) {
96+
case ok: return ok_;
97+
case created: return created_;
98+
case accepted: return accepted_;
99+
case no_content: return no_content_;
100+
case multiple_choices: return multiple_choices_;
101+
case moved_permanently: return moved_permanently_;
102+
case moved_temporarily: return moved_temporarily_;
103+
case not_modified: return not_modified_;
104+
case bad_request: return bad_request_;
105+
case unauthorized: return unauthorized_;
106+
case forbidden: return forbidden_;
107+
case not_found: return not_found_;
108+
case not_supported: return not_supported_;
109+
case not_acceptable: return not_acceptable_;
110+
case internal_server_error: return internal_server_error_;
111+
case not_implemented: return not_implemented_;
112+
case bad_gateway: return bad_gateway_;
113+
case service_unavailable: return service_unavailable_;
114+
default: return unknown_;
115+
}
116+
}
117+
118+
public:
67119

68120
async_connection(
69121
asio::io_service & io_service
@@ -82,21 +134,26 @@ namespace boost { namespace network { namespace http {
82134

83135
/** Function: template <class Range> set_headers(Range headers)
84136
* Precondition: headers have not been sent yet
85-
* Postcondition: headers have been linearized to a buffer.
137+
* Postcondition: headers have been linearized to a buffer,
138+
* and assumed to have been sent already when the function exits
86139
* Throws: std::logic_error in case the headers have already been sent.
87140
*
88141
* A call to set_headers takes a Range where each element models the
89142
* Header concept. This Range will be linearized onto a buffer, which is
90143
* then sent as soon as the first call to `write` or `flush` commences.
91144
*/
92145
template <class Range>
93-
void set_headers(Range headers, bool immediate = true) {
94-
if (headers_already_sent)
95-
boost::throw_exception(std::logic_error("Headers have already been sent."));
146+
void set_headers(Range headers) {
147+
lock_guard lock(headers_mutex);
148+
149+
if (headers_already_sent) boost::throw_exception(std::logic_error("Headers have already been sent."));
150+
151+
if (error_encountered) boost::throw_exception(boost::system::system_error(*error_encountered));
96152

97153
bool commit = false;
98154
BOOST_SCOPE_EXIT_TPL((&commit)(&headers_already_sent)) {
99155
if (!commit) headers_already_sent = false;
156+
else headers_already_sent = true;
100157
} BOOST_SCOPE_EXIT_END
101158

102159
typedef constants<Tag> consts;
@@ -113,50 +170,94 @@ namespace boost { namespace network { namespace http {
113170
stream << consts::crlf();
114171
}
115172
stream << consts::crlf();
116-
if (immediate) write_headers_only();
173+
174+
write_headers_only(
175+
boost::bind(
176+
&async_connection<Tag,Handler>::do_nothing
177+
, async_connection<Tag,Handler>::shared_from_this()
178+
));
117179

118180
commit = true;
119181
}
120182

121183
void set_status(status_t new_status) {
184+
lock_guard lock(headers_mutex);
185+
if (headers_already_sent) boost::throw_exception(std::logic_error("Headers have already been sent, cannot reset status."));
186+
if (error_encountered) boost::throw_exception(boost::system::system_error(*error_encountered));
187+
122188
status = new_status;
123189
}
124190

125191
template <class Range>
126192
void write(Range const & range) {
127-
write_impl(
128-
boost::make_iterator_range(range)
129-
, boost::bind(
193+
if (error_encountered) boost::throw_exception(boost::system::system_error(*error_encountered));
194+
195+
boost::function<void(boost::system::error_code)> f =
196+
boost::bind(
130197
&async_connection<Tag,Handler>::default_error
131198
, async_connection<Tag,Handler>::shared_from_this()
132-
, _1
133-
)
199+
, _1);
200+
201+
write_impl(
202+
boost::make_iterator_range(range)
203+
, f
134204
);
135205
}
136206

137207
template <class Range, class Callback>
138208
void write(Range const & range, Callback const & callback) {
139-
write_impl(
140-
boost::make_iterator_range(range)
141-
, callback
142-
);
209+
if (error_encountered) boost::throw_exception(boost::system::system_error(*error_encountered));
210+
boost::function<void(boost::system::error_code)> f = callback;
211+
write_impl(boost::make_iterator_range(range), callback);
212+
}
213+
214+
private:
215+
typedef boost::array<char, BOOST_NETWORK_HTTP_SERVER_CONNECTION_BUFFER_SIZE> buffer_type;
216+
217+
public:
218+
typedef iterator_range<buffer_type::const_iterator> input_range;
219+
typedef boost::function<void(input_range, boost::system::error_code, std::size_t, connection_ptr)> read_callback_function;
220+
221+
void read(read_callback_function callback) {
222+
if (error_encountered) boost::throw_exception(boost::system::system_error(*error_encountered));
223+
socket().async_read_some(
224+
asio::buffer(read_buffer_)
225+
, strand.wrap(
226+
boost::bind(
227+
&async_connection<Tag,Handler>::wrap_read_handler
228+
, async_connection<Tag,Handler>::shared_from_this()
229+
, callback
230+
, asio::placeholders::error, asio::placeholders::bytes_transferred)));
143231
}
144232

145233
asio::ip::tcp::socket & socket() { return socket_; }
146234
utils::thread_pool & thread_pool() { return thread_pool_; }
235+
bool has_error() { return (!!error_encountered); }
236+
optional<boost::system::system_error> error()
237+
{ return error_encountered; }
147238

148239
private:
149240

241+
void wrap_read_handler(read_callback_function callback, boost::system::error_code const & ec, std::size_t bytes_transferred) {
242+
if (ec) error_encountered = in_place<boost::system::system_error>(ec);
243+
thread_pool().post(
244+
boost::bind(
245+
callback
246+
, ec
247+
, bytes_transferred
248+
, async_connection<Tag,Handler>::shared_from_this()));
249+
}
250+
150251
void default_error(boost::system::error_code const & ec) {
151-
// TODO implement a sane default here, for now ignore the error
252+
error_encountered = in_place<boost::system::system_error>(ec);
152253
}
153254

154-
typedef boost::array<char, BOOST_NETWORK_HTTP_SERVER_CONNECTION_BUFFER_SIZE> buffer_type;
155255
typedef boost::array<char, BOOST_NETWORK_HTTP_SERVER_CONNECTION_BUFFER_SIZE> array;
156256
typedef std::list<shared_ptr<array> > array_list;
157257
typedef boost::shared_ptr<array_list> shared_array_list;
158258
typedef boost::shared_ptr<std::vector<asio::const_buffer> > shared_buffers;
159259
typedef request_parser<Tag> request_parser_type;
260+
typedef boost::lock_guard<boost::mutex> lock_guard;
160261

161262
asio::ip::tcp::socket socket_;
162263
asio::io_service::strand strand;
@@ -165,12 +266,14 @@ namespace boost { namespace network { namespace http {
165266
bool headers_already_sent;
166267
asio::streambuf headers_buffer;
167268

269+
boost::mutex headers_mutex;
168270
buffer_type read_buffer_;
169-
boost::uint16_t status;
271+
status_t status;
170272
request_parser_type parser;
171273
request request_;
172274
buffer_type::iterator new_start;
173275
string_type partial_parsed;
276+
optional<boost::system::system_error> error_encountered;
174277

175278
template <class, class> friend struct async_server_base;
176279

@@ -317,12 +420,45 @@ namespace boost { namespace network { namespace http {
317420
BOOST_ASSERT(false && "This is a bug, report to the cpp-netlib devel mailing list!");
318421
std::abort();
319422
}
423+
} else {
424+
error_encountered = in_place<boost::system::system_error>(ec);
320425
}
321-
// TODO log the error
322426
}
323427

324428
void client_error() {
325-
//FIXME write out a client request error
429+
status = bad_request;
430+
write_first_line(
431+
strand.wrap(
432+
boost::bind(
433+
&async_connection<Tag,Handler>::client_error_first_line_written
434+
, async_connection<Tag,Handler>::shared_from_this()
435+
, asio::placeholders::error
436+
, asio::placeholders::bytes_transferred)));
437+
}
438+
439+
void client_error_first_line_written(boost::system::error_code const & ec, std::size_t bytes_transferred) {
440+
static char const * bad_request =
441+
"HTTP/1.0 400 Bad Request\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: 12\r\n\r\nBad Request.";
442+
443+
asio::async_write(
444+
socket()
445+
, asio::buffer(bad_request, 115)
446+
, strand.wrap(
447+
boost::bind(
448+
&async_connection<Tag,Handler>::client_error_sent
449+
, async_connection<Tag,Handler>::shared_from_this()
450+
, asio::placeholders::error
451+
, asio::placeholders::bytes_transferred)));
452+
}
453+
454+
void client_error_sent(boost::system::error_code const & ec, std::size_t bytes_transferred) {
455+
if (!ec) {
456+
boost::system::error_code ignored;
457+
socket().shutdown(asio::ip::tcp::socket::shutdown_both, ignored);
458+
socket().close(ignored);
459+
} else {
460+
error_encountered = in_place<boost::system::system_error>(ec);
461+
}
326462
}
327463

328464
void parse_headers(string_type & input, typename request::headers_container_type & container) {
@@ -339,38 +475,99 @@ namespace boost { namespace network { namespace http {
339475
, headers
340476
);
341477
}
342-
template <class Range, class Callback>
343-
void write_headers(Range range, Callback callback) {
344-
// TODO send out the headers, then once that's done
345-
// call the write again on the range and callback
478+
479+
void do_nothing() {}
480+
481+
template <class Range>
482+
void continue_write(Range range, boost::function<void(boost::system::error_code)> callback) {
483+
thread_pool().post(
484+
boost::bind(
485+
&async_connection<Tag,Handler>::write_impl<Range>
486+
, async_connection<Tag,Handler>::shared_from_this()
487+
, range, callback));
346488
}
347489

348-
void write_headers_only() {
490+
template <class Callback>
491+
void write_first_line(Callback callback) {
492+
std::vector<asio::const_buffer> buffers;
493+
typedef constants<Tag> consts;
494+
typename ostringstream<Tag>::type first_line_stream;
495+
first_line_stream
496+
<< consts::http_slash() << 1<< consts::dot() << 1 << consts::space()
497+
<< status << consts::space() << status_message(status)
498+
<< consts::space()
499+
;
500+
std::string first_line = first_line_stream.str();
501+
buffers.push_back(asio::buffer(first_line));
502+
asio::async_write(
503+
socket()
504+
, buffers
505+
, callback);
349506
}
350507

351-
void handle_write_headers(boost::system::error_code const & ec) {
352-
if (ec) {
353-
// TODO signal somehow that there was an error so that subsequent
354-
// calls to write would throw an exception
355-
return;
508+
void write_headers_only(boost::function<void()> callback) {
509+
write_first_line(
510+
strand.wrap(
511+
boost::bind(
512+
&async_connection<Tag,Handler>::handle_first_line_written
513+
, async_connection<Tag,Handler>::shared_from_this()
514+
, callback
515+
, asio::placeholders::error
516+
, asio::placeholders::bytes_transferred)));
517+
}
518+
519+
void handle_first_line_written(boost::function<void()> callback, boost::system::error_code const & ec, std::size_t bytes_transferred) {
520+
lock_guard lock(headers_mutex);
521+
if (!ec) {
522+
asio::async_write(
523+
socket()
524+
, headers_buffer
525+
, strand.wrap(
526+
boost::bind(
527+
&async_connection<Tag,Handler>::handle_write_headers
528+
, async_connection<Tag,Handler>::shared_from_this()
529+
, callback
530+
, asio::placeholders::error
531+
, asio::placeholders::bytes_transferred)));
532+
} else {
533+
error_encountered = in_place<boost::system::system_error>(ec);
534+
}
535+
}
536+
537+
void handle_write_headers(boost::function<void()> callback, boost::system::error_code const & ec, std::size_t bytes_transferred) {
538+
lock_guard lock(headers_mutex);
539+
if (!ec) {
540+
headers_buffer.consume(headers_buffer.size());
541+
headers_already_sent = true;
542+
callback();
543+
} else {
544+
error_encountered = in_place<boost::system::system_error>(ec);
356545
}
357-
headers_already_sent = true;
358546
}
359547

360548
void handle_write(
361549
boost::function<void(boost::system::error_code const &)> callback
362550
, shared_array_list temporaries
363551
, shared_buffers buffers
364552
, boost::system::error_code const & ec
553+
, std::size_t bytes_transferred
365554
) {
366555
// we want to forget the temporaries and buffers
367556
thread_pool().post(boost::bind(callback, ec));
368557
}
369558

370-
template <class Range, class Callback>
371-
void write_impl(Range range, Callback callback) {
559+
template <class Range>
560+
void write_impl(Range range, boost::function<void(boost::system::error_code)> callback) {
372561
if (!headers_already_sent) {
373-
write_headers(range, callback);
562+
boost::function<void(boost::system::error_code)> callback_function =
563+
callback;
564+
565+
write_headers_only(
566+
boost::bind(
567+
&async_connection<Tag,Handler>::continue_write<Range>
568+
, async_connection<Tag,Handler>::shared_from_this()
569+
, range, callback_function
570+
));
374571
return;
375572
}
376573

@@ -431,6 +628,7 @@ namespace boost { namespace network { namespace http {
431628
, temporaries
432629
, buffers // keep these alive until the handler is called!
433630
, boost::asio::placeholders::error
631+
, boost::asio::placeholders::bytes_transferred
434632
)
435633
)
436634
);

0 commit comments

Comments
 (0)