Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

import logging 

 

import tornado.curl_httpclient 

import tornado.simple_httpclient 

 

 

log = logging.getLogger('http-client') # Used for debugging only 

 

 

# Class lifted directly from 

# safeview/service/src/surrogate-router/resource_req.py 

class CurlAsyncHTTPClientWithFlowControl( 

tornado.curl_httpclient.CurlAsyncHTTPClient): 

"""Like CurlAsyncHTTPClient but with flow control support. 

 

Specifically, we add support for pausing and resuming the response 

flow. While paused, the socket's kernel buffer is not dequeued, thus 

triggering rate limiting of the sender via TCP's flow-control 

mechanism. 

 

Caveat: this class will not work correctly unless AsyncHTTPClient is 

created with force_instance=True. Without this, pause() would pause all 

transfers on the multi object. 

""" 

def initialize(self, *args, **kwargs): 

super(CurlAsyncHTTPClientWithFlowControl, self).initialize(*args, 

**kwargs) 

self._paused = False 

 

def _handle_socket(self, event, fd, multi, data): 

"""Parent override. 

 

Like the parent method but makes sure that fds are not added to the 

event loop while paused. 

 

Caveat: this assumes that self._fds correspond only to the fds for 

one connection. This is not true if force_instance=False.""" 

super(CurlAsyncHTTPClientWithFlowControl, self)._handle_socket( 

event, fd, multi, data) 

if not self._paused: 

return 

# We're paused, so remove any handlers that were installed by the 

# parent; or else we'll start dequeuing from the socket. 

for fd in self._fds: 

self.io_loop.remove_handler(fd) 

 

def pause(self): 

"""Stop dequeuing incoming data from the kernel buffer. 

 

Any data received while paused will be queued in the kernel receive 

buffer, and if that overflows, TCP's flow control mechanism will 

signal to the sender that it should rate-limit itself. 

 

Caveat: this will pause all connections managed by the underlying 

multi object. Thus, for intended operation, you should use this with 

force_instance=True. 

""" 

if self._paused: 

return 

log.debug('pause: %s', self) 

# Stop the periodic curl socket check, or else we'll keep dequeuing 

# from kernel buffer even when we are supposed to be paused. 

self._force_timeout_callback.stop() 

self._paused = True 

for fd in self._fds: 

self.io_loop.remove_handler(fd) 

 

def resume(self): 

"""Resume dequeuing data from the kernel buffer. 

 

This enables TCP's flow control mechanism to signal to the sender 

that there is additional capacity and that it should increase its 

sending rate.""" 

if not self._paused: 

return 

log.debug('resume: %s', self) 

self._force_timeout_callback.start() 

self._paused = False 

for fd in self._fds: 

self.io_loop.add_handler(fd, self._handle_events, 

self._fds[fd]) 

 

def fetch(self, *args, **kwargs): 

"""Parent override. 

 

Be sure to unpause the flow on new fetches.""" 

self.resume() 

super(CurlAsyncHTTPClientWithFlowControl, self).fetch(*args, **kwargs) 

 

def close(self): 

super(CurlAsyncHTTPClientWithFlowControl, self).close() 

# This is needed to prevent the curl handle from leaking. See 

# https://goo.gl/xvs9Dw. Exact root cause is not perfectly understood, 

# but probably has to do with the c code holding references to 

# CurlAsyncHTTPClient via _multi, preventing garbage 

# collection. Explicitly deleting _multi breaks the cycle. 

del self._multi