Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(client-presence): disabled (failing) attendees unit tests #23419

Merged
merged 9 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 74 additions & 37 deletions packages/framework/presence/src/systemWorkspace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import type {
} from "./presence.js";
import { SessionClientStatus } from "./presence.js";
import type { PresenceStatesInternal } from "./presenceStates.js";
import { TimerManager } from "./timerManager.js";
import type { PresenceStates, PresenceStatesSchema } from "./types.js";

/**
Expand All @@ -37,17 +38,12 @@ class SessionClient implements ISessionClient {
*/
public order: number = 0;

private connectionStatus: SessionClientStatus;
private connectionStatus: SessionClientStatus = SessionClientStatus.Disconnected;

public constructor(
public readonly sessionId: ClientSessionId,
private connectionId: ClientConnectionId | undefined = undefined,
) {
this.connectionStatus =
connectionId === undefined
? SessionClientStatus.Disconnected
: SessionClientStatus.Connected;
}
public connectionId: ClientConnectionId | undefined = undefined,
) {}

public getConnectionId(): ClientConnectionId {
if (this.connectionId === undefined) {
Expand All @@ -60,8 +56,7 @@ class SessionClient implements ISessionClient {
return this.connectionStatus;
}

public setConnectionId(connectionId: ClientConnectionId): void {
this.connectionId = connectionId;
public setConnected(): void {
this.connectionStatus = SessionClientStatus.Connected;
}

Expand Down Expand Up @@ -103,6 +98,12 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace {
*/
private readonly attendees = new Map<ClientConnectionId | ClientSessionId, SessionClient>();

// When local client disconnects, we lose the connectivity status updates for remote attendees in the session.
// Upon reconnect, we mark all other attendees connections as stale and update their status to disconnected after 30 seconds of inactivity.
private readonly staleConnectionClients = new Set<SessionClient>();

private readonly staleConnectionTimer = new TimerManager();

public constructor(
clientSessionId: ClientSessionId,
private readonly datastore: SystemWorkspaceDatastore,
Expand Down Expand Up @@ -137,34 +138,23 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace {
): void {
const postUpdateActions: (() => void)[] = [];
const audienceMembers = this.audience.getMembers();
const connectedAttendees = new Set<SessionClient>();
const joiningAttendees = new Set<SessionClient>();
jason-ha marked this conversation as resolved.
Show resolved Hide resolved
for (const [clientConnectionId, value] of Object.entries(
remoteDatastore.clientToSessionId,
)) {
const clientSessionId = value.value;
const { attendee, isNew } = this.ensureAttendee(
const { attendee, isJoining } = this.ensureAttendee(
clientSessionId,
clientConnectionId,
/* order */ value.rev,
// If the attendee is present in audience OR if the attendee update is from the sending remote client itself,
// then the attendee is considered connected.
/* isConnected */ senderConnectionId === clientConnectionId ||
audienceMembers.has(clientConnectionId),
);

// Check new attendee against audience to see if they're currently connected
const isAttendeeConnected = audienceMembers.has(clientConnectionId);

if (isAttendeeConnected) {
connectedAttendees.add(attendee);
if (attendee.getConnectionStatus() === SessionClientStatus.Disconnected) {
attendee.setConnectionId(clientConnectionId);
}
if (isNew) {
// If the attendee is both new and in audience (i.e. currently connected), emit an attendeeJoined event.
postUpdateActions.push(() => this.events.emit("attendeeJoined", attendee));
}
}

// If the attendee is not in the audience, they are considered disconnected.
if (!connectedAttendees.has(attendee)) {
attendee.setDisconnected();
// If the attendee is joining the session, add them to the list of joining attendees to be announced later.
if (isJoining) {
joiningAttendees.add(attendee);
}

const knownSessionId: InternalTypes.ValueRequiredState<ClientSessionId> | undefined =
Expand All @@ -176,6 +166,10 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace {
}
}

for (const announcedAttendee of joiningAttendees) {
postUpdateActions.push(() => this.events.emit("attendeeJoined", announcedAttendee));
}

// TODO: reorganize processUpdate and caller to process actions after all updates are processed.
jason-ha marked this conversation as resolved.
Show resolved Hide resolved
for (const action of postUpdateActions) {
action();
Expand All @@ -189,8 +183,32 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace {
value: this.selfAttendee.sessionId,
};

this.selfAttendee.setConnectionId(clientConnectionId);
// Clear the stale connection timer when the local client reconnects
WillieHabi marked this conversation as resolved.
Show resolved Hide resolved
this.staleConnectionTimer.clearTimeout();

// Mark 'Connected' remote attendees connections as stale
WillieHabi marked this conversation as resolved.
Show resolved Hide resolved
for (const staleConnecionClient of this.attendees.values()) {
WillieHabi marked this conversation as resolved.
Show resolved Hide resolved
if (staleConnecionClient.getConnectionStatus() === SessionClientStatus.Connected) {
this.staleConnectionClients.add(staleConnecionClient);
}
}

// Update the self attendee
this.selfAttendee.connectionId = clientConnectionId;
this.selfAttendee.setConnected();
this.attendees.set(clientConnectionId, this.selfAttendee);

// Start the stale connection timer
this.staleConnectionTimer.setTimeout(() => {
for (const client of this.staleConnectionClients) {
// Mark the client as disconnected and remove from the stale connection set
if (client.getConnectionStatus() === SessionClientStatus.Connected) {
client.setDisconnected();
this.events.emit("attendeeDisconnected", client);
jason-ha marked this conversation as resolved.
Show resolved Hide resolved
}
this.staleConnectionClients.delete(client);
WillieHabi marked this conversation as resolved.
Show resolved Hide resolved
}
}, 30_000);
}

public removeClientConnectionId(clientConnectionId: ClientConnectionId): void {
Expand All @@ -199,6 +217,11 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace {
return;
}

if (attendee === this.selfAttendee) {
// If the local connection is being removed, clear the stale connection timer
jason-ha marked this conversation as resolved.
Show resolved Hide resolved
this.staleConnectionTimer.clearTimeout();
}

// If the last known connectionID is different from the connection ID being removed, the attendee has reconnected,
// therefore we should not change the attendee connection status or emit a disconnect event.
const attendeeReconnected = attendee.getConnectionId() !== clientConnectionId;
Expand Down Expand Up @@ -239,27 +262,41 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace {
clientSessionId: ClientSessionId,
clientConnectionId: ClientConnectionId,
order: number,
): { attendee: SessionClient; isNew: boolean } {
isConnected: boolean,
): { attendee: SessionClient; isJoining: boolean } {
let attendee = this.attendees.get(clientSessionId);
let isNew = false;
let isJoining = false;

if (attendee === undefined) {
// New attendee. Create SessionClient and add session ID based
// entry to map.
attendee = new SessionClient(clientSessionId, clientConnectionId);
this.attendees.set(clientSessionId, attendee);
isNew = true;
if (isConnected) {
attendee.setConnected();
isJoining = true;
}
} else if (order > attendee.order) {
// The given association is newer than the one we have.
// Update the order and current connection ID.
attendee.order = order;
attendee.setConnectionId(clientConnectionId);
isNew = true;
// Known attendee is joining the session if they are currently disconnected
if (attendee.getConnectionStatus() === SessionClientStatus.Disconnected && isConnected) {
attendee.setConnected();
isJoining = true;
}
attendee.connectionId = clientConnectionId;
}

if (this.staleConnectionClients.has(attendee) && isConnected) {
// If the attendee is connected, remove them from the stale connection set
this.staleConnectionClients.delete(attendee);
WillieHabi marked this conversation as resolved.
Show resolved Hide resolved
}

// Always update entry for the connection ID. (Okay if already set.)
this.attendees.set(clientConnectionId, attendee);

return { attendee, isNew };
return { attendee, isJoining };
}
}

Expand Down
40 changes: 21 additions & 19 deletions packages/framework/presence/src/test/presenceManager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ describe("Presence", () => {
const afterCleanUp: (() => void)[] = [];

beforeEach(() => {
presence = prepareConnectedPresence(runtime, "seassionId-2", "client2", clock, logger);
presence = prepareConnectedPresence(runtime, "sessionId-2", "client2", clock, logger);
});

afterEach(() => {
Expand Down Expand Up @@ -203,7 +203,7 @@ describe("Presence", () => {
verifyAttendee(joinedAttendees[0], rejoinAttendeeConnectionId, attendeeSessionId);
});

it.skip('second time is announced once via `attendeeJoined` with status "Connected" when prior is still connected', () => {
it('second time is announced once via `attendeeJoined` with status "Connected" when prior is still connected', () => {
// Act - simulate join message from client
const joinedAttendees = processJoinSignals([rejoinAttendeeSignal]);

Expand All @@ -217,7 +217,7 @@ describe("Presence", () => {
verifyAttendee(joinedAttendees[0], rejoinAttendeeConnectionId, attendeeSessionId);
});

it.skip('first time is announced via `attendeeJoined` with status "Connected" even if unknown to audience', () => {
it('first time is announced via `attendeeJoined` with status "Connected" even if unknown to audience', () => {
// Setup - remove connection from audience
runtime.removeMember(initialAttendeeConnectionId);

Expand Down Expand Up @@ -283,7 +283,7 @@ describe("Presence", () => {
verifyAttendee(joinedAttendees[0], rejoinAttendeeConnectionId, attendeeSessionId);
});

it.skip("as collateral with old connection info and connected is NOT announced via `attendeeJoined`", () => {
it("as collateral with old connection info and connected is NOT announced via `attendeeJoined`", () => {
// Setup - generate signals

// Both connection Id's unkonwn to audience
Expand Down Expand Up @@ -384,7 +384,7 @@ describe("Presence", () => {
// To retain symmetry across Joined and Disconnected events, do not announce
// attendeeJoined when the attendee is already connected and we only see
// a connection id update. This can happen when audience removal is late.
it.skip('is not announced via `attendeeJoined` when already "Connected"', () => {
it('is not announced via `attendeeJoined` when already "Connected"', () => {
// Setup
afterCleanUp.push(
presence.events.on("attendeeJoined", () => {
Expand Down Expand Up @@ -448,19 +448,21 @@ describe("Presence", () => {
// (e.g. being in audience, sending an update, or (re)joining the session) before their connection status set to "Disconnected".
// If an attendee with a stale connection becomes active, their "stale" status is removed.
describe("and then local client disconnects", () => {
let disconnectedAttendees: ISessionClient[];
let remoteDisconnectedAttendees: ISessionClient[];
beforeEach(() => {
// Setup
assert(knownAttendee !== undefined, "No attendee was set in beforeEach");
disconnectedAttendees = [];
remoteDisconnectedAttendees = [];
afterCleanUp.push(
presence.events.on("attendeeDisconnected", (attendee) => {
disconnectedAttendees.push(attendee);
if (attendee !== presence.getMyself()) {
remoteDisconnectedAttendees.push(attendee);
Comment on lines 457 to +459
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we announce ourselves when disconnecting. Do we have a test for that? I don't think we do, but we should. Task for after.

}
}),
);
});

it.skip("updates status of attendee with stale connection after 30s delay upon local reconnection", () => {
it("updates status of attendee with stale connection after 30s delay upon local reconnection", () => {
assert(knownAttendee !== undefined, "No attendee was set in beforeEach");

// Act - disconnect & reconnect local client
Expand All @@ -484,13 +486,13 @@ describe("Presence", () => {
"Attendee with stale connection should be 'Disconnected' 30s after reconnection",
);
assert.strictEqual(
disconnectedAttendees.length,
remoteDisconnectedAttendees.length,
1,
"Exactly one attendee should be announced as disconnected",
);
});

it.skip("does not update status of attendee with stale connection if local client does not reconnect", () => {
it("does not update status of attendee with stale connection if local client does not reconnect", () => {
assert(knownAttendee !== undefined, "No attendee was set in beforeEach");

// Act - disconnect local client and advance timer
Expand All @@ -505,7 +507,7 @@ describe("Presence", () => {
);
});

it.skip("does not update status of attendee with stale connection if local client reconnection lasts less than 30s", () => {
it("does not update status of attendee with stale connection if local client reconnection lasts less than 30s", () => {
assert(knownAttendee !== undefined, "No attendee was set in beforeEach");

// Act - disconnect, reconnect for 15 second, disconnect local client again, then advance timer
Expand All @@ -524,7 +526,7 @@ describe("Presence", () => {
);
});

it.skip("does not update status of attendee with stale connection if attendee rejoins", () => {
it("does not update status of attendee with stale connection if attendee rejoins", () => {
assert(knownAttendee !== undefined, "No attendee was set in beforeEach");

// Setup - fail if attendee joined is announced
Expand Down Expand Up @@ -552,7 +554,7 @@ describe("Presence", () => {
);
});

it.skip("does not update status of attendee with stale connection if attendee sends datastore update", () => {
it("does not update status of attendee with stale connection if attendee sends datastore update", () => {
assert(knownAttendee !== undefined, "No attendee was set in beforeEach");

// Setup - fail if attendee joined is announced
Expand Down Expand Up @@ -598,7 +600,7 @@ describe("Presence", () => {
);
});

it.skip("announces `attendeeDisconnected` once when remote client disconnects after local client reconnects", () => {
it("announces `attendeeDisconnected` once when remote client disconnects after local client reconnects", () => {
assert(knownAttendee !== undefined, "No attendee was set in beforeEach");

// Setup - initial attendee joins before local client disconnects
Expand All @@ -612,7 +614,7 @@ describe("Presence", () => {
runtime.audience.removeMember(initialAttendeeConnectionId); // Remove remote client connection before 30s timeout
// Confirm that `attendeeDisconnected` is announced for when active attendee disconnects
assert.strictEqual(
disconnectedAttendees.length,
remoteDisconnectedAttendees.length,
1,
"Exactly one attendee should be announced as disconnected",
);
Expand All @@ -625,13 +627,13 @@ describe("Presence", () => {
"Attendee should be 'Disconnected'",
);
assert.strictEqual(
disconnectedAttendees.length,
remoteDisconnectedAttendees.length,
1,
"Exactly one attendee should be announced as disconnected",
);
});

it.skip("updates status of attendee with stale connection only 30s after most recent local reconnection", () => {
it("updates status of attendee with stale connection only 30s after most recent local reconnection", () => {
// Setup
assert(knownAttendee !== undefined, "No attendee was set in beforeEach");
assert.strictEqual(
Expand Down Expand Up @@ -667,7 +669,7 @@ describe("Presence", () => {
"Attendee with stale connection has wrong status",
);
assert.strictEqual(
disconnectedAttendees.length,
remoteDisconnectedAttendees.length,
1,
"Exactly one attendee should be announced as disconnected",
);
Expand Down
Loading