summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-03-25 10:51:46 +0000
committerErik Johnston <erik@matrix.org>2020-03-25 10:51:46 +0000
commit5473f1806aebfb120391924d1339856f69fa5076 (patch)
tree9e5fadbfcf7f6d63cab4425362ff366a83011d44
parentAdd instance name to command (diff)
downloadsynapse-5473f1806aebfb120391924d1339856f69fa5076.tar.xz
Change stream_positions to include instance name
-rw-r--r--synapse/app/generic_worker.py2
-rw-r--r--synapse/handlers/typing.py2
-rw-r--r--synapse/replication/slave/storage/_base.py2
-rw-r--r--synapse/replication/slave/storage/account_data.py4
-rw-r--r--synapse/replication/slave/storage/deviceinbox.py2
-rw-r--r--synapse/replication/slave/storage/devices.py4
-rw-r--r--synapse/replication/slave/storage/events.py4
-rw-r--r--synapse/replication/slave/storage/groups.py2
-rw-r--r--synapse/replication/slave/storage/presence.py2
-rw-r--r--synapse/replication/slave/storage/push_rule.py4
-rw-r--r--synapse/replication/slave/storage/pushers.py2
-rw-r--r--synapse/replication/slave/storage/receipts.py2
-rw-r--r--synapse/replication/slave/storage/room.py4
-rw-r--r--synapse/replication/tcp/handler.py12
14 files changed, 23 insertions, 25 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index fb16059fac..c48606e1bb 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -743,7 +743,7 @@ class FederationSenderHandler(object):
         self.federation_sender.wake_destination(server)
 
     def stream_positions(self):
-        return {"federation": self.federation_position}
+        return {"federation": {"master": self.federation_position}}
 
     def process_replication_rows(self, stream_name, token, rows):
         # The federation stream contains things that we want to send out, e.g.
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 9291b54ef6..f8a347fa85 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -309,7 +309,7 @@ class TypingSlaveHandler(object):
         # We must update this typing token from the response of the previous
         # sync. In particular, the stream id may "reset" back to zero/a low
         # value which we *must* use for the next replication request.
-        return {"typing": self._latest_room_serial}
+        return {"typing": {"master": self._latest_room_serial}}
 
     def process_replication_rows(self, stream_name, token, rows):
         if stream_name != TypingStream.NAME:
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index 751c799d94..c2202e753b 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -59,7 +59,7 @@ class BaseSlavedStore(CacheInvalidationWorkerStore):
         """
         pos = {}
         if self._cache_id_gen:
-            pos["caches"] = self._cache_id_gen.get_current_token()
+            pos["caches"] = {"master": self._cache_id_gen.get_current_token()}
         return pos
 
     def get_cache_stream_token(self):
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index ebe94909cb..87c94a3ee9 100644
--- a/synapse/replication/slave/storage/account_data.py
+++ b/synapse/replication/slave/storage/account_data.py
@@ -35,9 +35,7 @@ class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlaved
     def stream_positions(self):
         result = super(SlavedAccountDataStore, self).stream_positions()
         position = self._account_data_id_gen.get_current_token()
-        result["user_account_data"] = position
-        result["room_account_data"] = position
-        result["tag_account_data"] = position
+        result["account_data"] = {"master": position}
         return result
 
     def process_replication_rows(self, stream_name, token, rows):
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index 0c237c6e0f..f77ba97d35 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -45,7 +45,7 @@ class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore):
 
     def stream_positions(self):
         result = super(SlavedDeviceInboxStore, self).stream_positions()
-        result["to_device"] = self._device_inbox_id_gen.get_current_token()
+        result["to_device"] = {"master": self._device_inbox_id_gen.get_current_token()}
         return result
 
     def process_replication_rows(self, stream_name, token, rows):
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index 23b1650e41..095a85bb39 100644
--- a/synapse/replication/slave/storage/devices.py
+++ b/synapse/replication/slave/storage/devices.py
@@ -54,8 +54,8 @@ class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedSto
         # device list stream, so set them both to the device list ID
         # generator's current token.
         current_token = self._device_list_id_gen.get_current_token()
-        result[DeviceListsStream.NAME] = current_token
-        result[UserSignatureStream.NAME] = current_token
+        result[DeviceListsStream.NAME] = {"master": current_token}
+        result[UserSignatureStream.NAME] = {"master": current_token}
         return result
 
     def process_replication_rows(self, stream_name, token, rows):
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index e73342c657..2f636889b1 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -95,8 +95,8 @@ class SlavedEventStore(
 
     def stream_positions(self):
         result = super(SlavedEventStore, self).stream_positions()
-        result["events"] = self._stream_id_gen.get_current_token()
-        result["backfill"] = -self._backfill_id_gen.get_current_token()
+        result["events"] = {"master": self._stream_id_gen.get_current_token()}
+        result["backfill"] = {"master": -self._backfill_id_gen.get_current_token()}
         return result
 
     def process_replication_rows(self, stream_name, token, rows):
diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py
index 2d4fd08cf5..38d03fd9ee 100644
--- a/synapse/replication/slave/storage/groups.py
+++ b/synapse/replication/slave/storage/groups.py
@@ -39,7 +39,7 @@ class SlavedGroupServerStore(GroupServerWorkerStore, BaseSlavedStore):
 
     def stream_positions(self):
         result = super(SlavedGroupServerStore, self).stream_positions()
-        result["groups"] = self._group_updates_id_gen.get_current_token()
+        result["groups"] = {"master": self._group_updates_id_gen.get_current_token()}
         return result
 
     def process_replication_rows(self, stream_name, token, rows):
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
index ad8f0c15a9..d8baa0e365 100644
--- a/synapse/replication/slave/storage/presence.py
+++ b/synapse/replication/slave/storage/presence.py
@@ -46,7 +46,7 @@ class SlavedPresenceStore(BaseSlavedStore):
 
         if self.hs.config.use_presence:
             position = self._presence_id_gen.get_current_token()
-            result["presence"] = position
+            result["presence"] = {"master": position}
 
         return result
 
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
index eebd5a1fb6..a2f1522e73 100644
--- a/synapse/replication/slave/storage/push_rule.py
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -39,7 +39,9 @@ class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
 
     def stream_positions(self):
         result = super(SlavedPushRuleStore, self).stream_positions()
-        result["push_rules"] = self._push_rules_stream_id_gen.get_current_token()
+        result["push_rules"] = {
+            "master": self._push_rules_stream_id_gen.get_current_token()
+        }
         return result
 
     def process_replication_rows(self, stream_name, token, rows):
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
index bce8a3d115..d2bef05280 100644
--- a/synapse/replication/slave/storage/pushers.py
+++ b/synapse/replication/slave/storage/pushers.py
@@ -30,7 +30,7 @@ class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):
 
     def stream_positions(self):
         result = super(SlavedPusherStore, self).stream_positions()
-        result["pushers"] = self._pushers_id_gen.get_current_token()
+        result["pushers"] = {"master": self._pushers_id_gen.get_current_token()}
         return result
 
     def get_pushers_stream_token(self):
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index d40dc6e1f5..0c4d26ff85 100644
--- a/synapse/replication/slave/storage/receipts.py
+++ b/synapse/replication/slave/storage/receipts.py
@@ -44,7 +44,7 @@ class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore):
 
     def stream_positions(self):
         result = super(SlavedReceiptsStore, self).stream_positions()
-        result["receipts"] = self._receipts_id_gen.get_current_token()
+        result["receipts"] = {"master": self._receipts_id_gen.get_current_token()}
         return result
 
     def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):
diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
index 3a20f45316..12ca3b64b6 100644
--- a/synapse/replication/slave/storage/room.py
+++ b/synapse/replication/slave/storage/room.py
@@ -32,7 +32,9 @@ class RoomStore(RoomWorkerStore, BaseSlavedStore):
 
     def stream_positions(self):
         result = super(RoomStore, self).stream_positions()
-        result["public_rooms"] = self._public_room_id_gen.get_current_token()
+        result["public_rooms"] = {
+            "master": self._public_room_id_gen.get_current_token()
+        }
         return result
 
     def process_replication_rows(self, stream_name, token, rows):
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index ffc6e7d398..401e4c1d4d 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -228,8 +228,10 @@ class ReplicationClientHandler:
             return
 
         # Find where we previously streamed up to.
-        current_token = self.replication_data_handler.get_streams_to_replicate().get(
-            cmd.stream_name
+        current_token = (
+            self.replication_data_handler.get_streams_to_replicate()
+            .get(cmd.stream_name, {})
+            .get(cmd.instance_name)
         )
         if current_token is None:
             logger.debug(
@@ -370,12 +372,6 @@ class ReplicationDataHandler:
 
         if self.slaved_store:
             args = self.store.stream_positions()
-            user_account_data = args.pop("user_account_data", None)
-            room_account_data = args.pop("room_account_data", None)
-            if user_account_data:
-                args["account_data"] = user_account_data
-            elif room_account_data:
-                args["account_data"] = room_account_data
 
         if self.slaved_typing:
             args.update(self.typing_handler.stream_positions())