2525{.push raises : [].}
2626
2727import
28- std/ [algorithm, deques, options, os, sequtils, strutils, typetraits],
28+ std/ [algorithm, deques, os, sequtils, strutils, typetraits],
2929 stew/ byteutils,
3030 stew/ shims/ macros,
3131 chronicles,
@@ -145,12 +145,12 @@ chronicles.formatIt(Opt[uint64]):
145145include p2p_backends_helpers
146146
147147proc requestResolver [MsgType](msg: pointer , future: FutureBase ) {.gcsafe .} =
148- var f = Future [Option [MsgType ]](future)
148+ var f = Future [Opt [MsgType ]](future)
149149 if not f.finished:
150150 if msg != nil :
151- f.complete some (cast [ptr MsgType ](msg)[])
151+ f.complete Opt . some (cast [ptr MsgType ](msg)[])
152152 else :
153- f.complete none (MsgType )
153+ f.complete Opt . none (MsgType )
154154
155155proc linkSendFailureToReqFuture [S, R](sendFut: Future [S], resFut: Future [R]) =
156156 sendFut.addCallback do (arg: pointer ):
@@ -186,7 +186,7 @@ proc disconnectAndRaise(
186186 await peer.disconnect (reason)
187187 raisePeerDisconnected (msg, reason)
188188
189- proc handshakeImpl [T](
189+ proc handshakeImpl * [T](
190190 peer: Peer ,
191191 sendFut: Future [void ],
192192 responseFut: auto , # Future[T].Raising([CancelledError, EthP2PError]),
@@ -276,7 +276,7 @@ proc getMsgName*(peer: Peer, msgId: uint64): string =
276276# Protocol info objects
277277#
278278
279- proc initProtocol (
279+ proc initProtocol * (
280280 name: string ,
281281 version: uint64 ,
282282 peerInit: PeerStateInitializer ,
@@ -289,7 +289,7 @@ proc initProtocol(
289289 networkStateInitializer: networkInit,
290290 )
291291
292- proc setEventHandlers (
292+ proc setEventHandlers * (
293293 p: ProtocolInfo ,
294294 onPeerConnected: OnPeerConnectedHandler ,
295295 onPeerDisconnected: OnPeerDisconnectedHandler ,
@@ -344,7 +344,7 @@ proc registerMsg(
344344# Message composition and encryption
345345#
346346
347- proc perPeerMsgIdImpl (peer: Peer , proto: ProtocolInfo , msgId: uint64 ): uint64 =
347+ proc perPeerMsgIdImpl * (peer: Peer , proto: ProtocolInfo , msgId: uint64 ): uint64 =
348348 result = msgId
349349 if not peer.dispatcher.isNil:
350350 result += peer.dispatcher.protocolOffsets[proto.index].value
@@ -634,7 +634,7 @@ proc checkedRlpRead(
634634
635635 raise e
636636
637- proc nextMsg * (
637+ proc nextMsg (
638638 peer: Peer , MsgType: type
639639): Future [MsgType ] {.async : (raises: [CancelledError , EthP2PError ], raw: true ).} =
640640 # # This procs awaits a specific RLPx message.
@@ -1402,3 +1402,224 @@ proc rlpxAccept*(
14021402 rlpx_accept_success.inc ()
14031403
14041404 return peer
1405+
1406+ # ------------------------------------------------------------------------------
1407+ # Mini Protocol DSL
1408+ # ------------------------------------------------------------------------------
1409+
1410+ type
1411+ Responder * = object
1412+ peer* : Peer
1413+ reqId* : uint64
1414+
1415+ proc `$` * (r: Responder ): string =
1416+ $ r.peer & " : " & $ r.reqId
1417+
1418+ template msgIdImpl (PROTO: type ; peer: Peer , methId: uint64 ): uint64 =
1419+ mixin protocolInfo
1420+ perPeerMsgIdImpl (peer, PROTO .protocolInfo, methId)
1421+
1422+ macro countArgs (args: untyped ): untyped =
1423+ var count = 0
1424+ for arg in args:
1425+ let arg = if arg.kind == nnkHiddenStdConv: arg[1 ]
1426+ else : arg
1427+ if arg.kind == nnkArgList:
1428+ for _ in arg:
1429+ inc count
1430+ else :
1431+ inc count
1432+ result = newLit (count)
1433+
1434+ macro appendArgs (writer: untyped , args: untyped ): untyped =
1435+ result = newStmtList ()
1436+ for arg in args:
1437+ let arg = if arg.kind == nnkHiddenStdConv: arg[1 ]
1438+ else : arg
1439+ if arg.kind == nnkArgList:
1440+ for subarg in arg:
1441+ result .add quote do :
1442+ append (`writer`, `subarg`)
1443+ else :
1444+ result .add quote do :
1445+ append (`writer`, `arg`)
1446+
1447+ template rlpxSendMessage * (PROTO: type , peer: Peer , msgId: static [uint64 ], params: varargs [untyped ]): auto =
1448+ let perPeerMsgId = msgIdImpl (PROTO , peer, msgId)
1449+ var writer = initRlpWriter ()
1450+ const paramsLen = countArgs ([params])
1451+ when paramsLen > 1 :
1452+ startList (writer, paramsLen)
1453+ appendArgs (writer, [params])
1454+ let msgBytes = finish (writer)
1455+ sendMsg (peer, perPeerMsgId, msgBytes)
1456+
1457+ template rlpxSendMessage * (PROTO: type , responder: Responder , msgId: static [uint64 ], params: varargs [untyped ]): auto =
1458+ let perPeerMsgId = msgIdImpl (PROTO , responder.peer, msgId)
1459+ var writer = initRlpWriter ()
1460+ const paramsLen = countArgs ([params])
1461+ when paramsLen > 0 :
1462+ startList (writer, paramsLen + 1 )
1463+ append (writer, responder.reqId)
1464+ appendArgs (writer, [params])
1465+ let msgBytes = finish (writer)
1466+ sendMsg (responder.peer, perPeerMsgId, msgBytes)
1467+
1468+ template rlpxSendRequest * (PROTO: type , peer: Peer , msgId: static [uint64 ], params: varargs [untyped ]) =
1469+ let perPeerMsgId = msgIdImpl (PROTO , peer, msgId)
1470+ var writer = initRlpWriter ()
1471+ const paramsLen = countArgs ([params])
1472+ if paramsLen > 0 :
1473+ startList (writer, paramsLen + 1 )
1474+ initFuture result
1475+ let reqId = registerRequest (peer, timeout, result , perPeerMsgId + 1 )
1476+ append (writer, reqId)
1477+ appendArgs (writer, [params])
1478+ let msgBytes = finish (writer)
1479+ linkSendFailureToReqFuture (sendMsg (peer, perPeerMsgId, msgBytes), result )
1480+
1481+ macro checkedRlpFields (peer; rlp; packet; fields): untyped =
1482+ result = newStmtList ()
1483+ for field in fields:
1484+ result .add quote do :
1485+ `packet`.`field` = checkedRlpRead (`peer`, `rlp`, typeof (`packet`.`field`))
1486+
1487+ macro countFields (fields): untyped =
1488+ var count = 0
1489+ for _ in fields:
1490+ inc count
1491+ result = newLit (count)
1492+
1493+ template wrapRlpxWithPacketException (MSGTYPE: type , peer: Peer , body): untyped =
1494+ const
1495+ msgName = astToStr (MSGTYPE )
1496+
1497+ try :
1498+ body
1499+ except rlp.RlpError as exc:
1500+ discard
1501+ warn " TODO: RLP decoding failed for incoming message" ,
1502+ msg = msgName, remote = peer.remote,
1503+ clientId = peer.clientId, err = exc.msg
1504+ await peer.disconnectAndRaise (BreachOfProtocol ,
1505+ " Invalid RLP in parameter list for " & msgName)
1506+
1507+ template rlpxWithPacketHandler * (PROTO: distinct type ;
1508+ MSGTYPE: distinct type ;
1509+ peer: Peer ;
1510+ data: Rlp ,
1511+ fields: untyped ;
1512+ body): untyped =
1513+ const
1514+ numFields = countFields (fields)
1515+
1516+ wrapRlpxWithPacketException (MSGTYPE , peer):
1517+ var
1518+ rlp = data
1519+ packet {.inject .}: MSGTYPE
1520+
1521+ when numFields > 1 :
1522+ tryEnterList (rlp)
1523+
1524+ checkedRlpFields (peer, rlp, packet, fields)
1525+ body
1526+
1527+ template rlpxWithPacketResponder * (PROTO: distinct type ;
1528+ MSGTYPE: distinct type ;
1529+ peer: Peer ;
1530+ data: Rlp ,
1531+ body): untyped =
1532+ wrapRlpxWithPacketException (MSGTYPE , peer):
1533+ var rlp = data
1534+ tryEnterList (rlp)
1535+ let reqId = read (rlp, uint64 )
1536+ var
1537+ response {.inject .} = initResponder (peer, reqId)
1538+ packet {.inject .} = checkedRlpRead (peer, rlp, MSGTYPE )
1539+ body
1540+
1541+ template rlpxWithFutureHandler * (PROTO: distinct type ;
1542+ MSGTYPE: distinct type ;
1543+ msgId: static [uint64 ];
1544+ peer: Peer ;
1545+ data: Rlp ,
1546+ fields: untyped ): untyped =
1547+ wrapRlpxWithPacketException (MSGTYPE , peer):
1548+ var
1549+ rlp = data
1550+ packet: MSGTYPE
1551+
1552+ tryEnterList (rlp)
1553+ let
1554+ reqId = read (rlp, uint64 )
1555+ perPeerMsgId = msgIdImpl (PROTO , peer, msgId)
1556+ checkedRlpFields (peer, rlp, packet, fields)
1557+ resolveResponseFuture (peer,
1558+ perPeerMsgId, addr (packet), reqId)
1559+
1560+
1561+ proc nextMsg * (PROTO: distinct type ,
1562+ peer: Peer ,
1563+ MsgType: distinct type ,
1564+ msgId: static [uint64 ]): Future [MsgType ]
1565+ {.async : (raises: [CancelledError , EthP2PError ], raw: true ).} =
1566+ # # This procs awaits a specific RLPx message.
1567+ # # Any messages received while waiting will be dispatched to their
1568+ # # respective handlers. The designated message handler will also run
1569+ # # to completion before the future returned by `nextMsg` is resolved.
1570+ let wantedId = msgIdImpl (PROTO , peer, msgId)
1571+ let f = peer.awaitedMessages[wantedId]
1572+ if not f.isNil:
1573+ return Future [MsgType ].Raising ([CancelledError , EthP2PError ])(f)
1574+
1575+ initFuture result
1576+ peer.awaitedMessages[wantedId] = result
1577+
1578+ template registerMsg * (protocol: ProtocolInfo ,
1579+ msgId: static [uint64 ],
1580+ msgName: static [string ],
1581+ msgThunk: untyped ,
1582+ MsgType: type ) =
1583+ registerMsg (protocol,
1584+ msgId,
1585+ msgName,
1586+ msgThunk,
1587+ messagePrinter[MsgType ],
1588+ requestResolver[MsgType ],
1589+ nextMsgResolver[MsgType ],
1590+ failResolver[MsgType ])
1591+
1592+ func initResponder * (peer: Peer , reqId: uint64 ): Responder =
1593+ Responder (peer: peer, reqId: reqId)
1594+
1595+ template state * (response: Responder , PROTO: type ): auto =
1596+ state (response.peer, PROTO )
1597+
1598+ template networkState * (response: Responder , PROTO: type ): auto =
1599+ networkState (response.peer, PROTO )
1600+
1601+ template defineProtocol * (PROTO: untyped ,
1602+ version: static [int ],
1603+ rlpxName: static [string ],
1604+ peerState: distinct type ,
1605+ networkState: distinct type ) =
1606+ type
1607+ PROTO * = object
1608+
1609+ const
1610+ PROTOIndex = getProtocolIndex ()
1611+
1612+ template protocolInfo * (_: type PROTO ): auto =
1613+ getProtocol (PROTOIndex )
1614+
1615+ template State * (_: type PROTO ): type =
1616+ peerState
1617+
1618+ template NetworkState * (_: type PROTO ): type =
1619+ networkState
1620+
1621+ func initProtocol * (_: type PROTO ): auto =
1622+ initProtocol (rlpxName,
1623+ version,
1624+ createPeerState[Peer , peerState],
1625+ createNetworkState[EthereumNode , networkState])
0 commit comments