summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/8171.misc1
-rw-r--r--synapse/replication/slave/storage/_slaved_id_tracker.py4
-rw-r--r--synapse/replication/slave/storage/account_data.py4
-rw-r--r--synapse/replication/slave/storage/deviceinbox.py2
-rw-r--r--synapse/replication/slave/storage/devices.py4
-rw-r--r--synapse/replication/slave/storage/groups.py2
-rw-r--r--synapse/replication/slave/storage/presence.py2
-rw-r--r--synapse/replication/slave/storage/push_rule.py2
-rw-r--r--synapse/replication/slave/storage/pushers.py2
-rw-r--r--synapse/replication/slave/storage/receipts.py2
-rw-r--r--synapse/replication/slave/storage/room.py2
-rw-r--r--synapse/storage/databases/main/events_worker.py4
12 files changed, 16 insertions, 15 deletions
diff --git a/changelog.d/8171.misc b/changelog.d/8171.misc
new file mode 100644
index 0000000000..cafbf23d83
--- /dev/null
+++ b/changelog.d/8171.misc
@@ -0,0 +1 @@
+Make `SlavedIdTracker.advance` have the same interface as `MultiWriterIDGenerator`.
diff --git a/synapse/replication/slave/storage/_slaved_id_tracker.py b/synapse/replication/slave/storage/_slaved_id_tracker.py
index d43eaf3a29..047f2c50f7 100644
--- a/synapse/replication/slave/storage/_slaved_id_tracker.py
+++ b/synapse/replication/slave/storage/_slaved_id_tracker.py
@@ -21,9 +21,9 @@ class SlavedIdTracker(object):
         self.step = step
         self._current = _load_current_id(db_conn, table, column, step)
         for table, column in extra_tables:
-            self.advance(_load_current_id(db_conn, table, column))
+            self.advance(None, _load_current_id(db_conn, table, column))
 
-    def advance(self, new_id):
+    def advance(self, instance_name, new_id):
         self._current = (max if self.step > 0 else min)(self._current, new_id)
 
     def get_current_token(self):
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index 154f0e687c..bb66ba9b80 100644
--- a/synapse/replication/slave/storage/account_data.py
+++ b/synapse/replication/slave/storage/account_data.py
@@ -41,12 +41,12 @@ class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlaved
 
     def process_replication_rows(self, stream_name, instance_name, token, rows):
         if stream_name == TagAccountDataStream.NAME:
-            self._account_data_id_gen.advance(token)
+            self._account_data_id_gen.advance(instance_name, token)
             for row in rows:
                 self.get_tags_for_user.invalidate((row.user_id,))
                 self._account_data_stream_cache.entity_has_changed(row.user_id, token)
         elif stream_name == AccountDataStream.NAME:
-            self._account_data_id_gen.advance(token)
+            self._account_data_id_gen.advance(instance_name, token)
             for row in rows:
                 if not row.room_id:
                     self.get_global_account_data_by_type_for_user.invalidate(
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index ee7f69a918..533d927701 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -46,7 +46,7 @@ class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore):
 
     def process_replication_rows(self, stream_name, instance_name, token, rows):
         if stream_name == ToDeviceStream.NAME:
-            self._device_inbox_id_gen.advance(token)
+            self._device_inbox_id_gen.advance(instance_name, token)
             for row in rows:
                 if row.entity.startswith("@"):
                     self._device_inbox_stream_cache.entity_has_changed(
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index 722f3745e9..596c72eb92 100644
--- a/synapse/replication/slave/storage/devices.py
+++ b/synapse/replication/slave/storage/devices.py
@@ -50,10 +50,10 @@ class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedSto
 
     def process_replication_rows(self, stream_name, instance_name, token, rows):
         if stream_name == DeviceListsStream.NAME:
-            self._device_list_id_gen.advance(token)
+            self._device_list_id_gen.advance(instance_name, token)
             self._invalidate_caches_for_devices(token, rows)
         elif stream_name == UserSignatureStream.NAME:
-            self._device_list_id_gen.advance(token)
+            self._device_list_id_gen.advance(instance_name, token)
             for row in rows:
                 self._user_signature_stream_cache.entity_has_changed(row.user_id, token)
         return super().process_replication_rows(stream_name, instance_name, token, rows)
diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py
index 3291558c7a..567b4a5cc1 100644
--- a/synapse/replication/slave/storage/groups.py
+++ b/synapse/replication/slave/storage/groups.py
@@ -40,7 +40,7 @@ class SlavedGroupServerStore(GroupServerWorkerStore, BaseSlavedStore):
 
     def process_replication_rows(self, stream_name, instance_name, token, rows):
         if stream_name == GroupServerStream.NAME:
-            self._group_updates_id_gen.advance(token)
+            self._group_updates_id_gen.advance(instance_name, token)
             for row in rows:
                 self._group_updates_stream_cache.entity_has_changed(row.user_id, token)
 
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
index a912c04360..025f6f6be8 100644
--- a/synapse/replication/slave/storage/presence.py
+++ b/synapse/replication/slave/storage/presence.py
@@ -44,7 +44,7 @@ class SlavedPresenceStore(BaseSlavedStore):
 
     def process_replication_rows(self, stream_name, instance_name, token, rows):
         if stream_name == PresenceStream.NAME:
-            self._presence_id_gen.advance(token)
+            self._presence_id_gen.advance(instance_name, token)
             for row in rows:
                 self.presence_stream_cache.entity_has_changed(row.user_id, token)
                 self._get_presence_for_user.invalidate((row.user_id,))
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
index 90d90833f9..de904c943c 100644
--- a/synapse/replication/slave/storage/push_rule.py
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -30,7 +30,7 @@ class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
         assert isinstance(self._push_rules_stream_id_gen, SlavedIdTracker)
 
         if stream_name == PushRulesStream.NAME:
-            self._push_rules_stream_id_gen.advance(token)
+            self._push_rules_stream_id_gen.advance(instance_name, token)
             for row in rows:
                 self.get_push_rules_for_user.invalidate((row.user_id,))
                 self.get_push_rules_enabled_for_user.invalidate((row.user_id,))
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
index 63300e5da6..9da218bfe8 100644
--- a/synapse/replication/slave/storage/pushers.py
+++ b/synapse/replication/slave/storage/pushers.py
@@ -34,5 +34,5 @@ class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):
 
     def process_replication_rows(self, stream_name, instance_name, token, rows):
         if stream_name == PushersStream.NAME:
-            self._pushers_id_gen.advance(token)
+            self._pushers_id_gen.advance(instance_name, token)
         return super().process_replication_rows(stream_name, instance_name, token, rows)
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index 17ba1f22ac..5c2986e050 100644
--- a/synapse/replication/slave/storage/receipts.py
+++ b/synapse/replication/slave/storage/receipts.py
@@ -46,7 +46,7 @@ class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore):
 
     def process_replication_rows(self, stream_name, instance_name, token, rows):
         if stream_name == ReceiptsStream.NAME:
-            self._receipts_id_gen.advance(token)
+            self._receipts_id_gen.advance(instance_name, token)
             for row in rows:
                 self.invalidate_caches_for_receipt(
                     row.room_id, row.receipt_type, row.user_id
diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
index 427c81772b..80ae803ad9 100644
--- a/synapse/replication/slave/storage/room.py
+++ b/synapse/replication/slave/storage/room.py
@@ -33,6 +33,6 @@ class RoomStore(RoomWorkerStore, BaseSlavedStore):
 
     def process_replication_rows(self, stream_name, instance_name, token, rows):
         if stream_name == PublicRoomsStream.NAME:
-            self._public_room_id_gen.advance(token)
+            self._public_room_id_gen.advance(instance_name, token)
 
         return super().process_replication_rows(stream_name, instance_name, token, rows)
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index d59d73938a..e6247d682d 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -113,9 +113,9 @@ class EventsWorkerStore(SQLBaseStore):
 
     def process_replication_rows(self, stream_name, instance_name, token, rows):
         if stream_name == EventsStream.NAME:
-            self._stream_id_gen.advance(token)
+            self._stream_id_gen.advance(instance_name, token)
         elif stream_name == BackfillStream.NAME:
-            self._backfill_id_gen.advance(-token)
+            self._backfill_id_gen.advance(instance_name, -token)
 
         super().process_replication_rows(stream_name, instance_name, token, rows)