summary refs log tree commit diff
path: root/synapse/storage/databases/main/pusher.py
diff options
context:
space:
mode:
authorNick Mills-Barrett <nick@beeper.com>2023-01-04 11:49:26 +0000
committerGitHub <noreply@github.com>2023-01-04 11:49:26 +0000
commitdb1cfe9c80a707995fcad8f3faa839acb247068a (patch)
tree691c711006765e770056d97db624043d5b87b781 /synapse/storage/databases/main/pusher.py
parentAdd experimental support for MSC3391: deleting account data (#14714) (diff)
downloadsynapse-db1cfe9c80a707995fcad8f3faa839acb247068a.tar.xz
Update all stream IDs after processing replication rows (#14723)
This creates a new store method, `process_replication_position` that
is called after `process_replication_rows`. By moving stream ID advances
here this guarantees any relevant cache invalidations will have been
applied before the stream is advanced.

This avoids race conditions where Python switches between threads mid
way through processing the `process_replication_rows` method where stream
IDs may be advanced before caches are invalidated due to class resolution
ordering.

See this comment/issue for further discussion:
	https://github.com/matrix-org/synapse/issues/14158#issuecomment-1344048703
Diffstat (limited to 'synapse/storage/databases/main/pusher.py')
-rw-r--r--synapse/storage/databases/main/pusher.py6
1 files changed, 3 insertions, 3 deletions
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 40fd781a6a..7f24a3b6ec 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -111,12 +111,12 @@ class PusherWorkerStore(SQLBaseStore):
     def get_pushers_stream_token(self) -> int:
         return self._pushers_id_gen.get_current_token()
 
-    def process_replication_rows(
-        self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
+    def process_replication_position(
+        self, stream_name: str, instance_name: str, token: int
     ) -> None:
         if stream_name == PushersStream.NAME:
             self._pushers_id_gen.advance(instance_name, token)
-        return super().process_replication_rows(stream_name, instance_name, token, rows)
+        super().process_replication_position(stream_name, instance_name, token)
 
     async def get_pushers_by_app_id_and_pushkey(
         self, app_id: str, pushkey: str