Skip to content

Refactored HTTP client v2. #476

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Jan 2, 2015
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added some comments; fixed tests so that they compile for Boost.Optio…
…nal in 1.56.
  • Loading branch information
glynos committed Sep 20, 2014
commit ad273ec1cacf4c56f0a93957a32c34f948715a24
52 changes: 34 additions & 18 deletions http/src/http/v2/client/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ std::future<response> client::impl::execute(std::shared_ptr<request_context> con
context->request_.append_header("User-Agent", options_.user_agent());
}

// Get the host and port from the request and resolve
auto url = context->request_.url();
auto host = url.host()?
uri::string_type(std::begin(*url.host()), std::end(*url.host())) : uri::string_type();
Expand All @@ -164,23 +165,25 @@ void client::impl::connect(const boost::system::error_code &ec,
return;
}

// make a connection to an endpoint
auto host = context->request_.url().host();
tcp::endpoint endpoint(*endpoint_iterator);
context->connection_->async_connect(endpoint,
std::string(std::begin(*host), std::end(*host)),
strand_.wrap([=] (const boost::system::error_code &ec) {
if (ec && endpoint_iterator != tcp::resolver::iterator()) {
// copy iterator because it is const after the lambda
// capture
auto it = endpoint_iterator;
boost::system::error_code ignore;
connect(ignore, ++it, context);
return;
}

write_request(ec, context);
}));
}
std::string(std::begin(*host), std::end(*host)),
strand_.wrap([=] (const boost::system::error_code &ec) {
// If there is no connection, try again on another endpoint
if (ec && endpoint_iterator != tcp::resolver::iterator()) {
// copy iterator because it is const after the lambda
// capture
auto it = endpoint_iterator;
boost::system::error_code ignore;
connect(ignore, ++it, context);
return;
}

write_request(ec, context);
}));
}

void client::impl::write_request(const boost::system::error_code &ec,
std::shared_ptr<request_context> context) {
Expand All @@ -189,6 +192,7 @@ void client::impl::write_request(const boost::system::error_code &ec,
return;
}

// write the request to an I/O stream.
std::ostream request_stream(&context->request_buffer_);
request_stream << context->request_;
if (!request_stream) {
Expand All @@ -210,11 +214,13 @@ void client::impl::write_body(const boost::system::error_code &ec,
return;
}

// update progress
context->total_bytes_written_ += bytes_written;
if (auto progress = context->options_.progress()) {
progress(client_message::transfer_direction::bytes_written, context->total_bytes_written_);
}

// write the body to an I/O stream
std::ostream request_stream(&context->request_buffer_);
// TODO write payload to request_buffer_
if (!request_stream) {
Expand All @@ -236,11 +242,13 @@ void client::impl::read_response(const boost::system::error_code &ec,
return;
}

// update progress.
context->total_bytes_written_ += bytes_written;
if (auto progress = context->options_.progress()) {
progress(client_message::transfer_direction::bytes_written, context->total_bytes_written_);
}

// Create a response object and fill it with the status from the server.
std::shared_ptr<response> res(new response{});
context->connection_->async_read_until(context->response_buffer_,
"\r\n",
Expand All @@ -259,6 +267,7 @@ void client::impl::read_response_status(const boost::system::error_code &ec,
return;
}

// Update the reponse status.
std::istream is(&context->response_buffer_);
string_type version;
is >> version;
Expand All @@ -271,6 +280,7 @@ void client::impl::read_response_status(const boost::system::error_code &ec,
res->set_status(network::http::v2::status::code(status));
res->set_status_message(boost::trim_copy(message));

// Read the response headers.
context->connection_->async_read_until(context->response_buffer_,
"\r\n\r\n",
strand_.wrap([=] (const boost::system::error_code &ec,
Expand Down Expand Up @@ -299,6 +309,7 @@ void client::impl::read_response_headers(const boost::system::error_code &ec,
res->add_header(key, value);
}

// read the response body.
context->connection_->async_read(context->response_buffer_,
strand_.wrap([=] (const boost::system::error_code &ec,
std::size_t bytes_read) {
Expand All @@ -307,6 +318,8 @@ void client::impl::read_response_headers(const boost::system::error_code &ec,
}

namespace {
// I don't want to to delimit with newlines when using the input
// stream operator, so that's why I wrote this function.
std::istream &getline_with_newline(std::istream &is, std::string &line) {
line.clear();

Expand All @@ -332,11 +345,13 @@ void client::impl::read_response_body(const boost::system::error_code &ec,
std::size_t bytes_read,
std::shared_ptr<request_context> context,
std::shared_ptr<response> res) {
// update progress.
context->total_bytes_read_ += bytes_read;
if (auto progress = context->options_.progress()) {
progress(client_message::transfer_direction::bytes_read, context->total_bytes_read_);
}

// If there's no data else to read, then set the response and exit.
if (bytes_read == 0) {
context->response_promise_.set_value(*res);
return;
Expand All @@ -348,11 +363,12 @@ void client::impl::read_response_body(const boost::system::error_code &ec,
res->append_body(line);
}

// Keep reading the response body until we have nothing else to read.
context->connection_->async_read(context->response_buffer_,
strand_.wrap([=] (const boost::system::error_code &ec,
std::size_t bytes_read) {
read_response_body(ec, bytes_read, context, res);
}));
strand_.wrap([=] (const boost::system::error_code &ec,
std::size_t bytes_read) {
read_response_body(ec, bytes_read, context, res);
}));
}

client::client(client_options options)
Expand Down
150 changes: 75 additions & 75 deletions http/src/network/http/v2/client/connection/tcp_resolver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,81 +24,81 @@
#include <network/http/v2/client/connection/endpoint_cache.hpp>

namespace network {
namespace http {
inline namespace v2 {
namespace client_connection {
/**
* \class tcp_resolver network/http/v2/client/connection/tcp_resolver.hpp
* \brief Resolves and maintains a cache of hosts.
*/
class tcp_resolver : public async_resolver {

tcp_resolver(const tcp_resolver &) = delete;
tcp_resolver &operator = (const tcp_resolver &) = delete;

public:

using async_resolver::resolver;
using async_resolver::resolver_iterator;
using async_resolver::resolve_callback;

/**
* \brief Constructor.
*/
tcp_resolver(boost::asio::io_service &service, bool cache_resolved = false)
: resolver_(service)
, cache_resolved_(cache_resolved) {

}

/**
* \brief Destructor.
*/
virtual ~tcp_resolver() noexcept {

}

virtual void async_resolve(const std::string &host, std::uint16_t port,
resolve_callback handler) {
if (cache_resolved_) {
auto it = endpoint_cache_.find(host);
if (it != endpoint_cache_.end()) {
boost::system::error_code ec;
handler(ec, it->second);
return;
}
}

resolver::query query(host, std::to_string(port));
resolver_.async_resolve(query,
[host, handler, this](const boost::system::error_code &ec,
resolver_iterator endpoint_iterator) {
if (ec) {
handler(ec, resolver_iterator());
}
else {
if (cache_resolved_) {
endpoint_cache_.insert(host, endpoint_iterator);
}
handler(ec, endpoint_iterator);
}
});
}

virtual void clear_resolved_cache() {
endpoint_cache_.clear();
}

private:

resolver resolver_;
bool cache_resolved_;
endpoint_cache endpoint_cache_;

};
} // namespace client_connection
} // namespace v2
} // namespace http
namespace http {
inline namespace v2 {
namespace client_connection {
/**
* \class tcp_resolver network/http/v2/client/connection/tcp_resolver.hpp
* \brief Resolves and maintains a cache of hosts.
*/
class tcp_resolver : public async_resolver {

tcp_resolver(const tcp_resolver &) = delete;
tcp_resolver &operator = (const tcp_resolver &) = delete;

public:

using async_resolver::resolver;
using async_resolver::resolver_iterator;
using async_resolver::resolve_callback;

/**
* \brief Constructor.
*/
tcp_resolver(boost::asio::io_service &service, bool cache_resolved = false)
: resolver_(service)
, cache_resolved_(cache_resolved) {

}

/**
* \brief Destructor.
*/
virtual ~tcp_resolver() noexcept {

}

virtual void async_resolve(const std::string &host, std::uint16_t port,
resolve_callback handler) {
if (cache_resolved_) {
auto it = endpoint_cache_.find(host);
if (it != endpoint_cache_.end()) {
boost::system::error_code ec;
handler(ec, it->second);
return;
}
}

resolver::query query(host, std::to_string(port));
resolver_.async_resolve(query,
[host, handler, this](const boost::system::error_code &ec,
resolver_iterator endpoint_iterator) {
if (ec) {
handler(ec, resolver_iterator());
}
else {
if (cache_resolved_) {
endpoint_cache_.insert(host, endpoint_iterator);
}
handler(ec, endpoint_iterator);
}
});
}

virtual void clear_resolved_cache() {
endpoint_cache_.clear();
}

private:

resolver resolver_;
bool cache_resolved_;
endpoint_cache endpoint_cache_;

};
} // namespace client_connection
} // namespace v2
} // namespace http
} // namespace network

#endif // NETWORK_HTTP_V2_CLIENT_CONNECTION_TCP_RESOLVER_INC
2 changes: 1 addition & 1 deletion http/test/v2/client/units/request_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ TEST(request_test, get_header) {
;

auto header = instance.header("User-Agent");
ASSERT_TRUE(header);
ASSERT_TRUE(static_cast<bool>(header));
ASSERT_EQ("request_test", *header);
}

Expand Down
2 changes: 1 addition & 1 deletion uri
Submodule uri updated from 56143a to 20aad3