From 9f50ef3359e12b6e98ba94bf6c50f6ffeec0e7a6 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 21 Nov 2024 12:18:14 -0400 Subject: [PATCH 1/2] refactor(libwaku): async --- library/callback.nim | 13 -- library/ffi_types.nim | 30 +++ library/libwaku.nim | 181 +++++++++++------- .../waku_thread_request.nim | 74 ++++++- .../waku_thread_response.nim | 48 ----- library/waku_thread/waku_thread.nim | 49 ++--- 6 files changed, 220 insertions(+), 175 deletions(-) delete mode 100644 library/callback.nim create mode 100644 library/ffi_types.nim delete mode 100644 library/waku_thread/inter_thread_communication/waku_thread_response.nim diff --git a/library/callback.nim b/library/callback.nim deleted file mode 100644 index 8a8522600c..0000000000 --- a/library/callback.nim +++ /dev/null @@ -1,13 +0,0 @@ -import ./waku_thread/waku_thread - -type WakuCallBack* = proc( - callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer -) {.cdecl, gcsafe, raises: [].} - -template checkLibwakuParams*( - ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer -) = - ctx[].userData = userData - - if isNil(callback): - return RET_MISSING_CALLBACK diff --git a/library/ffi_types.nim b/library/ffi_types.nim new file mode 100644 index 0000000000..a5eeb97115 --- /dev/null +++ b/library/ffi_types.nim @@ -0,0 +1,30 @@ +################################################################################ +### Exported types + +type WakuCallBack* = proc( + callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer +) {.cdecl, gcsafe, raises: [].} + +const RET_OK*: cint = 0 +const RET_ERR*: cint = 1 +const RET_MISSING_CALLBACK*: cint = 2 + +### End of exported types +################################################################################ + +################################################################################ +### FFI utils + +template foreignThreadGc*(body: untyped) = + when declared(setupForeignThreadGc): + setupForeignThreadGc() + + body + + when declared(tearDownForeignThreadGc): + tearDownForeignThreadGc() + +type onDone* = proc() + +### End of FFI utils +################################################################################ diff --git a/library/libwaku.nim b/library/libwaku.nim index 36286a3869..f54f245c53 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -6,7 +6,7 @@ when defined(linux): {.passl: "-Wl,-soname,libwaku.so".} import std/[json, atomics, strformat, options, atomics] -import chronicles, chronos +import chronicles, chronos, chronos/threadsync import waku/common/base64, waku/waku_core/message/message, @@ -25,52 +25,34 @@ import ./waku_thread/inter_thread_communication/requests/ping_request, ./waku_thread/inter_thread_communication/waku_thread_request, ./alloc, - ./callback + ./ffi_types ################################################################################ ### Wrapper around the waku node ################################################################################ -################################################################################ -### Exported types - -const RET_OK: cint = 0 -const RET_ERR: cint = 1 -const RET_MISSING_CALLBACK: cint = 2 - -### End of exported types -################################################################################ - ################################################################################ ### Not-exported components -template foreignThreadGc(body: untyped) = - when declared(setupForeignThreadGc): - setupForeignThreadGc() - - body +template checkLibwakuParams*( + ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer +) = + ctx[].userData = userData - when declared(tearDownForeignThreadGc): - tearDownForeignThreadGc() + if isNil(callback): + return RET_MISSING_CALLBACK -proc handleRes[T: string | void]( - res: Result[T, string], callback: WakuCallBack, userData: pointer +proc handleSentToChannelRes( + res: Result[onDone, string], callback: WakuCallBack, userData: pointer ): cint = - ## Handles the Result responses, which can either be Result[string, string] or - ## Result[void, string]. Notice that in case of Result[void, string], it is enough to - ## just return RET_OK and not provide any additional feedback through the callback. - if res.isErr(): + ## Handles failures when sending a request to the waku thread. + let onDone = res.valueOr: foreignThreadGc: let msg = "libwaku error: " & $res.error callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR - foreignThreadGc: - var msg: cstring = "" - when T is string: - msg = res.get().cstring() - callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) - return RET_OK + onDone() proc relayEventCallback(ctx: ptr WakuContext): WakuRelayHandler = return proc( @@ -151,15 +133,15 @@ proc waku_new( ctx.userData = userData - waku_thread.sendRequestToWakuThread( - ctx, - RequestType.LIFECYCLE, - NodeLifecycleRequest.createShared(NodeLifecycleMsgType.CREATE_NODE, configJson), - ).isOkOr: - foreignThreadGc: - let msg = $error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) - return nil + discard waku_thread + .sendRequestToWakuThread( + ctx, + RequestType.LIFECYCLE, + NodeLifecycleRequest.createShared(NodeLifecycleMsgType.CREATE_NODE, configJson), + callback, + userData, + ) + .handleSentToChannelRes(callback, userData) return ctx @@ -168,7 +150,12 @@ proc waku_destroy( ): cint {.dynlib, exportc.} = checkLibwakuParams(ctx, callback, userData) - waku_thread.destroyWakuThread(ctx).handleRes(callback, userData) + let res = waku_thread.destroyWakuThread(ctx) + if res.isErr(): + foreignThreadGc: + let msg = "libwaku error: " & $res.error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR proc waku_version( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer @@ -303,8 +290,10 @@ proc waku_relay_publish( WakuRelayHandler(relayEventCallback(ctx)), wakuMessage, ), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_start( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer @@ -316,8 +305,10 @@ proc waku_start( ctx, RequestType.LIFECYCLE, NodeLifecycleRequest.createShared(NodeLifecycleMsgType.START_NODE), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_stop( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer @@ -329,8 +320,10 @@ proc waku_stop( ctx, RequestType.LIFECYCLE, NodeLifecycleRequest.createShared(NodeLifecycleMsgType.STOP_NODE), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_relay_subscribe( ctx: ptr WakuContext, @@ -352,8 +345,10 @@ proc waku_relay_subscribe( RelayRequest.createShared( RelayMsgType.SUBSCRIBE, PubsubTopic($pst), WakuRelayHandler(cb) ), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_relay_unsubscribe( ctx: ptr WakuContext, @@ -376,8 +371,10 @@ proc waku_relay_unsubscribe( PubsubTopic($pst), WakuRelayHandler(relayEventCallback(ctx)), ), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_relay_get_num_connected_peers( ctx: ptr WakuContext, @@ -396,8 +393,10 @@ proc waku_relay_get_num_connected_peers( ctx, RequestType.RELAY, RelayRequest.createShared(RelayMsgType.LIST_CONNECTED_PEERS, PubsubTopic($pst)), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_relay_get_num_peers_in_mesh( ctx: ptr WakuContext, @@ -416,8 +415,10 @@ proc waku_relay_get_num_peers_in_mesh( ctx, RequestType.RELAY, RelayRequest.createShared(RelayMsgType.LIST_MESH_PEERS, PubsubTopic($pst)), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_lightpush_publish( ctx: ptr WakuContext, @@ -461,8 +462,10 @@ proc waku_lightpush_publish( LightpushRequest.createShared( LightpushMsgType.PUBLISH, PubsubTopic($pst), wakuMessage ), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_connect( ctx: ptr WakuContext, @@ -480,8 +483,10 @@ proc waku_connect( PeerManagementRequest.createShared( PeerManagementMsgType.CONNECT_TO, $peerMultiAddr, chronos.milliseconds(timeoutMs) ), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_disconnect_peer_by_id( ctx: ptr WakuContext, peerId: cstring, callback: WakuCallBack, userData: pointer @@ -495,8 +500,10 @@ proc waku_disconnect_peer_by_id( PeerManagementRequest.createShared( op = PeerManagementMsgType.DISCONNECT_PEER_BY_ID, peerId = $peerId ), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_dial_peer( ctx: ptr WakuContext, @@ -517,8 +524,10 @@ proc waku_dial_peer( peerMultiAddr = $peerMultiAddr, protocol = $protocol, ), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_dial_peer_by_id( ctx: ptr WakuContext, @@ -537,8 +546,10 @@ proc waku_dial_peer_by_id( PeerManagementRequest.createShared( op = PeerManagementMsgType.DIAL_PEER_BY_ID, peerId = $peerId, protocol = $protocol ), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_get_peerids_from_peerstore( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer @@ -550,8 +561,10 @@ proc waku_get_peerids_from_peerstore( ctx, RequestType.PEER_MANAGER, PeerManagementRequest.createShared(PeerManagementMsgType.GET_ALL_PEER_IDS), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_get_connected_peers( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer @@ -563,8 +576,10 @@ proc waku_get_connected_peers( ctx, RequestType.PEER_MANAGER, PeerManagementRequest.createShared(PeerManagementMsgType.GET_CONNECTED_PEERS), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_get_peerids_by_protocol( ctx: ptr WakuContext, protocol: cstring, callback: WakuCallBack, userData: pointer @@ -578,8 +593,10 @@ proc waku_get_peerids_by_protocol( PeerManagementRequest.createShared( op = PeerManagementMsgType.GET_PEER_IDS_BY_PROTOCOL, protocol = $protocol ), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_store_query( ctx: ptr WakuContext, @@ -596,8 +613,10 @@ proc waku_store_query( ctx, RequestType.STORE, JsonStoreQueryRequest.createShared(jsonQuery, peerAddr, timeoutMs), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_listen_addresses( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer @@ -609,8 +628,10 @@ proc waku_listen_addresses( ctx, RequestType.DEBUG, DebugNodeRequest.createShared(DebugNodeMsgType.RETRIEVE_LISTENING_ADDRESSES), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_dns_discovery( ctx: ptr WakuContext, @@ -629,8 +650,10 @@ proc waku_dns_discovery( DiscoveryRequest.createRetrieveBootstrapNodesRequest( DiscoveryMsgType.GET_BOOTSTRAP_NODES, entTreeUrl, nameDnsServer, timeoutMs ), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_discv5_update_bootnodes( ctx: ptr WakuContext, bootnodes: cstring, callback: WakuCallBack, userData: pointer @@ -646,8 +669,10 @@ proc waku_discv5_update_bootnodes( DiscoveryRequest.createUpdateBootstrapNodesRequest( DiscoveryMsgType.UPDATE_DISCV5_BOOTSTRAP_NODES, bootnodes ), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_get_my_enr( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer @@ -659,8 +684,10 @@ proc waku_get_my_enr( ctx, RequestType.DEBUG, DebugNodeRequest.createShared(DebugNodeMsgType.RETRIEVE_MY_ENR), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_get_my_peerid( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer @@ -672,8 +699,10 @@ proc waku_get_my_peerid( ctx, RequestType.DEBUG, DebugNodeRequest.createShared(DebugNodeMsgType.RETRIEVE_MY_PEER_ID), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_start_discv5( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer @@ -682,9 +711,13 @@ proc waku_start_discv5( waku_thread .sendRequestToWakuThread( - ctx, RequestType.DISCOVERY, DiscoveryRequest.createDiscV5StartRequest() + ctx, + RequestType.DISCOVERY, + DiscoveryRequest.createDiscV5StartRequest(), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_stop_discv5( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer @@ -693,9 +726,13 @@ proc waku_stop_discv5( waku_thread .sendRequestToWakuThread( - ctx, RequestType.DISCOVERY, DiscoveryRequest.createDiscV5StopRequest() + ctx, + RequestType.DISCOVERY, + DiscoveryRequest.createDiscV5StopRequest(), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_peer_exchange_request( ctx: ptr WakuContext, numPeers: uint64, callback: WakuCallBack, userData: pointer @@ -704,9 +741,13 @@ proc waku_peer_exchange_request( waku_thread .sendRequestToWakuThread( - ctx, RequestType.DISCOVERY, DiscoveryRequest.createPeerExchangeRequest(numPeers) + ctx, + RequestType.DISCOVERY, + DiscoveryRequest.createPeerExchangeRequest(numPeers), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) proc waku_ping_peer( ctx: ptr WakuContext, @@ -722,8 +763,10 @@ proc waku_ping_peer( ctx, RequestType.PING, PingRequest.createShared(peerAddr, chronos.milliseconds(timeoutMs)), + callback, + userData, ) - .handleRes(callback, userData) + .handleSentToChannelRes(callback, userData) ### End of exported procs ################################################################################ diff --git a/library/waku_thread/inter_thread_communication/waku_thread_request.nim b/library/waku_thread/inter_thread_communication/waku_thread_request.nim index 63dacc06d8..85bceb1f92 100644 --- a/library/waku_thread/inter_thread_communication/waku_thread_request.nim +++ b/library/waku_thread/inter_thread_communication/waku_thread_request.nim @@ -3,9 +3,10 @@ ## the Waku Thread. import std/json, results -import chronos +import chronos, chronos/threadsync import ../../../waku/factory/waku, + ../../ffi_types, ./requests/node_lifecycle_request, ./requests/peer_manager_request, ./requests/protocols/relay_request, @@ -28,22 +29,79 @@ type RequestType* {.pure.} = enum type InterThreadRequest* = object reqType: RequestType reqContent: pointer + callback: WakuCallBack + userData: pointer + doneSignal: ThreadSignalPtr proc createShared*( - T: type InterThreadRequest, reqType: RequestType, reqContent: pointer + T: type InterThreadRequest, + reqType: RequestType, + reqContent: pointer, + callback: WakuCallBack, + userData: pointer, ): ptr type T = var ret = createShared(T) ret[].reqType = reqType ret[].reqContent = reqContent + ret[].callback = callback + ret[].userData = userData + ret[].doneSignal = ThreadSignalPtr.new().tryGet() return ret -proc process*( - T: type InterThreadRequest, request: ptr InterThreadRequest, waku: ptr Waku -): Future[Result[string, string]] {.async.} = - ## Processes the request and deallocates its memory +proc handleRes[T: string | void]( + res: Result[T, string], request: ptr InterThreadRequest +) = + ## Handles the Result responses, which can either be Result[string, string] or + ## Result[void, string]. + defer: - deallocShared(request) + let fireSyncRes = request[].doneSignal.fireSync() + if fireSyncRes.isErr(): + let msg = "libwaku error: " & $fireSyncRes.error + request[].callback( + RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData + ) + return + if fireSyncRes.get() == false: + let msg = "libwaku error: couldn't fireSync in time" + request[].callback( + RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData + ) + return + + if res.isErr(): + foreignThreadGc: + let msg = "libwaku error: " & $res.error + request[].callback( + RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData + ) + return + + foreignThreadGc: + var msg: cstring = "" + when T is string: + msg = res.get().cstring() + request[].callback( + RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData + ) + return +proc deallocOnDone*(T: type InterThreadRequest, request: ptr InterThreadRequest) = + let res = waitSync(request[].doneSignal) + if res.isErr(): + foreignThreadGc: + let msg = "libwaku error: " & $res.error + request[].callback( + RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData + ) + return + + discard request[].doneSignal.close() + deallocShared(request) + +proc process*( + T: type InterThreadRequest, request: ptr InterThreadRequest, waku: ptr Waku +) {.async.} = echo "Request received: " & $request[].reqType let retFut = @@ -65,7 +123,7 @@ proc process*( of LIGHTPUSH: cast[ptr LightpushRequest](request[].reqContent).process(waku) - return await retFut + handleRes(await retFut, request) proc `$`*(self: InterThreadRequest): string = return $self.reqType diff --git a/library/waku_thread/inter_thread_communication/waku_thread_response.nim b/library/waku_thread/inter_thread_communication/waku_thread_response.nim deleted file mode 100644 index e44e2d49f8..0000000000 --- a/library/waku_thread/inter_thread_communication/waku_thread_response.nim +++ /dev/null @@ -1,48 +0,0 @@ -## This file contains the base message response type that will be handled. -## The response will be created from the Waku Thread and processed in -## the main thread. - -import std/json, results -import ../../alloc - -type ResponseType {.pure.} = enum - OK - ERR - -type InterThreadResponse* = object - respType: ResponseType - content: cstring - -proc createShared*( - T: type InterThreadResponse, res: Result[string, string] -): ptr type T = - ## Converts a `Result[string, string]` into a `ptr InterThreadResponse` - ## so that it can be transfered to another thread in a safe way. - - var ret = createShared(T) - if res.isOk(): - let value = res.get() - ret[].respType = ResponseType.OK - ret[].content = value.alloc() - else: - let error = res.error - ret[].respType = ResponseType.ERR - ret[].content = res.error.alloc() - return ret - -proc process*( - T: type InterThreadResponse, resp: ptr InterThreadResponse -): Result[string, string] = - ## Converts the received `ptr InterThreadResponse` into a - ## `Result[string, string]`. Notice that the response is expected to be - ## allocated from the Waku Thread and deallocated by the main thread. - - defer: - deallocShared(resp[].content) - deallocShared(resp) - - case resp[].respType - of OK: - return ok($resp[].content) - of ERR: - return err($resp[].content) diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index 5babbf3801..a28a9205c1 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -4,17 +4,12 @@ import std/[options, atomics, os, net] import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results -import - waku/factory/waku, - ./inter_thread_communication/waku_thread_request, - ./inter_thread_communication/waku_thread_response +import waku/factory/waku, ./inter_thread_communication/waku_thread_request, ../ffi_types type WakuContext* = object thread: Thread[(ptr WakuContext)] reqChannel: ChannelSPSCSingle[ptr InterThreadRequest] reqSignal: ThreadSignalPtr - respChannel: ChannelSPSCSingle[ptr InterThreadResponse] - respSignal: ThreadSignalPtr userData*: pointer eventCallback*: pointer eventUserdata*: pointer @@ -43,20 +38,7 @@ proc runWaku(ctx: ptr WakuContext) {.async.} = continue ## Handle the request - let resultResponse = waitFor InterThreadRequest.process(request, addr waku) - - ## Converting a `Result` into a thread-safe transferable response type - let threadSafeResp = InterThreadResponse.createShared(resultResponse) - - ## Send the response back to the thread that sent the request - let sentOk = ctx.respChannel.trySend(threadSafeResp) - if not sentOk: - error "could not send a request to the requester thread", - original_request = $request[] - - let fireRes = ctx.respSignal.fireSync() - if fireRes.isErr(): - error "could not fireSync back to requester thread", error = fireRes.error + asyncSpawn InterThreadRequest.process(request, addr waku) proc run(ctx: ptr WakuContext) {.thread.} = ## Launch waku worker @@ -68,8 +50,6 @@ proc createWakuThread*(): Result[ptr WakuContext, string] = var ctx = createShared(WakuContext, 1) ctx.reqSignal = ThreadSignalPtr.new().valueOr: return err("couldn't create reqSignal ThreadSignalPtr") - ctx.respSignal = ThreadSignalPtr.new().valueOr: - return err("couldn't create respSignal ThreadSignalPtr") ctx.running.store(true) @@ -93,15 +73,18 @@ proc destroyWakuThread*(ctx: ptr WakuContext): Result[void, string] = joinThread(ctx.thread) ?ctx.reqSignal.close() - ?ctx.respSignal.close() freeShared(ctx) return ok() proc sendRequestToWakuThread*( - ctx: ptr WakuContext, reqType: RequestType, reqContent: pointer -): Result[string, string] = - let req = InterThreadRequest.createShared(reqType, reqContent) + ctx: ptr WakuContext, + reqType: RequestType, + reqContent: pointer, + callback: WakuCallBack, + userData: pointer, +): Result[onDone, string] = + let req = InterThreadRequest.createShared(reqType, reqContent, callback, userData) ## Sending the request let sentOk = ctx.reqChannel.trySend(req) if not sentOk: @@ -114,15 +97,7 @@ proc sendRequestToWakuThread*( if fireSyncRes.get() == false: return err("Couldn't fireSync in time") - # Waiting for the response - let res = waitSync(ctx.respSignal) - if res.isErr(): - return err("Couldnt receive response signal") - - var response: ptr InterThreadResponse - var recvOk = ctx.respChannel.tryRecv(response) - if recvOk == false: - return err("Couldn't receive response from the waku thread: " & $req[]) + let onDoneProc = proc() = + InterThreadRequest.deallocOnDone(req) - ## Converting the thread-safe response into a managed/CG'ed `Result` - return InterThreadResponse.process(response) + ok(onDoneProc) From b3b48b6cdd05e3458bb6fe78849d1b478169b2bf Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Fri, 22 Nov 2024 09:31:10 -0400 Subject: [PATCH 2/2] fix: code review --- library/libwaku.nim | 145 ++++++------------ .../waku_thread_request.nim | 31 ++-- library/waku_thread/waku_thread.nim | 10 +- 3 files changed, 74 insertions(+), 112 deletions(-) diff --git a/library/libwaku.nim b/library/libwaku.nim index f54f245c53..d0ac7f8520 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -42,18 +42,25 @@ template checkLibwakuParams*( if isNil(callback): return RET_MISSING_CALLBACK -proc handleSentToChannelRes( - res: Result[onDone, string], callback: WakuCallBack, userData: pointer +proc handleLifecycleRequest( + ctx: ptr WakuContext, + requestType: RequestType, + content: pointer, + callback: WakuCallBack, + userData: pointer, ): cint = - ## Handles failures when sending a request to the waku thread. - let onDone = res.valueOr: + let onDone = waku_thread.sendRequestToWakuThread( + ctx, requestType, content, callback, userData + ).valueOr: foreignThreadGc: - let msg = "libwaku error: " & $res.error + let msg = "libwaku error: " & $error callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR onDone() + return RET_OK + proc relayEventCallback(ctx: ptr WakuContext): WakuRelayHandler = return proc( pubsubTopic: PubsubTopic, msg: WakuMessage @@ -133,15 +140,16 @@ proc waku_new( ctx.userData = userData - discard waku_thread - .sendRequestToWakuThread( - ctx, - RequestType.LIFECYCLE, - NodeLifecycleRequest.createShared(NodeLifecycleMsgType.CREATE_NODE, configJson), - callback, - userData, - ) - .handleSentToChannelRes(callback, userData) + let retCode = handleLifecycleRequest( + ctx, + RequestType.LIFECYCLE, + NodeLifecycleRequest.createShared(NodeLifecycleMsgType.CREATE_NODE, configJson), + callback, + userData, + ) + + if retCode == RET_ERR: + return nil return ctx @@ -150,13 +158,14 @@ proc waku_destroy( ): cint {.dynlib, exportc.} = checkLibwakuParams(ctx, callback, userData) - let res = waku_thread.destroyWakuThread(ctx) - if res.isErr(): + waku_thread.destroyWakuThread(ctx).isOkOr: foreignThreadGc: - let msg = "libwaku error: " & $res.error + let msg = "libwaku error: " & $error callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR + return RET_OK + proc waku_version( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = @@ -280,8 +289,7 @@ proc waku_relay_publish( else: $pst - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.RELAY, RelayRequest.createShared( @@ -293,37 +301,30 @@ proc waku_relay_publish( callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_start( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = checkLibwakuParams(ctx, callback, userData) - - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.LIFECYCLE, NodeLifecycleRequest.createShared(NodeLifecycleMsgType.START_NODE), callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_stop( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = checkLibwakuParams(ctx, callback, userData) - - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.LIFECYCLE, NodeLifecycleRequest.createShared(NodeLifecycleMsgType.STOP_NODE), callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_relay_subscribe( ctx: ptr WakuContext, @@ -338,8 +339,7 @@ proc waku_relay_subscribe( deallocShared(pst) var cb = relayEventCallback(ctx) - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.RELAY, RelayRequest.createShared( @@ -348,7 +348,6 @@ proc waku_relay_subscribe( callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_relay_unsubscribe( ctx: ptr WakuContext, @@ -362,8 +361,7 @@ proc waku_relay_unsubscribe( defer: deallocShared(pst) - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.RELAY, RelayRequest.createShared( @@ -374,7 +372,6 @@ proc waku_relay_unsubscribe( callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_relay_get_num_connected_peers( ctx: ptr WakuContext, @@ -388,15 +385,13 @@ proc waku_relay_get_num_connected_peers( defer: deallocShared(pst) - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.RELAY, RelayRequest.createShared(RelayMsgType.LIST_CONNECTED_PEERS, PubsubTopic($pst)), callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_relay_get_num_peers_in_mesh( ctx: ptr WakuContext, @@ -410,15 +405,13 @@ proc waku_relay_get_num_peers_in_mesh( defer: deallocShared(pst) - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.RELAY, RelayRequest.createShared(RelayMsgType.LIST_MESH_PEERS, PubsubTopic($pst)), callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_lightpush_publish( ctx: ptr WakuContext, @@ -455,8 +448,7 @@ proc waku_lightpush_publish( else: $pst - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.LIGHTPUSH, LightpushRequest.createShared( @@ -465,7 +457,6 @@ proc waku_lightpush_publish( callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_connect( ctx: ptr WakuContext, @@ -476,8 +467,7 @@ proc waku_connect( ): cint {.dynlib, exportc.} = checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.PEER_MANAGER, PeerManagementRequest.createShared( @@ -486,15 +476,13 @@ proc waku_connect( callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_disconnect_peer_by_id( ctx: ptr WakuContext, peerId: cstring, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.PEER_MANAGER, PeerManagementRequest.createShared( @@ -503,7 +491,6 @@ proc waku_disconnect_peer_by_id( callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_dial_peer( ctx: ptr WakuContext, @@ -515,8 +502,7 @@ proc waku_dial_peer( ): cint {.dynlib, exportc.} = checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.PEER_MANAGER, PeerManagementRequest.createShared( @@ -527,7 +513,6 @@ proc waku_dial_peer( callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_dial_peer_by_id( ctx: ptr WakuContext, @@ -539,8 +524,7 @@ proc waku_dial_peer_by_id( ): cint {.dynlib, exportc.} = checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.PEER_MANAGER, PeerManagementRequest.createShared( @@ -549,45 +533,39 @@ proc waku_dial_peer_by_id( callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_get_peerids_from_peerstore( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.PEER_MANAGER, PeerManagementRequest.createShared(PeerManagementMsgType.GET_ALL_PEER_IDS), callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_get_connected_peers( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.PEER_MANAGER, PeerManagementRequest.createShared(PeerManagementMsgType.GET_CONNECTED_PEERS), callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_get_peerids_by_protocol( ctx: ptr WakuContext, protocol: cstring, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.PEER_MANAGER, PeerManagementRequest.createShared( @@ -596,7 +574,6 @@ proc waku_get_peerids_by_protocol( callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_store_query( ctx: ptr WakuContext, @@ -608,30 +585,26 @@ proc waku_store_query( ): cint {.dynlib, exportc.} = checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.STORE, JsonStoreQueryRequest.createShared(jsonQuery, peerAddr, timeoutMs), callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_listen_addresses( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.DEBUG, DebugNodeRequest.createShared(DebugNodeMsgType.RETRIEVE_LISTENING_ADDRESSES), callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_dns_discovery( ctx: ptr WakuContext, @@ -643,8 +616,7 @@ proc waku_dns_discovery( ): cint {.dynlib, exportc.} = checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.DISCOVERY, DiscoveryRequest.createRetrieveBootstrapNodesRequest( @@ -653,7 +625,6 @@ proc waku_dns_discovery( callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_discv5_update_bootnodes( ctx: ptr WakuContext, bootnodes: cstring, callback: WakuCallBack, userData: pointer @@ -662,8 +633,7 @@ proc waku_discv5_update_bootnodes( ## bootnodes - JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]` checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.DISCOVERY, DiscoveryRequest.createUpdateBootstrapNodesRequest( @@ -672,82 +642,71 @@ proc waku_discv5_update_bootnodes( callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_get_my_enr( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.DEBUG, DebugNodeRequest.createShared(DebugNodeMsgType.RETRIEVE_MY_ENR), callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_get_my_peerid( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.DEBUG, DebugNodeRequest.createShared(DebugNodeMsgType.RETRIEVE_MY_PEER_ID), callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_start_discv5( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.DISCOVERY, DiscoveryRequest.createDiscV5StartRequest(), callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_stop_discv5( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.DISCOVERY, DiscoveryRequest.createDiscV5StopRequest(), callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_peer_exchange_request( ctx: ptr WakuContext, numPeers: uint64, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.DISCOVERY, DiscoveryRequest.createPeerExchangeRequest(numPeers), callback, userData, ) - .handleSentToChannelRes(callback, userData) proc waku_ping_peer( ctx: ptr WakuContext, @@ -758,15 +717,13 @@ proc waku_ping_peer( ): cint {.dynlib, exportc.} = checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleLifecycleRequest( ctx, RequestType.PING, PingRequest.createShared(peerAddr, chronos.milliseconds(timeoutMs)), callback, userData, ) - .handleSentToChannelRes(callback, userData) ### End of exported procs ################################################################################ diff --git a/library/waku_thread/inter_thread_communication/waku_thread_request.nim b/library/waku_thread/inter_thread_communication/waku_thread_request.nim index 85bceb1f92..748b42b459 100644 --- a/library/waku_thread/inter_thread_communication/waku_thread_request.nim +++ b/library/waku_thread/inter_thread_communication/waku_thread_request.nim @@ -26,7 +26,7 @@ type RequestType* {.pure.} = enum DISCOVERY LIGHTPUSH -type InterThreadRequest* = object +type WakuThreadRequest* = object reqType: RequestType reqContent: pointer callback: WakuCallBack @@ -34,7 +34,7 @@ type InterThreadRequest* = object doneSignal: ThreadSignalPtr proc createShared*( - T: type InterThreadRequest, + T: type WakuThreadRequest, reqType: RequestType, reqContent: pointer, callback: WakuCallBack, @@ -49,7 +49,7 @@ proc createShared*( return ret proc handleRes[T: string | void]( - res: Result[T, string], request: ptr InterThreadRequest + res: Result[T, string], request: ptr WakuThreadRequest ) = ## Handles the Result responses, which can either be Result[string, string] or ## Result[void, string]. @@ -57,13 +57,13 @@ proc handleRes[T: string | void]( defer: let fireSyncRes = request[].doneSignal.fireSync() if fireSyncRes.isErr(): - let msg = "libwaku error: " & $fireSyncRes.error + let msg = "libwaku error: handleRes fireSyncRes error: " & $fireSyncRes.error request[].callback( RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData ) return if fireSyncRes.get() == false: - let msg = "libwaku error: couldn't fireSync in time" + let msg = "libwaku error: handleRes fireSyncRes error: couldn't fireSync in time" request[].callback( RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData ) @@ -71,7 +71,7 @@ proc handleRes[T: string | void]( if res.isErr(): foreignThreadGc: - let msg = "libwaku error: " & $res.error + let msg = "libwaku error: handleRes fireSyncRes error: " & $res.error request[].callback( RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData ) @@ -86,7 +86,10 @@ proc handleRes[T: string | void]( ) return -proc deallocOnDone*(T: type InterThreadRequest, request: ptr InterThreadRequest) = +proc deallocOnDone*(T: type WakuThreadRequest, request: ptr WakuThreadRequest) = + defer: + deallocShared(request) + let res = waitSync(request[].doneSignal) if res.isErr(): foreignThreadGc: @@ -96,14 +99,16 @@ proc deallocOnDone*(T: type InterThreadRequest, request: ptr InterThreadRequest) ) return - discard request[].doneSignal.close() - deallocShared(request) + request[].doneSignal.close().isOkOr: + foreignThreadGc: + let msg = "libwaku error: doneSignal close error: " & $res.error + request[].callback( + RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData + ) proc process*( - T: type InterThreadRequest, request: ptr InterThreadRequest, waku: ptr Waku + T: type WakuThreadRequest, request: ptr WakuThreadRequest, waku: ptr Waku ) {.async.} = - echo "Request received: " & $request[].reqType - let retFut = case request[].reqType of LIFECYCLE: @@ -125,5 +130,5 @@ proc process*( handleRes(await retFut, request) -proc `$`*(self: InterThreadRequest): string = +proc `$`*(self: WakuThreadRequest): string = return $self.reqType diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index a28a9205c1..0108580c13 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -8,7 +8,7 @@ import waku/factory/waku, ./inter_thread_communication/waku_thread_request, ../f type WakuContext* = object thread: Thread[(ptr WakuContext)] - reqChannel: ChannelSPSCSingle[ptr InterThreadRequest] + reqChannel: ChannelSPSCSingle[ptr WakuThreadRequest] reqSignal: ThreadSignalPtr userData*: pointer eventCallback*: pointer @@ -31,14 +31,14 @@ proc runWaku(ctx: ptr WakuContext) {.async.} = break ## Trying to get a request from the libwaku requestor thread - var request: ptr InterThreadRequest + var request: ptr WakuThreadRequest let recvOk = ctx.reqChannel.tryRecv(request) if not recvOk: error "waku thread could not receive a request" continue ## Handle the request - asyncSpawn InterThreadRequest.process(request, addr waku) + asyncSpawn WakuThreadRequest.process(request, addr waku) proc run(ctx: ptr WakuContext) {.thread.} = ## Launch waku worker @@ -84,7 +84,7 @@ proc sendRequestToWakuThread*( callback: WakuCallBack, userData: pointer, ): Result[onDone, string] = - let req = InterThreadRequest.createShared(reqType, reqContent, callback, userData) + let req = WakuThreadRequest.createShared(reqType, reqContent, callback, userData) ## Sending the request let sentOk = ctx.reqChannel.trySend(req) if not sentOk: @@ -98,6 +98,6 @@ proc sendRequestToWakuThread*( return err("Couldn't fireSync in time") let onDoneProc = proc() = - InterThreadRequest.deallocOnDone(req) + WakuThreadRequest.deallocOnDone(req) ok(onDoneProc)