fix: websocket handling

This commit is contained in:
bwees
2026-02-25 11:32:58 -06:00
parent f6847f1385
commit 4c6134dc97
4 changed files with 58 additions and 52 deletions

View File

@@ -340,60 +340,43 @@ class SyncStreamService {
}
}
Future<void> handleWsAssetEditReadyV1Batch(List<dynamic> batchData) async {
if (batchData.isEmpty) return;
_logger.info('Processing batch of ${batchData.length} AssetEditReadyV1 events');
final List<SyncAssetV1> assets = [];
final List<SyncAssetEditV1> assetEdits = [];
Future<void> handleWsAssetEditReadyV1(dynamic data) async {
_logger.info('Processing AssetEditReadyV1 event');
try {
for (final data in batchData) {
if (data is! Map<String, dynamic>) {
continue;
}
final payload = data;
final assetData = payload['asset'];
final editData = payload['edit'];
if (assetData == null) {
continue;
}
final asset = SyncAssetV1.fromJson(assetData);
if (asset != null) {
assets.add(asset);
}
// Edits are only send on v2.6.0+
if (editData != null) {
final edits = (editData as List<dynamic>)
.map((e) => SyncAssetEditV1.fromJson(e))
.whereType<SyncAssetEditV1>()
.toList();
assetEdits.addAll(edits);
}
if (data is! Map<String, dynamic>) {
throw ArgumentError("Invalid data format for AssetEditReadyV1 event");
}
if (assets.isNotEmpty) {
await _syncStreamRepository.updateAssetsV1(assets, debugLabel: 'websocket-edit');
final payload = data;
// TODO: How do we want to handle this?
// edits that are sent replace previous edits, so we delete existing ones first
// await _syncStreamRepository.deleteAssetEditsV1(
// assets.map((asset) => SyncAssetEditDeleteV1(editId: asset.id)).toList(),
// debugLabel: 'websocket-edit',
// );
await _syncStreamRepository.updateAssetEditsV1(assetEdits, debugLabel: 'websocket-edit');
_logger.info('Successfully processed ${assets.length} edited assets');
if (payload['asset'] == null) {
throw ArgumentError("Missing 'asset' field in AssetEditReadyV1 event data");
}
final asset = SyncAssetV1.fromJson(payload['asset']);
if (asset == null) {
throw ArgumentError("Failed to parse 'asset' field in AssetEditReadyV1 event data");
}
List<SyncAssetEditV1> assetEdits = [];
// Edits are only send on v2.6.0+
if (payload['edit'] != null && payload['edit'] is List<dynamic>) {
assetEdits = (payload['edit'] as List<dynamic>)
.map((e) => SyncAssetEditV1.fromJson(e))
.whereType<SyncAssetEditV1>()
.toList();
}
await _syncStreamRepository.updateAssetsV1([asset], debugLabel: 'websocket-edit');
await _syncStreamRepository.replaceAssetEditsV1(asset.id, assetEdits, debugLabel: 'websocket-edit');
_logger.info(
'Successfully processed AssetEditReadyV1 event for asset ${asset.id} with ${assetEdits.length} edits',
);
} catch (error, stackTrace) {
_logger.severe("Error processing AssetEditReadyV1 websocket batch events", error, stackTrace);
_logger.severe("Error processing AssetEditReadyV1 websocket event", error, stackTrace);
}
}

View File

@@ -196,11 +196,11 @@ class BackgroundSyncManager {
});
}
Future<void> syncWebsocketEditBatch(List<dynamic> batchData) {
Future<void> syncWebsocketEdit(dynamic data) {
if (_syncWebsocketTask != null) {
return _syncWebsocketTask!.future;
}
_syncWebsocketTask = _handleWsAssetEditReadyV1Batch(batchData);
_syncWebsocketTask = _handleWsAssetEditReadyV1(data);
return _syncWebsocketTask!.whenComplete(() {
_syncWebsocketTask = null;
});
@@ -242,7 +242,7 @@ Cancelable<void> _handleWsAssetUploadReadyV1Batch(List<dynamic> batchData) => ru
debugLabel: 'websocket-batch',
);
Cancelable<void> _handleWsAssetEditReadyV1Batch(List<dynamic> batchData) => runInIsolateGentle(
computation: (ref) => ref.read(syncStreamServiceProvider).handleWsAssetEditReadyV1Batch(batchData),
Cancelable<void> _handleWsAssetEditReadyV1(dynamic data) => runInIsolateGentle(
computation: (ref) => ref.read(syncStreamServiceProvider).handleWsAssetEditReadyV1(data),
debugLabel: 'websocket-edit',
);

View File

@@ -346,6 +346,29 @@ class SyncStreamRepository extends DriftDatabaseRepository {
}
}
Future<void> replaceAssetEditsV1(String assetId, Iterable<SyncAssetEditV1> data, {String debugLabel = 'user'}) async {
try {
await _db.batch((batch) {
batch.deleteWhere(_db.assetEditEntity, (row) => row.assetId.equals(assetId));
for (final edit in data) {
final companion = AssetEditEntityCompanion(
id: Value(edit.id),
assetId: Value(edit.assetId),
action: Value(edit.action.toAssetEditAction()),
parameters: Value(edit.parameters as Map<String, Object?>),
sequence: Value(edit.sequence),
);
batch.insert(_db.assetEditEntity, companion);
}
});
} catch (error, stack) {
_logger.severe('Error: replaceAssetEditsV1 - $debugLabel', error, stack);
rethrow;
}
}
Future<void> deleteAssetEditsV1(Iterable<SyncAssetEditDeleteV1> data, {String debugLabel = 'user'}) async {
try {
await _db.batch((batch) {

View File

@@ -319,7 +319,7 @@ class WebsocketNotifier extends StateNotifier<WebsocketState> {
}
void _handleSyncAssetEditReady(dynamic data) {
unawaited(_ref.read(backgroundSyncProvider).syncWebsocketEditBatch([data]));
unawaited(_ref.read(backgroundSyncProvider).syncWebsocketEdit(data));
}
void _processBatchedAssetUploadReady() {