diff options
author | Erik Johnston <erik@matrix.org> | 2020-03-25 10:51:46 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2020-03-25 10:51:46 +0000 |
commit | 5473f1806aebfb120391924d1339856f69fa5076 (patch) | |
tree | 9e5fadbfcf7f6d63cab4425362ff366a83011d44 | |
parent | Add instance name to command (diff) | |
download | synapse-5473f1806aebfb120391924d1339856f69fa5076.tar.xz |
Change stream_positions to include instance name
-rw-r--r-- | synapse/app/generic_worker.py | 2 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 2 | ||||
-rw-r--r-- | synapse/replication/slave/storage/_base.py | 2 | ||||
-rw-r--r-- | synapse/replication/slave/storage/account_data.py | 4 | ||||
-rw-r--r-- | synapse/replication/slave/storage/deviceinbox.py | 2 | ||||
-rw-r--r-- | synapse/replication/slave/storage/devices.py | 4 | ||||
-rw-r--r-- | synapse/replication/slave/storage/events.py | 4 | ||||
-rw-r--r-- | synapse/replication/slave/storage/groups.py | 2 | ||||
-rw-r--r-- | synapse/replication/slave/storage/presence.py | 2 | ||||
-rw-r--r-- | synapse/replication/slave/storage/push_rule.py | 4 | ||||
-rw-r--r-- | synapse/replication/slave/storage/pushers.py | 2 | ||||
-rw-r--r-- | synapse/replication/slave/storage/receipts.py | 2 | ||||
-rw-r--r-- | synapse/replication/slave/storage/room.py | 4 | ||||
-rw-r--r-- | synapse/replication/tcp/handler.py | 12 |
14 files changed, 23 insertions, 25 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index fb16059fac..c48606e1bb 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -743,7 +743,7 @@ class FederationSenderHandler(object): self.federation_sender.wake_destination(server) def stream_positions(self): - return {"federation": self.federation_position} + return {"federation": {"master": self.federation_position}} def process_replication_rows(self, stream_name, token, rows): # The federation stream contains things that we want to send out, e.g. diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 9291b54ef6..f8a347fa85 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -309,7 +309,7 @@ class TypingSlaveHandler(object): # We must update this typing token from the response of the previous # sync. In particular, the stream id may "reset" back to zero/a low # value which we *must* use for the next replication request. - return {"typing": self._latest_room_serial} + return {"typing": {"master": self._latest_room_serial}} def process_replication_rows(self, stream_name, token, rows): if stream_name != TypingStream.NAME: diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index 751c799d94..c2202e753b 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -59,7 +59,7 @@ class BaseSlavedStore(CacheInvalidationWorkerStore): """ pos = {} if self._cache_id_gen: - pos["caches"] = self._cache_id_gen.get_current_token() + pos["caches"] = {"master": self._cache_id_gen.get_current_token()} return pos def get_cache_stream_token(self): diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py index ebe94909cb..87c94a3ee9 100644 --- a/synapse/replication/slave/storage/account_data.py +++ b/synapse/replication/slave/storage/account_data.py @@ -35,9 +35,7 @@ class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlaved def stream_positions(self): result = super(SlavedAccountDataStore, self).stream_positions() position = self._account_data_id_gen.get_current_token() - result["user_account_data"] = position - result["room_account_data"] = position - result["tag_account_data"] = position + result["account_data"] = {"master": position} return result def process_replication_rows(self, stream_name, token, rows): diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index 0c237c6e0f..f77ba97d35 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -45,7 +45,7 @@ class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore): def stream_positions(self): result = super(SlavedDeviceInboxStore, self).stream_positions() - result["to_device"] = self._device_inbox_id_gen.get_current_token() + result["to_device"] = {"master": self._device_inbox_id_gen.get_current_token()} return result def process_replication_rows(self, stream_name, token, rows): diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py index 23b1650e41..095a85bb39 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py @@ -54,8 +54,8 @@ class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedSto # device list stream, so set them both to the device list ID # generator's current token. current_token = self._device_list_id_gen.get_current_token() - result[DeviceListsStream.NAME] = current_token - result[UserSignatureStream.NAME] = current_token + result[DeviceListsStream.NAME] = {"master": current_token} + result[UserSignatureStream.NAME] = {"master": current_token} return result def process_replication_rows(self, stream_name, token, rows): diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index e73342c657..2f636889b1 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -95,8 +95,8 @@ class SlavedEventStore( def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() - result["events"] = self._stream_id_gen.get_current_token() - result["backfill"] = -self._backfill_id_gen.get_current_token() + result["events"] = {"master": self._stream_id_gen.get_current_token()} + result["backfill"] = {"master": -self._backfill_id_gen.get_current_token()} return result def process_replication_rows(self, stream_name, token, rows): diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py index 2d4fd08cf5..38d03fd9ee 100644 --- a/synapse/replication/slave/storage/groups.py +++ b/synapse/replication/slave/storage/groups.py @@ -39,7 +39,7 @@ class SlavedGroupServerStore(GroupServerWorkerStore, BaseSlavedStore): def stream_positions(self): result = super(SlavedGroupServerStore, self).stream_positions() - result["groups"] = self._group_updates_id_gen.get_current_token() + result["groups"] = {"master": self._group_updates_id_gen.get_current_token()} return result def process_replication_rows(self, stream_name, token, rows): diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py index ad8f0c15a9..d8baa0e365 100644 --- a/synapse/replication/slave/storage/presence.py +++ b/synapse/replication/slave/storage/presence.py @@ -46,7 +46,7 @@ class SlavedPresenceStore(BaseSlavedStore): if self.hs.config.use_presence: position = self._presence_id_gen.get_current_token() - result["presence"] = position + result["presence"] = {"master": position} return result diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py index eebd5a1fb6..a2f1522e73 100644 --- a/synapse/replication/slave/storage/push_rule.py +++ b/synapse/replication/slave/storage/push_rule.py @@ -39,7 +39,9 @@ class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore): def stream_positions(self): result = super(SlavedPushRuleStore, self).stream_positions() - result["push_rules"] = self._push_rules_stream_id_gen.get_current_token() + result["push_rules"] = { + "master": self._push_rules_stream_id_gen.get_current_token() + } return result def process_replication_rows(self, stream_name, token, rows): diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py index bce8a3d115..d2bef05280 100644 --- a/synapse/replication/slave/storage/pushers.py +++ b/synapse/replication/slave/storage/pushers.py @@ -30,7 +30,7 @@ class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore): def stream_positions(self): result = super(SlavedPusherStore, self).stream_positions() - result["pushers"] = self._pushers_id_gen.get_current_token() + result["pushers"] = {"master": self._pushers_id_gen.get_current_token()} return result def get_pushers_stream_token(self): diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py index d40dc6e1f5..0c4d26ff85 100644 --- a/synapse/replication/slave/storage/receipts.py +++ b/synapse/replication/slave/storage/receipts.py @@ -44,7 +44,7 @@ class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore): def stream_positions(self): result = super(SlavedReceiptsStore, self).stream_positions() - result["receipts"] = self._receipts_id_gen.get_current_token() + result["receipts"] = {"master": self._receipts_id_gen.get_current_token()} return result def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id): diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py index 3a20f45316..12ca3b64b6 100644 --- a/synapse/replication/slave/storage/room.py +++ b/synapse/replication/slave/storage/room.py @@ -32,7 +32,9 @@ class RoomStore(RoomWorkerStore, BaseSlavedStore): def stream_positions(self): result = super(RoomStore, self).stream_positions() - result["public_rooms"] = self._public_room_id_gen.get_current_token() + result["public_rooms"] = { + "master": self._public_room_id_gen.get_current_token() + } return result def process_replication_rows(self, stream_name, token, rows): diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index ffc6e7d398..401e4c1d4d 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -228,8 +228,10 @@ class ReplicationClientHandler: return # Find where we previously streamed up to. - current_token = self.replication_data_handler.get_streams_to_replicate().get( - cmd.stream_name + current_token = ( + self.replication_data_handler.get_streams_to_replicate() + .get(cmd.stream_name, {}) + .get(cmd.instance_name) ) if current_token is None: logger.debug( @@ -370,12 +372,6 @@ class ReplicationDataHandler: if self.slaved_store: args = self.store.stream_positions() - user_account_data = args.pop("user_account_data", None) - room_account_data = args.pop("room_account_data", None) - if user_account_data: - args["account_data"] = user_account_data - elif room_account_data: - args["account_data"] = room_account_data if self.slaved_typing: args.update(self.typing_handler.stream_positions()) |