diff --git a/docs-website/docs/docs/README.md b/docs-website/docs/docs/README.md index dfa8e6fde..88abb032a 100644 --- a/docs-website/docs/docs/README.md +++ b/docs-website/docs/docs/README.md @@ -204,6 +204,12 @@ The result is fully reactive! Whenever a post or comment is added, changed, or r
+ + ezypack + + +
+ _Does your company or app use 🍉? Open a pull request and add your logo/icon with link here!_ ## Contributing diff --git a/examples/typescript/yarn.lock b/examples/typescript/yarn.lock index fb4b4d795..928d63320 100644 --- a/examples/typescript/yarn.lock +++ b/examples/typescript/yarn.lock @@ -977,10 +977,10 @@ log-symbols@^2.0.0: dependencies: chalk "^2.0.1" -"lokijs@npm:@nozbe/lokijs@1.5.12-wmelon6": - version "1.5.12-wmelon6" - resolved "https://registry.yarnpkg.com/@nozbe/lokijs/-/lokijs-1.5.12-wmelon6.tgz#e457d934d614d5df80105c86314252a6e614df9b" - integrity sha512-GXsaqY8qTJ6xdCrGyno2t+ON2aj6PrUDdvhbrkxK/0Fp12C4FGvDg1wS+voLU9BANYHEnr7KRWfItDZnQkjoAg== +"lokijs@npm:@nozbe/lokijs@1.5.12-wmelon8": + version "1.5.12-wmelon8" + resolved "https://registry.yarnpkg.com/@nozbe/lokijs/-/lokijs-1.5.12-wmelon8.tgz#38ad7884d9cfd574a645c8201ad0cdbc93076dd6" + integrity sha512-WnqtKrWDh48FvuxnFv2LKurxeSAp8Q3TtQ4akwKFC7CkBaZYgn2P7F3YuBXguj4AgXhSEogxJmJWN8xQq7zPRQ== loud-rejection@^1.0.0: version "1.6.0" diff --git a/src/Collection/index.js b/src/Collection/index.js index 5b8405e51..f780c4807 100644 --- a/src/Collection/index.js +++ b/src/Collection/index.js @@ -116,7 +116,7 @@ export default class Collection { }) } - /*:: query: ArrayOrSpreadFn> */ + query: ArrayOrSpreadFn> /** * Returns a `Query` with conditions given. * diff --git a/src/Database/index.js b/src/Database/index.js index 6175192e5..14f4fcfa8 100644 --- a/src/Database/index.js +++ b/src/Database/index.js @@ -84,7 +84,7 @@ export default class Database { return this._localStorage } - /*:: batch: ArrayOrSpreadFn> */ + batch: ArrayOrSpreadFn> /** * Executes multiple prepared operations * diff --git a/src/Query/index.js b/src/Query/index.js index 2d8135d74..22bd71c18 100644 --- a/src/Query/index.js +++ b/src/Query/index.js @@ -81,7 +81,7 @@ export default class Query { this.description = Q.queryWithoutDeleted(this._rawDescription) } - /*:: extend: ArrayOrSpreadFn> */ + extend: ArrayOrSpreadFn> /** * Returns a new Query that contains all clauses (conditions, sorting, etc.) from this Query * as well as the ones passed as arguments. diff --git a/src/observation/subscribeToQueryWithColumns/index.js b/src/observation/subscribeToQueryWithColumns/index.js index 20d2f1531..4447a9c89 100644 --- a/src/observation/subscribeToQueryWithColumns/index.js +++ b/src/observation/subscribeToQueryWithColumns/index.js @@ -126,42 +126,42 @@ export default function subscribeToQueryWithColumns( // Observe the source records list (list of records matching a query) // eslint-disable-next-line prefer-arrow-callback - const sourceUnsubscribe = subscribeToSource(function observeWithColumnsSourceChanged( - recordsOrStatus, - ): void { - // $FlowFixMe - if (recordsOrStatus === false) { - sourceIsFetching = true - return - } - sourceIsFetching = false - - // Emit changes if one of observed columns changed OR list of matching records changed - const records: Record[] = recordsOrStatus - const shouldEmit = - firstEmission || hasPendingColumnChanges || !identicalArrays(records, observedRecords) - - hasPendingColumnChanges = false - firstEmission = false - - // Find changes, and save current list for comparison on next emission - const arrayDifference = require('../../utils/fp/arrayDifference').default - const { added, removed } = arrayDifference(observedRecords, records) - observedRecords = records - - // Unsubscribe from records removed from list - removed.forEach((record) => { - recordStates.delete(record.id) - }) - - // Save current record state for later comparison - added.forEach((newRecord) => { - recordStates.set(newRecord.id, getRecordState(newRecord, columnNames)) - }) - - // Emit - shouldEmit && emitCopy(records) - }) + const sourceUnsubscribe = subscribeToSource( + function observeWithColumnsSourceChanged(recordsOrStatus): void { + // $FlowFixMe + if (recordsOrStatus === false) { + sourceIsFetching = true + return + } + sourceIsFetching = false + + // Emit changes if one of observed columns changed OR list of matching records changed + const records: Record[] = recordsOrStatus + const shouldEmit = + firstEmission || hasPendingColumnChanges || !identicalArrays(records, observedRecords) + + hasPendingColumnChanges = false + firstEmission = false + + // Find changes, and save current list for comparison on next emission + const arrayDifference = require('../../utils/fp/arrayDifference').default + const { added, removed } = arrayDifference(observedRecords, records) + observedRecords = records + + // Unsubscribe from records removed from list + removed.forEach((record) => { + recordStates.delete(record.id) + }) + + // Save current record state for later comparison + added.forEach((newRecord) => { + recordStates.set(newRecord.id, getRecordState(newRecord, columnNames)) + }) + + // Emit + shouldEmit && emitCopy(records) + }, + ) return () => { unsubscribed = true diff --git a/src/sync/impl/__tests__/synchronize.test.js b/src/sync/impl/__tests__/synchronize.test.js index 475a111a9..3959beea3 100644 --- a/src/sync/impl/__tests__/synchronize.test.js +++ b/src/sync/impl/__tests__/synchronize.test.js @@ -180,6 +180,129 @@ describe('synchronize', () => { await expectSyncedAndMatches(projects, 'pSynced', { name: 'remote' }) await expectDoesNotExist(tasks, 'tSynced') }) + it('can pull changes in pages', async () => { + const { database, projects } = makeDatabase() + + const pullChanges = jest + .fn() + .mockReturnValueOnce( + Promise.resolve({ + changes: makeChangeSet({ + mock_projects: { + created: [{ id: 'p1', name: 'remote 1' }], + }, + }), + timestamp: 1500, + experimentalHasMore: true, + }), + ) + .mockReturnValueOnce( + Promise.resolve({ + changes: makeChangeSet({ + mock_projects: { + created: [{ id: 'p2', name: 'remote 2' }], + }, + }), + timestamp: 1600, + experimentalHasMore: false, + }), + ) + + await synchronize({ database, pullChanges, pushChanges: jest.fn() }) + + expect(pullChanges).toHaveBeenCalledTimes(2) + expect(pullChanges).toHaveBeenNthCalledWith(1, { + lastPulledAt: null, + schemaVersion: 1, + migration: null, + }) + expect(pullChanges).toHaveBeenNthCalledWith(2, { + lastPulledAt: 1500, + schemaVersion: 1, + migration: null, + }) + + expect(await getLastPulledAt(database)).toBe(1600) + await expectSyncedAndMatches(projects, 'p1', { name: 'remote 1' }) + await expectSyncedAndMatches(projects, 'p2', { name: 'remote 2' }) + }) + it('calls sync hooks for every page', async () => { + const { database } = makeDatabase() + + const onWillApplyRemoteChanges = jest.fn() + const onDidPullChanges = jest.fn() + const pullChanges = jest + .fn() + .mockReturnValueOnce( + Promise.resolve({ + changes: makeChangeSet({ + mock_projects: { created: [{ id: 'p1' }] }, + }), + timestamp: 1500, + experimentalHasMore: true, + extra: 'page 1', + }), + ) + .mockReturnValueOnce( + Promise.resolve({ + changes: makeChangeSet({ + mock_projects: { created: [{ id: 'p2' }] }, + }), + timestamp: 1600, + experimentalHasMore: false, + extra: 'page 2', + }), + ) + + await synchronize({ + database, + pullChanges, + onWillApplyRemoteChanges, + onDidPullChanges, + }) + + expect(onWillApplyRemoteChanges).toHaveBeenCalledTimes(2) + expect(onWillApplyRemoteChanges).toHaveBeenNthCalledWith(1, { remoteChangeCount: 1 }) + expect(onWillApplyRemoteChanges).toHaveBeenNthCalledWith(2, { remoteChangeCount: 1 }) + + expect(onDidPullChanges).toHaveBeenCalledTimes(2) + expect(onDidPullChanges).toHaveBeenNthCalledWith(1, { + extra: 'page 1', + timestamp: 1500, + experimentalHasMore: true, + }) + expect(onDidPullChanges).toHaveBeenNthCalledWith(2, { + extra: 'page 2', + timestamp: 1600, + experimentalHasMore: false, + }) + }) + it('does not update lastPulledAt if paginated sync fails midway', async () => { + const { database, projects } = makeDatabase() + + const pullChanges = jest + .fn() + .mockReturnValueOnce( + Promise.resolve({ + changes: makeChangeSet({ + mock_projects: { created: [{ id: 'p1' }] }, + }), + timestamp: 1500, + experimentalHasMore: true, + }), + ) + .mockReturnValueOnce(Promise.reject(new Error('page 2 fail'))) + + await expect(synchronize({ database, pullChanges, pushChanges: jest.fn() })).rejects.toThrow( + 'page 2 fail', + ) + + // Data from first page should be there + await expectSyncedAndMatches(projects, 'p1', {}) + + // BUT timestamp should still be null + expect(await getLastPulledAt(database)).toBe(null) + }) it('can synchronize changes with conflicts', async () => { const { database, projects, tasks, comments } = makeDatabase() diff --git a/src/sync/impl/synchronize.js b/src/sync/impl/synchronize.js index eef1f6e81..512fbfaad 100644 --- a/src/sync/impl/synchronize.js +++ b/src/sync/impl/synchronize.js @@ -32,9 +32,11 @@ export default async function synchronize({ log && (log.phase = 'starting') // TODO: Wrap the three computionally intensive phases in `requestIdleCallback` + log && (log.remoteChangeCount = 0) // pull phase - const lastPulledAt = await getLastPulledAt(database) + const initialLastPulledAt = await getLastPulledAt(database) + let lastPulledAt = initialLastPulledAt log && (log.lastPulledAt = lastPulledAt) const { schemaVersion, migration, shouldSaveSchemaVersion } = await getMigrationInfo( @@ -44,80 +46,84 @@ export default async function synchronize({ migrationsEnabledAtVersion, ) log && (log.phase = 'ready to pull') + let experimentalHasMore = true + while (experimentalHasMore) { + // $FlowFixMe + const pullResult = await pullChanges({ lastPulledAt, schemaVersion, migration }) + log && (log.phase = 'pulled') + + let newLastPulledAt: Timestamp = (pullResult: any).timestamp + const remoteChangeCount = pullResult.changes ? changeSetCount(pullResult.changes) : NaN + const hasMore = !!pullResult.experimentalHasMore + + if (onWillApplyRemoteChanges) { + await onWillApplyRemoteChanges({ remoteChangeCount }) + } - // $FlowFixMe - const pullResult = await pullChanges({ - lastPulledAt, - schemaVersion, - migration, - }) - log && (log.phase = 'pulled') - - let newLastPulledAt: Timestamp = (pullResult: any).timestamp - const remoteChangeCount = pullResult.changes ? changeSetCount(pullResult.changes) : NaN - - if (onWillApplyRemoteChanges) { - await onWillApplyRemoteChanges({ remoteChangeCount }) - } - - await database.write(async () => { - ensureSameDatabase(database, resetCount) - invariant( - lastPulledAt === (await getLastPulledAt(database)), - '[Sync] Concurrent synchronization is not allowed. More than one synchronize() call was running at the same time, and the later one was aborted before committing results to local database.', - ) - - if (unsafeTurbo) { + await database.write(async () => { + ensureSameDatabase(database, resetCount) invariant( - !_unsafeBatchPerCollection, - 'unsafeTurbo must not be used with _unsafeBatchPerCollection', + initialLastPulledAt === (await getLastPulledAt(database)), + '[Sync] Concurrent synchronization is not allowed. More than one synchronize() call was running at the same time, and the later one was aborted before committing results to local database.', ) + + if (unsafeTurbo) { + invariant( + !_unsafeBatchPerCollection, + 'unsafeTurbo must not be used with _unsafeBatchPerCollection', + ) + invariant( + 'syncJson' in pullResult || 'syncJsonId' in pullResult, + 'missing syncJson/syncJsonId', + ) + invariant(lastPulledAt === null, 'unsafeTurbo can only be used as the first sync') + + const syncJsonId = pullResult.syncJsonId || Math.floor(Math.random() * 1000000000) + + if (pullResult.syncJson) { + await database.adapter.provideSyncJson(syncJsonId, pullResult.syncJson) + } + + const resultRest = await database.adapter.unsafeLoadFromSync(syncJsonId) + newLastPulledAt = resultRest.timestamp + onDidPullChanges && onDidPullChanges(resultRest) + } + + log && (log.newLastPulledAt = newLastPulledAt) invariant( - 'syncJson' in pullResult || 'syncJsonId' in pullResult, - 'missing syncJson/syncJsonId', + typeof newLastPulledAt === 'number' && newLastPulledAt > 0, + `pullChanges() returned invalid timestamp ${newLastPulledAt}. timestamp must be a non-zero number`, ) - invariant(lastPulledAt === null, 'unsafeTurbo can only be used as the first sync') - - const syncJsonId = pullResult.syncJsonId || Math.floor(Math.random() * 1000000000) - if (pullResult.syncJson) { - await database.adapter.provideSyncJson(syncJsonId, pullResult.syncJson) + if (!unsafeTurbo) { + // $FlowFixMe + const { changes: remoteChanges, ...resultRest } = pullResult + log && (log.remoteChangeCount += remoteChangeCount) + // $FlowFixMe + await applyRemoteChanges(remoteChanges, { + db: database, + strategy: ((pullResult: any).experimentalStrategy: ?SyncPullStrategy), + sendCreatedAsUpdated, + log, + conflictResolver, + _unsafeBatchPerCollection, + }) + onDidPullChanges && onDidPullChanges(resultRest) } - const resultRest = await database.adapter.unsafeLoadFromSync(syncJsonId) - newLastPulledAt = resultRest.timestamp - onDidPullChanges && onDidPullChanges(resultRest) - } + log && (log.phase = 'applied remote changes') - log && (log.newLastPulledAt = newLastPulledAt) - invariant( - typeof newLastPulledAt === 'number' && newLastPulledAt > 0, - `pullChanges() returned invalid timestamp ${newLastPulledAt}. timestamp must be a non-zero number`, - ) - - if (!unsafeTurbo) { - // $FlowFixMe - const { changes: remoteChanges, ...resultRest } = pullResult - log && (log.remoteChangeCount = remoteChangeCount) - // $FlowFixMe - await applyRemoteChanges(remoteChanges, { - db: database, - strategy: ((pullResult: any).experimentalStrategy: ?SyncPullStrategy), - sendCreatedAsUpdated, - log, - conflictResolver, - _unsafeBatchPerCollection, - }) - onDidPullChanges && onDidPullChanges(resultRest) - } - - log && (log.phase = 'applied remote changes') - await setLastPulledAt(database, newLastPulledAt) + if (!hasMore) { + await setLastPulledAt(database, newLastPulledAt) + if (shouldSaveSchemaVersion) { + await setLastPulledSchemaVersion(database, schemaVersion) + } + } + }, 'sync-synchronize-apply') - if (shouldSaveSchemaVersion) { - await setLastPulledSchemaVersion(database, schemaVersion) - } - }, 'sync-synchronize-apply') + experimentalHasMore = hasMore + lastPulledAt = newLastPulledAt + } // push phase if (pushChanges) { @@ -130,8 +136,7 @@ export default async function synchronize({ ensureSameDatabase(database, resetCount) if (!isChangeSetEmpty(localChanges.changes)) { log && (log.phase = 'ready to push') - const pushResult = - (await pushChanges({ changes: localChanges.changes, lastPulledAt: newLastPulledAt })) || {} + const pushResult = (await pushChanges({ changes: localChanges.changes, lastPulledAt })) || {} log && (log.phase = 'pushed') log && (log.rejectedIds = pushResult.experimentalRejectedIds) diff --git a/src/sync/index.d.ts b/src/sync/index.d.ts index 3b34a158f..ce716fadd 100644 --- a/src/sync/index.d.ts +++ b/src/sync/index.d.ts @@ -23,7 +23,12 @@ export type SyncPullArgs = $Exact<{ migration: MigrationSyncChanges }> export type SyncPullResult = - | $Exact<{ changes: SyncDatabaseChangeSet; timestamp: Timestamp }> + | $Exact<{ + changes: SyncDatabaseChangeSet + timestamp: Timestamp + experimentalStrategy?: any + experimentalHasMore?: boolean + }> | $Exact<{ syncJson: string }> | $Exact<{ syncJsonId: number }> diff --git a/src/sync/index.js b/src/sync/index.js index fcce9b80d..13fa34c21 100644 --- a/src/sync/index.js +++ b/src/sync/index.js @@ -49,6 +49,7 @@ export type SyncPullResult = changes: SyncDatabaseChangeSet, timestamp: Timestamp, experimentalStrategy?: SyncPullStrategy, + experimentalHasMore?: boolean, }> | $Exact<{ syncJson: string }> | $Exact<{ syncJsonId: number }> diff --git a/src/utils/rx/__wmelonRxShim/index.d.ts b/src/utils/rx/__wmelonRxShim/index.d.ts index c663a6bc8..5e8eba8a7 100644 --- a/src/utils/rx/__wmelonRxShim/index.d.ts +++ b/src/utils/rx/__wmelonRxShim/index.d.ts @@ -24,6 +24,6 @@ export { switchMap, throttleTime, startWith, - catchError + catchError, } from 'rxjs/operators' export type { ConnectableObservable } from 'rxjs' diff --git a/src/utils/rx/index.d.ts b/src/utils/rx/index.d.ts index 10cc30878..dbf355fdc 100644 --- a/src/utils/rx/index.d.ts +++ b/src/utils/rx/index.d.ts @@ -22,6 +22,6 @@ export { switchMap, throttleTime, startWith, - catchError + catchError, } from './__wmelonRxShim' export type { ConnectableObservable } from './__wmelonRxShim'