From 1e8f577104fc2dd03fcd3238542fd19948183075 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Mon, 23 Oct 2023 08:37:28 +0200 Subject: [PATCH] chore(cbindings): avoid using global var in libwaku.nim (#2118) * libwaku: Avoid global variable and changing callback signature * Better signature for the callback. Two new parameters have been added: one aimed to allow passing the caller result code; the other param is to pass an optional userData pointer that might need to be linked locally with the Context object. For example, this is needed in Rust to make the passed closures live as long as the Context. * waku_example.c: adaptation to the latest changes * libwaku.h: removing 'waku_set_user_data' function * libwaku.nim: renaming parameter in WakuCallBack (isOk -> callerRet) --- examples/cbindings/waku_example.c | 77 ++++++++----- library/libwaku.h | 66 ++++++++---- library/libwaku.nim | 161 +++++++++++++++++++--------- library/waku_thread/waku_thread.nim | 14 +-- 4 files changed, 213 insertions(+), 105 deletions(-) diff --git a/examples/cbindings/waku_example.c b/examples/cbindings/waku_example.c index b58dbc9a61..ec1b8f9d32 100644 --- a/examples/cbindings/waku_example.c +++ b/examples/cbindings/waku_example.c @@ -30,6 +30,12 @@ struct ConfigNode { char peers[2048]; }; +// libwaku Context +void* ctx; + +// For the case of C language we don't need to store a particular userData +void* userData = NULL; + // Arguments parsing static char doc[] = "\nC example that shows how to use the waku library."; static char args_doc[] = ""; @@ -78,8 +84,18 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { static struct argp argp = { options, parse_opt, args_doc, doc, 0, 0, 0 }; +void event_handler(int callerRet, const char* msg, size_t len) { + if (callerRet == RET_ERR) { + printf("Error: %s\n", msg); + exit(1); + } + else if (callerRet == RET_OK) { + printf("Receiving message %s\n", msg); + } +} + char* contentTopic = NULL; -void handle_content_topic(const char* msg, size_t len) { +void handle_content_topic(int callerRet, const char* msg, size_t len) { if (contentTopic != NULL) { free(contentTopic); } @@ -89,7 +105,7 @@ void handle_content_topic(const char* msg, size_t len) { } char* publishResponse = NULL; -void handle_publish_ok(const char* msg, size_t len) { +void handle_publish_ok(int callerRet, const char* msg, size_t len) { printf("Publish Ok: %s %lu\n", msg, len); if (publishResponse != NULL) { @@ -100,22 +116,19 @@ void handle_publish_ok(const char* msg, size_t len) { strcpy(publishResponse, msg); } -void handle_error(const char* msg, size_t len) { - printf("Error: %s\n", msg); - exit(1); -} - #define MAX_MSG_SIZE 65535 void publish_message(char* pubsubTopic, const char* msg) { char jsonWakuMsg[MAX_MSG_SIZE]; char *msgPayload = b64_encode(msg, strlen(msg)); - WAKU_CALL( waku_content_topic("appName", + WAKU_CALL( waku_content_topic(RET_OK, + "appName", 1, "contentTopicName", "encoding", - handle_content_topic) ); + handle_content_topic, + userData) ); snprintf(jsonWakuMsg, MAX_MSG_SIZE, @@ -124,10 +137,12 @@ void publish_message(char* pubsubTopic, const char* msg) { free(msgPayload); - WAKU_CALL( waku_relay_publish(pubsubTopic, + WAKU_CALL( waku_relay_publish(&ctx, + pubsubTopic, jsonWakuMsg, 10000 /*timeout ms*/, - handle_error) ); + event_handler, + userData) ); printf("waku relay response [%s]\n", publishResponse); } @@ -137,15 +152,11 @@ void show_help_and_exit() { exit(1); } -void event_handler(const char* msg, size_t len) { - printf("Receiving message %s\n", msg); -} - -void print_default_pubsub_topic(const char* msg, size_t len) { +void print_default_pubsub_topic(int callerRet, const char* msg, size_t len) { printf("Default pubsub topic: %s\n", msg); } -void print_waku_version(const char* msg, size_t len) { +void print_waku_version(int callerRet, const char* msg, size_t len) { printf("Git Version: %s\n", msg); } @@ -186,8 +197,10 @@ void handle_user_input() { char pubsubTopic[128]; scanf("%127s", pubsubTopic); - WAKU_CALL( waku_relay_subscribe(pubsubTopic, - handle_error) ); + WAKU_CALL( waku_relay_subscribe(&ctx, + pubsubTopic, + event_handler, + userData) ); printf("The subscription went well\n"); show_main_menu(); @@ -199,7 +212,7 @@ void handle_user_input() { printf("e.g.: /ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN\n"); char peerAddr[512]; scanf("%511s", peerAddr); - WAKU_CALL(waku_connect(peerAddr, 10000 /* timeoutMs */, handle_error)); + WAKU_CALL(waku_connect(&ctx, peerAddr, 10000 /* timeoutMs */, event_handler, userData)); show_main_menu(); break; @@ -239,6 +252,8 @@ int main(int argc, char** argv) { show_help_and_exit(); } + ctx = waku_init(event_handler, userData); + char jsonConfig[1024]; snprintf(jsonConfig, 1024, "{ \ \"host\": \"%s\", \ @@ -250,25 +265,29 @@ int main(int argc, char** argv) { cfgNode.key, cfgNode.relay ? "true":"false"); - WAKU_CALL( waku_default_pubsub_topic(print_default_pubsub_topic) ); - WAKU_CALL( waku_version(print_waku_version) ); + WAKU_CALL( waku_default_pubsub_topic(&ctx, print_default_pubsub_topic, userData) ); + WAKU_CALL( waku_version(&ctx, print_waku_version, userData) ); printf("Bind addr: %s:%u\n", cfgNode.host, cfgNode.port); printf("Waku Relay enabled: %s\n", cfgNode.relay == 1 ? "YES": "NO"); - WAKU_CALL( waku_new(jsonConfig, handle_error) ); + WAKU_CALL( waku_new(&ctx, jsonConfig, event_handler, userData) ); - waku_set_event_callback(event_handler); - waku_start(); + waku_set_event_callback(event_handler, userData); + waku_start(&ctx, event_handler, userData); printf("Establishing connection with: %s\n", cfgNode.peers); - WAKU_CALL( waku_connect(cfgNode.peers, + WAKU_CALL( waku_connect(&ctx, + cfgNode.peers, 10000 /* timeoutMs */, - handle_error) ); + event_handler, + userData) ); - WAKU_CALL( waku_relay_subscribe("/waku/2/default-waku/proto", - handle_error) ); + WAKU_CALL( waku_relay_subscribe(&ctx, + "/waku/2/default-waku/proto", + event_handler, + userData) ); show_main_menu(); while(1) { handle_user_input(); diff --git a/library/libwaku.h b/library/libwaku.h index d2fe5cd1e4..bbd7f35155 100644 --- a/library/libwaku.h +++ b/library/libwaku.h @@ -15,45 +15,73 @@ extern "C" { #endif -typedef void (*WakuCallBack) (const char* msg, size_t len_0); +typedef void (*WakuCallBack) (int callerRet, const char* msg, size_t len); + +// Initializes the waku library and returns a pointer to the Context. +void* waku_init(WakuCallBack callback, + void* userData); // Creates a new instance of the waku node. // Sets up the waku node from the given configuration. -int waku_new(const char* configJson, WakuCallBack onErrCb); +int waku_new(void* ctx, + const char* configJson, + WakuCallBack callback, + void* userData); -void waku_start(void); +int waku_start(void* ctx, + WakuCallBack callback, + void* userData); -void waku_stop(void); +int waku_stop(void* ctx, + WakuCallBack callback, + void* userData); -int waku_version(WakuCallBack onOkCb); +int waku_version(void* ctx, + WakuCallBack callback, + void* userData); -void waku_set_event_callback(WakuCallBack callback); +void waku_set_event_callback(WakuCallBack callback, + void* userData); -int waku_content_topic(const char* appName, +int waku_content_topic(void* ctx, + const char* appName, unsigned int appVersion, const char* contentTopicName, const char* encoding, - WakuCallBack onOkCb); + WakuCallBack callback, + void* userData); -int waku_pubsub_topic(const char* topicName, - WakuCallBack onOkCb); +int waku_pubsub_topic(void* ctx, + const char* topicName, + WakuCallBack callback, + void* userData); -int waku_default_pubsub_topic(WakuCallBack onOkCb); +int waku_default_pubsub_topic(void* ctx, + WakuCallBack callback, + void* userData); -int waku_relay_publish(const char* pubSubTopic, +int waku_relay_publish(void* ctx, + const char* pubSubTopic, const char* jsonWakuMessage, unsigned int timeoutMs, - WakuCallBack onErrCb); + WakuCallBack callback, + void* userData); -int waku_relay_subscribe(const char* pubSubTopic, - WakuCallBack onErrCb); +int waku_relay_subscribe(void* ctx, + const char* pubSubTopic, + WakuCallBack callback, + void* userData); -int waku_relay_unsubscribe(const char* pubSubTopic, - WakuCallBack onErrCb); +int waku_relay_unsubscribe(void* ctx, + const char* pubSubTopic, + WakuCallBack callback, + void* userData); -int waku_connect(const char* peerMultiAddr, +int waku_connect(void* ctx, + const char* peerMultiAddr, unsigned int timeoutMs, - WakuCallBack onErrCb); + WakuCallBack callback, + void* userData); #ifdef __cplusplus } diff --git a/library/libwaku.nim b/library/libwaku.nim index 1757de7210..9f6449d535 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -4,8 +4,7 @@ {.passc: "-fPIC".} import - std/[json,sequtils,times,strformat,options,atomics,strutils], - strutils + std/[json,sequtils,times,strformat,options,atomics,strutils] import chronicles, chronos @@ -34,7 +33,9 @@ const RET_ERR: cint = 1 const RET_MISSING_CALLBACK: cint = 2 type - WakuCallBack* = proc(msg: ptr cchar, len: csize_t) {.cdecl, gcsafe.} + WakuCallBack* = proc(callerRet: cint, + msg: ptr cchar, + len: csize_t) {.cdecl, gcsafe.} ### End of exported types ################################################################################ @@ -51,10 +52,11 @@ proc relayEventCallback(pubsubTopic: PubsubTopic, if not isNil(extEventCallback): try: let event = $JsonMessageEvent.new(pubsubTopic, msg) - extEventCallback(unsafeAddr event[0], cast[csize_t](len(event))) + extEventCallback(RET_OK, unsafeAddr event[0], cast[csize_t](len(event))) except Exception,CatchableError: - error "Exception when calling 'eventCallBack': " & - getCurrentExceptionMsg() + let msg = "Exception when calling 'eventCallBack': " & + getCurrentExceptionMsg() + extEventCallback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) else: error "extEventCallback is nil" @@ -64,53 +66,72 @@ proc relayEventCallback(pubsubTopic: PubsubTopic, ################################################################################ ### Exported procs -proc waku_new(configJson: cstring, - onErrCb: WakuCallback): cint - {.dynlib, exportc, cdecl.} = - # Creates a new instance of the WakuNode. - # Notice that the ConfigNode type is also exported and available for users. - - if isNil(onErrCb): - return RET_MISSING_CALLBACK +proc waku_init(callback: WakuCallback): pointer {.dynlib, exportc, cdecl.} = + ## Initializes the waku library. ## Create the Waku thread that will keep waiting for req from the main thread. - waku_thread.createWakuThread().isOkOr: + var ctx = waku_thread.createWakuThread().valueOr: let msg = "Error in createWakuThread: " & $error - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) - return RET_ERR + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) + return nil + + return ctx + +proc waku_new(ctx: ptr ptr Context, + configJson: cstring, + callback: WakuCallback, + userData: pointer): cint + {.dynlib, exportc, cdecl.} = + ## Creates a new instance of the WakuNode. + ## Notice that the ConfigNode type is also exported and available for users. + + ctx[][].userData = userData + + if isNil(callback): + return RET_MISSING_CALLBACK let sendReqRes = waku_thread.sendRequestToWakuThread( + ctx[], RequestType.LIFECYCLE, NodeLifecycleRequest.createShared( NodeLifecycleMsgType.CREATE_NODE, configJson)) if sendReqRes.isErr(): let msg = $sendReqRes.error - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR return RET_OK -proc waku_version(onOkCb: WakuCallBack): cint {.dynlib, exportc.} = - if isNil(onOkCb): +proc waku_version(ctx: ptr ptr Context, + callback: WakuCallBack, + userData: pointer): cint {.dynlib, exportc.} = + + ctx[][].userData = userData + + if isNil(callback): return RET_MISSING_CALLBACK - onOkCb(cast[ptr cchar](WakuNodeVersionString), - cast[csize_t](len(WakuNodeVersionString))) + callback(RET_OK, cast[ptr cchar](WakuNodeVersionString), + cast[csize_t](len(WakuNodeVersionString))) return RET_OK proc waku_set_event_callback(callback: WakuCallBack) {.dynlib, exportc.} = extEventCallback = callback -proc waku_content_topic(appName: cstring, +proc waku_content_topic(ctx: ptr ptr Context, + appName: cstring, appVersion: cuint, contentTopicName: cstring, encoding: cstring, - onOkCb: WakuCallBack): cint {.dynlib, exportc.} = + callback: WakuCallBack, + userData: pointer): cint {.dynlib, exportc.} = # https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding - if isNil(onOkCb): + ctx[][].userData = userData + + if isNil(callback): return RET_MISSING_CALLBACK let appStr = appName.alloc() @@ -118,7 +139,7 @@ proc waku_content_topic(appName: cstring, let encodingStr = encoding.alloc() let contentTopic = fmt"/{$appStr}/{appVersion}/{$ctnStr}/{$encodingStr}" - onOkCb(unsafeAddr contentTopic[0], cast[csize_t](len(contentTopic))) + callback(RET_OK, unsafeAddr contentTopic[0], cast[csize_t](len(contentTopic))) deallocShared(appStr) deallocShared(ctnStr) @@ -126,40 +147,53 @@ proc waku_content_topic(appName: cstring, return RET_OK -proc waku_pubsub_topic(topicName: cstring, - onOkCb: WakuCallBack): cint {.dynlib, exportc, cdecl.} = - if isNil(onOkCb): +proc waku_pubsub_topic(ctx: ptr ptr Context, + topicName: cstring, + callback: WakuCallBack, + userData: pointer): cint {.dynlib, exportc, cdecl.} = + # https://rfc.vac.dev/spec/36/#extern-char-waku_pubsub_topicchar-name-char-encoding + + ctx[][].userData = userData + + if isNil(callback): return RET_MISSING_CALLBACK let topicNameStr = topicName.alloc() - # https://rfc.vac.dev/spec/36/#extern-char-waku_pubsub_topicchar-name-char-encoding let outPubsubTopic = fmt"/waku/2/{$topicNameStr}" - onOkCb(unsafeAddr outPubsubTopic[0], cast[csize_t](len(outPubsubTopic))) + callback(RET_OK, unsafeAddr outPubsubTopic[0], cast[csize_t](len(outPubsubTopic))) deallocShared(topicNameStr) return RET_OK -proc waku_default_pubsub_topic(onOkCb: WakuCallBack): cint {.dynlib, exportc.} = +proc waku_default_pubsub_topic(ctx: ptr ptr Context, + callback: WakuCallBack, + userData: pointer): cint {.dynlib, exportc.} = # https://rfc.vac.dev/spec/36/#extern-char-waku_default_pubsub_topic - if isNil(onOkCb): + + ctx[][].userData = userData + + if isNil(callback): return RET_MISSING_CALLBACK - onOkCb(cast[ptr cchar](DefaultPubsubTopic), - cast[csize_t](len(DefaultPubsubTopic))) + callback(RET_OK, cast[ptr cchar](DefaultPubsubTopic), cast[csize_t](len(DefaultPubsubTopic))) return RET_OK -proc waku_relay_publish(pubSubTopic: cstring, +proc waku_relay_publish(ctx: ptr ptr Context, + pubSubTopic: cstring, jsonWakuMessage: cstring, timeoutMs: cuint, - onErrCb: WakuCallBack): cint + callback: WakuCallBack, + userData: pointer): cint {.dynlib, exportc, cdecl.} = # https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms - if isNil(onErrCb): + ctx[][].userData = userData + + if isNil(callback): return RET_MISSING_CALLBACK let jwm = jsonWakuMessage.alloc() @@ -169,7 +203,7 @@ proc waku_relay_publish(pubSubTopic: cstring, except JsonParsingError: deallocShared(jwm) let msg = fmt"Error parsing json message: {getCurrentExceptionMsg()}" - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR deallocShared(jwm) @@ -190,7 +224,7 @@ proc waku_relay_publish(pubSubTopic: cstring, ) except KeyError: let msg = fmt"Problem building the WakuMessage: {getCurrentExceptionMsg()}" - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR let pst = pubSubTopic.alloc() @@ -201,6 +235,7 @@ proc waku_relay_publish(pubSubTopic: cstring, $pst let sendReqRes = waku_thread.sendRequestToWakuThread( + ctx[], RequestType.RELAY, RelayRequest.createShared(RelayMsgType.PUBLISH, PubsubTopic($pst), @@ -210,31 +245,47 @@ proc waku_relay_publish(pubSubTopic: cstring, if sendReqRes.isErr(): let msg = $sendReqRes.error - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR return RET_OK -proc waku_start() {.dynlib, exportc.} = +proc waku_start(ctx: ptr ptr Context, + callback: WakuCallBack, + userData: pointer): cint {.dynlib, exportc.} = + + ctx[][].userData = userData + ## TODO: handle the error discard waku_thread.sendRequestToWakuThread( + ctx[], RequestType.LIFECYCLE, NodeLifecycleRequest.createShared( NodeLifecycleMsgType.START_NODE)) -proc waku_stop() {.dynlib, exportc.} = +proc waku_stop(ctx: ptr ptr Context, + callback: WakuCallBack, + userData: pointer): cint {.dynlib, exportc.} = + ctx[][].userData = userData + ## TODO: handle the error discard waku_thread.sendRequestToWakuThread( + ctx[], RequestType.LIFECYCLE, NodeLifecycleRequest.createShared( NodeLifecycleMsgType.STOP_NODE)) proc waku_relay_subscribe( + ctx: ptr ptr Context, pubSubTopic: cstring, - onErrCb: WakuCallBack): cint + callback: WakuCallBack, + userData: pointer): cint {.dynlib, exportc.} = + ctx[][].userData = userData + let pst = pubSubTopic.alloc() let sendReqRes = waku_thread.sendRequestToWakuThread( + ctx[], RequestType.RELAY, RelayRequest.createShared(RelayMsgType.SUBSCRIBE, PubsubTopic($pst), @@ -243,19 +294,24 @@ proc waku_relay_subscribe( if sendReqRes.isErr(): let msg = $sendReqRes.error - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR return RET_OK proc waku_relay_unsubscribe( + ctx: ptr ptr Context, pubSubTopic: cstring, - onErrCb: WakuCallBack): cint + callback: WakuCallBack, + userData: pointer): cint {.dynlib, exportc.} = + ctx[][].userData = userData + let pst = pubSubTopic.alloc() let sendReqRes = waku_thread.sendRequestToWakuThread( + ctx[], RequestType.RELAY, RelayRequest.createShared(RelayMsgType.SUBSCRIBE, PubsubTopic($pst), @@ -264,17 +320,22 @@ proc waku_relay_unsubscribe( if sendReqRes.isErr(): let msg = $sendReqRes.error - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR return RET_OK -proc waku_connect(peerMultiAddr: cstring, +proc waku_connect(ctx: ptr ptr Context, + peerMultiAddr: cstring, timeoutMs: cuint, - onErrCb: WakuCallBack): cint + callback: WakuCallBack, + userData: pointer): cint {.dynlib, exportc.} = + ctx[][].userData = userData + let connRes = waku_thread.sendRequestToWakuThread( + ctx[], RequestType.PEER_MANAGER, PeerManagementRequest.createShared( PeerManagementMsgType.CONNECT_TO, @@ -282,7 +343,7 @@ proc waku_connect(peerMultiAddr: cstring, chronos.milliseconds(timeoutMs))) if connRes.isErr(): let msg = $connRes.error - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR return RET_OK diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index a2965e4dd2..807fb1fe72 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -25,8 +25,7 @@ type reqSignal: ThreadSignalPtr respChannel: ChannelSPSCSingle[ptr InterThreadResponse] respSignal: ThreadSignalPtr - -var ctx {.threadvar.}: ptr Context + userData*: pointer # To control when the thread is running var running: Atomic[bool] @@ -70,13 +69,13 @@ proc run(ctx: ptr Context) {.thread.} = tearDownForeignThreadGc() -proc createWakuThread*(): Result[void, string] = +proc createWakuThread*(): Result[ptr Context, string] = ## This proc is called from the main thread and it creates ## the Waku working thread. waku_init() - ctx = createShared(Context, 1) + var ctx = createShared(Context, 1) ctx.reqSignal = ThreadSignalPtr.new().valueOr: return err("couldn't create reqSignal ThreadSignalPtr") ctx.respSignal = ThreadSignalPtr.new().valueOr: @@ -92,16 +91,17 @@ proc createWakuThread*(): Result[void, string] = return err("failed to create the Waku thread: " & getCurrentExceptionMsg()) - return ok() + return ok(ctx) -proc stopWakuNodeThread*() = +proc stopWakuNodeThread*(ctx: ptr Context) = running.store(false) joinThread(ctx.thread) discard ctx.reqSignal.close() discard ctx.respSignal.close() freeShared(ctx) -proc sendRequestToWakuThread*(reqType: RequestType, +proc sendRequestToWakuThread*(ctx: ptr Context, + reqType: RequestType, reqContent: pointer): Result[string, string] = let req = InterThreadRequest.createShared(reqType, reqContent)