@@ -77,14 +77,20 @@ def __init__(self):
7777
7878 super ().__init__ (map = mininode_socket_map )
7979
80+ self ._conn_open = False
81+
82+ @property
83+ def is_connected (self ):
84+ return self ._conn_open
85+
8086 def peer_connect (self , dstaddr , dstport , net = "regtest" ):
8187 self .dstaddr = dstaddr
8288 self .dstport = dstport
8389 self .create_socket (socket .AF_INET , socket .SOCK_STREAM )
8490 self .socket .setsockopt (socket .IPPROTO_TCP , socket .TCP_NODELAY , 1 )
8591 self .sendbuf = b""
8692 self .recvbuf = b""
87- self .state = "connecting"
93+ self ._asyncore_pre_connection = True
8894 self .network = net
8995 self .disconnect = False
9096
@@ -97,22 +103,23 @@ def peer_connect(self, dstaddr, dstport, net="regtest"):
97103
98104 def peer_disconnect (self ):
99105 # Connection could have already been closed by other end.
100- if self .state == "connected" :
101- self .disconnect_node ()
106+ if self .is_connected :
107+ self .disconnect = True # Signal asyncore to disconnect
102108
103109 # Connection and disconnection methods
104110
105111 def handle_connect (self ):
106112 """asyncore callback when a connection is opened."""
107- if self .state != "connected" :
113+ if not self .is_connected :
108114 logger .debug ("Connected & Listening: %s:%d" % (self .dstaddr , self .dstport ))
109- self .state = "connected"
115+ self ._conn_open = True
116+ self ._asyncore_pre_connection = False
110117 self .on_open ()
111118
112119 def handle_close (self ):
113120 """asyncore callback when a connection is closed."""
114121 logger .debug ("Closing connection to: %s:%d" % (self .dstaddr , self .dstport ))
115- self .state = "closed"
122+ self ._conn_open = False
116123 self .recvbuf = b""
117124 self .sendbuf = b""
118125 try :
@@ -121,13 +128,6 @@ def handle_close(self):
121128 pass
122129 self .on_close ()
123130
124- def disconnect_node (self ):
125- """Disconnect the p2p connection.
126-
127- Called by the test logic thread. Causes the p2p connection
128- to be disconnected on the next iteration of the asyncore loop."""
129- self .disconnect = True
130-
131131 # Socket read methods
132132
133133 def handle_read (self ):
@@ -182,17 +182,16 @@ def on_message(self, message):
182182 def writable (self ):
183183 """asyncore method to determine whether the handle_write() callback should be called on the next loop."""
184184 with mininode_lock :
185- pre_connection = self .state == "connecting"
186185 length = len (self .sendbuf )
187- return ( length > 0 or pre_connection )
186+ return length > 0 or self . _asyncore_pre_connection
188187
189188 def handle_write (self ):
190189 """asyncore callback when data should be written to the socket."""
191190 with mininode_lock :
192191 # asyncore does not expose socket connection, only the first read/write
193192 # event, thus we must check connection manually here to know when we
194193 # actually connect
195- if self .state == "connecting" :
194+ if self ._asyncore_pre_connection :
196195 self .handle_connect ()
197196 if not self .writable ():
198197 return
@@ -204,26 +203,17 @@ def handle_write(self):
204203 return
205204 self .sendbuf = self .sendbuf [sent :]
206205
207- def send_message (self , message , pushbuf = False ):
206+ def send_message (self , message ):
208207 """Send a P2P message over the socket.
209208
210209 This method takes a P2P payload, builds the P2P header and adds
211210 the message to the send buffer to be sent over the socket."""
212- if self . state != "connected" and not pushbuf :
213- raise IOError ('Not connected, no pushbuf ' )
211+ if not self . is_connected :
212+ raise IOError ('Not connected' )
214213 self ._log_message ("send" , message )
215- command = message .command
216- data = message .serialize ()
217- tmsg = MAGIC_BYTES [self .network ]
218- tmsg += command
219- tmsg += b"\x00 " * (12 - len (command ))
220- tmsg += struct .pack ("<I" , len (data ))
221- th = sha256 (data )
222- h = sha256 (th )
223- tmsg += h [:4 ]
224- tmsg += data
214+ tmsg = self ._build_message (message )
225215 with mininode_lock :
226- if ( len (self .sendbuf ) == 0 and not pushbuf ) :
216+ if len (self .sendbuf ) == 0 :
227217 try :
228218 sent = self .send (tmsg )
229219 self .sendbuf = tmsg [sent :]
@@ -234,6 +224,20 @@ def send_message(self, message, pushbuf=False):
234224
235225 # Class utility methods
236226
227+ def _build_message (self , message ):
228+ """Build a serialized P2P message"""
229+ command = message .command
230+ data = message .serialize ()
231+ tmsg = MAGIC_BYTES [self .network ]
232+ tmsg += command
233+ tmsg += b"\x00 " * (12 - len (command ))
234+ tmsg += struct .pack ("<I" , len (data ))
235+ th = sha256 (data )
236+ h = sha256 (th )
237+ tmsg += h [:4 ]
238+ tmsg += data
239+ return tmsg
240+
237241 def _log_message (self , direction , msg ):
238242 """Logs a message being sent or received over the connection."""
239243 if direction == "send" :
@@ -280,7 +284,7 @@ def peer_connect(self, *args, services=NODE_NETWORK|NODE_WITNESS, send_version=T
280284 vt .addrTo .port = self .dstport
281285 vt .addrFrom .ip = "0.0.0.0"
282286 vt .addrFrom .port = 0
283- self .send_message (vt , True )
287+ self .sendbuf = self . _build_message (vt ) # Will be sent right after handle_connect
284288
285289 # Message receiving methods
286290
@@ -348,7 +352,7 @@ def on_version(self, message):
348352 # Connection helper methods
349353
350354 def wait_for_disconnect (self , timeout = 60 ):
351- test_function = lambda : self .state != "connected"
355+ test_function = lambda : not self .is_connected
352356 wait_until (test_function , timeout = timeout , lock = mininode_lock )
353357
354358 # Message receiving helper methods
0 commit comments