Skip to content

Added example to store large streaming data directly to filesystem #468

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions libs/network/example/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ if (OPENSSL_FOUND)
include_directories(${OPENSSL_INCLUDE_DIR})
endif (OPENSSL_FOUND)

include(CheckCXXCompilerFlag)
CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)

add_executable(http_client http_client.cpp)
add_executable(simple_wget simple_wget.cpp)
add_executable(atom_reader atom/atom.cpp atom/main.cpp)
Expand All @@ -19,13 +22,19 @@ add_executable(hello_world_async_server_with_work_queue http/hello_world_async_s
add_executable(trivial_google trivial_google.cpp)
if (UNIX)
add_executable(fileserver http/fileserver.cpp)
if (COMPILER_SUPPORTS_CXX11)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
add_executable(async_server_file_upload http/async_server_file_upload.cpp)
endif()
endif (UNIX)

add_dependencies(http_client cppnetlib-uri cppnetlib-client-connections)
add_dependencies(simple_wget cppnetlib-uri cppnetlib-client-connections)
add_dependencies(atom_reader cppnetlib-uri cppnetlib-client-connections)
add_dependencies(rss_reader cppnetlib-uri cppnetlib-client-connections)
add_dependencies(twitter_search cppnetlib-uri cppnetlib-client-connections)
add_dependencies(trivial_google cppnetlib-uri cppnetlib-client-connections)

set(BOOST_CLIENT_LIBS
${Boost_PROGRAM_OPTIONS_LIBRARY}
${Boost_THREAD_LIBRARY}
Expand Down Expand Up @@ -139,11 +148,25 @@ if (UNIX)
${Boost_FILESYSTEM_LIBRARY}
${CMAKE_THREAD_LIBS_INIT}
cppnetlib-server-parsers)

if (COMPILER_SUPPORTS_CXX11)
target_link_libraries(async_server_file_upload
${BOOST_CLIENT_LIBS}
${CMAKE_THREAD_LIBS_INIT}
cppnetlib-server-parsers)
endif()

if (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
target_link_libraries(fileserver rt)
if (COMPILER_SUPPORTS_CXX11)
target_link_libraries(async_server_file_upload rt)
endif()
endif()
if (OPENSSL_FOUND)
target_link_libraries(fileserver ${OPENSSL_LIBRARIES})
if (COMPILER_SUPPORTS_CXX11)
target_link_libraries(async_server_file_upload ${OPENSSL_LIBRARIES})
endif()
endif(OPENSSL_FOUND)
endif (UNIX)

Expand All @@ -159,4 +182,7 @@ set_target_properties(hello_world_async_server_with_work_queue PROPERTIES RUNTIM

if (UNIX)
set_target_properties(fileserver PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CPP-NETLIB_BINARY_DIR}/example)
if (COMPILER_SUPPORTS_CXX11)
set_target_properties(async_server_file_upload PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CPP-NETLIB_BINARY_DIR}/example)
endif()
endif (UNIX)
246 changes: 246 additions & 0 deletions libs/network/example/http/async_server_file_upload.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
//
// Copyright 2014 (c) Arun Chandrasekaran <visionofarun@gmail.com>
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
//

//
// Example for performing streaming file upload operations directly to
// filesystem using async server
//
// If you use wget, do the following at the client side:
//
// wget localhost:9190/upload?filename=Earth.mp4
// --post-file=$HOME/Videos/Earth-From-Space.mp4
//
#include <boost/shared_ptr.hpp>
#include <boost/network/protocol/http/server.hpp>
#include <boost/network/protocol/http/server/async_server.hpp>
#include <boost/network/utils/thread_pool.hpp>

#include <condition_variable>
#include <mutex>
#include <map>

struct connection_handler;

typedef boost::network::http::async_server<connection_handler> server;

///
/// Custom exception type
///
struct file_uploader_exception : public std::runtime_error {
file_uploader_exception(const std::string err) :
std::runtime_error(err) {
}
};

///
/// Encapsulates request & connection
///
struct file_uploader : boost::enable_shared_from_this<file_uploader> {
const server::request& req;
server::connection_ptr conn;

std::mutex mtx;
std::condition_variable condvar;

FILE* fp = NULL;

public:
file_uploader(const server::request& req, const server::connection_ptr& conn) :
req(req),
conn(conn) {
const std::string dest = destination(req);

if (dest.find("/upload") != std::string::npos) {
auto queries = get_queries(dest);
auto fname = queries.find("filename");
if (fname != queries.end()) {
fp = ::fopen(fname->second.c_str(), "w");
if (!fp) {
throw file_uploader_exception("Failed to open file to write");
}
} else {
throw file_uploader_exception("'filename' cannot be empty");
}
}
}

~file_uploader() {
if (fp) {
::fflush(fp);
::fclose(fp);
}
}

///
/// Non blocking call to initiate the data transfer
///
void startRecv() {
std::size_t content_length = 0;
auto const& headers = req.headers;
for (auto item : headers) {
if (boost::to_lower_copy(item.name) == "content-length") {
content_length = boost::lexical_cast<std::size_t>(item.value);
break;
}
}

read_chunk(conn, content_length);
}

///
/// The client shall wait by calling this until the transfer is done by
/// the IO threadpool
///
void waitForCompletion() {
std::unique_lock<std::mutex> _(mtx);
condvar.wait(_);
}

private:
///
/// Parses the string and gets the query as a key-value pair
///
/// @param [in] dest String containing the path and the queries, without the fragment,
/// of the form "/path?key1=value1&key2=value2"
///
std::map<std::string, std::string> get_queries(const std::string dest) {

std::map<std::string, std::string> queries;

std::size_t pos = dest.find_first_of("?");

if (pos != std::string::npos) {

std::string query_string = dest.substr(pos + 1);

// Replace '&' with space
for (pos = 0; pos < query_string.size(); pos++) {
if (query_string[pos] == '&') {
query_string[pos] = ' ';
}
}

std::istringstream sin(query_string);
while (sin >> query_string) {

pos = query_string.find_first_of("=");

if (pos != std::string::npos) {
const std::string key = query_string.substr(0, pos);
const std::string value = query_string.substr(pos + 1);
queries[key] = value;
}
}
}

return queries;
}

///
/// Reads a chunk of data
///
/// @param [in] conn Connection to read from
/// @param [in] left2read Size to read
///
void read_chunk(server::connection_ptr conn, std::size_t left2read) {
conn->read(boost::bind(&file_uploader::on_data_ready,
file_uploader::shared_from_this(),
_1, _2, _3, conn, left2read));
}

///
/// Callback that gets called when the data is ready to be consumed
///
void on_data_ready(server::connection::input_range range,
boost::system::error_code error,
std::size_t size,
server::connection_ptr conn,
std::size_t left2read) {
if (!error) {
::fwrite(boost::begin(range), size, 1, fp);
std::size_t left = left2read - size;
if (left > 0)
read_chunk(conn, left);
else
wakeup();
}
}

///
/// Wakesup the waiting thread
///
void wakeup() {
std::unique_lock<std::mutex> _(mtx);
condvar.notify_one();
}
};

///
/// Functor that gets executed whenever there is a packet on the HTTP port
///
struct connection_handler {
///
/// Gets executed whenever there is a packet on the HTTP port.
///
/// @param [in] req Request object that holds the protobuf data
/// @param [in] conn Connection object
///
void operator()(server::request const& req, const server::connection_ptr& conn) {
static server::response_header headers[] = {
{"Connection","close"},
{"Content-Type", "text/plain"}
};

if (method(req) == "POST") {
try {
// Create a file uploader
boost::shared_ptr<file_uploader> uploader(new file_uploader(req, conn));
// On success to create, start receiving the data
uploader->startRecv();
// Wait until the data transfer is done by the IO threads
uploader->waitForCompletion();

// Respond to the client
conn->set_status(server::connection::ok);
conn->set_headers(boost::make_iterator_range(headers, headers+2));
conn->write("Success to upload");
} catch (const file_uploader_exception& e) {
conn->set_status(server::connection::bad_request);
conn->set_headers(boost::make_iterator_range(headers, headers+2));
const std::string err = e.what();
conn->write(err);
}
}
}
};

int main(int ac, const char *av[])
{
if (ac != 2) {
std::cerr << "Usage: " << av[0] << " <listener-port>" << std::endl;
return EXIT_SUCCESS;
}

// Setup the threadpool
boost::shared_ptr<boost::network::utils::thread_pool>
tp(new boost::network::utils::thread_pool(2));

// Create a connection handler
connection_handler handler;

// Setup the async server
server local_server(server::options(handler)
.address("0.0.0.0")
.port(av[1])
.reuse_address(true)
.thread_pool(tp));

// Start the server eventloop
local_server.run();

return EXIT_SUCCESS;
}