Skip to content

Commit

Permalink
core/room: add channel manager, fix bug where channel options not app…
Browse files Browse the repository at this point in the history
…lied

This change fixes the bug described in #411 whereby channel options are overwritten by successive uses of the same
channel, rather than merged together.

In doing this, we also fix the bug in relation to newly introduce spec point CHA-RC3. We were previously depending on the
soft-deprecated behaviour in RTS3c which would allow you to update channel options via setOptions.

The change is achieved by adding a channel manager, whose job it is to register all the options for a given channel
prior to the first channels.get() call, and call channels.get with the amalgamated options.

Closes #411
  • Loading branch information
AndyTWF committed Nov 27, 2024
1 parent bef2c15 commit 8050f7b
Show file tree
Hide file tree
Showing 10 changed files with 306 additions and 63 deletions.
46 changes: 46 additions & 0 deletions src/core/channel-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import * as Ably from 'ably';

import { Logger } from './logger.js';
import { DEFAULT_CHANNEL_OPTIONS } from './version.js';

export type ChannelOptionsMerger = (options: Ably.ChannelOptions) => Ably.ChannelOptions;

export class ChannelManager {
private readonly _realtime: Ably.Realtime;
private readonly _logger: Logger;
private readonly _registeredOptions = new Map<string, Ably.ChannelOptions>();
private readonly _requestedChannels = new Set<string>();

constructor(realtime: Ably.Realtime, logger: Logger) {
logger.trace('ChannelManager();');
this._realtime = realtime;
this._logger = logger;
}

mergeOptions(channelName: string, merger: ChannelOptionsMerger): void {
this._logger.trace('ChannelManager.registerOptions();', { channelName });
if (this._requestedChannels.has(channelName)) {
this._logger.error('channel options cannot be modified after the channel has been requested', { channelName });
throw new Ably.ErrorInfo('channel options cannot be modified after the channel has been requested', 40000, 400);
}

const currentOpts = this._registeredOptions.get(channelName) ?? DEFAULT_CHANNEL_OPTIONS;
this._registeredOptions.set(channelName, merger(currentOpts));
}

get(channelName: string): Ably.RealtimeChannel {
this._logger.trace('ChannelManager.get();', { channelName });
this._requestedChannels.add(channelName);
return this._realtime.channels.get(
channelName,
this._registeredOptions.get(channelName) ?? DEFAULT_CHANNEL_OPTIONS,
);
}

release(channelName: string): void {
this._logger.trace('ChannelManager.release();', { channelName });
this._requestedChannels.delete(channelName);
this._registeredOptions.delete(channelName);
this._realtime.channels.release(channelName);
}
}
16 changes: 0 additions & 16 deletions src/core/channel.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
import * as Ably from 'ably';

import { DEFAULT_CHANNEL_OPTIONS } from './version.js';

export const getChannel = (name: string, realtime: Ably.Realtime, opts?: Ably.ChannelOptions): Ably.RealtimeChannel => {
const resolvedOptions = {
...opts,
params: {
...opts?.params,
...DEFAULT_CHANNEL_OPTIONS.params,
},
};

return realtime.channels.get(name, resolvedOptions);
};

/**
* Get the channel name for the chat messages channel.
* @param roomId The room ID.
Expand Down
13 changes: 7 additions & 6 deletions src/core/messages.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as Ably from 'ably';

import { getChannel, messagesChannelName } from './channel.js';
import { messagesChannelName } from './channel.js';
import { ChannelManager } from './channel-manager.js';
import { ChatApi } from './chat-api.js';
import {
DiscontinuityEmitter,
Expand Down Expand Up @@ -300,16 +301,16 @@ export class DefaultMessages
/**
* Constructs a new `DefaultMessages` instance.
* @param roomId The unique identifier of the room.
* @param realtime An instance of the Ably Realtime client.
* @param channelManager An instance of the ChannelManager.
* @param chatApi An instance of the ChatApi.
* @param clientId The client ID of the user.
* @param logger An instance of the Logger.
*/
constructor(roomId: string, realtime: Ably.Realtime, chatApi: ChatApi, clientId: string, logger: Logger) {
constructor(roomId: string, channelManager: ChannelManager, chatApi: ChatApi, clientId: string, logger: Logger) {
super();
this._roomId = roomId;

this._channel = this._makeChannel(roomId, realtime);
this._channel = this._makeChannel(roomId, channelManager);

this._chatApi = chatApi;
this._clientId = clientId;
Expand All @@ -320,8 +321,8 @@ export class DefaultMessages
/**
* Creates the realtime channel for messages.
*/
private _makeChannel(roomId: string, realtime: Ably.Realtime): Ably.RealtimeChannel {
const channel = getChannel(messagesChannelName(roomId), realtime);
private _makeChannel(roomId: string, channelManager: ChannelManager): Ably.RealtimeChannel {
const channel = channelManager.get(messagesChannelName(roomId));

addListenerToChannelWithoutAttach({
listener: this._processEvent.bind(this),
Expand Down
33 changes: 27 additions & 6 deletions src/core/occupancy.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as Ably from 'ably';

import { getChannel, messagesChannelName } from './channel.js';
import { messagesChannelName } from './channel.js';
import { ChannelManager, ChannelOptionsMerger } from './channel-manager.js';
import { ChatApi } from './chat-api.js';
import {
DiscontinuityEmitter,
Expand Down Expand Up @@ -106,24 +107,24 @@ export class DefaultOccupancy
/**
* Constructs a new `DefaultOccupancy` instance.
* @param roomId The unique identifier of the room.
* @param realtime An instance of the Ably Realtime client.
* @param channelManager An instance of the ChannelManager.
* @param chatApi An instance of the ChatApi.
* @param logger An instance of the Logger.
*/
constructor(roomId: string, realtime: Ably.Realtime, chatApi: ChatApi, logger: Logger) {
constructor(roomId: string, channelManager: ChannelManager, chatApi: ChatApi, logger: Logger) {
super();

this._roomId = roomId;
this._channel = this._makeChannel(roomId, realtime);
this._channel = this._makeChannel(roomId, channelManager);
this._chatApi = chatApi;
this._logger = logger;
}

/**
* Creates the realtime channel for occupancy.
*/
private _makeChannel(roomId: string, realtime: Ably.Realtime): Ably.RealtimeChannel {
const channel = getChannel(messagesChannelName(roomId), realtime, { params: { occupancy: 'metrics' } });
private _makeChannel(roomId: string, channelManager: ChannelManager): Ably.RealtimeChannel {
const channel = channelManager.get(DefaultOccupancy.channelName(roomId));
addListenerToChannelWithoutAttach({
listener: this._internalOccupancyListener.bind(this),
events: ['[meta]occupancy'],
Expand Down Expand Up @@ -244,4 +245,24 @@ export class DefaultOccupancy
get detachmentErrorCode(): ErrorCodes {
return ErrorCodes.OccupancyDetachmentFailed;
}

/**
* Merges the channel options for the room with the ones required for presence.
*
* @param roomOptions The room options to merge for.
* @returns A function that merges the channel options for the room with the ones required for presence.
*/
static channelOptionMerger(): ChannelOptionsMerger {
return (options) => ({ ...options, params: { ...options.params, occupancy: 'metrics' } });
}

/**
* Returns the channel name for the presence channel.
*
* @param roomId The unique identifier of the room.
* @returns The channel name for the presence channel.
*/
static channelName(roomId: string): string {
return messagesChannelName(roomId);
}
}
55 changes: 38 additions & 17 deletions src/core/presence.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as Ably from 'ably';

import { getChannel, messagesChannelName } from './channel.js';
import { messagesChannelName } from './channel.js';
import { ChannelManager, ChannelOptionsMerger } from './channel-manager.js';
import {
DiscontinuityEmitter,
DiscontinuityListener,
Expand Down Expand Up @@ -198,35 +199,24 @@ export class DefaultPresence
/**
* Constructs a new `DefaultPresence` instance.
* @param roomId The unique identifier of the room.
* @param roomOptions The room options for presence.
* @param realtime An instance of the Ably Realtime client.
* @param channelManager The channel manager to use for creating the presence channel.
* @param clientId The client ID, attached to presences messages as an identifier of the sender.
* A channel can have multiple connections using the same clientId.
* @param logger An instance of the Logger.
*/
constructor(roomId: string, roomOptions: RoomOptions, realtime: Ably.Realtime, clientId: string, logger: Logger) {
constructor(roomId: string, channelManager: ChannelManager, clientId: string, logger: Logger) {
super();

this._channel = this._makeChannel(roomId, roomOptions, realtime);
this._channel = this._makeChannel(roomId, channelManager);
this._clientId = clientId;
this._logger = logger;
}

/**
* Creates the realtime channel for presence.
*/
private _makeChannel(roomId: string, roomOptions: RoomOptions, realtime: Ably.Realtime): Ably.RealtimeChannel {
// Set our channel modes based on the room options
const channelModes = ['PUBLISH', 'SUBSCRIBE'] as Ably.ChannelMode[];
if (roomOptions.presence?.enter === undefined || roomOptions.presence.enter) {
channelModes.push('PRESENCE');
}

if (roomOptions.presence?.subscribe === undefined || roomOptions.presence.subscribe) {
channelModes.push('PRESENCE_SUBSCRIBE');
}

const channel = getChannel(messagesChannelName(roomId), realtime, { modes: channelModes });
private _makeChannel(roomId: string, channelManager: ChannelManager): Ably.RealtimeChannel {
const channel = channelManager.get(DefaultPresence.channelName(roomId));

addListenerToChannelPresenceWithoutAttach({
listener: this.subscribeToEvents.bind(this),
Expand Down Expand Up @@ -418,4 +408,35 @@ export class DefaultPresence
get detachmentErrorCode(): ErrorCodes {
return ErrorCodes.PresenceDetachmentFailed;
}

/**
* Merges the channel options for the room with the ones required for presence.
*
* @param roomOptions The room options to merge for.
* @returns A function that merges the channel options for the room with the ones required for presence.
*/
static channelOptionMerger(roomOptions: RoomOptions): ChannelOptionsMerger {
return (options) => {
const channelModes = ['PUBLISH', 'SUBSCRIBE'] as Ably.ChannelMode[];
if (roomOptions.presence?.enter === undefined || roomOptions.presence.enter) {
channelModes.push('PRESENCE');
}

if (roomOptions.presence?.subscribe === undefined || roomOptions.presence.subscribe) {
channelModes.push('PRESENCE_SUBSCRIBE');
}

return { ...options, modes: channelModes };
};
}

/**
* Returns the channel name for the presence channel.
*
* @param roomId The unique identifier of the room.
* @returns The channel name for the presence channel.
*/
static channelName(roomId: string): string {
return messagesChannelName(roomId);
}
}
12 changes: 6 additions & 6 deletions src/core/room-reactions.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as Ably from 'ably';

import { getChannel } from './channel.js';
import { ChannelManager } from './channel-manager.js';
import {
DiscontinuityEmitter,
DiscontinuityListener,
Expand Down Expand Up @@ -143,23 +143,23 @@ export class DefaultRoomReactions
/**
* Constructs a new `DefaultRoomReactions` instance.
* @param roomId The unique identifier of the room.
* @param realtime An instance of the Ably Realtime client.
* @param channelManager The ChannelManager instance.
* @param clientId The client ID of the user.
* @param logger An instance of the Logger.
*/
constructor(roomId: string, realtime: Ably.Realtime, clientId: string, logger: Logger) {
constructor(roomId: string, channelManager: ChannelManager, clientId: string, logger: Logger) {
super();

this._channel = this._makeChannel(roomId, realtime);
this._channel = this._makeChannel(roomId, channelManager);
this._clientId = clientId;
this._logger = logger;
}

/**
* Creates the realtime channel for room reactions.
*/
private _makeChannel(roomId: string, realtime: Ably.Realtime): Ably.RealtimeChannel {
const channel = getChannel(`${roomId}::$chat::$reactions`, realtime);
private _makeChannel(roomId: string, channelManager: ChannelManager): Ably.RealtimeChannel {
const channel = channelManager.get(`${roomId}::$chat::$reactions`);
addListenerToChannelWithoutAttach({
listener: this._forwarder.bind(this),
events: [RoomReactionEvents.Reaction],
Expand Down
36 changes: 30 additions & 6 deletions src/core/room.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as Ably from 'ably';
import cloneDeep from 'lodash.clonedeep';

import { ChannelManager } from './channel-manager.js';
import { ChatApi } from './chat-api.js';
import { Logger } from './logger.js';
import { DefaultMessages, Messages } from './messages.js';
Expand Down Expand Up @@ -167,32 +168,34 @@ export class DefaultRoom implements Room {
this._logger = logger;
this._lifecycle = new DefaultRoomLifecycle(roomId, logger);

const channelManager = this._getChannelManager(options, realtime, logger);

// Setup features
this._messages = new DefaultMessages(roomId, realtime, this._chatApi, realtime.auth.clientId, logger);
this._messages = new DefaultMessages(roomId, channelManager, this._chatApi, realtime.auth.clientId, logger);

const features: ContributesToRoomLifecycle[] = [this._messages];

if (options.presence) {
this._logger.debug('enabling presence on room', { roomId });
this._presence = new DefaultPresence(roomId, options, realtime, realtime.auth.clientId, logger);
this._presence = new DefaultPresence(roomId, channelManager, realtime.auth.clientId, logger);
features.push(this._presence);
}

if (options.typing) {
this._logger.debug('enabling typing on room', { roomId });
this._typing = new DefaultTyping(roomId, options.typing, realtime, realtime.auth.clientId, logger);
this._typing = new DefaultTyping(roomId, options.typing, channelManager, realtime.auth.clientId, logger);
features.push(this._typing);
}

if (options.reactions) {
this._logger.debug('enabling reactions on room', { roomId });
this._reactions = new DefaultRoomReactions(roomId, realtime, realtime.auth.clientId, logger);
this._reactions = new DefaultRoomReactions(roomId, channelManager, realtime.auth.clientId, logger);
features.push(this._reactions);
}

if (options.occupancy) {
this._logger.debug('enabling occupancy on room', { roomId });
this._occupancy = new DefaultOccupancy(roomId, realtime, this._chatApi, logger);
this._occupancy = new DefaultOccupancy(roomId, channelManager, this._chatApi, logger);
features.push(this._occupancy);
}

Expand All @@ -210,13 +213,34 @@ export class DefaultRoom implements Room {
await this._lifecycleManager.release();

for (const feature of features) {
realtime.channels.release(feature.channel.name);
channelManager.release(feature.channel.name);
}

finalized = true;
};
}

/**
* Gets the channel manager for the room, which handles merging channel options together and creating channels.
*
* @param options The room options.
* @param realtime An instance of the Ably Realtime client.
* @param logger An instance of the Logger.
*/
private _getChannelManager(options: RoomOptions, realtime: Ably.Realtime, logger: Logger): ChannelManager {
const manager = new ChannelManager(realtime, logger);

if (options.occupancy) {
manager.mergeOptions(DefaultOccupancy.channelName(this._roomId), DefaultOccupancy.channelOptionMerger());
}

if (options.presence) {
manager.mergeOptions(DefaultPresence.channelName(this._roomId), DefaultPresence.channelOptionMerger(options));
}

return manager;
}

/**
* @inheritdoc Room
*/
Expand Down
Loading

0 comments on commit 8050f7b

Please sign in to comment.