Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: temporary logs to analyse filter issue. some can be interesting #3138

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions waku/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =

return

info "disconnecting from peer", peerId = peerId, reason = reason
debug "disconnecting from peer", peerId = shortLog(peerId), reason = reason
asyncSpawn(pm.switch.disconnect(peerId))
pm.wakuPeerStore.delete(peerId)

Expand Down Expand Up @@ -673,7 +673,8 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
# pm.colocationLimit == 0 disables the ip colocation limit
if pm.colocationLimit != 0 and peersBehindIp.len > pm.colocationLimit:
for peerId in peersBehindIp[0 ..< (peersBehindIp.len - pm.colocationLimit)]:
debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip
debug "Pruning connection due to ip colocation",
peerId = shortLog(peerId), ip = ip
asyncSpawn(pm.switch.disconnect(peerId))
pm.wakuPeerStore.delete(peerId)
of Left:
Expand Down Expand Up @@ -889,6 +890,7 @@ proc prunePeerStore*(pm: PeerManager) =
peersToPrune.incl(peer)

for peer in peersToPrune:
debug "deleting peer", peerId = shortLog(peer)
pm.wakuPeerStore.delete(peer)

let afterNumPeers = pm.wakuPeerStore[AddressBook].book.len
Expand Down
20 changes: 15 additions & 5 deletions waku/waku_filter_v2/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ type WakuFilter* = ref object of LPProtocol
peerRequestRateLimiter*: PerPeerRateLimiter

proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
trace "pinging subscriber", peerId = peerId
debug "pinging subscriber", peerId = shortLog(peerId)

if not wf.subscriptions.isSubscribed(peerId):
debug "pinging peer has no subscriptions", peerId = peerId
debug "pinging peer has no subscriptions", peerId = shortLog(peerId)
return err(FilterSubscribeError.notFound())

wf.subscriptions.refreshSubscription(peerId)
Expand Down Expand Up @@ -125,13 +125,20 @@ proc handleSubscribeRequest*(
## Handle subscribe request
case request.filterSubscribeType
of FilterSubscribeType.SUBSCRIBER_PING:
debug "debugging filter ping", peerId
subscribeResult = wf.pingSubscriber(peerId)
of FilterSubscribeType.SUBSCRIBE:
debug "debugging filter subscribe",
peerId, pubsubTopic = $(request.pubsubTopic), cTopics = $(request.contentTopics)
subscribeResult = wf.subscribe(peerId, request.pubsubTopic, request.contentTopics)
of FilterSubscribeType.UNSUBSCRIBE:
debug "debugging filter unsibscribe",
peerId, pubsubTopic = $(request.pubsubTopic), cTopics = $(request.contentTopics)
subscribeResult =
wf.unsubscribe(peerId, request.pubsubTopic, request.contentTopics)
of FilterSubscribeType.UNSUBSCRIBE_ALL:
debug "debugging filter unsubscribe all",
peerId, pubsubTopic = $(request.pubsubTopic), cTopics = $(request.contentTopics)
subscribeResult = wf.unsubscribeAll(peerId)

let
Expand All @@ -143,16 +150,18 @@ proc handleSubscribeRequest*(
)

if subscribeResult.isErr():
debug "debugging filter is err", error = $(subscribeResult.error)
return FilterSubscribeResponse(
requestId: request.requestId,
statusCode: subscribeResult.error.kind.uint32,
statusDesc: some($subscribeResult.error),
)
else:
debug "debugging filter ok"
return FilterSubscribeResponse.ok(request.requestId)

proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
trace "pushing message to subscribed peer", peer_id = shortLog(peer)
debug "pushing message to subscribed peer", peer_id = shortLog(peer)

if not wf.peerManager.wakuPeerStore.hasPeer(peer, WakuFilterPushCodec):
# Check that peer has not been removed from peer store
Expand All @@ -175,6 +184,7 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
proc pushToPeers(
wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush
) {.async.} =
debug "debugging filter pushToPeers"
let targetPeerIds = peers.mapIt(shortLog(it))
let msgHash =
messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex()
Expand Down Expand Up @@ -202,7 +212,7 @@ proc pushToPeers(
await allFutures(pushFuts)

proc maintainSubscriptions*(wf: WakuFilter) =
trace "maintaining subscriptions"
debug "maintaining subscriptions"

## Remove subscriptions for peers that have been removed from peer store
var peersToRemove: seq[PeerId]
Expand All @@ -227,7 +237,7 @@ proc handleMessage*(
) {.async.} =
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()

trace "handling message", pubsubTopic = pubsubTopic, msg_hash = msgHash
debug "handling message", pubsubTopic = pubsubTopic, msg_hash = msgHash

let handleMessageStartTime = Moment.now()

Expand Down
Loading