Skip to content

Commit bf9aac3

Browse files
committed
Lorem Ipsum 10k Testing
This commit implements a real asynchronous HTTP/1.1 server. It still doesn't support pipelined requests, but the connection framework is there to be usable. Locally, this implementation has been benchmarked to be able to handle 2800 requests per second, with Apache Bench (ab) with the following parameters: ab -n 100000 -c 100 http://localhost:8000/ This was tested on a Linux machine, while the binary produced was built with clang 2.9 (trunk).
1 parent b7dc852 commit bf9aac3

File tree

2 files changed

+63
-26
lines changed

2 files changed

+63
-26
lines changed

boost/network/protocol/http/server/async_connection.hpp

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <boost/optional.hpp>
2525
#include <boost/utility/typed_in_place_factory.hpp>
2626
#include <boost/thread/locks.hpp>
27+
#include <boost/thread/recursive_mutex.hpp>
2728
#include <list>
2829
#include <vector>
2930
#include <iterator>
@@ -127,11 +128,20 @@ namespace boost { namespace network { namespace http {
127128
, handler(handler)
128129
, thread_pool_(thread_pool)
129130
, headers_already_sent(false)
131+
, first_line_already_sent(false)
132+
, headers_in_progress(false)
133+
, first_line_in_progress(false)
130134
, headers_buffer(BOOST_NETWORK_HTTP_SERVER_CONNECTION_HEADER_BUFFER_MAX_SIZE)
131135
{
132136
new_start = read_buffer_.begin();
133137
}
134138

139+
~async_connection() throw () {
140+
boost::system::error_code ignored;
141+
socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ignored);
142+
socket_.close(ignored);
143+
}
144+
135145
/** Function: template <class Range> set_headers(Range headers)
136146
* Precondition: headers have not been sent yet
137147
* Postcondition: headers have been linearized to a buffer,
@@ -145,16 +155,11 @@ namespace boost { namespace network { namespace http {
145155
template <class Range>
146156
void set_headers(Range headers) {
147157
lock_guard lock(headers_mutex);
158+
if (first_line_in_progress || headers_in_progress || headers_already_sent)
159+
boost::throw_exception(std::logic_error("Headers have already been sent."));
148160

149-
if (headers_already_sent) boost::throw_exception(std::logic_error("Headers have already been sent."));
150-
151-
if (error_encountered) boost::throw_exception(boost::system::system_error(*error_encountered));
152-
153-
bool commit = false;
154-
BOOST_SCOPE_EXIT_TPL((&commit)(&headers_already_sent)) {
155-
if (!commit) headers_already_sent = false;
156-
else headers_already_sent = true;
157-
} BOOST_SCOPE_EXIT_END
161+
if (error_encountered)
162+
boost::throw_exception(boost::system::system_error(*error_encountered));
158163

159164
typedef constants<Tag> consts;
160165
headers_buffer.consume(headers_buffer.size());
@@ -169,14 +174,13 @@ namespace boost { namespace network { namespace http {
169174
stream << consts::crlf();
170175
}
171176
stream << consts::crlf();
177+
stream.flush();
172178

173179
write_headers_only(
174180
boost::bind(
175181
&async_connection<Tag,Handler>::do_nothing
176182
, async_connection<Tag,Handler>::shared_from_this()
177183
));
178-
179-
commit = true;
180184
}
181185

182186
void set_status(status_t new_status) {
@@ -189,6 +193,7 @@ namespace boost { namespace network { namespace http {
189193

190194
template <class Range>
191195
void write(Range const & range) {
196+
lock_guard lock(headers_mutex);
192197
if (error_encountered) boost::throw_exception(boost::system::system_error(*error_encountered));
193198

194199
boost::function<void(boost::system::error_code)> f =
@@ -205,6 +210,7 @@ namespace boost { namespace network { namespace http {
205210

206211
template <class Range, class Callback>
207212
void write(Range const & range, Callback const & callback) {
213+
lock_guard lock(headers_mutex);
208214
if (error_encountered) boost::throw_exception(boost::system::system_error(*error_encountered));
209215
boost::function<void(boost::system::error_code)> f = callback;
210216
write_impl(boost::make_iterator_range(range), callback);
@@ -256,23 +262,25 @@ namespace boost { namespace network { namespace http {
256262
typedef boost::shared_ptr<array_list> shared_array_list;
257263
typedef boost::shared_ptr<std::vector<asio::const_buffer> > shared_buffers;
258264
typedef request_parser<Tag> request_parser_type;
259-
typedef boost::lock_guard<boost::mutex> lock_guard;
265+
typedef boost::lock_guard<boost::recursive_mutex> lock_guard;
266+
typedef std::list<boost::function<void()> > pending_actions_list;
260267

261268
asio::ip::tcp::socket socket_;
262269
asio::io_service::strand strand;
263270
Handler & handler;
264271
utils::thread_pool & thread_pool_;
265-
bool headers_already_sent;
272+
volatile bool headers_already_sent, first_line_already_sent, headers_in_progress, first_line_in_progress;
266273
asio::streambuf headers_buffer;
267274

268-
boost::mutex headers_mutex;
275+
boost::recursive_mutex headers_mutex;
269276
buffer_type read_buffer_;
270277
status_t status;
271278
request_parser_type parser;
272279
request request_;
273280
buffer_type::iterator new_start;
274281
string_type partial_parsed;
275282
optional<boost::system::system_error> error_encountered;
283+
pending_actions_list pending_actions;
276284

277285
template <class, class> friend struct async_server_base;
278286

@@ -281,6 +289,10 @@ namespace boost { namespace network { namespace http {
281289
};
282290

283291
void start() {
292+
typename ostringstream<Tag>::type ip_stream;
293+
ip_stream << socket_.remote_endpoint().address().to_v4().to_string() << ':'
294+
<< socket_.remote_endpoint().port();
295+
request_.source = ip_stream.str();
284296
read_more(method);
285297
}
286298

@@ -462,7 +474,6 @@ namespace boost { namespace network { namespace http {
462474

463475
void parse_headers(string_type & input, typename request::headers_container_type & container) {
464476
using namespace boost::spirit::qi;
465-
std::vector<fusion::tuple<std::string,std::string> > headers;
466477
parse(
467478
input.begin(), input.end(),
468479
*(
@@ -471,7 +482,7 @@ namespace boost { namespace network { namespace http {
471482
>> +(alnum|space|punct)
472483
>> lit("\r\n")
473484
)
474-
, headers
485+
, container
475486
);
476487
}
477488

@@ -488,13 +499,17 @@ namespace boost { namespace network { namespace http {
488499

489500
template <class Callback>
490501
void write_first_line(Callback callback) {
502+
lock_guard lock(headers_mutex);
503+
if (first_line_in_progress) return;
504+
first_line_in_progress = true;
505+
491506
std::vector<asio::const_buffer> buffers;
492507
typedef constants<Tag> consts;
493508
typename ostringstream<Tag>::type first_line_stream;
494509
first_line_stream
495510
<< consts::http_slash() << 1<< consts::dot() << 1 << consts::space()
496511
<< status << consts::space() << status_message(status)
497-
<< consts::space()
512+
<< consts::crlf()
498513
;
499514
std::string first_line = first_line_stream.str();
500515
buffers.push_back(asio::buffer(first_line));
@@ -505,6 +520,9 @@ namespace boost { namespace network { namespace http {
505520
}
506521

507522
void write_headers_only(boost::function<void()> callback) {
523+
if (headers_in_progress) return;
524+
headers_in_progress = true;
525+
508526
write_first_line(
509527
strand.wrap(
510528
boost::bind(
@@ -518,6 +536,7 @@ namespace boost { namespace network { namespace http {
518536
void handle_first_line_written(boost::function<void()> callback, boost::system::error_code const & ec, std::size_t bytes_transferred) {
519537
lock_guard lock(headers_mutex);
520538
if (!ec) {
539+
first_line_already_sent = true;
521540
asio::async_write(
522541
socket()
523542
, headers_buffer
@@ -538,7 +557,13 @@ namespace boost { namespace network { namespace http {
538557
if (!ec) {
539558
headers_buffer.consume(headers_buffer.size());
540559
headers_already_sent = true;
541-
callback();
560+
thread_pool().post(callback);
561+
pending_actions_list::iterator start = pending_actions.begin()
562+
, end = pending_actions.end();
563+
while (start != end) {
564+
thread_pool().post(*start++);
565+
}
566+
pending_actions_list().swap(pending_actions);
542567
} else {
543568
error_encountered = in_place<boost::system::system_error>(ec);
544569
}
@@ -557,17 +582,25 @@ namespace boost { namespace network { namespace http {
557582

558583
template <class Range>
559584
void write_impl(Range range, boost::function<void(boost::system::error_code)> callback) {
560-
if (!headers_already_sent) {
561-
boost::function<void(boost::system::error_code)> callback_function =
562-
callback;
585+
lock_guard lock(headers_mutex);
586+
boost::function<void(boost::system::error_code)> callback_function =
587+
callback;
563588

589+
if (!headers_already_sent && !headers_in_progress) {
564590
write_headers_only(
565591
boost::bind(
566592
&async_connection<Tag,Handler>::continue_write<Range>
567593
, async_connection<Tag,Handler>::shared_from_this()
568594
, range, callback_function
569595
));
570596
return;
597+
} else if (headers_in_progress && !headers_already_sent) {
598+
pending_actions.push_back(
599+
boost::bind(
600+
&async_connection<Tag,Handler>::continue_write<Range>
601+
, async_connection<Tag,Handler>::shared_from_this()
602+
, range, callback_function));
603+
return;
571604
}
572605

573606
// linearize the whole range into a vector
@@ -583,14 +616,15 @@ namespace boost { namespace network { namespace http {
583616
// on doing I/O.
584617
//
585618

586-
static std::size_t const connection_buffer_size = BOOST_NETWORK_HTTP_SERVER_CONNECTION_BUFFER_SIZE;
619+
static std::size_t const connection_buffer_size =
620+
BOOST_NETWORK_HTTP_SERVER_CONNECTION_BUFFER_SIZE;
587621
shared_array_list temporaries =
588622
boost::make_shared<array_list>();
589623
shared_buffers buffers =
590624
boost::make_shared<std::vector<asio::const_buffer> >(0);
591625

592626
std::size_t range_size = boost::distance(range);
593-
buffers->resize(
627+
buffers->reserve(
594628
(range_size / connection_buffer_size)
595629
+ ((range_size % connection_buffer_size)?1:0)
596630
);

0 commit comments

Comments
 (0)