2727
2828logger = logging .getLogger ("TestFramework.mininode" )
2929
30- # Keep our own socket map for asyncore, so that we can track disconnects
31- # ourselves (to workaround an issue with closing an asyncore socket when
32- # using select)
33- mininode_socket_map = dict ()
34-
35- # One lock for synchronizing all data access between the networking thread (see
36- # NetworkThread below) and the thread running the test logic. For simplicity,
37- # NodeConn acquires this lock whenever delivering a message to a NodeConnCB,
38- # and whenever adding anything to the send buffer (in send_message()). This
39- # lock should be acquired in the thread running the test logic to synchronize
40- # access to any data shared with the NodeConnCB or NodeConn.
41- mininode_lock = RLock ()
30+ MESSAGEMAP = {
31+ b"addr" : msg_addr ,
32+ b"block" : msg_block ,
33+ b"blocktxn" : msg_blocktxn ,
34+ b"cmpctblock" : msg_cmpctblock ,
35+ b"feefilter" : msg_feefilter ,
36+ b"getaddr" : msg_getaddr ,
37+ b"getblocks" : msg_getblocks ,
38+ b"getblocktxn" : msg_getblocktxn ,
39+ b"getdata" : msg_getdata ,
40+ b"getheaders" : msg_getheaders ,
41+ b"headers" : msg_headers ,
42+ b"inv" : msg_inv ,
43+ b"mempool" : msg_mempool ,
44+ b"ping" : msg_ping ,
45+ b"pong" : msg_pong ,
46+ b"reject" : msg_reject ,
47+ b"sendcmpct" : msg_sendcmpct ,
48+ b"sendheaders" : msg_sendheaders ,
49+ b"tx" : msg_tx ,
50+ b"verack" : msg_verack ,
51+ b"version" : msg_version ,
52+ }
53+
54+ MAGIC_BYTES = {
55+ "mainnet" : b"\xf9 \xbe \xb4 \xd9 " , # mainnet
56+ "testnet3" : b"\x0b \x11 \x09 \x07 " , # testnet3
57+ "regtest" : b"\xfa \xbf \xb5 \xda " , # regtest
58+ }
4259
4360class NodeConnCB ():
4461 """Callback and helper functions for P2P connection to a bitcoind node.
@@ -183,34 +200,6 @@ class NodeConn(asyncore.dispatcher):
183200 """The actual NodeConn class
184201
185202 This class provides an interface for a p2p connection to a specified node."""
186- messagemap = {
187- b"version" : msg_version ,
188- b"verack" : msg_verack ,
189- b"addr" : msg_addr ,
190- b"inv" : msg_inv ,
191- b"getdata" : msg_getdata ,
192- b"getblocks" : msg_getblocks ,
193- b"tx" : msg_tx ,
194- b"block" : msg_block ,
195- b"getaddr" : msg_getaddr ,
196- b"ping" : msg_ping ,
197- b"pong" : msg_pong ,
198- b"headers" : msg_headers ,
199- b"getheaders" : msg_getheaders ,
200- b"reject" : msg_reject ,
201- b"mempool" : msg_mempool ,
202- b"feefilter" : msg_feefilter ,
203- b"sendheaders" : msg_sendheaders ,
204- b"sendcmpct" : msg_sendcmpct ,
205- b"cmpctblock" : msg_cmpctblock ,
206- b"getblocktxn" : msg_getblocktxn ,
207- b"blocktxn" : msg_blocktxn
208- }
209- MAGIC_BYTES = {
210- "mainnet" : b"\xf9 \xbe \xb4 \xd9 " , # mainnet
211- "testnet3" : b"\x0b \x11 \x09 \x07 " , # testnet3
212- "regtest" : b"\xfa \xbf \xb5 \xda " , # regtest
213- }
214203
215204 def __init__ (self , dstaddr , dstport , rpc , callback , net = "regtest" , services = NODE_NETWORK | NODE_WITNESS , send_version = True ):
216205 asyncore .dispatcher .__init__ (self , map = mininode_socket_map )
@@ -247,6 +236,8 @@ def __init__(self, dstaddr, dstport, rpc, callback, net="regtest", services=NODE
247236 self .handle_close ()
248237 self .rpc = rpc
249238
239+ # Connection and disconnection methods
240+
250241 def handle_connect (self ):
251242 if self .state != "connected" :
252243 logger .debug ("Connected & Listening: %s:%d" % (self .dstaddr , self .dstport ))
@@ -264,44 +255,30 @@ def handle_close(self):
264255 pass
265256 self .cb .on_close (self )
266257
258+ def disconnect_node (self ):
259+ """ Disconnect the p2p connection.
260+
261+ Called by the test logic thread. Causes the p2p connection
262+ to be disconnected on the next iteration of the asyncore loop."""
263+ self .disconnect = True
264+
265+ # Socket read methods
266+
267+ def readable (self ):
268+ return True
269+
267270 def handle_read (self ):
268271 t = self .recv (8192 )
269272 if len (t ) > 0 :
270273 self .recvbuf += t
271274 self .got_data ()
272275
273- def readable (self ):
274- return True
275-
276- def writable (self ):
277- with mininode_lock :
278- pre_connection = self .state == "connecting"
279- length = len (self .sendbuf )
280- return (length > 0 or pre_connection )
281-
282- def handle_write (self ):
283- with mininode_lock :
284- # asyncore does not expose socket connection, only the first read/write
285- # event, thus we must check connection manually here to know when we
286- # actually connect
287- if self .state == "connecting" :
288- self .handle_connect ()
289- if not self .writable ():
290- return
291-
292- try :
293- sent = self .send (self .sendbuf )
294- except :
295- self .handle_close ()
296- return
297- self .sendbuf = self .sendbuf [sent :]
298-
299276 def got_data (self ):
300277 try :
301278 while True :
302279 if len (self .recvbuf ) < 4 :
303280 return
304- if self .recvbuf [:4 ] != self . MAGIC_BYTES [self .network ]:
281+ if self .recvbuf [:4 ] != MAGIC_BYTES [self .network ]:
305282 raise ValueError ("got garbage %s" % repr (self .recvbuf ))
306283 if len (self .recvbuf ) < 4 + 12 + 4 + 4 :
307284 return
@@ -316,23 +293,54 @@ def got_data(self):
316293 if checksum != h [:4 ]:
317294 raise ValueError ("got bad checksum " + repr (self .recvbuf ))
318295 self .recvbuf = self .recvbuf [4 + 12 + 4 + 4 + msglen :]
319- if command not in self . messagemap :
296+ if command not in MESSAGEMAP :
320297 raise ValueError ("Received unknown command from %s:%d: '%s' %s" % (self .dstaddr , self .dstport , command , repr (msg )))
321298 f = BytesIO (msg )
322- t = self . messagemap [command ]()
299+ t = MESSAGEMAP [command ]()
323300 t .deserialize (f )
324301 self .got_message (t )
325302 except Exception as e :
326303 logger .exception ('Error reading message:' , repr (e ))
327304 raise
328305
306+ def got_message (self , message ):
307+ if self .last_sent + 30 * 60 < time .time ():
308+ self .send_message (MESSAGEMAP [b'ping' ]())
309+ self ._log_message ("receive" , message )
310+ self .cb .deliver (self , message )
311+
312+ # Socket write methods
313+
314+ def writable (self ):
315+ with mininode_lock :
316+ pre_connection = self .state == "connecting"
317+ length = len (self .sendbuf )
318+ return (length > 0 or pre_connection )
319+
320+ def handle_write (self ):
321+ with mininode_lock :
322+ # asyncore does not expose socket connection, only the first read/write
323+ # event, thus we must check connection manually here to know when we
324+ # actually connect
325+ if self .state == "connecting" :
326+ self .handle_connect ()
327+ if not self .writable ():
328+ return
329+
330+ try :
331+ sent = self .send (self .sendbuf )
332+ except :
333+ self .handle_close ()
334+ return
335+ self .sendbuf = self .sendbuf [sent :]
336+
329337 def send_message (self , message , pushbuf = False ):
330338 if self .state != "connected" and not pushbuf :
331339 raise IOError ('Not connected, no pushbuf' )
332340 self ._log_message ("send" , message )
333341 command = message .command
334342 data = message .serialize ()
335- tmsg = self . MAGIC_BYTES [self .network ]
343+ tmsg = MAGIC_BYTES [self .network ]
336344 tmsg += command
337345 tmsg += b"\x00 " * (12 - len (command ))
338346 tmsg += struct .pack ("<I" , len (data ))
@@ -351,11 +359,7 @@ def send_message(self, message, pushbuf=False):
351359 self .sendbuf += tmsg
352360 self .last_sent = time .time ()
353361
354- def got_message (self , message ):
355- if self .last_sent + 30 * 60 < time .time ():
356- self .send_message (self .messagemap [b'ping' ]())
357- self ._log_message ("receive" , message )
358- self .cb .deliver (self , message )
362+ # Class utility methods
359363
360364 def _log_message (self , direction , msg ):
361365 if direction == "send" :
@@ -367,9 +371,19 @@ def _log_message(self, direction, msg):
367371 log_message += "... (msg truncated)"
368372 logger .debug (log_message )
369373
370- def disconnect_node (self ):
371- self .disconnect = True
372374
375+ # Keep our own socket map for asyncore, so that we can track disconnects
376+ # ourselves (to workaround an issue with closing an asyncore socket when
377+ # using select)
378+ mininode_socket_map = dict ()
379+
380+ # One lock for synchronizing all data access between the networking thread (see
381+ # NetworkThread below) and the thread running the test logic. For simplicity,
382+ # NodeConn acquires this lock whenever delivering a message to a NodeConnCB,
383+ # and whenever adding anything to the send buffer (in send_message()). This
384+ # lock should be acquired in the thread running the test logic to synchronize
385+ # access to any data shared with the NodeConnCB or NodeConn.
386+ mininode_lock = RLock ()
373387
374388class NetworkThread (Thread ):
375389 def run (self ):
@@ -381,6 +395,6 @@ def run(self):
381395 for fd , obj in mininode_socket_map .items ():
382396 if obj .disconnect :
383397 disconnected .append (obj )
384- [ obj .handle_close () for obj in disconnected ]
398+ [obj .handle_close () for obj in disconnected ]
385399 asyncore .loop (0.1 , use_poll = True , map = mininode_socket_map , count = 1 )
386400 logger .debug ("Network thread closing" )
0 commit comments