Skip to content

Commit

Permalink
fix(client): an error occurred when dispose a subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
liutaigang committed May 12, 2024
1 parent 902e69f commit 281eccd
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 27 deletions.
2 changes: 1 addition & 1 deletion packages/client/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@jsonrpc-rx/client",
"version": "0.2.1",
"version": "0.2.2",
"author": "jsonrpc-rx",
"description": "A tool library for RPC based on JSON-RPC 2.0 and Reactive Programming",
"main": "./dist/index.cjs.js",
Expand Down
21 changes: 0 additions & 21 deletions packages/client/src/jsonrpc-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import {
Observer,
JsonrpcParams,
JsonrpcCostomError,
INNER_ONCALL_FOR_QUERY_MODE,
ExposeMode,
asyncFuncParamsInterceptor,
} from '@jsonrpc-rx/core';
import { MessageSenderCtx } from './message-sender-ctx';
Expand All @@ -54,7 +52,6 @@ export class JsonrpcClient implements IJsonrpcClient {
disposable: IDisposable;
}
> = new Map();
private unifyQueryModeMap = new Map<string, ExposeMode>();

private msgSenderCtx: MessageSenderCtx;
private msgReceiverCtx: MessageReceiverCtx;
Expand Down Expand Up @@ -148,24 +145,6 @@ export class JsonrpcClient implements IJsonrpcClient {
});
};

async _unify(name: string, args: any[]) {
if (toType(name) != 'string') this.throwInvalidParamsError();

let mode: ExposeMode;
if (this.unifyQueryModeMap.has(name)) {
mode = this.unifyQueryModeMap.get(name)!;
} else {
mode = await this.call<ExposeMode>(INNER_ONCALL_FOR_QUERY_MODE, [name]);
this.unifyQueryModeMap.set(name, mode);
}

if (mode === 'subscribe') {
return this[mode](name, args[0], args.slice(1));
} else {
return this[mode](name, args);
}
}

private receiveMessage() {
const receiveHandler = (messageBody: MessageBody) => {
this.receiveMessageForCall(messageBody as JsonrpcResponseBody);
Expand Down
73 changes: 68 additions & 5 deletions packages/client/src/wrap.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,70 @@
import { Callable, HandlerConfig, Notifiable, Observable, Observer, Promisify, PromisifyReturn, Publisher } from '@jsonrpc-rx/core';
import {
Callable,
ExposeMode,
HandlerConfig,
INNER_ONCALL_FOR_QUERY_MODE,
Notifiable,
Observable,
Observer,
Promisify,
PromisifyReturn,
Publisher,
} from '@jsonrpc-rx/core';
import { JsonrpcClient } from './jsonrpc-client';

type To<T> = T;

type ObservableTypeMapper<T extends (args: any) => any> = T extends (publisher: Publisher<infer P>, ...args: infer A) => infer R
? (observer: Observer<P>, ...rest: A) => Promisify<R>
: PromisifyReturn<T>;

type DefaultTypeMapper<T extends { [key: string]: any }> = T extends () => unknown
? PromisifyReturn<T>
: T extends (publisher: Publisher<infer P>, ...args: infer A) => infer R
? (observer: Observer<P>, ...rest: A) => Promisify<R>
: PromisifyReturn<T>;

type NotifiableTypeMapper<T extends (args: any) => any> = To<(...params: Parameters<T>) => void>;

type CallableTypeMapper<T extends (args: any) => any> = To<(...params: Parameters<T>) => Promisify<ReturnType<T>>>;

type HandlersTypeMapper<T extends { [key: string]: any }> = To<{
[K in keyof T]: T[K] extends Notifiable<any>
? NotifiableTypeMapper<T[K]>
: T[K] extends Observable<any>
? ObservableTypeMapper<T[K]>
: T[K] extends Callable<any>
? CallableTypeMapper<T[K]>
: PromisifyReturn<T[K]>;
: DefaultTypeMapper<T[K]>;
}>;

const unifyQueryModeMapKey = Symbol('unifyQueryModeMap');

export const wrap = <T extends HandlerConfig>(jsonrpcClient: JsonrpcClient): HandlersTypeMapper<T> => {
if ((jsonrpcClient as any)[unifyQueryModeMapKey] == null) {
(jsonrpcClient as any)[unifyQueryModeMapKey] = new Map();
}

const proxyHandler: ProxyHandler<object> = {
get: function (target, prop: string) {
return (...params: any[]) => jsonrpcClient._unify(prop, params);
get: function (_, name: string) {
return async (...params: any[]) => {
const unifyQueryModeMap = (jsonrpcClient as any)[unifyQueryModeMapKey] as Map<string, ExposeMode>;

let mode: ExposeMode;
if (unifyQueryModeMap.has(name)) {
mode = unifyQueryModeMap.get(name)!;
} else {
mode = await jsonrpcClient.call<ExposeMode>(INNER_ONCALL_FOR_QUERY_MODE, [name]);
unifyQueryModeMap.set(name, mode);
}

if (mode === 'subscribe') {
const disposable = await jsonrpcClient[mode](name, params[0], params.slice(1));
return disposable.dispose.bind(disposable);
} else {
return jsonrpcClient[mode](name, params);
}
};
},
};
return new Proxy({}, proxyHandler) as any;
Expand All @@ -32,9 +76,12 @@ export const wrap = <T extends HandlerConfig>(jsonrpcClient: JsonrpcClient): Han
// math: asCall(async (calculator: (...nums: number[]) => number, a: number, b: number) => {
// return await calculator(a, b);
// }),
// hello: asNotify(() => {
// hello: asNotify((a: string) => {
// console.log('hello jsonrpc-rx');
// }),
// hello01: () => {
// console.log('hello jsonrpc-rx');
// },
// timer: asSubject((publisher: Publisher<number>, maxSecond: number = 10) => {
// let second = 0;
// const interval = setInterval(() => {
Expand Down Expand Up @@ -63,10 +110,26 @@ export const wrap = <T extends HandlerConfig>(jsonrpcClient: JsonrpcClient): Han
// clearInterval(interval);
// };
// }, 0),
// timer02: (publisher: Publisher<number>, maxSecond: number = 10) => {
// let second = 0;
// const interval = setInterval(() => {
// if (++second > maxSecond) {
// clearInterval(interval);
// publisher.complete();
// return;
// }
// publisher.next(second);
// }, 1000);
// return () => {
// clearInterval(interval);
// };
// },
// };

// type HandlersType = typeof handlers;
// const remote = wrap<HandlersType>({} as any);
// remote.sum;
// remote.hello;
// remote.hello01;
// remote.timer;
// remote.timer02;

0 comments on commit 281eccd

Please sign in to comment.