Skip to content

Commit

Permalink
Backfiller (#3263)
Browse files Browse the repository at this point in the history
Backfilling is the process of downloading historical blocks via P2P that
are required to fulfill `GetBlocksByRange` duties - this happens during
both trusted node and finalized checkpoint syncs.

In particular, backfilling happens after syncing to head, such that
attestation work can start as soon as possible.

* Fix SyncQueue initialization procedure.
Remove usage of `awaitne`.
Add cancellation support.
Remove unneeded `sleepAsync()` if peer's head is older than needed.
Add `direction` field to all logs.
Fix syncmanager wedge issue.
Add proper resource cleaning procedure on backward sync finish.

Co-authored-by: cheatfate <[email protected]>
  • Loading branch information
arnetheduck and cheatfate authored Jan 20, 2022
1 parent e493953 commit 570379d
Show file tree
Hide file tree
Showing 6 changed files with 341 additions and 214 deletions.
1 change: 1 addition & 0 deletions beacon_chain/beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type
vcProcess*: Process
requestManager*: RequestManager
syncManager*: SyncManager[Peer, PeerID]
backfiller*: SyncManager[Peer, PeerID]
genesisSnapshotContent*: string
actionTracker*: ActionTracker
processor*: ref Eth2Processor
Expand Down
2 changes: 0 additions & 2 deletions beacon_chain/conf.nim
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,10 @@ type
name: "weak-subjectivity-checkpoint" }: Option[Checkpoint]

finalizedCheckpointState* {.
hidden # TODO unhide when backfilling is done
desc: "SSZ file specifying a recent finalized state"
name: "finalized-checkpoint-state" }: Option[InputFile]

finalizedCheckpointBlock* {.
hidden # TODO unhide when backfilling is done
desc: "SSZ file specifying a recent finalized block"
name: "finalized-checkpoint-block" }: Option[InputFile]

Expand Down
3 changes: 3 additions & 0 deletions beacon_chain/consensus_object_pools/blockchain_dag.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1664,3 +1664,6 @@ proc getBlockSSZ*(dag: ChainDAGRef, id: BlockId, bytes: var seq[byte]): bool =
dag.db.getAltairBlockSSZ(id.root, bytes)
of BeaconBlockFork.Bellatrix:
dag.db.getMergeBlockSSZ(id.root, bytes)

func needsBackfill*(dag: ChainDAGRef): bool =
dag.backfill.slot > dag.genesis.slot
42 changes: 29 additions & 13 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -467,9 +467,13 @@ proc init*(T: type BeaconNode,
validatorPool, syncCommitteeMsgPool, quarantine, rng, getBeaconTime,
taskpool)
syncManager = newSyncManager[Peer, PeerID](
network.peerPool, SyncQueueKind.Forward, getLocalHeadSlot, getLocalWallSlot,
getFirstSlotAtFinalizedEpoch, getBackfillSlot, dag.tail.slot,
blockVerifier)
network.peerPool, SyncQueueKind.Forward, getLocalHeadSlot,
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
dag.tail.slot, blockVerifier)
backfiller = newSyncManager[Peer, PeerID](
network.peerPool, SyncQueueKind.Backward, getLocalHeadSlot,
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
dag.backfill.slot, blockVerifier, maxHeadAge = 0)

let stateTtlCache = if config.restCacheSize > 0:
StateTtlCache.init(
Expand Down Expand Up @@ -500,6 +504,7 @@ proc init*(T: type BeaconNode,
eventBus: eventBus,
requestManager: RequestManager.init(network, blockVerifier),
syncManager: syncManager,
backfiller: backfiller,
actionTracker: ActionTracker.init(rng, config.subscribeAllSubnets),
processor: processor,
blockProcessor: blockProcessor,
Expand Down Expand Up @@ -917,6 +922,11 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
# above, this will be done just before the next slot starts
await node.updateGossipStatus(slot + 1)

proc syncStatus(node: BeaconNode): string =
if node.syncManager.inProgress: node.syncManager.syncStatus
elif node.backfiller.inProgress: "backfill: " & node.backfiller.syncStatus
else: "synced"

proc onSlotStart(
node: BeaconNode, wallTime: BeaconTime, lastSlot: Slot) {.async.} =
## Called at the beginning of a slot - usually every slot, but sometimes might
Expand All @@ -936,9 +946,7 @@ proc onSlotStart(
info "Slot start",
slot = shortLog(wallSlot),
epoch = shortLog(wallSlot.epoch),
sync =
if node.syncManager.inProgress: node.syncManager.syncStatus
else: "synced",
sync = node.syncStatus(),
peers = len(node.network.peerPool),
head = shortLog(node.dag.head),
finalized = shortLog(getStateField(
Expand Down Expand Up @@ -1127,6 +1135,18 @@ proc stop(node: BeaconNode) =
node.db.close()
notice "Databases closed"

proc startBackfillTask(node: BeaconNode) {.async.} =
while node.dag.needsBackfill:
if not node.syncManager.inProgress:
# Only start the backfiller if it's needed _and_ head sync has completed -
# if we lose sync after having synced head, we could stop the backfilller,
# but this should be a fringe case - might as well keep the logic simple for
# now
node.backfiller.start()
return

await sleepAsync(chronos.seconds(2))

proc run(node: BeaconNode) {.raises: [Defect, CatchableError].} =
bnStatus = BeaconNodeStatus.Running

Expand All @@ -1150,6 +1170,8 @@ proc run(node: BeaconNode) {.raises: [Defect, CatchableError].} =
node.requestManager.start()
node.syncManager.start()

if node.dag.needsBackfill(): asyncSpawn node.startBackfillTask()

waitFor node.updateGossipStatus(wallSlot)

asyncSpawn runSlotLoop(node, wallTime, onSlotStart)
Expand Down Expand Up @@ -1327,13 +1349,7 @@ proc initStatusBar(node: BeaconNode) {.raises: [Defect, ValueError].} =
formatGwei(node.attachedValidatorBalanceTotal)

of "sync_status":
if isNil(node.syncManager):
"pending"
else:
if node.syncManager.inProgress:
node.syncManager.syncStatus
else:
"synced"
node.syncStatus()
else:
# We ignore typos for now and just render the expression
# as it was written. TODO: come up with a good way to show
Expand Down
Loading

0 comments on commit 570379d

Please sign in to comment.