Compare commits

...

3 Commits

Author SHA1 Message Date
Matiss Janis Aboltins
2bbb7ecd0f Enhance applyMessages function with tracking for added items and special handling for synced preferences. Improve clock persistence logic during syncing. 2026-02-23 21:51:24 +00:00
Matiss Janis Aboltins
54defb155a Refactor applyMessages function to improve code readability 2026-02-23 21:49:58 +00:00
Matiss Janis Aboltins
8690616f41 [AI] Chunked sync message application and progress UX for mobile
- Apply sync messages in batches (APPLY_MESSAGES_BATCH_SIZE) when count > 5000 to avoid blocking mobile
- Emit sync-event progress (applied/total) during batched apply; client shows 'Applying sync... X%'
- Add SyncEvent type 'progress'; handle in sync-events.ts and set loadingText
- Add batched-apply and progress-emission tests in sync.test.ts
- Fixes mobile browser stuck on Downloading (e.g. #6904)

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-23 21:47:56 +00:00
5 changed files with 166 additions and 13 deletions

View File

@@ -4,7 +4,7 @@ import { t } from 'i18next';
import { listen, send } from 'loot-core/platform/client/connection';
import { accountQueries } from './accounts';
import { resetSync, sync } from './app/appSlice';
import { resetSync, setAppState, sync } from './app/appSlice';
import { categoryQueries } from './budget';
import {
closeAndDownloadBudget,
@@ -19,6 +19,19 @@ import type { AppStore } from './redux/store';
import { signOut } from './users/usersSlice';
export function listenForSyncEvent(store: AppStore, queryClient: QueryClient) {
const unlistenProgress = listen('sync-event', event => {
if (event.type !== 'progress') {
return;
}
const percent = Math.round((event.applied / event.total) * 100);
store.dispatch(
setAppState({
loadingText: t('Applying sync... {{percent}}%', { percent }),
}),
);
});
// TODO: Should this run on mobile too?
const unlistenUnauthorized = listen('sync-event', async ({ type }) => {
if (type === 'unauthorized') {
@@ -387,6 +400,7 @@ export function listenForSyncEvent(store: AppStore, queryClient: QueryClient) {
});
return () => {
unlistenProgress();
unlistenUnauthorized();
unlistenSuccess();
};

View File

@@ -34,6 +34,12 @@ export { resetSync } from './reset';
export { repairSync } from './repair';
const FULL_SYNC_DELAY = 1000;
// When applying more than this many sync messages, use batched transactions
// to avoid blocking the event loop (e.g. on mobile). Smaller syncs use a
// single transaction for consistency and performance.
export const APPLY_MESSAGES_BATCH_SIZE = 5000;
let SYNCING_MODE = 'enabled';
type SyncingMode = 'enabled' | 'offline' | 'disabled' | 'import';
@@ -324,17 +330,11 @@ export const applyMessages = sequential(async (messages: Message[]) => {
sheet.get().startCacheBarrier();
}
// Now that we have all of the data, go through and apply the
// messages carefully. This transaction is **crucial**: it
// guarantees that everything is atomically committed to the
// database, and if any part of it fails everything aborts and
// nothing is changed. This is critical to maintain consistency. We
// also avoid any side effects to in-memory objects, and apply them
// after this succeeds.
db.transaction(() => {
const added = new Set();
const useBatching = messages.length > APPLY_MESSAGES_BATCH_SIZE;
const added = new Set<string>();
for (const msg of messages) {
function applyChunk(chunk: Message[]) {
for (const msg of chunk) {
const { dataset, row, column, timestamp, value } = msg;
if (!msg.old) {
@@ -365,7 +365,9 @@ export const applyMessages = sequential(async (messages: Message[]) => {
void setBudgetType(value);
}
}
}
function persistClockIfSyncing() {
if (checkSyncingMode('enabled')) {
currentMerkle = merkle.prune(currentMerkle);
@@ -378,7 +380,38 @@ export const applyMessages = sequential(async (messages: Message[]) => {
[serializeClock({ ...clock, merkle: currentMerkle })],
);
}
});
}
if (!useBatching) {
// Single transaction for small syncs
db.transaction(() => {
applyChunk(messages);
persistClockIfSyncing();
});
} else {
// Batched transactions for large syncs (e.g. initial sync on mobile)
const chunks: Message[][] = [];
for (let i = 0; i < messages.length; i += APPLY_MESSAGES_BATCH_SIZE) {
chunks.push(messages.slice(i, i + APPLY_MESSAGES_BATCH_SIZE));
}
let appliedSoFar = 0;
for (const chunk of chunks) {
db.transaction(() => {
applyChunk(chunk);
persistClockIfSyncing();
});
appliedSoFar += chunk.length;
connection.send('sync-event', {
type: 'progress',
applied: appliedSoFar,
total: messages.length,
});
await new Promise<void>(resolve => setTimeout(resolve, 0));
}
}
if (checkSyncingMode('enabled')) {
// The transaction succeeded, so we can update in-memory objects

View File

@@ -1,6 +1,8 @@
// @ts-strict-ignore
import { getClock, Timestamp } from '@actual-app/crdt';
import { vi } from 'vitest';
import * as connection from '../../platform/server/connection';
import * as db from '../db';
import * as prefs from '../prefs';
import * as sheet from '../sheet';
@@ -9,7 +11,13 @@ import * as mockSyncServer from '../tests/mockSyncServer';
import * as encoder from './encoder';
import { isError } from './utils';
import { applyMessages, fullSync, sendMessages, setSyncingMode } from './index';
import {
APPLY_MESSAGES_BATCH_SIZE,
applyMessages,
fullSync,
sendMessages,
setSyncingMode,
} from './index';
beforeEach(() => {
mockSyncServer.reset();
@@ -150,6 +158,93 @@ describe('Sync', () => {
expect(result.messages.length).toBe(2);
expect(mockSyncServer.getMessages().length).toBe(3);
});
it('should apply a large number of messages in batches (same result as single transaction)', async () => {
await prefs.loadPrefs();
await prefs.savePrefs({ groupId: 'group' });
const messageCount = APPLY_MESSAGES_BATCH_SIZE * 2 + 100;
const messages = [];
for (let i = 0; i < messageCount; i++) {
global.stepForwardInTime();
messages.push({
dataset: 'transactions',
row: 'tx-' + i,
column: 'amount',
value: 1000 + i,
timestamp: Timestamp.send(),
});
}
await applyMessages(messages);
const crdtRows = await db.all<db.DbCrdtMessage>(
'SELECT * FROM messages_crdt ORDER BY timestamp',
);
expect(crdtRows.length).toBe(messageCount);
const clockRows = await db.all<db.DbClockMessage>(
'SELECT * FROM messages_clock',
);
expect(clockRows.length).toBe(1);
expect(getClock().merkle).toBeDefined();
});
it('should emit progress sync-events when applying a large number of messages', async () => {
await prefs.loadPrefs();
await prefs.savePrefs({ groupId: 'group' });
const sendSpy = vi.spyOn(connection, 'send');
const messageCount = APPLY_MESSAGES_BATCH_SIZE * 2 + 100;
const messages = [];
for (let i = 0; i < messageCount; i++) {
global.stepForwardInTime();
messages.push({
dataset: 'transactions',
row: 'tx-progress-' + i,
column: 'amount',
value: 2000 + i,
timestamp: Timestamp.send(),
});
}
await applyMessages(messages);
const progressCalls = sendSpy.mock.calls.filter(
([name, payload]) =>
name === 'sync-event' &&
payload &&
typeof payload === 'object' &&
'type' in payload &&
payload.type === 'progress',
);
expect(progressCalls.length).toBeGreaterThan(0);
const appliedValues = progressCalls
.map(([, payload]) => (payload as { applied: number }).applied)
.sort((a, b) => a - b);
for (let i = 0; i < appliedValues.length; i++) {
expect(appliedValues[i]).toBeGreaterThan(0);
expect(appliedValues[i]).toBeLessThanOrEqual(messageCount);
}
for (const [, payload] of progressCalls) {
const p = payload as { applied: number; total: number };
expect(p.total).toBe(messageCount);
}
for (let i = 1; i < appliedValues.length; i++) {
expect(appliedValues[i]).toBeGreaterThanOrEqual(appliedValues[i - 1]);
}
sendSpy.mockRestore();
});
});
function registerBudgetMonths(months) {

View File

@@ -38,6 +38,11 @@ type SyncEvent = {
| {
type: 'start';
}
| {
type: 'progress';
applied: number;
total: number;
}
| {
type: 'unauthorized';
}

View File

@@ -0,0 +1,6 @@
---
category: Enhancements
authors: [MatissJanis]
---
Performance: batch sync messages when performing initial sync