diff options
author | Nick Mills-Barrett <nick@beeper.com> | 2023-01-04 11:49:26 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-04 11:49:26 +0000 |
commit | db1cfe9c80a707995fcad8f3faa839acb247068a (patch) | |
tree | 691c711006765e770056d97db624043d5b87b781 /synapse/storage/databases/main/devices.py | |
parent | Add experimental support for MSC3391: deleting account data (#14714) (diff) | |
download | synapse-db1cfe9c80a707995fcad8f3faa839acb247068a.tar.xz |
Update all stream IDs after processing replication rows (#14723)
This creates a new store method, `process_replication_position` that is called after `process_replication_rows`. By moving stream ID advances here this guarantees any relevant cache invalidations will have been applied before the stream is advanced. This avoids race conditions where Python switches between threads mid way through processing the `process_replication_rows` method where stream IDs may be advanced before caches are invalidated due to class resolution ordering. See this comment/issue for further discussion: https://github.com/matrix-org/synapse/issues/14158#issuecomment-1344048703
Diffstat (limited to 'synapse/storage/databases/main/devices.py')
-rw-r--r-- | synapse/storage/databases/main/devices.py | 11 |
1 files changed, 9 insertions, 2 deletions
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index a5bb4d404e..db877e3f13 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -162,14 +162,21 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] ) -> None: if stream_name == DeviceListsStream.NAME: - self._device_list_id_gen.advance(instance_name, token) self._invalidate_caches_for_devices(token, rows) elif stream_name == UserSignatureStream.NAME: - self._device_list_id_gen.advance(instance_name, token) for row in rows: self._user_signature_stream_cache.entity_has_changed(row.user_id, token) return super().process_replication_rows(stream_name, instance_name, token, rows) + def process_replication_position( + self, stream_name: str, instance_name: str, token: int + ) -> None: + if stream_name == DeviceListsStream.NAME: + self._device_list_id_gen.advance(instance_name, token) + elif stream_name == UserSignatureStream.NAME: + self._device_list_id_gen.advance(instance_name, token) + super().process_replication_position(stream_name, instance_name, token) + def _invalidate_caches_for_devices( self, token: int, rows: Iterable[DeviceListsStream.DeviceListsStreamRow] ) -> None: |