Skip to content

Commit 150ffaa

Browse files
authored
Beacon sync reimplement stand by mode (#3875)
* Prevent scheduler race condition when fetching `alien` peers why Other peer descriptors must not be used when the peer is about to terminate. This will be important when accessed by `snap` sync for occasionally fetching data via `eth`. * Update conditional trace logging in sync scheduler details Just leaving it as trace. There are not too many instances when running sort of normal. * Implement stand-by-mode direcly into the sync scheduler why This removes complexity from the beacon sync app state machine. The stand-by mode as provided by the scheduler holds back running the daemon task and the peer tasks while start()/stop() works normal. So stand-by mode is not to be seen a variation of the beacon sync app `idle` state. * Update beacon sync app for new stand-by mode implementation * Code cosmetics, update logging
1 parent a4e9ac3 commit 150ffaa

File tree

16 files changed

+167
-136
lines changed

16 files changed

+167
-136
lines changed

execution_chain/sync/beacon.nim

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -119,29 +119,18 @@ proc configTarget*(desc: BeaconSyncRef; hex: string; isFinal: bool): bool =
119119

120120
# -----------------
121121

122-
proc activate*(desc: BeaconSyncRef) =
123-
## Clear stand-by mode (if any)
124-
doAssert not desc.ctx.isNil
125-
if desc.ctx.pool.syncState == SyncState.standByMode:
126-
desc.ctx.pool.syncState = SyncState.idle
127-
128-
proc start*(desc: BeaconSyncRef; standByMode = false): bool =
122+
proc start*(desc: BeaconSyncRef; standBy = false): bool =
129123
## This function returns `true` exactly if the run state could be changed.
130124
## The following expressions are equivalent:
131125
## * desc.start(true)
132126
## * desc.start(false) and desc.start(true)
133127
##
134128
doAssert not desc.ctx.isNil
135-
if desc.isRunning:
136-
# Correct state to stand-by mode if possible
137-
if standByMode and desc.ctx.pool.syncState == SyncState.idle:
138-
desc.ctx.pool.syncState = SyncState.standByMode
139-
return true
140-
else:
141-
if standByMode:
142-
desc.ctx.pool.syncState = SyncState.standByMode
143-
if desc.startSync():
144-
return true
129+
let save = desc.ctx.pool.standByMode
130+
desc.ctx.pool.standByMode = standBy # the ticker sees this when starting
131+
if desc.startSync(standBy):
132+
return true
133+
desc.ctx.pool.standByMode = save
145134
# false
146135

147136
proc stop*(desc: BeaconSyncRef) {.async.} =

execution_chain/sync/beacon/worker.nim

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ proc release*(ctx: BeaconCtxRef; info: static[string]) =
3636
proc start*(buddy: BeaconPeerRef; info: static[string]): bool =
3737
## Initialise worker peer
3838
let
39-
peer = buddy.peer
39+
peer {.inject,used.} = $buddy.peer # logging only
4040
ctx = buddy.ctx
4141

4242
if not ctx.pool.seenData and buddy.peerID in ctx.pool.failedPeers:
@@ -53,8 +53,8 @@ proc start*(buddy: BeaconPeerRef; info: static[string]): bool =
5353

5454
proc stop*(buddy: BeaconPeerRef; info: static[string]) =
5555
## Clean up this peer
56-
if not buddy.ctx.hibernate: debug info & ": release peer", peer=buddy.peer,
57-
thPut=buddy.only.thPutStats.toMeanVar.toStr,
56+
if not buddy.ctx.hibernate: debug info & ": release peer",
57+
peer=buddy.peer, thPut=buddy.only.thPutStats.toMeanVar.toStr,
5858
nSyncPeers=(buddy.ctx.nSyncPeers()-1), state=($buddy.syncState)
5959
buddy.stopSyncPeer()
6060

@@ -146,8 +146,7 @@ template runPeer*(
146146
if buddy.somethingToCollectOrUnstage():
147147

148148
trace info & ": start processing", peer=buddy.peer,
149-
thPut=buddy.only.thPutStats.toMeanVar.toStr,
150-
rankInfo=($rank.assessed),
149+
thPut=buddy.only.thPutStats.toMeanVar.toStr, rankInfo=($rank.assessed),
151150
rank=(if rank.ranking < 0: "n/a" else: $rank.ranking),
152151
nSyncPeers=buddy.ctx.nSyncPeers(), state=($buddy.syncState)
153152

@@ -203,10 +202,6 @@ template runPeer*(
203202
bodyRc = workerIdleLongWaitInterval
204203
break body
205204

206-
elif buddy.ctx.pool.syncState == SyncState.standByMode:
207-
bodyRc = workerIdleLongWaitInterval
208-
break body
209-
210205
# Idle sleep unless there is something to do
211206
if not buddy.somethingToCollectOrUnstage():
212207
bodyRc = workerIdleWaitInterval

execution_chain/sync/beacon/worker/blocks.nim

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ template blocksCollect*(
5757
##
5858
let
5959
ctx = buddy.ctx
60-
peer = buddy.peer
60+
peer = $buddy.peer # logging only
6161

6262
block body:
6363
if ctx.blocksUnprocIsEmpty():
@@ -234,7 +234,7 @@ template blocksUnstage*(
234234
break body # return false => switch peer
235235

236236
var
237-
peer {.inject.} = buddy.peer
237+
peer {.inject,used.} = $buddy.peer # logging only
238238
nImported {.inject.} = 0u64 # statistics
239239
nUnstaged {.inject.} = 0 # ditto
240240
importedOK = false # imported some blocks

execution_chain/sync/beacon/worker/blocks/blocks_blocks.nim

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ template blocksFetchCheckImpl(
5757
let
5858
ctx = buddy.ctx
5959
iv {.inject,used.} = iv
60-
peer {.inject,used.} = buddy.peer
60+
peer {.inject,used.} = $buddy.peer # logging only
6161

6262
# Preset headers to be completed with bodies. Also collect block
6363
# hashes for fetching missing blocks.
@@ -197,7 +197,7 @@ template blocksImport*(
197197
block body:
198198
let
199199
ctx = buddy.ctx
200-
peer = buddy.peer
200+
peer {.inject,used.} = $buddy.peer # logging only
201201
iv {.inject.} =
202202
BnRange.new(blocks[0].header.number, blocks[^1].header.number)
203203
doAssert iv.len == blocks.len.uint64

execution_chain/sync/beacon/worker/blocks/blocks_fetch.nim

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,19 @@ proc maybeSlowPeerError(
4040

4141
# false
4242

43+
func errStr(rc: Result[FetchBodiesData,BeaconError]): string =
44+
if rc.isErr:
45+
result = $rc.error.excp
46+
if 0 < rc.error.name.len:
47+
result &= "(" & rc.error.name & ")"
48+
else:
49+
result = "n/a"
4350

44-
proc getBlockBodies*(
51+
# ------------------------------------------------------------------------------
52+
# Private function(s)
53+
# ------------------------------------------------------------------------------
54+
55+
proc getBlockBodies(
4556
buddy: BeaconPeerRef;
4657
req: BlockBodiesRequest;
4758
): Future[Result[FetchBodiesData,BeaconError]]
@@ -73,7 +84,7 @@ proc getBlockBodies*(
7384
return ok((move resp, Moment.now()-start))
7485

7586
# ------------------------------------------------------------------------------
76-
# Public functions
87+
# Public function(s)
7788
# ------------------------------------------------------------------------------
7889

7990
template fetchBodies*(
@@ -90,7 +101,7 @@ template fetchBodies*(
90101
sendInfo = trEthSendSendingGetBlockBodies
91102
recvInfo = trEthRecvReceivedBlockBodies
92103
let
93-
peer {.inject,used.} = buddy.peer
104+
peer {.inject,used.} = $buddy.peer # logging only
94105
nReq {.inject,used.} = request.blockHashes.len
95106

96107
if request.blockHashes.len == 0:
@@ -123,14 +134,12 @@ template fetchBodies*(
123134
of EAlreadyTriedAndFailed:
124135
trace recvInfo & " error", peer, startHash=startHash.short, nReq,
125136
ela=rc.error.elapsed.toStr, state=($buddy.syncState),
126-
error=rc.error.excp, nErrors=buddy.nErrors.fetch.bdy
137+
error=rc.errStr, nErrors=buddy.nErrors.fetch.bdy
127138
break body # return err()
128139

129140
# Debug message for other errors
130141
debug recvInfo & " error", peer, startHash=startHash.short, nReq,
131-
ela=elapsed.toStr, state=($buddy.syncState),
132-
error=($rc.error.excp & (if rc.error.name.len == 0: ""
133-
else: "(" & rc.error.name & ")")),
142+
ela=elapsed.toStr, state=($buddy.syncState), error=rc.errStr,
134143
msg=rc.error.msg, nErrors=buddy.nErrors.fetch.bdy
135144
break body # return err()
136145

@@ -140,8 +149,7 @@ template fetchBodies*(
140149
buddy.bdyFetchRegisterError()
141150
trace recvInfo & " error", peer, startHash=startHash.short, nReq,
142151
ela=elapsed.toStr, state=($buddy.syncState),
143-
error=(if rc.isErr: $rc.error.excp else: "n/a"),
144-
nErrors=buddy.nErrors.fetch.bdy
152+
error=rc.errStr, nErrors=buddy.nErrors.fetch.bdy
145153
break body # return err()
146154

147155
# Verify the correct number of block bodies received

execution_chain/sync/beacon/worker/blocks/blocks_import.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ proc importBlock*(
3030
let
3131
start = Moment.now()
3232
ctx = buddy.ctx
33-
peer = buddy.peer
33+
peer {.inject,used.} = $buddy.peer # logging only
3434

3535
if blk.header.number <= ctx.chain.baseNumber:
3636
trace "Ignoring block less eq. base", peer, blk=blk.header.number,

execution_chain/sync/beacon/worker/headers.nim

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ template headersCollect*(buddy: BeaconPeerRef; info: static[string]) =
5757
block body:
5858
let
5959
ctx = buddy.ctx
60-
peer {.inject,used.} = buddy.peer
60+
peer {.inject,used.} = $buddy.peer # logging only
6161

6262
if ctx.headersUnprocIsEmpty() or
6363
ctx.hdrCache.state != collecting:
@@ -229,7 +229,7 @@ proc headersUnstage*(buddy: BeaconPeerRef; info: static[string]): bool =
229229
##
230230
let
231231
ctx = buddy.ctx
232-
peer = buddy.peer
232+
peer {.inject,used.} = $buddy.peer # logging only
233233

234234
if ctx.hdr.staged.len == 0:
235235
return false # switch peer

execution_chain/sync/beacon/worker/headers/headers_fetch.nim

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,19 @@ proc maybeSlowPeerError(
4040

4141
# false
4242

43+
func errStr(rc: Result[FetchHeadersData,BeaconError]): string =
44+
if rc.isErr:
45+
result = $rc.error.excp
46+
if 0 < rc.error.name.len:
47+
result &= "(" & rc.error.name & ")"
48+
else:
49+
result = "n/a"
50+
4351
# ------------------------------------------------------------------------------
44-
# Public handler
52+
# Private function(s)
4553
# ------------------------------------------------------------------------------
4654

47-
proc getBlockHeaders*(
55+
proc getBlockHeaders(
4856
buddy: BeaconPeerRef;
4957
req: BlockHeadersRequest;
5058
bn: BlockNumber;
@@ -77,7 +85,7 @@ proc getBlockHeaders*(
7785
return ok((move resp, Moment.now()-start))
7886

7987
# ------------------------------------------------------------------------------
80-
# Public function
88+
# Public function(s)
8189
# ------------------------------------------------------------------------------
8290

8391
template fetchHeadersReversed*(
@@ -96,7 +104,7 @@ template fetchHeadersReversed*(
96104
sendInfo = trEthSendSendingGetBlockHeaders
97105
recvInfo = trEthRecvReceivedBlockHeaders
98106
let
99-
peer {.inject,used.} = buddy.peer
107+
peer {.inject,used.} = $buddy.peer # logging only
100108
req = block:
101109
if topHash != emptyRoot:
102110
BlockHeadersRequest(
@@ -137,15 +145,13 @@ template fetchHeadersReversed*(
137145
of EAlreadyTriedAndFailed:
138146
trace recvInfo & " error", peer, req=ivReq, nReq=req.maxResults,
139147
hash=topHash.toStr, ela=elapsed.toStr, state=($buddy.syncState),
140-
error=rc.error.excp, nErrors=buddy.nErrors.fetch.hdr
148+
error=rc.errStr, nErrors=buddy.nErrors.fetch.hdr
141149
break body # return err()
142150

143151
# Debug message for other errors
144152
debug recvInfo & " error", peer, req=ivReq, nReq=req.maxResults,
145153
hash=topHash.toStr, ela=elapsed.toStr, state=($buddy.syncState),
146-
error=($rc.error.excp & (if rc.error.name.len == 0: ""
147-
else: "(" & rc.error.name & ")")),
148-
msg=rc.error.msg, nErrors=buddy.nErrors.fetch.hdr
154+
error=rc.errStr, msg=rc.error.msg, nErrors=buddy.nErrors.fetch.hdr
149155
break body # return err()
150156

151157
# Evaluate result
@@ -154,8 +160,7 @@ template fetchHeadersReversed*(
154160
buddy.hdrFetchRegisterError()
155161
trace recvInfo & " error", peer, nReq=req.maxResults, hash=topHash.toStr,
156162
nResp=0, ela=elapsed.toStr, state=($buddy.syncState),
157-
error=(if rc.isErr: $rc.error.excp else: "n/a"),
158-
nErrors=buddy.nErrors.fetch.hdr
163+
error=rc.errStr, nErrors=buddy.nErrors.fetch.hdr
159164
break body # return err()
160165

161166
# Verify the correct number of block headers received

execution_chain/sync/beacon/worker/headers/headers_headers.nim

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ template headersFetch*(
3737
##
3838
let
3939
ctx = buddy.ctx
40-
peer = buddy.peer
40+
peer {.inject,used.} = $buddy.peer # logging only
4141

4242
var bodyRc = Opt[seq[Header]].err()
4343
block body:
@@ -95,7 +95,7 @@ proc headersStashOnDisk*(
9595
## failure, this function returns the number of headers stored.
9696
let
9797
ctx = buddy.ctx
98-
peer = buddy.peer
98+
peer {.inject,used.} = $buddy.peer # logging only
9999
dTop = ctx.hdrCache.antecedent.number # current antecedent
100100
rc = ctx.hdrCache.put(revHdrs) # verify and save headers
101101

execution_chain/sync/beacon/worker/headers/headers_target.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ template headersTargetActivate*(
6363
break body # return
6464

6565
let
66-
peer {.inject.} = buddy.peer
66+
peer {.inject,used.} = $buddy.peer # logging only
6767
trg = ctx.pool.initTarget.unsafeGet
6868

6969
# Require minimum of sync peers

0 commit comments

Comments
 (0)