Skip to content

Commit 29d7544

Browse files
committed
S3 read and write use socket-level throttling
1 parent 50d0fbb commit 29d7544

File tree

13 files changed

+249
-31
lines changed

13 files changed

+249
-31
lines changed

base/poco/Net/include/Poco/Net/HTTPSession.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "Poco/Net/Net.h"
2828
#include "Poco/Net/StreamSocket.h"
2929
#include "Poco/Timespan.h"
30+
#include "Poco/Net/Throttler.h"
3031

3132

3233
namespace Poco
@@ -84,6 +85,12 @@ namespace Net
8485
setTimeout(const Poco::Timespan & connectionTimeout, const Poco::Timespan & sendTimeout, const Poco::Timespan & receiveTimeout);
8586
/// Sets different timeouts for the HTTP session.
8687

88+
void setReceiveThrottler(const ThrottlerPtr & throttler = {});
89+
/// Sets the throttler that is used to limit the speed of data received through the socket.
90+
91+
void setSendThrottler(const ThrottlerPtr & throttler = {});
92+
/// Sets the throttler that is used to limit the speed of data sent through the socket.
93+
8794
Poco::Timespan getTimeout() const;
8895
/// Returns the timeout for the HTTP session.
8996

@@ -237,6 +244,8 @@ namespace Net
237244
Poco::Timespan _connectionTimeout;
238245
Poco::Timespan _receiveTimeout;
239246
Poco::Timespan _sendTimeout;
247+
ThrottlerPtr _receiveThrottler;
248+
ThrottlerPtr _sendThrottler;
240249
Poco::Exception * _pException;
241250
Poco::Any _data;
242251

base/poco/Net/include/Poco/Net/Socket.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,18 @@ namespace Net
187187
/// timeout previously set with getReceiveTimeout(),
188188
/// as the system is free to adjust the value.
189189

190+
void setSendThrottler(const Poco::Net::ThrottlerPtr & throttler = {});
191+
/// Sets the throttler that is used to limit the speed of data sent through the socket.
192+
193+
Poco::Net::ThrottlerPtr getSendThrottler() const;
194+
/// Returns the throttler that is used to limit the speed of data sent through the socket.
195+
196+
void setReceiveThrottler(const Poco::Net::ThrottlerPtr & throttler = {});
197+
/// Sets the throttler that is used to limit the speed of data received through the socket.
198+
199+
Poco::Net::ThrottlerPtr getReceiveThrottler() const;
200+
/// Returns the throttler that is used to limit the speed of data received through the socket.
201+
190202
void setOption(int level, int option, int value);
191203
/// Sets the socket option specified by level and option
192204
/// to the given integer value.
@@ -454,6 +466,29 @@ namespace Net
454466
}
455467

456468

469+
inline void Socket::setSendThrottler(const Poco::Net::ThrottlerPtr & throttler)
470+
{
471+
_pImpl->setSendThrottler(throttler);
472+
}
473+
474+
475+
inline Poco::Net::ThrottlerPtr Socket::getSendThrottler() const
476+
{
477+
return _pImpl->getSendThrottler();
478+
}
479+
480+
481+
inline void Socket::setReceiveThrottler(const Poco::Net::ThrottlerPtr & throttler)
482+
{
483+
_pImpl->setReceiveThrottler(throttler);
484+
}
485+
486+
487+
inline Poco::Net::ThrottlerPtr Socket::getReceiveThrottler() const
488+
{
489+
return _pImpl->getReceiveThrottler();
490+
}
491+
457492
inline void Socket::setOption(int level, int option, int value)
458493
{
459494
_pImpl->setOption(level, option, value);

base/poco/Net/include/Poco/Net/WebSocketImpl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ namespace Net
7272
virtual Poco::Timespan getSendTimeout();
7373
virtual void setReceiveTimeout(const Poco::Timespan & timeout);
7474
virtual Poco::Timespan getReceiveTimeout();
75+
virtual void setSendThrottler(const Poco::Net::ThrottlerPtr & throttler);
76+
virtual Poco::Net::ThrottlerPtr getSendThrottler() const;
77+
virtual void setReceiveThrottler(const Poco::Net::ThrottlerPtr & throttler);
78+
virtual Poco::Net::ThrottlerPtr getReceiveThrottler() const;
7579

7680
// Internal
7781
int frameFlags() const;

base/poco/Net/src/HTTPSession.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,18 @@ void HTTPSession::setTimeout(const Poco::Timespan& connectionTimeout, const Poco
124124
}
125125

126126

127+
void HTTPSession::setReceiveThrottler(const ThrottlerPtr& throttler)
128+
{
129+
_receiveThrottler = throttler;
130+
_socket.setReceiveThrottler(throttler);
131+
}
132+
133+
void HTTPSession::setSendThrottler(const ThrottlerPtr& throttler)
134+
{
135+
_sendThrottler = throttler;
136+
_socket.setSendThrottler(throttler);
137+
}
138+
127139
int HTTPSession::get()
128140
{
129141
if (_pCurrent == _pEnd)
@@ -227,6 +239,8 @@ void HTTPSession::connect(const SocketAddress& address)
227239
_socket.connect(address, _connectionTimeout);
228240
_socket.setReceiveTimeout(_receiveTimeout);
229241
_socket.setSendTimeout(_sendTimeout);
242+
_socket.setReceiveThrottler(_receiveThrottler);
243+
_socket.setSendThrottler(_sendThrottler);
230244
_socket.setNoDelay(true);
231245
// There may be leftover data from a previous (failed) request in the buffer,
232246
// so we clear it.

base/poco/Net/src/WebSocketImpl.cpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,30 @@ Poco::Timespan WebSocketImpl::getReceiveTimeout()
387387
}
388388

389389

390+
void WebSocketImpl::setSendThrottler(const Poco::Net::ThrottlerPtr& throttler)
391+
{
392+
_pStreamSocketImpl->setSendThrottler(throttler);
393+
}
394+
395+
396+
Poco::Net::ThrottlerPtr WebSocketImpl::getSendThrottler() const
397+
{
398+
return _pStreamSocketImpl->getSendThrottler();
399+
}
400+
401+
402+
void WebSocketImpl::setReceiveThrottler(const Poco::Net::ThrottlerPtr& throttler)
403+
{
404+
_pStreamSocketImpl->setReceiveThrottler(throttler);
405+
}
406+
407+
408+
Poco::Net::ThrottlerPtr WebSocketImpl::getReceiveThrottler() const
409+
{
410+
return _pStreamSocketImpl->getReceiveThrottler();
411+
}
412+
413+
390414
int WebSocketImpl::available()
391415
{
392416
int n = static_cast<int>(_buffer.size()) - _bufferOffset;

base/poco/NetSSL_OpenSSL/include/Poco/Net/SecureStreamSocketImpl.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ namespace Net
4343
void setSendTimeout(const Poco::Timespan & timeout);
4444
void setReceiveTimeout(const Poco::Timespan & timeout);
4545

46+
void setSendThrottler(const Poco::Net::ThrottlerPtr & throttler = {});
47+
void setReceiveThrottler(const Poco::Net::ThrottlerPtr & throttler = {});
48+
4649
SocketImpl * acceptConnection(SocketAddress & clientAddr);
4750
/// Not supported by a SecureStreamSocket.
4851
///

base/poco/NetSSL_OpenSSL/src/SecureStreamSocketImpl.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,18 @@ void SecureStreamSocketImpl::setReceiveTimeout(const Poco::Timespan& timeout)
6363
_recvTimeout = underlying_socket->getReceiveTimeout();
6464
}
6565

66+
void SecureStreamSocketImpl::setSendThrottler(const Poco::Net::ThrottlerPtr& throttler)
67+
{
68+
_sndThrottler = throttler;
69+
underlying_socket->setSendThrottler(throttler);
70+
}
71+
72+
void SecureStreamSocketImpl::setReceiveThrottler(const Poco::Net::ThrottlerPtr& throttler)
73+
{
74+
_recvThrottler = throttler;
75+
underlying_socket->setReceiveThrottler(throttler);
76+
}
77+
6678
SocketImpl* SecureStreamSocketImpl::acceptConnection(SocketAddress& clientAddr)
6779
{
6880
throw Poco::InvalidAccessException("Cannot acceptConnection() on a SecureStreamSocketImpl");

src/Common/CurrentThread.cpp

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,56 @@ ResourceLink CurrentThread::getWriteResourceLink()
163163
return current_thread->write_resource_link;
164164
}
165165

166+
void CurrentThread::attachReadThrottler(const ThrottlerPtr & throttler)
167+
{
168+
if (unlikely(!current_thread))
169+
return;
170+
if (current_thread->read_throttler)
171+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has been already attached to read throttler", std::to_string(getThreadId()));
172+
current_thread->read_throttler = throttler;
173+
}
174+
175+
void CurrentThread::detachReadThrottler()
176+
{
177+
if (unlikely(!current_thread))
178+
return;
179+
if (!current_thread->read_throttler)
180+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has not been attached to read throttler", std::to_string(getThreadId()));
181+
current_thread->read_throttler.reset();
182+
}
183+
184+
ThrottlerPtr CurrentThread::getReadThrottler()
185+
{
186+
if (unlikely(!current_thread))
187+
return {};
188+
return current_thread->read_throttler;
189+
}
190+
191+
void CurrentThread::attachWriteThrottler(const ThrottlerPtr & throttler)
192+
{
193+
if (unlikely(!current_thread))
194+
return;
195+
if (current_thread->write_throttler)
196+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has been already attached to write throttler", std::to_string(getThreadId()));
197+
current_thread->write_throttler = throttler;
198+
}
199+
200+
void CurrentThread::detachWriteThrottler()
201+
{
202+
if (unlikely(!current_thread))
203+
return;
204+
if (!current_thread->write_throttler)
205+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread #{} has not been attached to write throttler", std::to_string(getThreadId()));
206+
current_thread->write_throttler.reset();
207+
}
208+
209+
ThrottlerPtr CurrentThread::getWriteThrottler()
210+
{
211+
if (unlikely(!current_thread))
212+
return {};
213+
return current_thread->write_throttler;
214+
}
215+
166216
MemoryTracker * CurrentThread::getUserMemoryTracker()
167217
{
168218
if (unlikely(!current_thread))

src/Common/CurrentThread.h

Lines changed: 82 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,14 @@ class CurrentThread
9999
static void detachWriteResource();
100100
static ResourceLink getWriteResourceLink();
101101

102+
// For IO Throttling
103+
static void attachReadThrottler(const ThrottlerPtr & throttler);
104+
static void detachReadThrottler();
105+
static ThrottlerPtr getReadThrottler();
106+
static void attachWriteThrottler(const ThrottlerPtr & throttler);
107+
static void detachWriteThrottler();
108+
static ThrottlerPtr getWriteThrottler();
109+
102110
/// Initializes query with current thread as master thread in constructor, and detaches it in destructor
103111
struct QueryScope : private boost::noncopyable
104112
{
@@ -111,37 +119,93 @@ class CurrentThread
111119
};
112120

113121
/// Scoped attach/detach of IO resource links
114-
struct IOScope : private boost::noncopyable
122+
struct IOSchedulingScope : private boost::noncopyable
115123
{
116-
explicit IOScope(ResourceLink read_resource_link, ResourceLink write_resource_link)
124+
IOSchedulingScope(ResourceLink read_resource_link, ResourceLink write_resource_link)
125+
{
126+
readResource(read_resource_link);
127+
writeResource(write_resource_link);
128+
}
129+
130+
explicit IOSchedulingScope(const IOSchedulingSettings & settings)
131+
: IOSchedulingScope(settings.read_resource_link, settings.write_resource_link)
132+
{}
133+
134+
~IOSchedulingScope()
135+
{
136+
if (read_resource_attached)
137+
detachReadResource();
138+
if (write_resource_attached)
139+
detachWriteResource();
140+
}
141+
142+
private:
143+
void readResource(ResourceLink link)
117144
{
118-
if (read_resource_link)
145+
if (link)
119146
{
120-
attachReadResource(read_resource_link);
121-
read_attached = true;
147+
attachReadResource(link);
148+
read_resource_attached = true;
122149
}
123-
if (write_resource_link)
150+
}
151+
152+
void writeResource(ResourceLink link)
153+
{
154+
if (link)
124155
{
125-
attachWriteResource(write_resource_link);
126-
write_attached = true;
156+
attachWriteResource(link);
157+
write_resource_attached = true;
127158
}
128159
}
129160

130-
explicit IOScope(const IOSchedulingSettings & settings)
131-
: IOScope(settings.read_resource_link, settings.write_resource_link)
132-
{}
161+
bool read_resource_attached = false;
162+
bool write_resource_attached = false;
163+
};
133164

134-
~IOScope()
165+
/// Scoped attach/detach of read throttler
166+
struct ReadThrottlingScope : private boost::noncopyable
167+
{
168+
explicit ReadThrottlingScope(const ThrottlerPtr & read_throttler_)
135169
{
136-
if (read_attached)
137-
detachReadResource();
138-
if (write_attached)
139-
detachWriteResource();
170+
if (read_throttler_)
171+
{
172+
attachReadThrottler(read_throttler_);
173+
read_throttler_attached = true;
174+
}
175+
}
176+
177+
~ReadThrottlingScope()
178+
{
179+
if (read_throttler_attached)
180+
detachReadThrottler();
181+
}
182+
183+
private:
184+
bool read_throttler_attached = false;
185+
};
186+
187+
/// Scoped attach/detach of write throttler
188+
struct WriteThrottlingScope : private boost::noncopyable
189+
{
190+
explicit WriteThrottlingScope(const ThrottlerPtr & write_throttler_)
191+
{
192+
if (write_throttler_)
193+
{
194+
attachWriteThrottler(write_throttler_);
195+
write_throttler_attached = true;
196+
}
140197
}
141198

142-
bool read_attached = false;
143-
bool write_attached = false;
199+
~WriteThrottlingScope()
200+
{
201+
if (write_throttler_attached)
202+
detachWriteThrottler();
203+
}
204+
205+
private:
206+
bool write_throttler_attached = false;
144207
};
208+
145209
};
146210

147211
}

src/Common/HTTPConnectionPool.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,10 @@ class EndpointConnectionPool : public std::enable_shared_from_this<EndpointConne
397397
Session::setReceiveDataHooks(std::make_shared<ResourceGuardSessionDataHooks>(link, ResourceGuard::Metrics::getIORead(), log, request.getMethod(), request.getURI()));
398398
if (ResourceLink link = CurrentThread::getWriteResourceLink())
399399
Session::setSendDataHooks(std::make_shared<ResourceGuardSessionDataHooks>(link, ResourceGuard::Metrics::getIOWrite(), log, request.getMethod(), request.getURI()));
400+
if (auto throttler = CurrentThread::getReadThrottler())
401+
Session::setReceiveThrottler(throttler);
402+
if (auto throttler = CurrentThread::getWriteThrottler())
403+
Session::setSendThrottler(throttler);
400404

401405
std::ostream & result = Session::sendRequest(request, connect_time, first_byte_time);
402406
result.exceptions(std::ios::badbit);
@@ -456,6 +460,8 @@ class EndpointConnectionPool : public std::enable_shared_from_this<EndpointConne
456460
response_stream = nullptr;
457461
Session::setSendDataHooks();
458462
Session::setReceiveDataHooks();
463+
Session::setSendThrottler();
464+
Session::setReceiveThrottler();
459465

460466
group->atConnectionDestroy();
461467

0 commit comments

Comments
 (0)