From 7611f0ac202a45f40a6f4d24b60347dd6f54ca7c Mon Sep 17 00:00:00 2001 From: Ben Brown Date: Fri, 22 Nov 2024 11:54:14 -0600 Subject: [PATCH 1/7] Fix #101 - capture device debug content and log it. Also add some comments to transformer to demystify protocol handling. --- src/utils/transformHandler.ts | 40 +++++++++++++++++++++++++++++++---- 1 file changed, 36 insertions(+), 4 deletions(-) diff --git a/src/utils/transformHandler.ts b/src/utils/transformHandler.ts index a000bd6..c889d5d 100644 --- a/src/utils/transformHandler.ts +++ b/src/utils/transformHandler.ts @@ -3,51 +3,79 @@ import type { Logger } from "tslog"; import * as Protobuf from "@meshtastic/protobufs"; import * as Types from "../types.ts"; + +// This function takes the raw binary stream from the radio +// and converts it into usable "packets" that are returned to the +// adapter for handling export const transformHandler = ( log: Logger, onReleaseEvent: SimpleEventDispatcher, onDeviceDebugLog: SimpleEventDispatcher, concurrentLogOutput: boolean, ) => { + // byteBuffer contains the data to be processed let byteBuffer = new Uint8Array([]); + + // return the actual transformer return new TransformStream({ transform(chunk: Uint8Array, controller): void { log = log.getSubLogger({ name: "streamTransformer" }); onReleaseEvent.subscribe(() => { controller.terminate(); }); + + // add the latest chunk of data into the array byteBuffer = new Uint8Array([...byteBuffer, ...chunk]); + + // This loop looks for Meshtastic packets in the stream based on the + // protocol definition. byteBuffer may contain 0 or more packets at + // any time. let processingExhausted = false; while (byteBuffer.length !== 0 && !processingExhausted) { + // Look for the magic byte that indicates a packet is starting const framingIndex = byteBuffer.findIndex((byte) => byte === 0x94); + // Check the second confirmation byte const framingByte2 = byteBuffer[framingIndex + 1]; if (framingByte2 === 0xc3) { + // Check to see if there is content in the buffer before the packet starts + // Per the protocol spec, data that is outside of the packet + // is likely to be ascii debugging information from the radio + // This includes formatting escape codes. if (byteBuffer.subarray(0, framingIndex).length) { if (concurrentLogOutput) { + // dispatch the raw data as an event + // the consumer will have to translate the bytes into ascii onDeviceDebugLog.dispatch(byteBuffer.subarray(0, framingIndex)); } else { - log.warn( + // This takes the bytes, translates them into ascii, and logs them + const ascii_debug = Array.from(byteBuffer.subarray(0, framingIndex)).map((code)=>String.fromCharCode(code)).join(''); + log.trace( Types.EmitterScope.SerialConnection, Types.Emitter.Connect, - `⚠️ Found unneccesary message padding, removing: ${byteBuffer - .subarray(0, framingIndex) - .toString()}`, + `Debug from radio:\n ${ ascii_debug }`, ); } + // Remove everything before the magic byte byteBuffer = byteBuffer.subarray(framingIndex); } + // the next two bytes define the length of the packet const msb = byteBuffer[2]; const lsb = byteBuffer[3]; + // If we have a valid length, and the byteBuffer is long enough, + // then we should have a full packet. Let's process it... if ( msb !== undefined && lsb !== undefined && byteBuffer.length >= 4 + (msb << 8) + lsb ) { + // extract just the right amount of bytes const packet = byteBuffer.subarray(4, 4 + (msb << 8) + lsb); + // check to make sure these bytes don't include a new packet start + // this would indicate a malformed packet... const malformedDetectorIndex = packet.findIndex( (byte) => byte === 0x94, ); @@ -64,9 +92,13 @@ export const transformHandler = ( Protobuf.Mesh.LogRecord_Level.WARNING, ); + // prune out the malformed packet byteBuffer = byteBuffer.subarray(malformedDetectorIndex); } else { + // since we have a valid packet, we can remove those bytes... byteBuffer = byteBuffer.subarray(3 + (msb << 8) + lsb + 1); + + // and return the packet to the pipe... controller.enqueue(packet); } } else { From 9b86bdfd9fb166cac7684b8aca162d20e73c84a8 Mon Sep 17 00:00:00 2001 From: Ben Brown Date: Fri, 22 Nov 2024 11:56:21 -0600 Subject: [PATCH 2/7] clarify comment --- src/utils/transformHandler.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/utils/transformHandler.ts b/src/utils/transformHandler.ts index c889d5d..c6b93b3 100644 --- a/src/utils/transformHandler.ts +++ b/src/utils/transformHandler.ts @@ -16,7 +16,8 @@ export const transformHandler = ( // byteBuffer contains the data to be processed let byteBuffer = new Uint8Array([]); - // return the actual transformer + // return the actual transformer that will be called for each + // new chunk of data... return new TransformStream({ transform(chunk: Uint8Array, controller): void { log = log.getSubLogger({ name: "streamTransformer" }); From 981a0117ab41e32d963538b5525ddfba3cb85392 Mon Sep 17 00:00:00 2001 From: Ben Brown Date: Fri, 22 Nov 2024 11:11:07 -0600 Subject: [PATCH 3/7] Fixes #114: Implement heartbeat ping for serial connections --- src/adapters/serialConnection.ts | 19 +++++++++++++++++++ src/meshDevice.ts | 17 +++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/src/adapters/serialConnection.ts b/src/adapters/serialConnection.ts index 96702d5..0609b98 100644 --- a/src/adapters/serialConnection.ts +++ b/src/adapters/serialConnection.ts @@ -25,6 +25,9 @@ export class SerialConnection extends MeshDevice { * through a transform stream (https://stackoverflow.com/questions/71262432) */ private pipePromise?: Promise; + /* Reference for the heartbeat ping interval so it can be canceled on disconnect. */ + private heartbeatInterval?: NodeJS.Timeout | undefined; + /** * Fires when `disconnect()` is called, used to instruct serial port and * readers to release there locks @@ -44,6 +47,7 @@ export class SerialConnection extends MeshDevice { this.transformer = undefined; this.onReleaseEvent = new SimpleEventDispatcher(); this.preventLock = false; + this.heartbeatInterval = undefined; this.log.debug( Types.Emitter[Types.Emitter.Constructor], @@ -125,6 +129,7 @@ export class SerialConnection extends MeshDevice { }); this.preventLock = false; + /** Connect to device */ await this.port .open({ @@ -151,6 +156,14 @@ export class SerialConnection extends MeshDevice { this.configure().catch(() => { // TODO: FIX, workaround for `wantConfigId` not getting acks. }); + + // Set up an interval to send a heartbeat ping once every minute. + // The firmware requires at least one ping per 15 minutes, so this should be more than enough. + this.heartbeatInterval = setInterval(() => { + this.heartbeat().catch((err) => { + console.error('Heartbeat error', err); + }); + }, 60*1000); } else { console.log("not readable or writable"); } @@ -180,6 +193,12 @@ export class SerialConnection extends MeshDevice { if (this.port?.readable) { await this.port?.close(); } + + // stop the interval when disconnecting. + if (this.heartbeatInterval) { + clearInterval(this.heartbeatInterval); + this.heartbeatInterval = undefined; + } // ------- this.updateDeviceStatus(Types.DeviceStatusEnum.DeviceDisconnected); this.complete(); diff --git a/src/meshDevice.ts b/src/meshDevice.ts index 1c3a58d..70cbdf1 100755 --- a/src/meshDevice.ts +++ b/src/meshDevice.ts @@ -735,6 +735,23 @@ export abstract class MeshDevice { return this.sendRaw(toBinary(Protobuf.Mesh.ToRadioSchema, toRadio)); } + /** Serial connection requires a heartbeat ping to stay connected, otherwise times out after 15 minutes */ + public heartbeat(): Promise { + this.log.debug( + Types.Emitter[Types.Emitter.Ping], + "❤️ Send heartbeat ping to radio", + ); + + const toRadio = create(Protobuf.Mesh.ToRadioSchema, { + payloadVariant: { + case: "heartbeat", + value: {}, + }, + }); + + return this.sendRaw(toBinary(Protobuf.Mesh.ToRadioSchema, toRadio)); + } + /** Sends a trace route packet to the designated node */ public async traceRoute(destination: number): Promise { const routeDiscovery = create(Protobuf.Mesh.RouteDiscoverySchema, { From 51d30757dc2bf94b250a622e75aba54a57643a08 Mon Sep 17 00:00:00 2001 From: Ben Brown Date: Fri, 22 Nov 2024 11:17:43 -0600 Subject: [PATCH 4/7] Adjust type to avoid node specifics --- src/adapters/serialConnection.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/adapters/serialConnection.ts b/src/adapters/serialConnection.ts index 0609b98..9e2b492 100644 --- a/src/adapters/serialConnection.ts +++ b/src/adapters/serialConnection.ts @@ -26,7 +26,7 @@ export class SerialConnection extends MeshDevice { private pipePromise?: Promise; /* Reference for the heartbeat ping interval so it can be canceled on disconnect. */ - private heartbeatInterval?: NodeJS.Timeout | undefined; + private heartbeatInterval?: ReturnType | undefined; /** * Fires when `disconnect()` is called, used to instruct serial port and From 6d27a0daa6df05cfe77103136bc92d6ae63a554a Mon Sep 17 00:00:00 2001 From: Ben Brown Date: Fri, 22 Nov 2024 13:01:16 -0600 Subject: [PATCH 5/7] linter fixes --- src/adapters/serialConnection.ts | 10 +++++----- src/utils/transformHandler.ts | 12 ++++++++---- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/adapters/serialConnection.ts b/src/adapters/serialConnection.ts index 9e2b492..13a402b 100644 --- a/src/adapters/serialConnection.ts +++ b/src/adapters/serialConnection.ts @@ -25,7 +25,7 @@ export class SerialConnection extends MeshDevice { * through a transform stream (https://stackoverflow.com/questions/71262432) */ private pipePromise?: Promise; - /* Reference for the heartbeat ping interval so it can be canceled on disconnect. */ + /* Reference for the heartbeat ping interval so it can be canceled on disconnect. */ private heartbeatInterval?: ReturnType | undefined; /** @@ -161,9 +161,9 @@ export class SerialConnection extends MeshDevice { // The firmware requires at least one ping per 15 minutes, so this should be more than enough. this.heartbeatInterval = setInterval(() => { this.heartbeat().catch((err) => { - console.error('Heartbeat error', err); + console.error("Heartbeat error", err); }); - }, 60*1000); + }, 60 * 1000); } else { console.log("not readable or writable"); } @@ -193,11 +193,11 @@ export class SerialConnection extends MeshDevice { if (this.port?.readable) { await this.port?.close(); } - + // stop the interval when disconnecting. if (this.heartbeatInterval) { clearInterval(this.heartbeatInterval); - this.heartbeatInterval = undefined; + this.heartbeatInterval = undefined; } // ------- this.updateDeviceStatus(Types.DeviceStatusEnum.DeviceDisconnected); diff --git a/src/utils/transformHandler.ts b/src/utils/transformHandler.ts index c6b93b3..5063c21 100644 --- a/src/utils/transformHandler.ts +++ b/src/utils/transformHandler.ts @@ -25,7 +25,7 @@ export const transformHandler = ( controller.terminate(); }); - // add the latest chunk of data into the array + // add the latest chunk of data into the array byteBuffer = new Uint8Array([...byteBuffer, ...chunk]); // This loop looks for Meshtastic packets in the stream based on the @@ -48,12 +48,16 @@ export const transformHandler = ( // the consumer will have to translate the bytes into ascii onDeviceDebugLog.dispatch(byteBuffer.subarray(0, framingIndex)); } else { - // This takes the bytes, translates them into ascii, and logs them - const ascii_debug = Array.from(byteBuffer.subarray(0, framingIndex)).map((code)=>String.fromCharCode(code)).join(''); + // This takes the bytes, translates them into ascii, and logs them + const ascii_debug = Array.from( + byteBuffer.subarray(0, framingIndex) + ) + .map((code)=>String.fromCharCode(code)) + .join(''); log.trace( Types.EmitterScope.SerialConnection, Types.Emitter.Connect, - `Debug from radio:\n ${ ascii_debug }`, + `Debug from radio:\n ${ascii_debug}`, ); } From d04ff907883c5d1708aaf1913547d6a90d962a6a Mon Sep 17 00:00:00 2001 From: Ben Brown Date: Fri, 22 Nov 2024 13:05:24 -0600 Subject: [PATCH 6/7] linter fixes --- src/utils/transformHandler.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/utils/transformHandler.ts b/src/utils/transformHandler.ts index 5063c21..6e172ea 100644 --- a/src/utils/transformHandler.ts +++ b/src/utils/transformHandler.ts @@ -50,10 +50,10 @@ export const transformHandler = ( } else { // This takes the bytes, translates them into ascii, and logs them const ascii_debug = Array.from( - byteBuffer.subarray(0, framingIndex) + byteBuffer.subarray(0, framingIndex), ) - .map((code)=>String.fromCharCode(code)) - .join(''); + .map((code) => String.fromCharCode(code)) + .join(""); log.trace( Types.EmitterScope.SerialConnection, Types.Emitter.Connect, From cf05d6dbc9816b819b6a99b85686f33faa0a8139 Mon Sep 17 00:00:00 2001 From: Ben Brown Date: Fri, 22 Nov 2024 13:07:17 -0600 Subject: [PATCH 7/7] linter fixes --- src/utils/transformHandler.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/utils/transformHandler.ts b/src/utils/transformHandler.ts index 6e172ea..c102cb3 100644 --- a/src/utils/transformHandler.ts +++ b/src/utils/transformHandler.ts @@ -3,7 +3,6 @@ import type { Logger } from "tslog"; import * as Protobuf from "@meshtastic/protobufs"; import * as Types from "../types.ts"; - // This function takes the raw binary stream from the radio // and converts it into usable "packets" that are returned to the // adapter for handling