Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/stream_chat/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@

✅ Added

- Added support for predefined filters for `QueryChannels` on `StreamChatClient` (`StreamChatClient.queryChannels` and `StreamChatClient.queryChannelsWithResult`).
- Added `ChatPersistenceClient.queryChannelStates` and `ChatPersistenceClient.saveChannelQueries` as the unified read/write methods for channel-query persistence. Both accept standard and predefined-filter parameters and internally dispatch.
- Added `StreamChatClient.pauseReconnect` / `resumeReconnect` to suspend the WebSocket's auto-retry loop without tearing down the user session.

⚠️ Deprecated

- Deprecated `ChatPersistenceClient.getChannelStates` and `ChatPersistenceClient.updateChannelQueries` in favor of the unified `queryChannelStates` / `saveChannelQueries`. The deprecated methods stay overridable so downstream subclasses keep working unchanged.

🐞 Fixed

- Fixed an unhandled `WebSocketChannelException` surfacing when a reconnect attempt failed (e.g. DNS lookup failed in background); the duplicate signal on `sink.done` is now ignored since the stream's `onError` already handles it.
Expand Down
192 changes: 172 additions & 20 deletions packages/stream_chat/lib/src/client/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import 'package:meta/meta.dart';
import 'package:rxdart/rxdart.dart';
import 'package:stream_chat/src/client/channel.dart';
import 'package:stream_chat/src/client/channel_delivery_reporter.dart';
import 'package:stream_chat/src/client/query_channels_result.dart';
import 'package:stream_chat/src/client/retry_policy.dart';
import 'package:stream_chat/src/core/api/attachment_file_uploader.dart';
import 'package:stream_chat/src/core/api/requests.dart';
Expand Down Expand Up @@ -646,12 +647,58 @@ class StreamChatClient {
});
}

final _queryChannelsCache = InFlightCache<String, List<Channel>>();
final _queryChannelsCache = InFlightCache<String, QueryChannelsResult>();

/// Requests channels with a given query.
///
/// Either an inline [filter]/[channelStateSort] pair or a [predefinedFilter]
/// identifier (optionally interpolated with [filterValues] and [sortValues])
/// can be supplied.
///
/// Use [queryChannelsWithResult] if you also need the server-resolved
/// [PredefinedFilter] spec.
Stream<List<Channel>> queryChannels({
Filter? filter,
SortOrder<ChannelState>? channelStateSort,
String? predefinedFilter,
Map<String, Object?>? filterValues,
Map<String, Object?>? sortValues,
bool state = true,
bool watch = true,
bool presence = false,
int? memberLimit,
int? messageLimit,
PaginationParams paginationParams = const PaginationParams(),
bool waitForConnect = true,
}) =>
queryChannelsWithResult(
filter: filter,
channelStateSort: channelStateSort,
predefinedFilter: predefinedFilter,
filterValues: filterValues,
sortValues: sortValues,
state: state,
watch: watch,
presence: presence,
memberLimit: memberLimit,
messageLimit: messageLimit,
paginationParams: paginationParams,
waitForConnect: waitForConnect,
).map((result) => result.channels);

/// Requests channels with a given query, yielding a [QueryChannelsResult]
/// that carries both the live channel list and the server-resolved
/// [PredefinedFilter] spec (when one is associated with the query).
///
/// Yields the offline-cached result first (when available), followed by
/// the online result. Concurrent identical online queries are coalesced
/// via [_queryChannelsCache].
Stream<QueryChannelsResult> queryChannelsWithResult({
Filter? filter,
SortOrder<ChannelState>? channelStateSort,
String? predefinedFilter,
Map<String, Object?>? filterValues,
Map<String, Object?>? sortValues,
bool state = true,
bool watch = true,
bool presence = false,
Expand All @@ -668,6 +715,9 @@ class StreamChatClient {
final hash = generateHash([
filter,
channelStateSort,
predefinedFilter,
filterValues,
sortValues,
state,
watch,
presence,
Expand All @@ -677,15 +727,18 @@ class StreamChatClient {
]);

// Per-caller offline emit — local persistence, not coalesced.
var offlineChannels = <Channel>[];
QueryChannelsResult? offlineResult;
try {
offlineChannels = await queryChannelsOffline(
offlineResult = await _queryChannelsOfflineImpl(
filter: filter,
predefinedFilter: predefinedFilter,
filterValues: filterValues,
sortValues: sortValues,
channelStateSort: channelStateSort,
paginationParams: paginationParams,
);

if (offlineChannels.isNotEmpty) yield offlineChannels;
if (offlineResult.channels.isNotEmpty) yield offlineResult;
} catch (e, stk) {
logger.warning('Error querying channels offline', e, stk);
// Continue to online query even if offline fails
Expand All @@ -697,9 +750,12 @@ class StreamChatClient {
// the lifecycle details.
final result = await _queryChannelsCache.run(
hash,
() => queryChannelsOnline(
() => _queryChannelsOnlineImpl(
filter: filter,
sort: channelStateSort,
predefinedFilter: predefinedFilter,
filterValues: filterValues,
sortValues: sortValues,
state: state,
watch: watch,
presence: presence,
Expand All @@ -719,7 +775,7 @@ class StreamChatClient {
} catch (e, stk) {
logger.severe('Error querying channels online', e, stk);
// Only rethrow if we have no channels to show the user
if (offlineChannels.isEmpty) rethrow;
if (offlineResult == null || offlineResult.channels.isEmpty) rethrow;
}
}

Expand Down Expand Up @@ -748,6 +804,40 @@ class StreamChatClient {
Future<List<Channel>> queryChannelsOnline({
Filter? filter,
SortOrder<ChannelState>? sort,
String? predefinedFilter,
Map<String, Object?>? filterValues,
Map<String, Object?>? sortValues,
bool state = true,
bool watch = true,
bool presence = false,
int? memberLimit,
int? messageLimit,
bool waitForConnect = true,
PaginationParams paginationParams = const PaginationParams(),
}) async {
final result = await _queryChannelsOnlineImpl(
filter: filter,
sort: sort,
predefinedFilter: predefinedFilter,
filterValues: filterValues,
sortValues: sortValues,
state: state,
watch: watch,
presence: presence,
memberLimit: memberLimit,
messageLimit: messageLimit,
waitForConnect: waitForConnect,
paginationParams: paginationParams,
);
return result.channels;
}

Future<QueryChannelsResult> _queryChannelsOnlineImpl({
Filter? filter,
SortOrder<ChannelState>? sort,
String? predefinedFilter,
Map<String, Object?>? filterValues,
Map<String, Object?>? sortValues,
bool state = true,
bool watch = true,
bool presence = false,
Expand Down Expand Up @@ -778,6 +868,9 @@ class StreamChatClient {
final res = await _chatApi.channel.queryChannels(
filter: filter,
sort: sort,
predefinedFilter: predefinedFilter,
filterValues: filterValues,
sortValues: sortValues,
state: state,
watch: watch,
presence: presence,
Expand All @@ -792,7 +885,10 @@ class StreamChatClient {
Please make sure to take a look at the Flutter tutorial: https://getstream.io/chat/flutter/tutorial
If your application already has users and channels, you might need to adjust your query channel as explained in the docs https://getstream.io/chat/docs/query_channels/?language=dart
''');
return <Channel>[];
return QueryChannelsResult(
channels: const [],
predefinedFilter: res.predefinedFilter,
);
}

final channels = res.channels;
Expand All @@ -810,32 +906,88 @@ class StreamChatClient {
// Submit delivery report for the channels fetched in this query.
await channelDeliveryReporter.submitForDelivery(updateData.value);

await chatPersistenceClient?.updateChannelQueries(
filter,
channels.map((c) => c.channel!.cid).toList(),
// Clear the query cache if we are refreshing.
clearQueryCache: (paginationParams.offset ?? 0) == 0,
final cachedCids = channels.map((c) => c.channel!.cid).toList();
// Clear the query cache if we are refreshing.
final clearQueryCache = (paginationParams.offset ?? 0) == 0;

Filter? resolvedFilter;
SortOrder<ChannelState>? resolvedSort;
if (res.predefinedFilter case final resolvedPredefinedFilter?) {
resolvedFilter = resolvedPredefinedFilter.filter;
resolvedSort = resolvedPredefinedFilter.effectiveSort;
}

await chatPersistenceClient?.saveChannelQueries(
cids: cachedCids,
filter: filter,
sort: sort,
predefinedFilter: predefinedFilter,
resolvedFilter: resolvedFilter,
resolvedSort: resolvedSort,
filterValues: filterValues,
sortValues: sortValues,
clearQueryCache: clearQueryCache,
);

this.state.addChannels(updateData.key);
return updateData.value;
return QueryChannelsResult(
channels: updateData.value,
predefinedFilter: res.predefinedFilter,
);
}

/// Requests channels with a given query from the Persistence client.
Future<List<Channel>> queryChannelsOffline({
Filter? filter,
String? predefinedFilter,
Map<String, Object?>? filterValues,
Map<String, Object?>? sortValues,
SortOrder<ChannelState>? channelStateSort,
PaginationParams paginationParams = const PaginationParams(),
}) async {
final result = await _queryChannelsOfflineImpl(
filter: filter,
predefinedFilter: predefinedFilter,
filterValues: filterValues,
sortValues: sortValues,
channelStateSort: channelStateSort,
paginationParams: paginationParams,
);
return result.channels;
}

Future<QueryChannelsResult> _queryChannelsOfflineImpl({
Filter? filter,
String? predefinedFilter,
Map<String, Object?>? filterValues,
Map<String, Object?>? sortValues,
SortOrder<ChannelState>? channelStateSort,
PaginationParams paginationParams = const PaginationParams(),
}) async {
final offlineChannels = (await chatPersistenceClient?.getChannelStates(
final res = await chatPersistenceClient?.queryChannelStates(
filter: filter,
channelStateSort: channelStateSort,
sort: channelStateSort,
predefinedFilter: predefinedFilter,
filterValues: filterValues,
sortValues: sortValues,
paginationParams: paginationParams,
)) ??
[];
final updatedData = _mapChannelStateToChannel(offlineChannels);
state.addChannels(updatedData.key);
return updatedData.value;
) ??
(QueryChannelsResponse()..channels = const []);

if (res.channels.isEmpty) {
logger.info('No channels found in offline storage for the given query');
return QueryChannelsResult(
channels: const [],
predefinedFilter: res.predefinedFilter,
);
}

final updateData = _mapChannelStateToChannel(res.channels);
state.addChannels(updateData.key);
return QueryChannelsResult(
channels: updateData.value,
predefinedFilter: res.predefinedFilter,
);
}

MapEntry<Map<String, Channel>, List<Channel>> _mapChannelStateToChannel(
Expand Down
19 changes: 19 additions & 0 deletions packages/stream_chat/lib/src/client/query_channels_result.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import 'package:stream_chat/src/client/channel.dart';
import 'package:stream_chat/src/core/models/predefined_filter.dart';

/// The result of a `queryChannelsWithResult` call on [StreamChatClient].
///
/// Carries the live [Channel] instances matching the query alongside the
/// server-resolved [PredefinedFilter] spec (when one is associated with the
/// query).
class QueryChannelsResult {
/// Creates a new [QueryChannelsResult].
const QueryChannelsResult({required this.channels, this.predefinedFilter});

/// The live [Channel] instances matching the query.
final List<Channel> channels;

/// The server-resolved predefined-filter spec, or null when the query did
/// not use a `predefinedFilter`.
final PredefinedFilter? predefinedFilter;
}
11 changes: 11 additions & 0 deletions packages/stream_chat/lib/src/core/api/channel_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,17 @@ class ChannelApi {
}

/// Requests channels with a given query from the API.
///
/// Either an inline [filter]/[sort] pair or a [predefinedFilter] identifier
/// (with optional [filterValues] / [sortValues]) may be provided. When a
/// predefined filter is used, the server resolves it and echoes the
/// materialized filter/sort on [QueryChannelsResponse.predefinedFilter].
Future<QueryChannelsResponse> queryChannels({
Filter? filter,
SortOrder<ChannelState>? sort,
String? predefinedFilter,
Map<String, Object?>? filterValues,
Map<String, Object?>? sortValues,
int? memberLimit,
int? messageLimit,
bool state = true,
Expand All @@ -72,6 +80,9 @@ class ChannelApi {
// passed options
if (sort != null) 'sort': sort,
if (filter != null) 'filter_conditions': filter,
if (predefinedFilter != null) 'predefined_filter': predefinedFilter,
if (filterValues != null) 'filter_values': filterValues,
if (sortValues != null) 'sort_values': sortValues,
if (memberLimit != null) 'member_limit': memberLimit,
if (messageLimit != null) 'message_limit': messageLimit,

Expand Down
8 changes: 8 additions & 0 deletions packages/stream_chat/lib/src/core/api/responses.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import 'package:stream_chat/src/core/models/message_reminder.dart';
import 'package:stream_chat/src/core/models/poll.dart';
import 'package:stream_chat/src/core/models/poll_option.dart';
import 'package:stream_chat/src/core/models/poll_vote.dart';
import 'package:stream_chat/src/core/models/predefined_filter.dart';
import 'package:stream_chat/src/core/models/push_preference.dart';
import 'package:stream_chat/src/core/models/reaction.dart';
import 'package:stream_chat/src/core/models/read.dart';
Expand Down Expand Up @@ -78,6 +79,13 @@ class QueryChannelsResponse extends _BaseResponse {
@JsonKey(defaultValue: [])
late List<ChannelState> channels;

/// Predefined filter spec as resolved by the server.
///
/// Populated when the request used `predefined_filter`. Contains the
/// preset name and the materialized `filter`/`sort` that were applied.
@JsonKey(name: 'predefined_filter')
PredefinedFilter? predefinedFilter;

/// Create a new instance from a json
static QueryChannelsResponse fromJson(Map<String, dynamic> json) =>
_$QueryChannelsResponseFromJson(json);
Expand Down
Loading
Loading