Skip to content

Commit

Permalink
Fix SyncQueue initialization procedure.
Browse files Browse the repository at this point in the history
Remove usage of `awaitne`.
Add cancellation support.
Remove unneeded `sleepAsync()` if peer's head is older than needed.
Add `direction` field to all logs.
Fix syncmanager wedge issue.
Add proper resource cleaning procedure on backward sync finish.
  • Loading branch information
cheatfate committed Jan 17, 2022
1 parent 5ed18b4 commit 24bde37
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 77 deletions.
219 changes: 160 additions & 59 deletions beacon_chain/sync/sync_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,24 @@ proc speed*(start, finish: SyncMoment): float {.inline.} =
slots / dur

proc initQueue[A, B](man: SyncManager[A, B]) =
man.queue = SyncQueue.init(A, man.direction, man.getFirstSlot(),
man.getLastSlot(), man.chunkSize,
man.getSafeSlot, man.blockVerifier, 1)
case man.direction
of SyncQueueKind.Forward:
man.queue = SyncQueue.init(A, man.direction, man.getFirstSlot(),
man.getLastSlot(), man.chunkSize,
man.getSafeSlot, man.blockVerifier, 1)
of SyncQueueKind.Backward:
let
firstSlot = man.getFirstSlot()
lastSlot = man.getLastSlot()
startSlot = if firstSlot == lastSlot:
# This case should never be happened in real life because
# there is present check `needsBackfill().
firstSlot
else:
Slot(firstSlot - 1'u64)
man.queue = SyncQueue.init(A, man.direction, firstSlot, lastSlot,
man.chunkSize, man.getSafeSlot,
man.blockVerifier, 1)

proc newSyncManager*[A, B](pool: PeerPool[A, B],
direction: SyncQueueKind,
Expand All @@ -110,9 +125,10 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
of SyncQueueKind.Forward:
(getLocalHeadSlotCb, getLocalWallSlotCb, getFinalizedSlotCb)
of SyncQueueKind.Backward:
(getBackfillSlotCb, GetSlotCallback(proc(): Slot = Slot(0)), getBackfillSlotCb)
(getBackfillSlotCb, GetSlotCallback(proc(): Slot = Slot(0)),
getBackfillSlotCb)

result = SyncManager[A, B](
var res = SyncManager[A, B](
pool: pool,
maxStatusAge: maxStatusAge,
getLocalHeadSlot: getLocalHeadSlotCb,
Expand All @@ -127,7 +143,8 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
notInSyncEvent: newAsyncEvent(),
direction: direction
)
result.initQueue()
res.initQueue()
res

proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
req: SyncRequest): Future[BeaconBlocksRes] {.async.} =
Expand All @@ -138,35 +155,56 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
direction = man.direction, topics = "syncman"
if peer.useSyncV2():
var workFut = awaitne beaconBlocksByRange_v2(peer, req.slot, req.count, req.step)
if workFut.failed():
debug "Error, while waiting getBlocks response", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
errMsg = workFut.readError().msg, peer_speed = peer.netKbps(),
direction = man.direction, topics = "syncman"
else:
let res = workFut.read()
if res.isErr:
debug "Error, while reading getBlocks response",
let res =
try:
await beaconBlocksByRange_v2(peer, req.slot, req.count, req.step)
except CancelledError:
debug "Interrupt, while waiting getBlocks response", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
peer_speed = peer.netKbps(), direction = man.direction,
topics = "syncman"
return
except CatchableError as exc:
debug "Error, while waiting getBlocks response", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
errName = exc.name, errMsg = exc.msg, peer_speed = peer.netKbps(),
direction = man.direction, topics = "syncman"
return
if res.isErr():
debug "Error, while reading getBlocks response",
peer = peer, slot = req.slot, count = req.count,
step = req.step, peer_speed = peer.netKbps(),
direction = man.direction, topics = "syncman", error = $res.error()
result = res
direction = man.direction, topics = "syncman",
error = $res.error()
return
return res
else:
var workFut = awaitne beaconBlocksByRange(peer, req.slot, req.count, req.step)
if workFut.failed():
debug "Error, while waiting getBlocks response", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
errMsg = workFut.readError().msg, peer_speed = peer.netKbps(),
direction = man.direction, topics = "syncman"
else:
let res = workFut.read()
if res.isErr:
debug "Error, while reading getBlocks response",
peer = peer, slot = req.slot, count = req.count,
step = req.step, peer_speed = peer.netKbps(),
topics = "syncman", error = $res.error()
result = res.map() do (blcks: seq[phase0.SignedBeaconBlock]) -> auto: blcks.mapIt(ForkedSignedBeaconBlock.init(it))
let res =
try:
await beaconBlocksByRange(peer, req.slot, req.count, req.step)
except CancelledError:
debug "Interrupt, while waiting getBlocks response", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
peer_speed = peer.netKbps(), direction = man.direction,
topics = "syncman"
return
except CatchableError as exc:
debug "Error, while waiting getBlocks response", peer = peer,
slot = req.slot, slot_count = req.count, step = req.step,
errName = exc.name, errMsg = exc.msg, peer_speed = peer.netKbps(),
direction = man.direction, topics = "syncman"
return
if res.isErr():
debug "Error, while reading getBlocks response",
peer = peer, slot = req.slot, count = req.count,
step = req.step, peer_speed = peer.netKbps(),
direction = man.direction, error = $res.error(),
topics = "syncman"
return
let forked =
res.map() do (blcks: seq[phase0.SignedBeaconBlock]) -> auto:
blcks.mapIt(ForkedSignedBeaconBlock.init(it))
return forked

proc remainingSlots(man: SyncManager): uint64 =
if man.direction == SyncQueueKind.Forward:
Expand Down Expand Up @@ -303,10 +341,6 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
peer_speed = peer.netKbps(), index = index,
direction = man.direction, topics = "syncman"
peer.updateScore(PeerScoreUseless)

# Give the peer time to do some syncing
await sleepAsync(StatusExpirationTime div 2)

return

if man.direction == SyncQueueKind.Forward:
Expand Down Expand Up @@ -345,7 +379,7 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =

try:
let blocks = await man.getBlocks(peer, req)
if blocks.isOk:
if blocks.isOk():
let data = blocks.get()
let smap = getShortMap(req, data)
debug "Received blocks on request", blocks_count = len(data),
Expand Down Expand Up @@ -381,26 +415,47 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =

except CatchableError as exc:
debug "Unexpected exception while receiving blocks",
request_slot = req.slot, request_count = req.count,
request_step = req.step, peer = peer, index = index,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
errMsg = exc.msg, direction = man.direction, topics = "syncman"
request_slot = req.slot, request_count = req.count,
request_step = req.step, peer = peer, index = index,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
errName = exc.name, errMsg = exc.msg, direction = man.direction,
topics = "syncman"
return

proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} =
mixin getKey, getScore, getHeadSlot

debug "Starting syncing worker",
index = index, direction = man.direction, topics = "syncman"
debug "Starting syncing worker", index = index, direction = man.direction,
topics = "syncman"

while true:
man.workers[index].status = SyncWorkerStatus.Sleeping
# This event is going to be set until we are not in sync with network
await man.notInSyncEvent.wait()
man.workers[index].status = SyncWorkerStatus.WaitingPeer
let peer = await man.pool.acquire()
await man.syncStep(index, peer)
man.pool.release(peer)
var peer: A = nil
let doBreak =
try:
man.workers[index].status = SyncWorkerStatus.Sleeping
# This event is going to be set until we are not in sync with network
await man.notInSyncEvent.wait()
man.workers[index].status = SyncWorkerStatus.WaitingPeer
peer = await man.pool.acquire()
await man.syncStep(index, peer)
man.pool.release(peer)
false
except CancelledError:
if not(isNil(peer)):
man.pool.release(peer)
true
except CatchableError as exc:
debug "Unexpected exception in sync worker",
peer = peer, index = index,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
errName = exc.name, errMsg = exc.msg, direction = man.direction,
topics = "syncman"
true
if doBreak:
break

debug "Sync worker stopped", index = index, direction = man.direction,
topics = "syncman"

proc getWorkersStats[A, B](man: SyncManager[A, B]): tuple[map: string,
sleeping: int,
Expand Down Expand Up @@ -450,7 +505,8 @@ proc guardTask[A, B](man: SyncManager[A, B]) {.async.} =
let index = pending.find(failFuture)
if failFuture.failed():
warn "Synchronization worker stopped working unexpectedly with an error",
index = index, errMsg = failFuture.error.msg, direction = man.direction
index = index, errMsg = failFuture.error.msg,
direction = man.direction
else:
warn "Synchronization worker stopped working unexpectedly without error",
index = index, direction = man.direction
Expand Down Expand Up @@ -489,9 +545,10 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
mixin getKey, getScore
var pauseTime = 0

asyncSpawn man.guardTask()
var guardTaskFut = man.guardTask()

debug "Synchronization loop started", topics = "syncman"
debug "Synchronization loop started", topics = "syncman",
direction = man.direction

proc averageSpeedTask() {.async.} =
while true:
Expand Down Expand Up @@ -523,7 +580,7 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =

stamp = newStamp

asyncSpawn averageSpeedTask()
var averageSpeedTaskFut = averageSpeedTask()

while true:
let wallSlot = man.getLocalWallSlot()
Expand Down Expand Up @@ -567,20 +624,64 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
sleeping_workers_count = sleeping,
waiting_workers_count = waiting, pending_workers_count = pending,
direction = man.direction, topics = "syncman"
# We already synced, so we should reset all the pending workers from
# any state they have.
man.queue.clearAndWakeup()
man.inProgress = true
else:
debug "Synchronization loop sleeping", wall_head_slot = wallSlot,
local_head_slot = headSlot, difference = (wallSlot - headSlot),
max_head_age = man.maxHeadAge, direction = man.direction,
topics = "syncman"
man.inProgress = false
case man.direction
of SyncQueueKind.Forward:
if man.inProgress:
man.inProgress = false
debug "Forward synchronization process finished, sleeping",
wall_head_slot = wallSlot, local_head_slot = headSlot,
difference = (wallSlot - headSlot),
max_head_age = man.maxHeadAge, direction = man.direction,
topics = "syncman"
else:
debug "Synchronization loop sleeping", wall_head_slot = wallSlot,
local_head_slot = headSlot,
difference = (wallSlot - headSlot),
max_head_age = man.maxHeadAge, direction = man.direction,
topics = "syncman"
of SyncQueueKind.Backward:
# Backward syncing is going to be executed only once, so we exit loop
# and stop all pending tasks which belongs to this instance (sync
# workers, guard task and speed calculation task).
# We first need to cancel and wait for guard task, because otherwise
# it will be able to restore cancelled workers.
guardTaskFut.cancel()
averageSpeedTaskFut.cancel()
await allFutures(guardTaskFut, averageSpeedTaskFut)
let pendingTasks =
block:
var res: seq[Future[void]]
for worker in man.workers:
# Because `pending == 0` there should be no active workers.
doAssert(worker.status in {Sleeping, WaitingPeer})
worker.future.cancel()
res.add(worker.future)
res
await allFutures(pendingTasks)
debug "Backward synchronization process finished, exiting",
wall_head_slot = wallSlot, local_head_slot = headSlot,
backfill_slot = man.getLastSlot(),
max_head_age = man.maxHeadAge, direction = man.direction,
topics = "syncman"
break
else:
if not(man.notInSyncEvent.isSet()):
# We get here only if we lost sync for more then `maxHeadAge` period.
if pending == 0:
man.initQueue()
man.notInSyncEvent.fire()
man.inProgress = true
debug "Node lost sync for more then preset period",
period = man.maxHeadAge, wall_head_slot = wallSlot,
local_head_slot = headSlot,
missing_slots = man.remainingSlots(),
progress = float(man.queue.progress()),
topics = "syncman"
else:
man.notInSyncEvent.fire()
man.inProgress = true
Expand Down
Loading

0 comments on commit 24bde37

Please sign in to comment.