Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backfiller #3263

Merged
merged 4 commits into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions beacon_chain/beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type
vcProcess*: Process
requestManager*: RequestManager
syncManager*: SyncManager[Peer, PeerID]
backfiller*: SyncManager[Peer, PeerID]
genesisSnapshotContent*: string
actionTracker*: ActionTracker
processor*: ref Eth2Processor
Expand Down
2 changes: 0 additions & 2 deletions beacon_chain/conf.nim
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,10 @@ type
name: "weak-subjectivity-checkpoint" }: Option[Checkpoint]

finalizedCheckpointState* {.
hidden # TODO unhide when backfilling is done
desc: "SSZ file specifying a recent finalized state"
name: "finalized-checkpoint-state" }: Option[InputFile]

finalizedCheckpointBlock* {.
hidden # TODO unhide when backfilling is done
desc: "SSZ file specifying a recent finalized block"
name: "finalized-checkpoint-block" }: Option[InputFile]

Expand Down
3 changes: 3 additions & 0 deletions beacon_chain/consensus_object_pools/blockchain_dag.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1664,3 +1664,6 @@ proc getBlockSSZ*(dag: ChainDAGRef, id: BlockId, bytes: var seq[byte]): bool =
dag.db.getAltairBlockSSZ(id.root, bytes)
of BeaconBlockFork.Bellatrix:
dag.db.getMergeBlockSSZ(id.root, bytes)

func needsBackfill*(dag: ChainDAGRef): bool =
dag.backfill.slot > dag.genesis.slot
42 changes: 29 additions & 13 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -467,9 +467,13 @@ proc init*(T: type BeaconNode,
validatorPool, syncCommitteeMsgPool, quarantine, rng, getBeaconTime,
taskpool)
syncManager = newSyncManager[Peer, PeerID](
network.peerPool, SyncQueueKind.Forward, getLocalHeadSlot, getLocalWallSlot,
getFirstSlotAtFinalizedEpoch, getBackfillSlot, dag.tail.slot,
blockVerifier)
network.peerPool, SyncQueueKind.Forward, getLocalHeadSlot,
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
dag.tail.slot, blockVerifier)
backfiller = newSyncManager[Peer, PeerID](
network.peerPool, SyncQueueKind.Backward, getLocalHeadSlot,
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
dag.backfill.slot, blockVerifier, maxHeadAge = 0)

let stateTtlCache = if config.restCacheSize > 0:
StateTtlCache.init(
Expand Down Expand Up @@ -500,6 +504,7 @@ proc init*(T: type BeaconNode,
eventBus: eventBus,
requestManager: RequestManager.init(network, blockVerifier),
syncManager: syncManager,
backfiller: backfiller,
actionTracker: ActionTracker.init(rng, config.subscribeAllSubnets),
processor: processor,
blockProcessor: blockProcessor,
Expand Down Expand Up @@ -917,6 +922,11 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
# above, this will be done just before the next slot starts
await node.updateGossipStatus(slot + 1)

proc syncStatus(node: BeaconNode): string =
if node.syncManager.inProgress: node.syncManager.syncStatus
elif node.backfiller.inProgress: "backfill: " & node.backfiller.syncStatus
else: "synced"

proc onSlotStart(
node: BeaconNode, wallTime: BeaconTime, lastSlot: Slot) {.async.} =
## Called at the beginning of a slot - usually every slot, but sometimes might
Expand All @@ -936,9 +946,7 @@ proc onSlotStart(
info "Slot start",
slot = shortLog(wallSlot),
epoch = shortLog(wallSlot.epoch),
sync =
if node.syncManager.inProgress: node.syncManager.syncStatus
else: "synced",
sync = node.syncStatus(),
peers = len(node.network.peerPool),
head = shortLog(node.dag.head),
finalized = shortLog(getStateField(
Expand Down Expand Up @@ -1127,6 +1135,18 @@ proc stop(node: BeaconNode) =
node.db.close()
notice "Databases closed"

proc startBackfillTask(node: BeaconNode) {.async.} =
while node.dag.needsBackfill:
if not node.syncManager.inProgress:
# Only start the backfiller if it's needed _and_ head sync has completed -
# if we lose sync after having synced head, we could stop the backfilller,
# but this should be a fringe case - might as well keep the logic simple for
# now
node.backfiller.start()
return

await sleepAsync(chronos.seconds(2))

proc run(node: BeaconNode) {.raises: [Defect, CatchableError].} =
bnStatus = BeaconNodeStatus.Running

Expand All @@ -1150,6 +1170,8 @@ proc run(node: BeaconNode) {.raises: [Defect, CatchableError].} =
node.requestManager.start()
node.syncManager.start()

if node.dag.needsBackfill(): asyncSpawn node.startBackfillTask()

waitFor node.updateGossipStatus(wallSlot)

asyncSpawn runSlotLoop(node, wallTime, onSlotStart)
Expand Down Expand Up @@ -1327,13 +1349,7 @@ proc initStatusBar(node: BeaconNode) {.raises: [Defect, ValueError].} =
formatGwei(node.attachedValidatorBalanceTotal)

of "sync_status":
if isNil(node.syncManager):
"pending"
else:
if node.syncManager.inProgress:
node.syncManager.syncStatus
else:
"synced"
node.syncStatus()
else:
# We ignore typos for now and just render the expression
# as it was written. TODO: come up with a good way to show
Expand Down
Loading