10
10
#include < boost/scope_exit.hpp>
11
11
#include < boost/network/protocol/http/algorithms/linearize.hpp>
12
12
#include < boost/network/utils/thread_pool.hpp>
13
+ #include < boost/range/adaptor/sliced.hpp>
13
14
#include < boost/range/algorithm/transform.hpp>
15
+ #include < boost/range/algorithm/copy.hpp>
14
16
#include < boost/asio/ip/tcp.hpp>
15
17
#include < boost/asio/streambuf.hpp>
16
18
#include < boost/asio/strand.hpp>
17
19
#include < boost/asio/buffer.hpp>
20
+ #include < boost/make_shared.hpp>
21
+ #include < list>
22
+ #include < vector>
18
23
#include < iterator>
19
24
20
25
#ifndef BOOST_NETWORK_HTTP_SERVER_CONNECTION_HEADER_BUFFER_MAX_SIZE
@@ -53,6 +58,8 @@ namespace boost { namespace network { namespace http {
53
58
, service_unavailable = 503
54
59
};
55
60
61
+ typedef typename string<Tag>::type string_type;
62
+
56
63
async_connection (
57
64
asio::io_service & io_service
58
65
, Handler & handler
@@ -76,7 +83,7 @@ namespace boost { namespace network { namespace http {
76
83
* then sent as soon as the first call to `write` or `flush` commences.
77
84
*/
78
85
template <class Range >
79
- void set_headers (Range headers) {
86
+ void set_headers (Range headers, bool immediate = true ) {
80
87
if (headers_already_sent)
81
88
boost::throw_exception (std::logic_error (" Headers have already been sent." ));
82
89
@@ -99,6 +106,8 @@ namespace boost { namespace network { namespace http {
99
106
stream << consts::crlf ();
100
107
}
101
108
stream << consts::crlf ();
109
+ if (immediate) write_headers_only ();
110
+
102
111
commit = true ;
103
112
}
104
113
@@ -107,19 +116,34 @@ namespace boost { namespace network { namespace http {
107
116
}
108
117
109
118
template <class Range >
110
- void write (Range) {
111
- if (!headers_already_sent) {
112
- // TODO write out the headers that are already
113
- // linearized to the headers_buffer.
114
- }
115
- // linearize the range into a shared array
116
- // schedule a stranded asynchronous write
119
+ void write (Range const & range) {
120
+ write_impl (
121
+ boost::make_iterator_range (range)
122
+ , boost::bind (
123
+ &async_connection<Tag,Handler>::default_error
124
+ , async_connection<Tag,Handler>::shared_from_this ()
125
+ , _1
126
+ )
127
+ );
128
+ }
129
+
130
+ template <class Range , class Callback >
131
+ void write (Range const & range, Callback const & callback) {
132
+ write_impl (
133
+ boost::make_iterator_range (range)
134
+ , callback
135
+ );
117
136
}
118
137
119
138
asio::ip::tcp::socket & socket () { return socket_; }
120
139
utils::thread_pool & thread_pool () { return thread_pool_; }
121
140
122
141
private:
142
+
143
+ void default_error (boost::system::error_code const & ec) {
144
+ // TODO implement a sane default here, for now ignore the error
145
+ }
146
+
123
147
asio::ip::tcp::socket socket_;
124
148
asio::io_service::strand strand;
125
149
Handler & handler;
@@ -129,6 +153,11 @@ namespace boost { namespace network { namespace http {
129
153
130
154
typedef boost::array<char , BOOST_NETWORK_HTTP_SERVER_CONNECTION_BUFFER_SIZE>
131
155
buffer_type;
156
+ typedef boost::array<char , BOOST_NETWORK_HTTP_SERVER_CONNECTION_BUFFER_SIZE>
157
+ array;
158
+ typedef std::list<shared_ptr<array> > array_list;
159
+ typedef boost::shared_ptr<array_list> shared_array_list;
160
+ typedef boost::shared_ptr<std::vector<asio::const_buffer> > shared_buffers;
132
161
buffer_type read_buffer_;
133
162
boost::uint16_t status;
134
163
@@ -157,6 +186,106 @@ namespace boost { namespace network { namespace http {
157
186
// FIXME -- damn all that work got wiped out because Jeni tripped on the power. :(
158
187
}
159
188
189
+ template <class Range , class Callback >
190
+ void write_headers (Range range, Callback callback) {
191
+ // TODO send out the headers, then once that's done
192
+ // call the write again on the range and callback
193
+ }
194
+
195
+ void write_headers_only () {
196
+ }
197
+
198
+ void handle_write_headers (boost::system::error_code const & ec) {
199
+ if (ec) {
200
+ // TODO signal somehow that there was an error so that subsequent
201
+ // calls to write would throw an exception
202
+ return ;
203
+ }
204
+ headers_already_sent = true ;
205
+ }
206
+
207
+ void handle_write (
208
+ boost::function<void (boost::system::error_code const &)> callback
209
+ , shared_array_list temporaries
210
+ , shared_buffers buffers
211
+ , boost::system::error_code const & ec
212
+ ) {
213
+ // we want to forget the temporaries and buffers
214
+ thread_pool ().post (boost::bind (callback, ec));
215
+ }
216
+
217
+ template <class Range , class Callback >
218
+ void write_impl (Range range, Callback callback) {
219
+ if (!headers_already_sent) {
220
+ write_headers (range, callback);
221
+ return ;
222
+ }
223
+
224
+ // linearize the whole range into a vector
225
+ // of fixed-sized buffers, then schedule an asynchronous
226
+ // write of these buffers -- make sure they are live
227
+ // by making these linearized buffers shared and made
228
+ // part of the completion handler.
229
+ //
230
+ // once the range has been linearized and sent, schedule
231
+ // a wrapper to be called in the io_service's thread, that
232
+ // will re-schedule the given callback into the thread pool
233
+ // referred to here so that the io_service's thread can concentrate
234
+ // on doing I/O.
235
+ //
236
+
237
+ shared_array_list temporaries =
238
+ boost::make_shared<array_list>();
239
+ shared_buffers buffers =
240
+ boost::make_shared<std::vector<asio::const_buffer> >(0 );
241
+
242
+ std::size_t range_size = boost::distance (range);
243
+ buffers->resize (
244
+ (range_size / BOOST_NETWORK_HTTP_SERVER_CONNECTION_BUFFER_SIZE)
245
+ + (range_size % BOOST_NETWORK_HTTP_SERVER_CONNECTION_BUFFER_SIZE)
246
+ );
247
+ std::size_t slice_size =
248
+ std::min (
249
+ range_size,
250
+ BOOST_NETWORK_HTTP_SERVER_CONNECTION_BUFFER_SIZE
251
+ );
252
+ typename boost::range_iterator<Range>::type
253
+ start = boost::begin (range)
254
+ , end = boost::end (range);
255
+ while (slice_size != 0 ) {
256
+ using boost::adaptors::sliced;
257
+ shared_ptr<array> new_array = make_shared<array>();
258
+ boost::copy (
259
+ range | sliced (0 ,slice_size)
260
+ , new_array->begin ()
261
+ );
262
+ temporaries->push_back (new_array);
263
+ buffers->push_back (asio::buffer (new_array->data (), slice_size));
264
+ std::advance (start, slice_size);
265
+ range = boost::make_iterator_range (start, end);
266
+ range_size = boost::distance (range);
267
+ slice_size = std::min (range_size, BOOST_NETWORK_HTTP_SERVER_CONNECTION_BUFFER_SIZE);
268
+ }
269
+
270
+ if (!buffers->empty ()) {
271
+ boost::function<void (boost::system::error_code const &)> f = callback;
272
+ asio::async_write (
273
+ socket_
274
+ , *buffers
275
+ , strand.wrap (
276
+ boost::bind (
277
+ &async_connection<Tag,Handler>::handle_write
278
+ , async_connection<Tag,Handler>::shared_from_this ()
279
+ , f
280
+ , temporaries
281
+ , buffers // keep these alive until the handler is called!
282
+ , boost::asio::placeholders::error
283
+ )
284
+ )
285
+ );
286
+ }
287
+ }
288
+
160
289
};
161
290
162
291
} /* http */
0 commit comments