summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-03-25 10:51:46 +0000
committerErik Johnston <erik@matrix.org>2020-03-25 10:51:46 +0000
commit5473f1806aebfb120391924d1339856f69fa5076 (patch)
tree9e5fadbfcf7f6d63cab4425362ff366a83011d44 /synapse/replication
parentAdd instance name to command (diff)
downloadsynapse-5473f1806aebfb120391924d1339856f69fa5076.tar.xz
Change stream_positions to include instance name
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/slave/storage/_base.py2
-rw-r--r--synapse/replication/slave/storage/account_data.py4
-rw-r--r--synapse/replication/slave/storage/deviceinbox.py2
-rw-r--r--synapse/replication/slave/storage/devices.py4
-rw-r--r--synapse/replication/slave/storage/events.py4
-rw-r--r--synapse/replication/slave/storage/groups.py2
-rw-r--r--synapse/replication/slave/storage/presence.py2
-rw-r--r--synapse/replication/slave/storage/push_rule.py4
-rw-r--r--synapse/replication/slave/storage/pushers.py2
-rw-r--r--synapse/replication/slave/storage/receipts.py2
-rw-r--r--synapse/replication/slave/storage/room.py4
-rw-r--r--synapse/replication/tcp/handler.py12
12 files changed, 21 insertions, 23 deletions
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py

index 751c799d94..c2202e753b 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py
@@ -59,7 +59,7 @@ class BaseSlavedStore(CacheInvalidationWorkerStore): """ pos = {} if self._cache_id_gen: - pos["caches"] = self._cache_id_gen.get_current_token() + pos["caches"] = {"master": self._cache_id_gen.get_current_token()} return pos def get_cache_stream_token(self): diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index ebe94909cb..87c94a3ee9 100644 --- a/synapse/replication/slave/storage/account_data.py +++ b/synapse/replication/slave/storage/account_data.py
@@ -35,9 +35,7 @@ class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlaved def stream_positions(self): result = super(SlavedAccountDataStore, self).stream_positions() position = self._account_data_id_gen.get_current_token() - result["user_account_data"] = position - result["room_account_data"] = position - result["tag_account_data"] = position + result["account_data"] = {"master": position} return result def process_replication_rows(self, stream_name, token, rows): diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index 0c237c6e0f..f77ba97d35 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -45,7 +45,7 @@ class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore): def stream_positions(self): result = super(SlavedDeviceInboxStore, self).stream_positions() - result["to_device"] = self._device_inbox_id_gen.get_current_token() + result["to_device"] = {"master": self._device_inbox_id_gen.get_current_token()} return result def process_replication_rows(self, stream_name, token, rows): diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index 23b1650e41..095a85bb39 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py
@@ -54,8 +54,8 @@ class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedSto # device list stream, so set them both to the device list ID # generator's current token. current_token = self._device_list_id_gen.get_current_token() - result[DeviceListsStream.NAME] = current_token - result[UserSignatureStream.NAME] = current_token + result[DeviceListsStream.NAME] = {"master": current_token} + result[UserSignatureStream.NAME] = {"master": current_token} return result def process_replication_rows(self, stream_name, token, rows): diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index e73342c657..2f636889b1 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py
@@ -95,8 +95,8 @@ class SlavedEventStore( def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() - result["events"] = self._stream_id_gen.get_current_token() - result["backfill"] = -self._backfill_id_gen.get_current_token() + result["events"] = {"master": self._stream_id_gen.get_current_token()} + result["backfill"] = {"master": -self._backfill_id_gen.get_current_token()} return result def process_replication_rows(self, stream_name, token, rows): diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py
index 2d4fd08cf5..38d03fd9ee 100644 --- a/synapse/replication/slave/storage/groups.py +++ b/synapse/replication/slave/storage/groups.py
@@ -39,7 +39,7 @@ class SlavedGroupServerStore(GroupServerWorkerStore, BaseSlavedStore): def stream_positions(self): result = super(SlavedGroupServerStore, self).stream_positions() - result["groups"] = self._group_updates_id_gen.get_current_token() + result["groups"] = {"master": self._group_updates_id_gen.get_current_token()} return result def process_replication_rows(self, stream_name, token, rows): diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
index ad8f0c15a9..d8baa0e365 100644 --- a/synapse/replication/slave/storage/presence.py +++ b/synapse/replication/slave/storage/presence.py
@@ -46,7 +46,7 @@ class SlavedPresenceStore(BaseSlavedStore): if self.hs.config.use_presence: position = self._presence_id_gen.get_current_token() - result["presence"] = position + result["presence"] = {"master": position} return result diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
index eebd5a1fb6..a2f1522e73 100644 --- a/synapse/replication/slave/storage/push_rule.py +++ b/synapse/replication/slave/storage/push_rule.py
@@ -39,7 +39,9 @@ class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore): def stream_positions(self): result = super(SlavedPushRuleStore, self).stream_positions() - result["push_rules"] = self._push_rules_stream_id_gen.get_current_token() + result["push_rules"] = { + "master": self._push_rules_stream_id_gen.get_current_token() + } return result def process_replication_rows(self, stream_name, token, rows): diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
index bce8a3d115..d2bef05280 100644 --- a/synapse/replication/slave/storage/pushers.py +++ b/synapse/replication/slave/storage/pushers.py
@@ -30,7 +30,7 @@ class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore): def stream_positions(self): result = super(SlavedPusherStore, self).stream_positions() - result["pushers"] = self._pushers_id_gen.get_current_token() + result["pushers"] = {"master": self._pushers_id_gen.get_current_token()} return result def get_pushers_stream_token(self): diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index d40dc6e1f5..0c4d26ff85 100644 --- a/synapse/replication/slave/storage/receipts.py +++ b/synapse/replication/slave/storage/receipts.py
@@ -44,7 +44,7 @@ class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore): def stream_positions(self): result = super(SlavedReceiptsStore, self).stream_positions() - result["receipts"] = self._receipts_id_gen.get_current_token() + result["receipts"] = {"master": self._receipts_id_gen.get_current_token()} return result def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id): diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
index 3a20f45316..12ca3b64b6 100644 --- a/synapse/replication/slave/storage/room.py +++ b/synapse/replication/slave/storage/room.py
@@ -32,7 +32,9 @@ class RoomStore(RoomWorkerStore, BaseSlavedStore): def stream_positions(self): result = super(RoomStore, self).stream_positions() - result["public_rooms"] = self._public_room_id_gen.get_current_token() + result["public_rooms"] = { + "master": self._public_room_id_gen.get_current_token() + } return result def process_replication_rows(self, stream_name, token, rows): diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index ffc6e7d398..401e4c1d4d 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py
@@ -228,8 +228,10 @@ class ReplicationClientHandler: return # Find where we previously streamed up to. - current_token = self.replication_data_handler.get_streams_to_replicate().get( - cmd.stream_name + current_token = ( + self.replication_data_handler.get_streams_to_replicate() + .get(cmd.stream_name, {}) + .get(cmd.instance_name) ) if current_token is None: logger.debug( @@ -370,12 +372,6 @@ class ReplicationDataHandler: if self.slaved_store: args = self.store.stream_positions() - user_account_data = args.pop("user_account_data", None) - room_account_data = args.pop("room_account_data", None) - if user_account_data: - args["account_data"] = user_account_data - elif room_account_data: - args["account_data"] = room_account_data if self.slaved_typing: args.update(self.typing_handler.stream_positions())