Skip to content

Commit

Permalink
Trusted node sync
Browse files Browse the repository at this point in the history
Trusted node sync, aka checkpoint sync, allows syncing tyhe chain from a
trusted node instead of relying on a full sync from genesis.

Features include:

* sync from any slot, including the latest finalized slot
* backfill blocks either from the REST api (default) or p2p
* resume backfilling either while running the node as normal or offline

Future improvements:

* top up blocks between head in database and some other node - this
makes for an efficient backup tool
* recreate historical state to enable historical queries

Assorted fixes:

* query remote peers for status only once per sync step
* fix peer status check in backward sync mode
* fix several off-by-ones in backward sync
* use common forked block/state reader in REST API
* disable JSON state readed to avoid the risk of stack overflows
  • Loading branch information
arnetheduck committed Dec 21, 2021
1 parent 0d4e49f commit 6597155
Show file tree
Hide file tree
Showing 16 changed files with 682 additions and 253 deletions.
1 change: 1 addition & 0 deletions beacon_chain/beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type
vcProcess*: Process
requestManager*: RequestManager
syncManager*: SyncManager[Peer, PeerID]
backfiller*: SyncManager[Peer, PeerID]
genesisSnapshotContent*: string
actionTracker*: ActionTracker
processor*: ref Eth2Processor
Expand Down
38 changes: 27 additions & 11 deletions beacon_chain/conf.nim
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ const
defaultSigningNodeRequestTimeout* = 60

type
BNStartUpCmd* = enum
BNStartUpCmd* {.pure.} = enum
noCommand
createTestnet
deposits
wallets
record
web3
slashingdb
trustedNodeSync

WalletsCmd* {.pure.} = enum
create = "Creates a new EIP-2386 wallet"
Expand Down Expand Up @@ -176,9 +177,9 @@ type

case cmd* {.
command
defaultValue: noCommand }: BNStartUpCmd
defaultValue: BNStartUpCmd.noCommand }: BNStartUpCmd

of noCommand:
of BNStartUpCmd.noCommand:
bootstrapNodes* {.
desc: "Specifies one or more bootstrap nodes to use when connecting to the network"
abbr: "b"
Expand Down Expand Up @@ -231,12 +232,10 @@ type
name: "weak-subjectivity-checkpoint" }: Option[Checkpoint]

finalizedCheckpointState* {.
hidden # TODO unhide when backfilling is done
desc: "SSZ file specifying a recent finalized state"
name: "finalized-checkpoint-state" }: Option[InputFile]

finalizedCheckpointBlock* {.
hidden # TODO unhide when backfilling is done
desc: "SSZ file specifying a recent finalized block"
name: "finalized-checkpoint-block" }: Option[InputFile]

Expand Down Expand Up @@ -386,7 +385,7 @@ type
defaultValue: false
name: "validator-monitor-totals" }: bool

of createTestnet:
of BNStartUpCmd.createTestnet:
testnetDepositsFile* {.
desc: "A LaunchPad deposits file for the genesis state validators"
name: "deposits-file" }: InputFile
Expand Down Expand Up @@ -420,7 +419,7 @@ type
desc: "Output file with list of bootstrap nodes for the network"
name: "output-bootstrap-file" }: OutFile

of wallets:
of BNStartUpCmd.wallets:
case walletsCmd* {.command.}: WalletsCmd
of WalletsCmd.create:
nextAccount* {.
Expand Down Expand Up @@ -453,7 +452,7 @@ type
of WalletsCmd.list:
discard

of deposits:
of BNStartUpCmd.deposits:
case depositsCmd* {.command.}: DepositsCmd
of DepositsCmd.createTestnetDeposits:
totalDeposits* {.
Expand Down Expand Up @@ -512,7 +511,7 @@ type
name: "epoch"
desc: "The desired exit epoch" }: Option[uint64]

of record:
of BNStartUpCmd.record:
case recordCmd* {.command.}: RecordCmd
of RecordCmd.create:
ipExt* {.
Expand Down Expand Up @@ -542,15 +541,15 @@ type
desc: "ENR URI of the record to print"
name: "enr" .}: Record

of web3:
of BNStartUpCmd.web3:
case web3Cmd* {.command.}: Web3Cmd
of Web3Cmd.test:
web3TestUrl* {.
argument
desc: "The web3 provider URL to test"
name: "url" }: Uri

of slashingdb:
of BNStartUpCmd.slashingdb:
case slashingdbCmd* {.command.}: SlashProtCmd
of SlashProtCmd.`import`:
importedInterchangeFile* {.
Expand All @@ -566,6 +565,23 @@ type
desc: "EIP-3076 slashing protection interchange file to export"
argument }: OutFile

of BNStartUpCmd.trustedNodeSync:
trustedNodeUrl* {.
desc: "URL of the REST API to sync from"
defaultValue: "http://localhost:5052"
name: "trusted-node-url"
.}: string

blockId* {.
desc: "Block id to sync to - this can be a block root, slot number, \"finalized\" or \"head\""
defaultValue: "finalized"
.}: string

backfillBlocks* {.
desc: "Backfill blocks directly from REST server instead of fetching via API"
defaultValue: true
name: "backfill"}: bool

ValidatorClientConf* = object
logLevel* {.
desc: "Sets the log level"
Expand Down
3 changes: 3 additions & 0 deletions beacon_chain/consensus_object_pools/blockchain_dag.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1564,3 +1564,6 @@ proc aggregateAll*(
err("aggregate: no attesting keys")
else:
ok(finish(aggregateKey))

func needsBackfill*(dag: ChainDAGRef): bool =
dag.backfill.slot > dag.genesis.slot
64 changes: 44 additions & 20 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import
# Local modules
"."/[
beacon_clock, beacon_chain_db, beacon_node, beacon_node_status,
conf, filepath, interop, nimbus_binary_common, statusbar,
conf, filepath, interop, nimbus_binary_common, statusbar, trusted_node_sync,
version],
./networking/[eth2_discovery, eth2_network, network_metadata],
./gossip_processing/[eth2_processor, block_processor, consensus_manager],
Expand Down Expand Up @@ -184,8 +184,9 @@ proc init(T: type BeaconNode,
try:
# Checkpoint block might come from an earlier fork than the state with
# the state having empty slots processed past the fork epoch.
checkpointBlock = readSszForkedTrustedSignedBeaconBlock(
let tmp = readSszForkedSignedBeaconBlock(
cfg, readAllBytes(checkpointBlockPath).tryGet())
checkpointBlock = tmp.asTrusted()
except SszError as err:
fatal "Invalid checkpoint block", err = err.formatMsg(checkpointBlockPath)
quit 1
Expand Down Expand Up @@ -434,6 +435,10 @@ proc init(T: type BeaconNode,
syncManager = newSyncManager[Peer, PeerID](
network.peerPool, SyncQueueKind.Forward, getLocalHeadSlot, getLocalWallSlot,
getFirstSlotAtFinalizedEpoch, getBackfillSlot, blockVerifier)
backfiller = newSyncManager[Peer, PeerID](
network.peerPool, SyncQueueKind.Backward, getLocalHeadSlot, getLocalWallSlot,
getFirstSlotAtFinalizedEpoch, getBackfillSlot, blockVerifier,
maxHeadAge = 0)

var node = BeaconNode(
nickname: nickname,
Expand All @@ -455,6 +460,7 @@ proc init(T: type BeaconNode,
eventBus: eventBus,
requestManager: RequestManager.init(network, blockVerifier),
syncManager: syncManager,
backfiller: backfiller,
actionTracker: ActionTracker.init(rng, config.subscribeAllSubnets),
processor: processor,
blockProcessor: blockProcessor,
Expand Down Expand Up @@ -890,6 +896,11 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
# above, this will be done just before the next slot starts
await node.updateGossipStatus(slot + 1)

proc syncStatus(node: BeaconNode): string =
if node.syncManager.inProgress: node.syncManager.syncStatus
elif node.backfiller.inProgress: "backfill: " & node.backfiller.syncStatus
else: "synced"

proc onSlotStart(
node: BeaconNode, wallTime: BeaconTime, lastSlot: Slot) {.async.} =
## Called at the beginning of a slot - usually every slot, but sometimes might
Expand All @@ -910,9 +921,7 @@ proc onSlotStart(
info "Slot start",
slot = shortLog(wallSlot),
epoch = shortLog(wallSlot.epoch),
sync =
if node.syncManager.inProgress: node.syncManager.syncStatus
else: "synced",
sync = node.syncStatus(),
peers = len(node.network.peerPool),
head = shortLog(node.dag.head),
finalized = shortLog(getStateField(
Expand Down Expand Up @@ -1103,6 +1112,18 @@ proc stop(node: BeaconNode) =
node.db.close()
notice "Databases closed"

proc startBackfillTask(node: BeaconNode) {.async.} =
while node.dag.needsBackfill:
if not node.syncManager.inProgress:
# Only start the backfiller if it's needed _and_ head sync has completed -
# if we lose sync after having synced head, we could stop the backfilller,
# but this should be a fringe case - might as well keep the logic simple for
# now
node.backfiller.start()
return

await sleepAsync(chronos.seconds(2))

proc run(node: BeaconNode) {.raises: [Defect, CatchableError].} =
bnStatus = BeaconNodeStatus.Running

Expand All @@ -1121,6 +1142,8 @@ proc run(node: BeaconNode) {.raises: [Defect, CatchableError].} =
node.requestManager.start()
node.syncManager.start()

if node.dag.needsBackfill(): asyncSpawn node.startBackfillTask()

waitFor node.updateGossipStatus(wallSlot)

asyncSpawn runSlotLoop(node, wallTime, onSlotStart)
Expand Down Expand Up @@ -1298,13 +1321,7 @@ proc initStatusBar(node: BeaconNode) {.raises: [Defect, ValueError].} =
formatGwei(node.attachedValidatorBalanceTotal)

of "sync_status":
if isNil(node.syncManager):
"pending"
else:
if node.syncManager.inProgress:
node.syncManager.syncStatus
else:
"synced"
node.syncStatus()
else:
# We ignore typos for now and just render the expression
# as it was written. TODO: come up with a good way to show
Expand Down Expand Up @@ -1846,7 +1863,6 @@ proc doSlashingImport(conf: BeaconNodeConf) {.raises: [SerializationError, IOErr
echo "Import finished: '", interchange, "' into '", dir/filetrunc & ".sqlite3", "'"

proc doSlashingInterchange(conf: BeaconNodeConf) {.raises: [Defect, CatchableError].} =
doAssert conf.cmd == slashingdb
case conf.slashingdbCmd
of SlashProtCmd.`export`:
conf.doSlashingExport()
Expand Down Expand Up @@ -1893,10 +1909,18 @@ programMain:
let rng = keys.newRng()

case config.cmd
of createTestnet: doCreateTestnet(config, rng[])
of noCommand: doRunBeaconNode(config, rng)
of deposits: doDeposits(config, rng[])
of wallets: doWallets(config, rng[])
of record: doRecord(config, rng[])
of web3: doWeb3Cmd(config)
of slashingdb: doSlashingInterchange(config)
of BNStartUpCmd.createTestnet: doCreateTestnet(config, rng[])
of BNStartUpCmd.noCommand: doRunBeaconNode(config, rng)
of BNStartUpCmd.deposits: doDeposits(config, rng[])
of BNStartUpCmd.wallets: doWallets(config, rng[])
of BNStartUpCmd.record: doRecord(config, rng[])
of BNStartUpCmd.web3: doWeb3Cmd(config)
of BNStartUpCmd.slashingdb: doSlashingInterchange(config)
of BNStartupCmd.trustedNodeSync:
# TODO use genesis state from metadata
waitFor doTrustedNodeSync(
getRuntimeConfig(config.eth2Network),
config.databaseDir,
config.trustedNodeUrl,
config.blockId,
config.backfillBlocks)
6 changes: 1 addition & 5 deletions beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ type
RestAttestationError |
RestGenericError |
GetBlockV2Response |
GetStateV2Response |
Web3SignerStatusResponse |
Web3SignerKeysResponse |
Web3SignerSignatureResponse |
Expand All @@ -87,11 +86,8 @@ type

SszDecodeTypes* =
GetPhase0StateSszResponse |
GetAltairStateSszResponse |
GetPhase0BlockSszResponse |
GetAltairBlockSszResponse |
GetBlockV2Header |
GetStateV2Header
GetBlockV2Header

{.push raises: [Defect].}

Expand Down
45 changes: 15 additions & 30 deletions beacon_chain/spec/eth2_apis/rest_beacon_calls.nim
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,18 @@ proc getBlockV2Plain*(block_id: BlockIdent): RestPlainResponse {.
## https://ethereum.github.io/beacon-APIs/#/Beacon/getBlockV2

proc getBlockV2*(client: RestClientRef, block_id: BlockIdent,
forks: array[2, Fork],
restAccept = ""): Future[ForkedSignedBeaconBlock] {.
cfg: RuntimeConfig,
restAccept = ""): Future[Option[ForkedSignedBeaconBlock]] {.
async.} =
# Return the asked-for block, or None in case 404 is returned from the server.
# Raises on other errors
let resp =
if len(restAccept) > 0:
await client.getBlockV2Plain(block_id, restAcceptType = restAccept)
else:
await client.getBlockV2Plain(block_id)
let data =

return
case resp.status
of 200:
case resp.contentType
Expand All @@ -170,35 +173,18 @@ proc getBlockV2*(client: RestClientRef, block_id: BlockIdent,
if res.isErr():
raise newException(RestError, $res.error())
res.get()
blck
some blck
of "application/octet-stream":
let header =
block:
let res = decodeBytes(GetBlockV2Header, resp.data, resp.contentType)
if res.isErr():
raise newException(RestError, $res.error())
res.get()
if header.slot.epoch() < forks[1].epoch:
let blck =
block:
let res = decodeBytes(GetPhase0BlockSszResponse, resp.data,
resp.contentType)
if res.isErr():
raise newException(RestError, $res.error())
res.get()
ForkedSignedBeaconBlock.init(blck)
else:
let blck =
block:
let res = decodeBytes(GetAltairBlockSszResponse, resp.data,
resp.contentType)
if res.isErr():
raise newException(RestError, $res.error())
res.get()
ForkedSignedBeaconBlock.init(blck)
try:
some readSszForkedSignedBeaconBlock(cfg, resp.data)
except CatchableError as exc:
raise newException(RestError, exc.msg)
else:
raise newException(RestError, "Unsupported content-type")
of 400, 404, 500:
of 404:
none(ForkedSignedBeaconBlock)

of 400, 500:
let error =
block:
let res = decodeBytes(RestGenericError, resp.data, resp.contentType)
Expand All @@ -212,7 +198,6 @@ proc getBlockV2*(client: RestClientRef, block_id: BlockIdent,
else:
let msg = "Unknown response status error (" & $resp.status & ")"
raise newException(RestError, msg)
return data

proc getBlockRoot*(block_id: BlockIdent): RestResponse[GetBlockRootResponse] {.
rest, endpoint: "/eth/v1/beacon/blocks/{block_id}/root",
Expand Down
Loading

0 comments on commit 6597155

Please sign in to comment.