typescript subscribe_to_update_websocket

This commit is contained in:
mbecker20
2025-02-09 03:19:26 -08:00
parent 1d242039e2
commit b763db7dab
3 changed files with 174 additions and 4 deletions

View File

@@ -9,8 +9,10 @@ import {
AuthRequest,
ExecuteRequest,
ReadRequest,
UpdateListItem,
UserRequest,
WriteRequest,
WsLoginMessage,
} from "./types.js";
export * as Types from "./types.js";
@@ -19,6 +21,16 @@ type InitOptions =
| { type: "jwt"; params: { jwt: string } }
| { type: "api-key"; params: { key: string; secret: string } };
export class CancelToken {
cancelled: boolean;
constructor() {
this.cancelled = false;
}
cancel() {
this.cancelled = true;
}
}
/** Initialize a new client for Komodo */
export function KomodoClient(url: string, options: InitOptions) {
const state = {
@@ -107,7 +119,6 @@ export function KomodoClient(url: string, options: InitOptions) {
UserResponses[Req["type"]]
>("/user", { type, params });
const read = async <
T extends ReadRequest["type"],
Req extends Extract<ReadRequest, { type: T }>
@@ -120,7 +131,6 @@ export function KomodoClient(url: string, options: InitOptions) {
ReadResponses[Req["type"]]
>("/read", { type, params });
const write = async <
T extends WriteRequest["type"],
Req extends Extract<WriteRequest, { type: T }>
@@ -133,7 +143,6 @@ export function KomodoClient(url: string, options: InitOptions) {
WriteResponses[Req["type"]]
>("/write", { type, params });
const execute = async <
T extends ExecuteRequest["type"],
Req extends Extract<ExecuteRequest, { type: T }>
@@ -148,6 +157,77 @@ export function KomodoClient(url: string, options: InitOptions) {
const core_version = () => read("GetVersion", {}).then((res) => res.version);
const subscribe_to_update_websocket = async ({
on_update,
on_login,
on_close,
retry_timeout_ms = 5_000,
cancel = new CancelToken(),
on_cancel,
}: {
on_update: (update: UpdateListItem) => void;
on_login?: () => void;
on_open?: () => void;
on_close?: () => void;
retry_timeout_ms?: number;
cancel?: CancelToken;
on_cancel?: () => void;
}) => {
while (true) {
if (cancel.cancelled) {
on_cancel?.();
return;
}
try {
const ws = new WebSocket(url.replace("http", "ws") + "/ws/update");
// Handle login on websocket open
ws.addEventListener("open", () => {
const login_msg: WsLoginMessage =
options.type === "jwt"
? {
type: "Jwt",
params: {
jwt: options.params.jwt,
},
}
: {
type: "ApiKeys",
params: {
key: options.params.key,
secret: options.params.secret,
},
};
ws.send(JSON.stringify(login_msg));
});
ws.addEventListener("message", ({ data }: MessageEvent) => {
if (data == "LOGGED_IN") return on_login?.();
on_update(JSON.parse(data));
});
if (on_close) {
ws.addEventListener("close", on_close);
}
// This while loop will end when the socket is closed
while (
ws.readyState !== WebSocket.CLOSING &&
ws.readyState !== WebSocket.CLOSED
) {
if (cancel.cancelled) ws.close();
// Sleep for a bit before checking for websocket closed
await new Promise((resolve) => setTimeout(resolve, 500));
}
} catch (error) {
console.error(error);
// Sleep for a bit before retrying, maybe Komodo Core is down temporarily.
await new Promise((resolve) => setTimeout(resolve, retry_timeout_ms));
}
}
};
return {
/**
* Call the `/auth` api.
@@ -212,5 +292,11 @@ export function KomodoClient(url: string, options: InitOptions) {
execute,
/** Returns the version of Komodo Core the client is calling to. */
core_version,
/**
* Subscribes to the update websocket with automatic reconnect loop.
*
* Note. Awaiting this method will never finish.
*/
subscribe_to_update_websocket,
};
}

View File

@@ -1,5 +1,5 @@
import { AuthResponses, ExecuteResponses, ReadResponses, UserResponses, WriteResponses } from "./responses.js";
import { AuthRequest, ExecuteRequest, ReadRequest, UserRequest, WriteRequest } from "./types.js";
import { AuthRequest, ExecuteRequest, ReadRequest, UpdateListItem, UserRequest, WriteRequest } from "./types.js";
export * as Types from "./types.js";
type InitOptions = {
type: "jwt";
@@ -13,6 +13,11 @@ type InitOptions = {
secret: string;
};
};
export declare class CancelToken {
cancelled: boolean;
constructor();
cancel(): void;
}
/** Initialize a new client for Komodo */
export declare function KomodoClient(url: string, options: InitOptions): {
/**
@@ -88,4 +93,18 @@ export declare function KomodoClient(url: string, options: InitOptions): {
}>>(type: T, params: Req["params"]) => Promise<ExecuteResponses[Req["type"]]>;
/** Returns the version of Komodo Core the client is calling to. */
core_version: () => Promise<string>;
/**
* Subscribes to the update websocket with automatic reconnect loop.
*
* Note. Awaiting this method will never finish.
*/
subscribe_to_update_websocket: ({ on_update, on_login, on_close, retry_timeout_ms, cancel, on_cancel, }: {
on_update: (update: UpdateListItem) => void;
on_login?: () => void;
on_open?: () => void;
on_close?: () => void;
retry_timeout_ms?: number;
cancel?: CancelToken;
on_cancel?: () => void;
}) => Promise<void>;
};

View File

@@ -1,4 +1,13 @@
export * as Types from "./types.js";
export class CancelToken {
cancelled;
constructor() {
this.cancelled = false;
}
cancel() {
this.cancelled = true;
}
}
/** Initialize a new client for Komodo */
export function KomodoClient(url, options) {
const state = {
@@ -66,6 +75,56 @@ export function KomodoClient(url, options) {
const write = async (type, params) => await request("/write", { type, params });
const execute = async (type, params) => await request("/execute", { type, params });
const core_version = () => read("GetVersion", {}).then((res) => res.version);
const subscribe_to_update_websocket = async ({ on_update, on_login, on_close, retry_timeout_ms = 3000, cancel = new CancelToken(), on_cancel, }) => {
while (true) {
if (cancel.cancelled) {
on_cancel?.();
return;
}
try {
const ws = new WebSocket(url.replace("http", "ws") + "/ws/update");
// Handle login on websocket open
ws.addEventListener("open", () => {
const login_msg = options.type === "jwt"
? {
type: "Jwt",
params: {
jwt: options.params.jwt,
},
}
: {
type: "ApiKeys",
params: {
key: options.params.key,
secret: options.params.secret,
},
};
ws.send(JSON.stringify(login_msg));
});
ws.addEventListener("message", ({ data }) => {
if (data == "LOGGED_IN")
return on_login?.();
on_update(JSON.parse(data));
});
if (on_close) {
ws.addEventListener("close", on_close);
}
// This while loop will end when the socket is closed
while (ws.readyState !== WebSocket.CLOSING &&
ws.readyState !== WebSocket.CLOSED) {
if (cancel.cancelled)
ws.close();
// Sleep for a bit before checking for websocket closed
await new Promise((resolve) => setTimeout(resolve, 500));
}
}
catch (error) {
console.error(error);
// Sleep for a bit before retrying, maybe Komodo Core is down temporarily.
await new Promise((resolve) => setTimeout(resolve, retry_timeout_ms));
}
}
};
return {
/**
* Call the `/auth` api.
@@ -130,5 +189,11 @@ export function KomodoClient(url, options) {
execute,
/** Returns the version of Komodo Core the client is calling to. */
core_version,
/**
* Subscribes to the update websocket with automatic reconnect loop.
*
* Note. Awaiting this method will never finish.
*/
subscribe_to_update_websocket,
};
}