summary refs log tree commit diff
path: root/synapse/handlers/presence.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/presence.py')
-rw-r--r--synapse/handlers/presence.py56
1 files changed, 36 insertions, 20 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7fd28ffa54..9938be3821 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -122,7 +122,8 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
 
 
 class BasePresenceHandler(abc.ABC):
-    """Parts of the PresenceHandler that are shared between workers and master"""
+    """Parts of the PresenceHandler that are shared between workers and presence
+    writer"""
 
     def __init__(self, hs: "HomeServer"):
         self.clock = hs.get_clock()
@@ -309,8 +310,16 @@ class WorkerPresenceHandler(BasePresenceHandler):
         super().__init__(hs)
         self.hs = hs
 
+        self._presence_writer_instance = hs.config.worker.writers.presence[0]
+
         self._presence_enabled = hs.config.use_presence
 
+        # Route presence EDUs to the right worker
+        hs.get_federation_registry().register_instances_for_edu(
+            "m.presence",
+            hs.config.worker.writers.presence,
+        )
+
         # The number of ongoing syncs on this process, by user id.
         # Empty if _presence_enabled is false.
         self._user_to_num_current_syncs = {}  # type: Dict[str, int]
@@ -318,8 +327,8 @@ class WorkerPresenceHandler(BasePresenceHandler):
         self.notifier = hs.get_notifier()
         self.instance_id = hs.get_instance_id()
 
-        # user_id -> last_sync_ms. Lists the users that have stopped syncing
-        # but we haven't notified the master of that yet
+        # user_id -> last_sync_ms. Lists the users that have stopped syncing but
+        # we haven't notified the presence writer of that yet
         self.users_going_offline = {}
 
         self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs)
@@ -352,22 +361,23 @@ class WorkerPresenceHandler(BasePresenceHandler):
             )
 
     def mark_as_coming_online(self, user_id):
-        """A user has started syncing. Send a UserSync to the master, unless they
-        had recently stopped syncing.
+        """A user has started syncing. Send a UserSync to the presence writer,
+        unless they had recently stopped syncing.
 
         Args:
             user_id (str)
         """
         going_offline = self.users_going_offline.pop(user_id, None)
         if not going_offline:
-            # Safe to skip because we haven't yet told the master they were offline
+            # Safe to skip because we haven't yet told the presence writer they
+            # were offline
             self.send_user_sync(user_id, True, self.clock.time_msec())
 
     def mark_as_going_offline(self, user_id):
-        """A user has stopped syncing. We wait before notifying the master as
-        its likely they'll come back soon. This allows us to avoid sending
-        a stopped syncing immediately followed by a started syncing notification
-        to the master
+        """A user has stopped syncing. We wait before notifying the presence
+        writer as its likely they'll come back soon. This allows us to avoid
+        sending a stopped syncing immediately followed by a started syncing
+        notification to the presence writer
 
         Args:
             user_id (str)
@@ -375,8 +385,8 @@ class WorkerPresenceHandler(BasePresenceHandler):
         self.users_going_offline[user_id] = self.clock.time_msec()
 
     def send_stop_syncing(self):
-        """Check if there are any users who have stopped syncing a while ago
-        and haven't come back yet. If there are poke the master about them.
+        """Check if there are any users who have stopped syncing a while ago and
+        haven't come back yet. If there are poke the presence writer about them.
         """
         now = self.clock.time_msec()
         for user_id, last_sync_ms in list(self.users_going_offline.items()):
@@ -492,9 +502,12 @@ class WorkerPresenceHandler(BasePresenceHandler):
         if not self.hs.config.use_presence:
             return
 
-        # Proxy request to master
+        # Proxy request to instance that writes presence
         await self._set_state_client(
-            user_id=user_id, state=state, ignore_status_msg=ignore_status_msg
+            instance_name=self._presence_writer_instance,
+            user_id=user_id,
+            state=state,
+            ignore_status_msg=ignore_status_msg,
         )
 
     async def bump_presence_active_time(self, user):
@@ -505,9 +518,11 @@ class WorkerPresenceHandler(BasePresenceHandler):
         if not self.hs.config.use_presence:
             return
 
-        # Proxy request to master
+        # Proxy request to instance that writes presence
         user_id = user.to_string()
-        await self._bump_active_client(user_id=user_id)
+        await self._bump_active_client(
+            instance_name=self._presence_writer_instance, user_id=user_id
+        )
 
 
 class PresenceHandler(BasePresenceHandler):
@@ -1909,7 +1924,7 @@ class PresenceFederationQueue:
         self._queue_presence_updates = True
 
         # Whether this instance is a presence writer.
-        self._presence_writer = hs.config.worker.worker_app is None
+        self._presence_writer = self._instance_name in hs.config.worker.writers.presence
 
         # The FederationSender instance, if this process sends federation traffic directly.
         self._federation = None
@@ -1957,7 +1972,7 @@ class PresenceFederationQueue:
         Will forward to the local federation sender (if there is one) and queue
         to send over replication (if there are other federation sender instances.).
 
-        Must only be called on the master process.
+        Must only be called on the presence writer process.
         """
 
         # This should only be called on a presence writer.
@@ -2003,10 +2018,11 @@ class PresenceFederationQueue:
         We return rows in the form of `(destination, user_id)` to keep the size
         of each row bounded (rather than returning the sets in a row).
 
-        On workers this will query the master process via HTTP replication.
+        On workers this will query the presence writer process via HTTP replication.
         """
         if instance_name != self._instance_name:
-            # If not local we query over http replication from the master
+            # If not local we query over http replication from the presence
+            # writer
             result = await self._repl_client(
                 instance_name=instance_name,
                 stream_name=PresenceFederationStream.NAME,