Skip to content
Draft
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
05841d7
added registry change
popuz May 18, 2026
db8c788
collapse StreamerComponent + flip Bridge & Cleanup onto resolver
popuz May 18, 2026
00334bf
removed one-to-many ecs approach
popuz May 18, 2026
1b36f3e
refactor(NearbyVoiceChat ECS): split runtime queries by archetype filter
popuz May 19, 2026
5d4b60c
Merge branch 'dev' into chore/nearby-voice-chat/audio-stream-deduplic…
popuz May 19, 2026
9bf172e
refactor(NearbyVoiceChat): drop bindings HashSet — component is the d…
popuz May 19, 2026
1ef96c4
Merge remote-tracking branch 'origin/chore/nearby-voice-chat/audio-st…
popuz May 19, 2026
5d24072
Merge branch 'dev' into chore/nearby-voice-chat/audio-stream-deduplic…
popuz May 20, 2026
2a3308a
refactor(NearbyVoiceChat): inline bulk remove into cleanup query, pas…
popuz May 20, 2026
f149b6e
Merge remote-tracking branch 'origin/chore/nearby-voice-chat/audio-st…
popuz May 20, 2026
48e130b
Merge branch 'dev' into chore/nearby-voice-chat/audio-stream-deduplic…
popuz May 20, 2026
d5fd015
Merge branch 'dev' into chore/nearby-voice-chat/audio-stream-deduplic…
popuz May 20, 2026
89d78ea
Merge branch 'dev' into chore/nearby-voice-chat/audio-stream-deduplic…
popuz May 20, 2026
88d353b
Merge branch 'dev' into chore/nearby-voice-chat/audio-stream-deduplic…
popuz May 20, 2026
26ab3c3
tests clean-up and docs update
popuz May 21, 2026
44d61b6
2 more comments
popuz May 21, 2026
add84de
pointed to merged livekit and updated to new interface
popuz May 21, 2026
03bc7dd
Merge branch 'dev' into chore/nearby-voice-chat/audio-stream-deduplic…
popuz May 21, 2026
9e47387
updated comment
popuz May 21, 2026
ac735af
Merge remote-tracking branch 'origin/chore/nearby-voice-chat/audio-st…
popuz May 21, 2026
e851ed2
Merge branch 'dev' into chore/nearby-voice-chat/audio-stream-deduplic…
popuz May 21, 2026
2daefb1
changed ref to in
popuz May 21, 2026
8cadb60
Merge remote-tracking branch 'origin/chore/nearby-voice-chat/audio-st…
popuz May 21, 2026
d34bec9
rolled back LogAudioStreams
popuz May 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using LiveKit.Rooms.Streaming.Audio;
using LiveKit.Rooms.VideoStreaming;
using RichTypes;
Comment thread
popuz marked this conversation as resolved.
using System;
using System.Collections.Generic;

namespace DCL.Multiplayer.Connections.Rooms.Interior
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using LiveKit.Rooms.Streaming.Audio;
using LiveKit.Rooms.VideoStreaming;
using RichTypes;
using System;
using System.Collections.Generic;

namespace DCL.Multiplayer.Connections.Rooms.Logs
Expand Down Expand Up @@ -66,6 +65,11 @@ public LogVideoStreams(IStreams<IVideoStream, VideoStreamInfo> origin) : base(or

public class LogAudioStreams : LogStreams<AudioStream, AudioStreamInfo>, IAudioStreams
Comment thread
popuz marked this conversation as resolved.
{
public LogAudioStreams(IStreams<AudioStream, AudioStreamInfo> origin) : base(origin, nameof(LogVideoStreams)) { }
private readonly IAudioStreams origin;

public LogAudioStreams(IAudioStreams origin) : base(origin, nameof(LogAudioStreams))
{
this.origin = origin;
}
}
Comment thread
popuz marked this conversation as resolved.
Comment thread
popuz marked this conversation as resolved.
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using LiveKit.Rooms.Streaming.Audio;
using LiveKit.Rooms.VideoStreaming;
using RichTypes;
using System;
using System.Collections.Generic;

namespace DCL.Multiplayer.Connections.Rooms.Nulls
Expand Down
9 changes: 2 additions & 7 deletions Explorer/Assets/DCL/PluginSystem/Global/VoiceChatPlugin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@
using DCL.VoiceChat.Nearby;
using DCL.VoiceChat.Nearby.Audio;
using DCL.VoiceChat.Nearby.Systems;
using LiveKit.Rooms.Streaming;
using LiveKit.Rooms.Streaming.Audio;
using LiveKit.Rooms;
using System;
using System.Collections.Generic;
using System.Threading;
using DCL.UI;
using DCL.Utilities;
Expand Down Expand Up @@ -78,7 +75,6 @@ public class VoiceChatPlugin : IDCLGlobalPlugin<VoiceChatPlugin.Settings>
private VoiceChatPanelPresenter? voiceChatPanelPresenter;
private VoiceChatDebugContainer? voiceChatDebugContainer;
private NearbyAudioStreamsRegistry? nearbyAudioStreamRegistry;
private HashSet<StreamKey>? nearbyAudioBindings;
private NearbyAudioSourceFactory? nearbyAudioSourceFactory;
private NearbyVoiceChatSuppressor? nearbyVoiceChatSuppressor;
private NearbyMicrophoneHandler? nearbyMicrophoneHandler;
Expand Down Expand Up @@ -156,9 +152,9 @@ public void InjectToWorld(ref ArchSystemsWorldBuilder<Arch.Core.World> builder,

NearbyLivekitBridgeSystem.InjectToWorld(ref builder, nearbyAudioStreamRegistry!);
NearbyAudibleRangeSystem.InjectToWorld(ref builder, voiceChatConfiguration, listenerState);
NearbyAudioBindingSystem.InjectToWorld(ref builder, nearbyAudioStreamRegistry!, nearbyAudioBindings!, userBlockingCache, nearbyStateModel!, nearbyAudioSourceFactory!, RoomMetadataCurrentScene.Instance);
NearbyAudioBindingSystem.InjectToWorld(ref builder, nearbyAudioStreamRegistry!, userBlockingCache, nearbyStateModel!, nearbyAudioSourceFactory!, RoomMetadataCurrentScene.Instance);
NearbyAudioPositionSystem.InjectToWorld(ref builder, nearbyMuteService!, listenerState);
NearbyAudioCleanupSystem.InjectToWorld(ref builder, nearbyAudioStreamRegistry!, nearbyAudioBindings!, userBlockingCache, nearbyStateModel!, nearbyAudioSourceFactory!, RoomMetadataCurrentScene.Instance);
NearbyAudioCleanupSystem.InjectToWorld(ref builder, nearbyAudioStreamRegistry!, userBlockingCache, nearbyStateModel!, nearbyAudioSourceFactory!, RoomMetadataCurrentScene.Instance);
NearbyVoiceChatNametagSystem.InjectToWorld(ref builder, playerEntity, nearbyAudioStreamRegistry!, nearbyStateModel!, nearbyMuteService!);

NearbyVoiceChatDebugSystem.InjectToWorld(ref builder, voiceChatConfiguration, debugContainer, roomHub.IslandRoom(), nearbyStateModel!, nearbyAudioStreamRegistry!, entityParticipantTable);
Expand Down Expand Up @@ -221,7 +217,6 @@ public async UniTask InitializeAsync(Settings settings, CancellationToken ct)
nearbyAudioStreamRegistry = new NearbyAudioStreamsRegistry(islandRoom);
pluginScope.Add(nearbyAudioStreamRegistry);

nearbyAudioBindings = new HashSet<StreamKey>(32);
nearbyAudioSourceFactory = new NearbyAudioSourceFactory(voiceChatConfiguration);

// State model is created in DynamicWorldContainer so analytics can subscribe to it.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,22 @@
using Arch.Core;
using LiveKit.Rooms.Streaming;
using LiveKit.Rooms.Streaming.Audio;
using UnityEngine;

namespace DCL.VoiceChat
{
/// <summary>
/// Lives on a dedicated audio-source entity. Bound 1:1 to a (participant, sid) pair.
/// </summary>
public struct NearbyAudioSourceComponent
{
public readonly StreamKey Key;
public readonly Entity AvatarEntity;
public readonly LivekitAudioSource LivekitAudioSource;

public uint LastSeenMuteVersion;
public bool LastAppliedMute;
public bool LastInactive;
public Vector3 LastWrittenPos;

public NearbyAudioSourceComponent(StreamKey key, Entity avatarEntity, LivekitAudioSource livekitAudioSource)
public NearbyAudioSourceComponent(StreamKey key, LivekitAudioSource livekitAudioSource)
{
Key = key;
AvatarEntity = avatarEntity;
LivekitAudioSource = livekitAudioSource;

LastSeenMuteVersion = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
using DCL.VoiceChat.Nearby.Audio;

namespace DCL.VoiceChat.Nearby
{
/// <summary>
/// Per-avatar mirror of the registry's audio sids for the avatar's walletId.
/// Per-avatar mirror of the registry's single active audio sid for the avatar's walletId.
/// <para>
/// <see cref="StreamSidsSnapshot"/> is a reference to a registry-owned copy-on-write <c>string[]</c>.
/// Reference identity is the version signal: a different reference ↔ content changed.
/// <b>Never mutate.</b>
/// Never retain across frames longer than the COW guarantees in <see cref="NearbyAudioStreamsRegistry"/>.
/// <see cref="CurrentSid"/> is the resolver's pick (most-recent-frame winner). Never <c>null</c>
/// while the component is attached — <see cref="NearbyLivekitBridgeSystem"/> guarantees the invariant
/// by only attaching on a non-null resolver result and ref-mutating on flips.
/// </para>
/// <para>
/// Remote-only: the local participant (user) is never present in the registry's streaming snapshot.
/// LiveKit does not raise <c>TrackSubscribed</c> for locally-published tracks, so this component is never attached to the local player avatar.
/// LiveKit does not raise <c>TrackSubscribed</c> for locally-published tracks, so this component is never
/// attached to the local player avatar.
/// </para>
/// </summary>
public struct NearbyAudioStreamerComponent
{
public string[] StreamSidsSnapshot;
public string CurrentSid;

public NearbyAudioStreamerComponent(string[] streamSidsSnapshot)
public NearbyAudioStreamerComponent(string currentSid)
{
StreamSidsSnapshot = streamSidsSnapshot;
CurrentSid = currentSid;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,28 @@ public interface INearbyAudioStreamRegistry : IDisposable
/// </summary>
bool HasAudioStream(string walletId);

/// <summary>
/// Raw reference to the registry's copy-on-write sid array. <c>null</c> when the wallet is not indexed.
/// Used by <see cref="LiveKit.Rooms.IRoom"/>-driven systems that need to compare snapshots via
/// <see cref="object.ReferenceEquals(object,object)"/> — a different reference ↔ content changed.
/// <b>Never mutate.</b> Treat the returned array as immutable from the caller's perspective.
/// </summary>
string[]? GetAudioSidsArray(string walletId);

/// <summary>
/// Resolves a stream lazily. Must be called from the main thread — <see cref="AudioStream"/>'s constructor
/// reads Unity audio settings and performs a synchronous FFI request.
/// </summary>
Weak<AudioStream> GetActiveStream(StreamKey key);

bool IsStreamGone(StreamKey key);
/// <summary>
/// <b>Call-site discipline.</b> Main-thread only — the multi-candidate branch performs one FFI hop per sid.
/// The single active sid for an identity. Returns <c>null</c> if the identity has no sids.
/// Single-candidate fast path: returned eagerly without consulting the frame oracle (a lone candidate is active by definition).
/// Multi-candidate: picks the sid that most recently emitted a media frame; returns <c>null</c> if none of the candidates has
/// ever emitted a frame — a transient "not-yet-decided" window that the bridge re-polls next tick and self-heals.
/// </summary>
string? GetActiveSid(string walletId);

/// <summary>
/// <c>true</c> when <paramref name="key"/>.sid is the resolver's current pick for <paramref name="key"/>.identity.
/// Cleanup uses this in place of "sid disappeared from snapshot" — it also reaps demoted ghost sids that
/// still exist in the registry but lost to a fresher candidate.
/// Shares the cost profile and main-thread discipline of <see cref="GetActiveSid"/>: same resolver call underneath.
/// </summary>
bool IsActiveSid(StreamKey key);

/// <summary>
/// Returns <c>true</c> if <paramref name="walletId"/> was present in the latest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@ namespace DCL.VoiceChat.Nearby.Audio
/// <b>Immutability contract.</b> Per-identity <c>string[]</c> arrays are copy-on-write
/// (every mutation publishes a new reference); the dictionary itself is swapped atomically on rehydrate / disconnect.
/// Do <b>not</b> reintroduce in-place mutation (array resize, pooled buffers, dict <c>Clear()</c>+rebuild) —
/// the bridge's <c>ReferenceEquals</c> freshness check and the cleanup system's "stream gone" detection both rely on these invariants.
/// <see cref="GetActiveSid"/> iterates the array assuming snapshot semantics; in-place edits would race the resolver.
/// </para>
/// </summary>
public sealed class NearbyAudioStreamsRegistry : INearbyAudioStreamRegistry
{
private readonly IRoom room;

// delegate is the only injectable shape for tests.
private readonly Func<StreamKey, Option<int>> getLastFrameReceivedAt;

// Immutability contract — see class XML. Swappable via Interlocked.Exchange / Volatile.Read. // IGNORE_LINE_WEBGL_THREAD_SAFETY_FLAG
// concurrencyLevel: 1 — FFI dispatch is serial, only one writer ever; saves the per-instance lock array (default = Environment.ProcessorCount).
private DCLConcurrentDictionary<string, string[]> streamsByIdentity = NewSnapshot();
Expand All @@ -53,8 +56,12 @@ public sealed class NearbyAudioStreamsRegistry : INearbyAudioStreamRegistry
public int RebuildEpoch => DCLVolatile.Read(ref rebuildEpoch);

public NearbyAudioStreamsRegistry(IRoom room)
: this(room, key => room.AudioStreams.GetLastFrameReceivedAt(key)) { }

internal NearbyAudioStreamsRegistry(IRoom room, Func<StreamKey, Option<int>> getLastFrameReceivedAt)
{
this.room = room;
this.getLastFrameReceivedAt = getLastFrameReceivedAt;

room.ConnectionUpdated += OnConnectionUpdated;

Expand Down Expand Up @@ -119,19 +126,43 @@ public bool IsActiveSpeaker(string walletId) =>
public bool HasAudioStream(string walletId) =>
DCLVolatile.Read(ref streamsByIdentity).ContainsKey(walletId);

// ReSharper disable once CanSimplifyDictionaryTryGetValueWithGetValueOrDefault
public string[]? GetAudioSidsArray(string walletId) =>
DCLVolatile.Read(ref streamsByIdentity).TryGetValue(walletId, out string[]? arr) ? arr : null;

public Weak<AudioStream> GetActiveStream(StreamKey key) =>
room.AudioStreams.ActiveStream(key);

public bool IsStreamGone(StreamKey key)
public string? GetActiveSid(string walletId)
{
if (!DCLVolatile.Read(ref streamsByIdentity).TryGetValue(key.identity, out string[]? sids))
return true;
if (!DCLVolatile.Read(ref streamsByIdentity).TryGetValue(walletId, out string[]? sids) || sids.Length == 0)
return null;

// Hot path: a single candidate is the active sid by definition; do not consult the frame oracle.
if (sids.Length == 1) return sids[0];

int bestTick = 0;
string? bestSid = null;

foreach (string sid in sids)
{
Option<int> lastFrame = getLastFrameReceivedAt(new StreamKey(walletId, sid));

return Array.IndexOf(sids, key.sid) < 0;
// Option.None: AudioStream missing or never decoded a frame.
if (!lastFrame.Has) continue;

int t = lastFrame.Value;

if (bestSid is null || unchecked(t - bestTick) > 0)
{
bestTick = t;
bestSid = sid;
}
}

return bestSid;
}

public bool IsActiveSid(StreamKey key)
{
string? active = GetActiveSid(key.identity);
return active is not null && string.Equals(active, key.sid, StringComparison.Ordinal);
}

private void OnConnectionUpdated(IRoom _, ConnectionUpdate update, LKDisconnectReason? __)
Expand Down Expand Up @@ -216,10 +247,9 @@ private static void AddAudioSidTo(DCLConcurrentDictionary<string, string[]> dict
}

// Publishes a NEW filtered array on every successful update; never mutates 'prev'.
// Required by the immutability contract in the class XML: NearbyLivekitBridgeSystem
// uses ReferenceEquals(observed, current) for its per-frame freshness check, so any
// in-place mutation of a published array would silently hide the change from the bridge.
// Single-writer assumption (serial FFI dispatch) — no CAS retry needed.
// Required by the immutability contract in the class XML: GetActiveSid iterates the
// array assuming snapshot semantics — in-place edits would race the resolver mid-pick.
// Single-writer assumption (serial FFI dispatch).
private void RemoveAudioSid(string identity, string sid)
{
DCLConcurrentDictionary<string, string[]> snap = DCLVolatile.Read(ref streamsByIdentity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,29 @@
using LiveKit.Rooms.Streaming;
using LiveKit.Rooms.Streaming.Audio;
using RichTypes;
using System.Collections.Generic;

namespace DCL.VoiceChat.Nearby.Systems
{
/// <summary>
/// Owns the creation part of the Nearby audio-source lifecycle.
/// For every avatar entity (<see cref="Profile"/> + <see cref="AvatarBase"/> + <see cref="NearbyAudioStreamerComponent"/> + <see cref="InAudibleRangeTag"/>)
/// the system iterates the snapshotted sids on the entity itself and materializes an audio-source entity per <c>(walletId, sid)</c> pair that does not yet have one.
/// Throttled to a fixed budget per frame so a crowd ramp-up does not spike a single tick.
/// Creates audio source component for every avatar streamer in audible range
/// the system materializes the audio-source component for the resolver-picked <c>(walletId, CurrentSid)</c> pair when one does not yet exist.
/// </summary>
[UpdateInGroup(typeof(NearbyVoiceChatGroup))]
[UpdateAfter(typeof(NearbyAudibleRangeSystem))]
[UpdateBefore(typeof(NearbyAudioPositionSystem))]
public partial class NearbyAudioBindingSystem : BaseUnityLoopSystem
{
internal const int MAX_CREATIONS_PER_FRAME = 10;

private readonly INearbyAudioStreamRegistry registry;
private readonly HashSet<StreamKey> bindings;
private readonly IUserBlockingCache userBlockingCache;
private readonly NearbyVoiceChatStateModel stateModel;
private readonly INearbyAudioSourceFactory sourceFactory;
private readonly RoomMetadataCurrentScene roomMetadataCurrentScene;


internal NearbyAudioBindingSystem(World world, INearbyAudioStreamRegistry registry, HashSet<StreamKey> bindings, IUserBlockingCache userBlockingCache, NearbyVoiceChatStateModel stateModel, INearbyAudioSourceFactory sourceFactory, RoomMetadataCurrentScene roomMetadataCurrentScene) : base(world)
internal NearbyAudioBindingSystem(World world, INearbyAudioStreamRegistry registry, IUserBlockingCache userBlockingCache, NearbyVoiceChatStateModel stateModel, INearbyAudioSourceFactory sourceFactory, RoomMetadataCurrentScene roomMetadataCurrentScene) : base(world)
{
this.registry = registry;
this.bindings = bindings;
this.userBlockingCache = userBlockingCache;
this.stateModel = stateModel;
this.sourceFactory = sourceFactory;
Expand All @@ -61,8 +55,8 @@ protected override void Update(float t)
}

[Query]
[None(typeof(DeleteEntityIntention))]
[All(typeof(AvatarBase), typeof(NearbyAudioStreamerComponent), typeof(InAudibleRangeTag))]
[None(typeof(NearbyAudioSourceComponent), typeof(DeleteEntityIntention))]
[All(typeof(AvatarBase), typeof(InAudibleRangeTag))]
private void CreateAndBindAudioSourcesToStreamers(Entity avatarEntity, in Profile profile, in NearbyAudioStreamerComponent nearby)
{
string walletId = profile.UserId;
Expand All @@ -71,24 +65,13 @@ private void CreateAndBindAudioSourcesToStreamers(Entity avatarEntity, in Profil
// Skip blocked / scene-banned identities. Cleanup system handles already-bound entities; this filter prevents creation in the first place.
if (userBlockingCache.UserIsBlocked(walletId) || roomMetadataCurrentScene.IsUserBanned(walletId)) return;

// Sids ride on the entity itself — no registry call on the per-avatar hot path.
// Bridge system guarantees SidsSnapshot is non-null for any entity that has the component.
foreach (string sid in nearby.StreamSidsSnapshot)
{
var key = new StreamKey(walletId, sid);

if (!bindings.Contains(key))
{
Weak<AudioStream> stream = registry.GetActiveStream(key);
var key = new StreamKey(walletId, nearby.CurrentSid);
Weak<AudioStream> stream = registry.GetActiveStream(key);

// Track was unsubscribed between collection (snapshot read) and resolve (GetActiveStream); skip to avoid a one-frame ghost source.
if (!stream.Resource.Has) continue;

LivekitAudioSource source = sourceFactory.Create(key, stream);

World.Create(new NearbyAudioSourceComponent(key, avatarEntity, source));
bindings.Add(key);
}
if (stream.Resource.Has)
{
LivekitAudioSource source = sourceFactory.Create(key, stream);
World.Add(avatarEntity, new NearbyAudioSourceComponent(key, source));
}
}
}
Expand Down
Loading
Loading