mirror of
https://github.com/actualbudget/actual.git
synced 2026-03-09 11:42:54 -05:00
Compare commits
3 Commits
v26.3.0
...
matiss/chu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2bbb7ecd0f | ||
|
|
54defb155a | ||
|
|
8690616f41 |
@@ -4,7 +4,7 @@ import { t } from 'i18next';
|
||||
import { listen, send } from 'loot-core/platform/client/connection';
|
||||
|
||||
import { accountQueries } from './accounts';
|
||||
import { resetSync, sync } from './app/appSlice';
|
||||
import { resetSync, setAppState, sync } from './app/appSlice';
|
||||
import { categoryQueries } from './budget';
|
||||
import {
|
||||
closeAndDownloadBudget,
|
||||
@@ -19,6 +19,19 @@ import type { AppStore } from './redux/store';
|
||||
import { signOut } from './users/usersSlice';
|
||||
|
||||
export function listenForSyncEvent(store: AppStore, queryClient: QueryClient) {
|
||||
const unlistenProgress = listen('sync-event', event => {
|
||||
if (event.type !== 'progress') {
|
||||
return;
|
||||
}
|
||||
|
||||
const percent = Math.round((event.applied / event.total) * 100);
|
||||
store.dispatch(
|
||||
setAppState({
|
||||
loadingText: t('Applying sync... {{percent}}%', { percent }),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
// TODO: Should this run on mobile too?
|
||||
const unlistenUnauthorized = listen('sync-event', async ({ type }) => {
|
||||
if (type === 'unauthorized') {
|
||||
@@ -387,6 +400,7 @@ export function listenForSyncEvent(store: AppStore, queryClient: QueryClient) {
|
||||
});
|
||||
|
||||
return () => {
|
||||
unlistenProgress();
|
||||
unlistenUnauthorized();
|
||||
unlistenSuccess();
|
||||
};
|
||||
|
||||
@@ -34,6 +34,12 @@ export { resetSync } from './reset';
|
||||
export { repairSync } from './repair';
|
||||
|
||||
const FULL_SYNC_DELAY = 1000;
|
||||
|
||||
// When applying more than this many sync messages, use batched transactions
|
||||
// to avoid blocking the event loop (e.g. on mobile). Smaller syncs use a
|
||||
// single transaction for consistency and performance.
|
||||
export const APPLY_MESSAGES_BATCH_SIZE = 5000;
|
||||
|
||||
let SYNCING_MODE = 'enabled';
|
||||
type SyncingMode = 'enabled' | 'offline' | 'disabled' | 'import';
|
||||
|
||||
@@ -324,17 +330,11 @@ export const applyMessages = sequential(async (messages: Message[]) => {
|
||||
sheet.get().startCacheBarrier();
|
||||
}
|
||||
|
||||
// Now that we have all of the data, go through and apply the
|
||||
// messages carefully. This transaction is **crucial**: it
|
||||
// guarantees that everything is atomically committed to the
|
||||
// database, and if any part of it fails everything aborts and
|
||||
// nothing is changed. This is critical to maintain consistency. We
|
||||
// also avoid any side effects to in-memory objects, and apply them
|
||||
// after this succeeds.
|
||||
db.transaction(() => {
|
||||
const added = new Set();
|
||||
const useBatching = messages.length > APPLY_MESSAGES_BATCH_SIZE;
|
||||
const added = new Set<string>();
|
||||
|
||||
for (const msg of messages) {
|
||||
function applyChunk(chunk: Message[]) {
|
||||
for (const msg of chunk) {
|
||||
const { dataset, row, column, timestamp, value } = msg;
|
||||
|
||||
if (!msg.old) {
|
||||
@@ -365,7 +365,9 @@ export const applyMessages = sequential(async (messages: Message[]) => {
|
||||
void setBudgetType(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function persistClockIfSyncing() {
|
||||
if (checkSyncingMode('enabled')) {
|
||||
currentMerkle = merkle.prune(currentMerkle);
|
||||
|
||||
@@ -378,7 +380,38 @@ export const applyMessages = sequential(async (messages: Message[]) => {
|
||||
[serializeClock({ ...clock, merkle: currentMerkle })],
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (!useBatching) {
|
||||
// Single transaction for small syncs
|
||||
db.transaction(() => {
|
||||
applyChunk(messages);
|
||||
persistClockIfSyncing();
|
||||
});
|
||||
} else {
|
||||
// Batched transactions for large syncs (e.g. initial sync on mobile)
|
||||
const chunks: Message[][] = [];
|
||||
for (let i = 0; i < messages.length; i += APPLY_MESSAGES_BATCH_SIZE) {
|
||||
chunks.push(messages.slice(i, i + APPLY_MESSAGES_BATCH_SIZE));
|
||||
}
|
||||
|
||||
let appliedSoFar = 0;
|
||||
for (const chunk of chunks) {
|
||||
db.transaction(() => {
|
||||
applyChunk(chunk);
|
||||
persistClockIfSyncing();
|
||||
});
|
||||
|
||||
appliedSoFar += chunk.length;
|
||||
connection.send('sync-event', {
|
||||
type: 'progress',
|
||||
applied: appliedSoFar,
|
||||
total: messages.length,
|
||||
});
|
||||
|
||||
await new Promise<void>(resolve => setTimeout(resolve, 0));
|
||||
}
|
||||
}
|
||||
|
||||
if (checkSyncingMode('enabled')) {
|
||||
// The transaction succeeded, so we can update in-memory objects
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
// @ts-strict-ignore
|
||||
import { getClock, Timestamp } from '@actual-app/crdt';
|
||||
import { vi } from 'vitest';
|
||||
|
||||
import * as connection from '../../platform/server/connection';
|
||||
import * as db from '../db';
|
||||
import * as prefs from '../prefs';
|
||||
import * as sheet from '../sheet';
|
||||
@@ -9,7 +11,13 @@ import * as mockSyncServer from '../tests/mockSyncServer';
|
||||
import * as encoder from './encoder';
|
||||
import { isError } from './utils';
|
||||
|
||||
import { applyMessages, fullSync, sendMessages, setSyncingMode } from './index';
|
||||
import {
|
||||
APPLY_MESSAGES_BATCH_SIZE,
|
||||
applyMessages,
|
||||
fullSync,
|
||||
sendMessages,
|
||||
setSyncingMode,
|
||||
} from './index';
|
||||
|
||||
beforeEach(() => {
|
||||
mockSyncServer.reset();
|
||||
@@ -150,6 +158,93 @@ describe('Sync', () => {
|
||||
expect(result.messages.length).toBe(2);
|
||||
expect(mockSyncServer.getMessages().length).toBe(3);
|
||||
});
|
||||
|
||||
it('should apply a large number of messages in batches (same result as single transaction)', async () => {
|
||||
await prefs.loadPrefs();
|
||||
await prefs.savePrefs({ groupId: 'group' });
|
||||
|
||||
const messageCount = APPLY_MESSAGES_BATCH_SIZE * 2 + 100;
|
||||
const messages = [];
|
||||
|
||||
for (let i = 0; i < messageCount; i++) {
|
||||
global.stepForwardInTime();
|
||||
messages.push({
|
||||
dataset: 'transactions',
|
||||
row: 'tx-' + i,
|
||||
column: 'amount',
|
||||
value: 1000 + i,
|
||||
timestamp: Timestamp.send(),
|
||||
});
|
||||
}
|
||||
|
||||
await applyMessages(messages);
|
||||
|
||||
const crdtRows = await db.all<db.DbCrdtMessage>(
|
||||
'SELECT * FROM messages_crdt ORDER BY timestamp',
|
||||
);
|
||||
expect(crdtRows.length).toBe(messageCount);
|
||||
|
||||
const clockRows = await db.all<db.DbClockMessage>(
|
||||
'SELECT * FROM messages_clock',
|
||||
);
|
||||
expect(clockRows.length).toBe(1);
|
||||
|
||||
expect(getClock().merkle).toBeDefined();
|
||||
});
|
||||
|
||||
it('should emit progress sync-events when applying a large number of messages', async () => {
|
||||
await prefs.loadPrefs();
|
||||
await prefs.savePrefs({ groupId: 'group' });
|
||||
|
||||
const sendSpy = vi.spyOn(connection, 'send');
|
||||
|
||||
const messageCount = APPLY_MESSAGES_BATCH_SIZE * 2 + 100;
|
||||
const messages = [];
|
||||
|
||||
for (let i = 0; i < messageCount; i++) {
|
||||
global.stepForwardInTime();
|
||||
messages.push({
|
||||
dataset: 'transactions',
|
||||
row: 'tx-progress-' + i,
|
||||
column: 'amount',
|
||||
value: 2000 + i,
|
||||
timestamp: Timestamp.send(),
|
||||
});
|
||||
}
|
||||
|
||||
await applyMessages(messages);
|
||||
|
||||
const progressCalls = sendSpy.mock.calls.filter(
|
||||
([name, payload]) =>
|
||||
name === 'sync-event' &&
|
||||
payload &&
|
||||
typeof payload === 'object' &&
|
||||
'type' in payload &&
|
||||
payload.type === 'progress',
|
||||
);
|
||||
|
||||
expect(progressCalls.length).toBeGreaterThan(0);
|
||||
|
||||
const appliedValues = progressCalls
|
||||
.map(([, payload]) => (payload as { applied: number }).applied)
|
||||
.sort((a, b) => a - b);
|
||||
|
||||
for (let i = 0; i < appliedValues.length; i++) {
|
||||
expect(appliedValues[i]).toBeGreaterThan(0);
|
||||
expect(appliedValues[i]).toBeLessThanOrEqual(messageCount);
|
||||
}
|
||||
|
||||
for (const [, payload] of progressCalls) {
|
||||
const p = payload as { applied: number; total: number };
|
||||
expect(p.total).toBe(messageCount);
|
||||
}
|
||||
|
||||
for (let i = 1; i < appliedValues.length; i++) {
|
||||
expect(appliedValues[i]).toBeGreaterThanOrEqual(appliedValues[i - 1]);
|
||||
}
|
||||
|
||||
sendSpy.mockRestore();
|
||||
});
|
||||
});
|
||||
|
||||
function registerBudgetMonths(months) {
|
||||
|
||||
@@ -38,6 +38,11 @@ type SyncEvent = {
|
||||
| {
|
||||
type: 'start';
|
||||
}
|
||||
| {
|
||||
type: 'progress';
|
||||
applied: number;
|
||||
total: number;
|
||||
}
|
||||
| {
|
||||
type: 'unauthorized';
|
||||
}
|
||||
|
||||
6
upcoming-release-notes/7063.md
Normal file
6
upcoming-release-notes/7063.md
Normal file
@@ -0,0 +1,6 @@
|
||||
---
|
||||
category: Enhancements
|
||||
authors: [MatissJanis]
|
||||
---
|
||||
|
||||
Performance: batch sync messages when performing initial sync
|
||||
Reference in New Issue
Block a user