From fd48fc45853eb193a22a08d874eb473e668e2d6a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Mar 2024 16:29:23 +0000 Subject: Fixups to new push stream (#17038) Follow on from #17037 --- synapse/config/workers.py | 8 ++++---- synapse/handlers/room_member.py | 6 ++++-- synapse/replication/tcp/handler.py | 2 +- synapse/rest/client/push_rule.py | 4 +++- synapse/storage/databases/main/push_rule.py | 4 +++- 5 files changed, 15 insertions(+), 9 deletions(-) (limited to 'synapse') diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 9f81a73d6f..7ecf349e4a 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -156,7 +156,7 @@ class WriterLocations: can only be a single instance. presence: The instances that write to the presence stream. Currently can only be a single instance. - push: The instances that write to the push stream. Currently + push_rules: The instances that write to the push stream. Currently can only be a single instance. """ @@ -184,7 +184,7 @@ class WriterLocations: default=["master"], converter=_instance_to_list_converter, ) - push: List[str] = attr.ib( + push_rules: List[str] = attr.ib( default=["master"], converter=_instance_to_list_converter, ) @@ -347,7 +347,7 @@ class WorkerConfig(Config): "account_data", "receipts", "presence", - "push", + "push_rules", ): instances = _instance_to_list_converter(getattr(self.writers, stream)) for instance in instances: @@ -385,7 +385,7 @@ class WorkerConfig(Config): "Must only specify one instance to handle `presence` messages." ) - if len(self.writers.push) != 1: + if len(self.writers.push_rules) != 1: raise ConfigError( "Must only specify one instance to handle `push` messages." ) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ee2e807afc..601d37341b 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -182,8 +182,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): hs.config.server.forgotten_room_retention_period ) - self._is_push_writer = hs.get_instance_name() in hs.config.worker.writers.push - self._push_writer = hs.config.worker.writers.push[0] + self._is_push_writer = ( + hs.get_instance_name() in hs.config.worker.writers.push_rules + ) + self._push_writer = hs.config.worker.writers.push_rules[0] self._copy_push_client = ReplicationCopyPusherRestServlet.make_client(hs) def _on_user_joined_room(self, event_id: str, room_id: str) -> None: diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 4342d6ce70..72a42cb6cc 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -180,7 +180,7 @@ class ReplicationCommandHandler: continue if isinstance(stream, PushRulesStream): - if hs.get_instance_name() in hs.config.worker.writers.push: + if hs.get_instance_name() in hs.config.worker.writers.push_rules: self._streams_to_replicate.append(stream) continue diff --git a/synapse/rest/client/push_rule.py b/synapse/rest/client/push_rule.py index 9c9bfc9794..af042504c9 100644 --- a/synapse/rest/client/push_rule.py +++ b/synapse/rest/client/push_rule.py @@ -59,7 +59,9 @@ class PushRuleRestServlet(RestServlet): self.auth = hs.get_auth() self.store = hs.get_datastores().main self.notifier = hs.get_notifier() - self._is_push_worker = hs.get_instance_name() in hs.config.worker.writers.push + self._is_push_worker = ( + hs.get_instance_name() in hs.config.worker.writers.push_rules + ) self._push_rules_handler = hs.get_push_rules_handler() self._push_rule_linearizer = Linearizer(name="push_rules") diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index ed734f03ac..660c834518 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -136,7 +136,9 @@ class PushRulesWorkerStore( ): super().__init__(database, db_conn, hs) - self._is_push_writer = hs.get_instance_name() in hs.config.worker.writers.push + self._is_push_writer = ( + hs.get_instance_name() in hs.config.worker.writers.push_rules + ) # In the worker store this is an ID tracker which we overwrite in the non-worker # class below that is used on the main process. -- cgit 1.4.1