Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/stream_chat/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@

✅ Added

- Added support for predefined filters for `QueryChannels` on `StreamChatClient` (`StreamChatClient.queryChannels` and `StreamChatClient.queryChannelsWithResult`).
- Added `ChatPersistenceClient.queryChannelStates` and `ChatPersistenceClient.saveChannelQueries` as the unified read/write methods for channel-query persistence. Both accept standard and predefined-filter parameters and internally dispatch.
- Added `StreamChatClient.pauseReconnect` / `resumeReconnect` to suspend the WebSocket's auto-retry loop without tearing down the user session.

⚠️ Deprecated

- Deprecated `ChatPersistenceClient.getChannelStates` and `ChatPersistenceClient.updateChannelQueries` in favor of the unified `queryChannelStates` / `saveChannelQueries`. The deprecated methods stay overridable so downstream subclasses keep working unchanged.

🐞 Fixed

- Fixed an unhandled `WebSocketChannelException` surfacing when a reconnect attempt failed (e.g. DNS lookup failed in background); the duplicate signal on `sink.done` is now ignored since the stream's `onError` already handles it.
Expand Down
192 changes: 172 additions & 20 deletions packages/stream_chat/lib/src/client/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import 'package:meta/meta.dart';
import 'package:rxdart/rxdart.dart';
import 'package:stream_chat/src/client/channel.dart';
import 'package:stream_chat/src/client/channel_delivery_reporter.dart';
import 'package:stream_chat/src/client/query_channels_result.dart';
import 'package:stream_chat/src/client/retry_policy.dart';
import 'package:stream_chat/src/core/api/attachment_file_uploader.dart';
import 'package:stream_chat/src/core/api/requests.dart';
Expand Down Expand Up @@ -646,12 +647,58 @@ class StreamChatClient {
});
}

final _queryChannelsCache = InFlightCache<String, List<Channel>>();
final _queryChannelsCache = InFlightCache<String, QueryChannelsResult>();

/// Requests channels with a given query.
///
/// Either an inline [filter]/[channelStateSort] pair or a [predefinedFilter]
/// identifier (optionally interpolated with [filterValues] and [sortValues])
/// can be supplied.
///
/// Use [queryChannelsWithResult] if you also need the server-resolved
/// [PredefinedFilter] spec.
Stream<List<Channel>> queryChannels({
Filter? filter,
SortOrder<ChannelState>? channelStateSort,
String? predefinedFilter,
Map<String, Object?>? filterValues,
Map<String, Object?>? sortValues,
bool state = true,
bool watch = true,
bool presence = false,
int? memberLimit,
int? messageLimit,
PaginationParams paginationParams = const PaginationParams(),
bool waitForConnect = true,
}) =>
queryChannelsWithResult(
filter: filter,
channelStateSort: channelStateSort,
predefinedFilter: predefinedFilter,
filterValues: filterValues,
sortValues: sortValues,
state: state,
watch: watch,
presence: presence,
memberLimit: memberLimit,
messageLimit: messageLimit,
paginationParams: paginationParams,
waitForConnect: waitForConnect,
).map((result) => result.channels);

/// Requests channels with a given query, yielding a [QueryChannelsResult]
/// that carries both the live channel list and the server-resolved
/// [PredefinedFilter] spec (when one is associated with the query).
///
/// Yields the offline-cached result first (when available), followed by
/// the online result. Concurrent identical online queries are coalesced
/// via [_queryChannelsCache].
Stream<QueryChannelsResult> queryChannelsWithResult({
Filter? filter,
SortOrder<ChannelState>? channelStateSort,
String? predefinedFilter,
Map<String, Object?>? filterValues,
Map<String, Object?>? sortValues,
bool state = true,
bool watch = true,
bool presence = false,
Expand All @@ -668,6 +715,9 @@ class StreamChatClient {
final hash = generateHash([
filter,
channelStateSort,
predefinedFilter,
filterValues,
sortValues,
state,
watch,
presence,
Expand All @@ -677,15 +727,18 @@ class StreamChatClient {
]);

// Per-caller offline emit — local persistence, not coalesced.
var offlineChannels = <Channel>[];
QueryChannelsResult? offlineResult;
try {
offlineChannels = await queryChannelsOffline(
offlineResult = await _queryChannelsOfflineImpl(
filter: filter,
predefinedFilter: predefinedFilter,
filterValues: filterValues,
sortValues: sortValues,
channelStateSort: channelStateSort,
paginationParams: paginationParams,
);

if (offlineChannels.isNotEmpty) yield offlineChannels;
if (offlineResult.channels.isNotEmpty) yield offlineResult;
} catch (e, stk) {
logger.warning('Error querying channels offline', e, stk);
// Continue to online query even if offline fails
Expand All @@ -697,9 +750,12 @@ class StreamChatClient {
// the lifecycle details.
final result = await _queryChannelsCache.run(
hash,
() => queryChannelsOnline(
() => _queryChannelsOnlineImpl(
filter: filter,
sort: channelStateSort,
predefinedFilter: predefinedFilter,
filterValues: filterValues,
sortValues: sortValues,
state: state,
watch: watch,
presence: presence,
Expand All @@ -719,7 +775,7 @@ class StreamChatClient {
} catch (e, stk) {
logger.severe('Error querying channels online', e, stk);
// Only rethrow if we have no channels to show the user
if (offlineChannels.isEmpty) rethrow;
if (offlineResult == null || offlineResult.channels.isEmpty) rethrow;
}
}

Expand Down Expand Up @@ -748,6 +804,40 @@ class StreamChatClient {
Future<List<Channel>> queryChannelsOnline({
Filter? filter,
SortOrder<ChannelState>? sort,
String? predefinedFilter,
Map<String, Object?>? filterValues,
Map<String, Object?>? sortValues,
bool state = true,
bool watch = true,
bool presence = false,
int? memberLimit,
int? messageLimit,
bool waitForConnect = true,
PaginationParams paginationParams = const PaginationParams(),
}) async {
final result = await _queryChannelsOnlineImpl(
filter: filter,
sort: sort,
predefinedFilter: predefinedFilter,
filterValues: filterValues,
sortValues: sortValues,
state: state,
watch: watch,
presence: presence,
memberLimit: memberLimit,
messageLimit: messageLimit,
waitForConnect: waitForConnect,
paginationParams: paginationParams,
);
return result.channels;
}

Future<QueryChannelsResult> _queryChannelsOnlineImpl({
Filter? filter,
SortOrder<ChannelState>? sort,
String? predefinedFilter,
Map<String, Object?>? filterValues,
Map<String, Object?>? sortValues,
bool state = true,
bool watch = true,
bool presence = false,
Expand Down Expand Up @@ -778,6 +868,9 @@ class StreamChatClient {
final res = await _chatApi.channel.queryChannels(
filter: filter,
sort: sort,
predefinedFilter: predefinedFilter,
filterValues: filterValues,
sortValues: sortValues,
state: state,
watch: watch,
presence: presence,
Expand All @@ -792,7 +885,10 @@ class StreamChatClient {
Please make sure to take a look at the Flutter tutorial: https://getstream.io/chat/flutter/tutorial
If your application already has users and channels, you might need to adjust your query channel as explained in the docs https://getstream.io/chat/docs/query_channels/?language=dart
''');
return <Channel>[];
return QueryChannelsResult(
channels: const [],
predefinedFilter: res.predefinedFilter,
);
}

final channels = res.channels;
Expand All @@ -810,32 +906,88 @@ class StreamChatClient {
// Submit delivery report for the channels fetched in this query.
await channelDeliveryReporter.submitForDelivery(updateData.value);

await chatPersistenceClient?.updateChannelQueries(
filter,
channels.map((c) => c.channel!.cid).toList(),
// Clear the query cache if we are refreshing.
clearQueryCache: (paginationParams.offset ?? 0) == 0,
final cachedCids = channels.map((c) => c.channel!.cid).toList();
// Clear the query cache if we are refreshing.
final clearQueryCache = (paginationParams.offset ?? 0) == 0;

Filter? resolvedFilter;
SortOrder<ChannelState>? resolvedSort;
if (res.predefinedFilter case final resolvedPredefinedFilter?) {
resolvedFilter = resolvedPredefinedFilter.filter;
resolvedSort = resolvedPredefinedFilter.effectiveSort;
}

await chatPersistenceClient?.saveChannelQueries(
cids: cachedCids,
filter: filter,
sort: sort,
predefinedFilter: predefinedFilter,
resolvedFilter: resolvedFilter,
resolvedSort: resolvedSort,
filterValues: filterValues,
sortValues: sortValues,
clearQueryCache: clearQueryCache,
);

this.state.addChannels(updateData.key);
return updateData.value;
return QueryChannelsResult(
channels: updateData.value,
predefinedFilter: res.predefinedFilter,
);
}

/// Requests channels with a given query from the Persistence client.
Future<List<Channel>> queryChannelsOffline({
Filter? filter,
String? predefinedFilter,
Map<String, Object?>? filterValues,
Map<String, Object?>? sortValues,
SortOrder<ChannelState>? channelStateSort,
PaginationParams paginationParams = const PaginationParams(),
}) async {
final result = await _queryChannelsOfflineImpl(
filter: filter,
predefinedFilter: predefinedFilter,
filterValues: filterValues,
sortValues: sortValues,
channelStateSort: channelStateSort,
paginationParams: paginationParams,
);
return result.channels;
}

Future<QueryChannelsResult> _queryChannelsOfflineImpl({
Filter? filter,
String? predefinedFilter,
Map<String, Object?>? filterValues,
Map<String, Object?>? sortValues,
SortOrder<ChannelState>? channelStateSort,
PaginationParams paginationParams = const PaginationParams(),
}) async {
final offlineChannels = (await chatPersistenceClient?.getChannelStates(
final res = await chatPersistenceClient?.queryChannelStates(
filter: filter,
channelStateSort: channelStateSort,
sort: channelStateSort,
predefinedFilter: predefinedFilter,
filterValues: filterValues,
sortValues: sortValues,
paginationParams: paginationParams,
)) ??
[];
final updatedData = _mapChannelStateToChannel(offlineChannels);
state.addChannels(updatedData.key);
return updatedData.value;
) ??
(QueryChannelsResponse()..channels = const []);

if (res.channels.isEmpty) {
logger.info('No channels found in offline storage for the given query');
return QueryChannelsResult(
channels: const [],
predefinedFilter: res.predefinedFilter,
);
}

final updateData = _mapChannelStateToChannel(res.channels);
state.addChannels(updateData.key);
return QueryChannelsResult(
channels: updateData.value,
predefinedFilter: res.predefinedFilter,
);
}

MapEntry<Map<String, Channel>, List<Channel>> _mapChannelStateToChannel(
Expand Down
19 changes: 19 additions & 0 deletions packages/stream_chat/lib/src/client/query_channels_result.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import 'package:stream_chat/src/client/channel.dart';
import 'package:stream_chat/src/core/models/predefined_filter.dart';

/// The result of a `queryChannelsWithResult` call on [StreamChatClient].
///
/// Carries the live [Channel] instances matching the query alongside the
/// server-resolved [PredefinedFilter] spec (when one is associated with the
/// query).
class QueryChannelsResult {
/// Creates a new [QueryChannelsResult].
const QueryChannelsResult({required this.channels, this.predefinedFilter});

/// The live [Channel] instances matching the query.
final List<Channel> channels;

/// The server-resolved predefined-filter spec, or null when the query did
/// not use a `predefinedFilter`.
final PredefinedFilter? predefinedFilter;
}
11 changes: 11 additions & 0 deletions packages/stream_chat/lib/src/core/api/channel_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,17 @@ class ChannelApi {
}

/// Requests channels with a given query from the API.
///
/// Either an inline [filter]/[sort] pair or a [predefinedFilter] identifier
/// (with optional [filterValues] / [sortValues]) may be provided. When a
/// predefined filter is used, the server resolves it and echoes the
/// materialized filter/sort on [QueryChannelsResponse.predefinedFilter].
Future<QueryChannelsResponse> queryChannels({
Filter? filter,
SortOrder<ChannelState>? sort,
String? predefinedFilter,
Map<String, Object?>? filterValues,
Map<String, Object?>? sortValues,
int? memberLimit,
int? messageLimit,
bool state = true,
Expand All @@ -72,6 +80,9 @@ class ChannelApi {
// passed options
if (sort != null) 'sort': sort,
if (filter != null) 'filter_conditions': filter,
if (predefinedFilter != null) 'predefined_filter': predefinedFilter,
if (filterValues != null) 'filter_values': filterValues,
if (sortValues != null) 'sort_values': sortValues,
if (memberLimit != null) 'member_limit': memberLimit,
if (messageLimit != null) 'message_limit': messageLimit,

Expand Down
8 changes: 8 additions & 0 deletions packages/stream_chat/lib/src/core/api/responses.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import 'package:stream_chat/src/core/models/message_reminder.dart';
import 'package:stream_chat/src/core/models/poll.dart';
import 'package:stream_chat/src/core/models/poll_option.dart';
import 'package:stream_chat/src/core/models/poll_vote.dart';
import 'package:stream_chat/src/core/models/predefined_filter.dart';
import 'package:stream_chat/src/core/models/push_preference.dart';
import 'package:stream_chat/src/core/models/reaction.dart';
import 'package:stream_chat/src/core/models/read.dart';
Expand Down Expand Up @@ -78,6 +79,13 @@ class QueryChannelsResponse extends _BaseResponse {
@JsonKey(defaultValue: [])
late List<ChannelState> channels;

/// Predefined filter spec as resolved by the server.
///
/// Populated when the request used `predefined_filter`. Contains the
/// preset name and the materialized `filter`/`sort` that were applied.
@JsonKey(name: 'predefined_filter')
PredefinedFilter? predefinedFilter;

/// Create a new instance from a json
static QueryChannelsResponse fromJson(Map<String, dynamic> json) =>
_$QueryChannelsResponseFromJson(json);
Expand Down
Loading
Loading