diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index 751c799d94..c2202e753b 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -59,7 +59,7 @@ class BaseSlavedStore(CacheInvalidationWorkerStore):
"""
pos = {}
if self._cache_id_gen:
- pos["caches"] = self._cache_id_gen.get_current_token()
+ pos["caches"] = {"master": self._cache_id_gen.get_current_token()}
return pos
def get_cache_stream_token(self):
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index ebe94909cb..87c94a3ee9 100644
--- a/synapse/replication/slave/storage/account_data.py
+++ b/synapse/replication/slave/storage/account_data.py
@@ -35,9 +35,7 @@ class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlaved
def stream_positions(self):
result = super(SlavedAccountDataStore, self).stream_positions()
position = self._account_data_id_gen.get_current_token()
- result["user_account_data"] = position
- result["room_account_data"] = position
- result["tag_account_data"] = position
+ result["account_data"] = {"master": position}
return result
def process_replication_rows(self, stream_name, token, rows):
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index 0c237c6e0f..f77ba97d35 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -45,7 +45,7 @@ class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore):
def stream_positions(self):
result = super(SlavedDeviceInboxStore, self).stream_positions()
- result["to_device"] = self._device_inbox_id_gen.get_current_token()
+ result["to_device"] = {"master": self._device_inbox_id_gen.get_current_token()}
return result
def process_replication_rows(self, stream_name, token, rows):
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index 23b1650e41..095a85bb39 100644
--- a/synapse/replication/slave/storage/devices.py
+++ b/synapse/replication/slave/storage/devices.py
@@ -54,8 +54,8 @@ class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedSto
# device list stream, so set them both to the device list ID
# generator's current token.
current_token = self._device_list_id_gen.get_current_token()
- result[DeviceListsStream.NAME] = current_token
- result[UserSignatureStream.NAME] = current_token
+ result[DeviceListsStream.NAME] = {"master": current_token}
+ result[UserSignatureStream.NAME] = {"master": current_token}
return result
def process_replication_rows(self, stream_name, token, rows):
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index e73342c657..2f636889b1 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -95,8 +95,8 @@ class SlavedEventStore(
def stream_positions(self):
result = super(SlavedEventStore, self).stream_positions()
- result["events"] = self._stream_id_gen.get_current_token()
- result["backfill"] = -self._backfill_id_gen.get_current_token()
+ result["events"] = {"master": self._stream_id_gen.get_current_token()}
+ result["backfill"] = {"master": -self._backfill_id_gen.get_current_token()}
return result
def process_replication_rows(self, stream_name, token, rows):
diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py
index 2d4fd08cf5..38d03fd9ee 100644
--- a/synapse/replication/slave/storage/groups.py
+++ b/synapse/replication/slave/storage/groups.py
@@ -39,7 +39,7 @@ class SlavedGroupServerStore(GroupServerWorkerStore, BaseSlavedStore):
def stream_positions(self):
result = super(SlavedGroupServerStore, self).stream_positions()
- result["groups"] = self._group_updates_id_gen.get_current_token()
+ result["groups"] = {"master": self._group_updates_id_gen.get_current_token()}
return result
def process_replication_rows(self, stream_name, token, rows):
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
index ad8f0c15a9..d8baa0e365 100644
--- a/synapse/replication/slave/storage/presence.py
+++ b/synapse/replication/slave/storage/presence.py
@@ -46,7 +46,7 @@ class SlavedPresenceStore(BaseSlavedStore):
if self.hs.config.use_presence:
position = self._presence_id_gen.get_current_token()
- result["presence"] = position
+ result["presence"] = {"master": position}
return result
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
index eebd5a1fb6..a2f1522e73 100644
--- a/synapse/replication/slave/storage/push_rule.py
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -39,7 +39,9 @@ class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
def stream_positions(self):
result = super(SlavedPushRuleStore, self).stream_positions()
- result["push_rules"] = self._push_rules_stream_id_gen.get_current_token()
+ result["push_rules"] = {
+ "master": self._push_rules_stream_id_gen.get_current_token()
+ }
return result
def process_replication_rows(self, stream_name, token, rows):
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
index bce8a3d115..d2bef05280 100644
--- a/synapse/replication/slave/storage/pushers.py
+++ b/synapse/replication/slave/storage/pushers.py
@@ -30,7 +30,7 @@ class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):
def stream_positions(self):
result = super(SlavedPusherStore, self).stream_positions()
- result["pushers"] = self._pushers_id_gen.get_current_token()
+ result["pushers"] = {"master": self._pushers_id_gen.get_current_token()}
return result
def get_pushers_stream_token(self):
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index d40dc6e1f5..0c4d26ff85 100644
--- a/synapse/replication/slave/storage/receipts.py
+++ b/synapse/replication/slave/storage/receipts.py
@@ -44,7 +44,7 @@ class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore):
def stream_positions(self):
result = super(SlavedReceiptsStore, self).stream_positions()
- result["receipts"] = self._receipts_id_gen.get_current_token()
+ result["receipts"] = {"master": self._receipts_id_gen.get_current_token()}
return result
def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):
diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
index 3a20f45316..12ca3b64b6 100644
--- a/synapse/replication/slave/storage/room.py
+++ b/synapse/replication/slave/storage/room.py
@@ -32,7 +32,9 @@ class RoomStore(RoomWorkerStore, BaseSlavedStore):
def stream_positions(self):
result = super(RoomStore, self).stream_positions()
- result["public_rooms"] = self._public_room_id_gen.get_current_token()
+ result["public_rooms"] = {
+ "master": self._public_room_id_gen.get_current_token()
+ }
return result
def process_replication_rows(self, stream_name, token, rows):
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index ffc6e7d398..401e4c1d4d 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -228,8 +228,10 @@ class ReplicationClientHandler:
return
# Find where we previously streamed up to.
- current_token = self.replication_data_handler.get_streams_to_replicate().get(
- cmd.stream_name
+ current_token = (
+ self.replication_data_handler.get_streams_to_replicate()
+ .get(cmd.stream_name, {})
+ .get(cmd.instance_name)
)
if current_token is None:
logger.debug(
@@ -370,12 +372,6 @@ class ReplicationDataHandler:
if self.slaved_store:
args = self.store.stream_positions()
- user_account_data = args.pop("user_account_data", None)
- room_account_data = args.pop("room_account_data", None)
- if user_account_data:
- args["account_data"] = user_account_data
- elif room_account_data:
- args["account_data"] = room_account_data
if self.slaved_typing:
args.update(self.typing_handler.stream_positions())
|