diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7fd28ffa54..9938be3821 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -122,7 +122,8 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
class BasePresenceHandler(abc.ABC):
- """Parts of the PresenceHandler that are shared between workers and master"""
+ """Parts of the PresenceHandler that are shared between workers and presence
+ writer"""
def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
@@ -309,8 +310,16 @@ class WorkerPresenceHandler(BasePresenceHandler):
super().__init__(hs)
self.hs = hs
+ self._presence_writer_instance = hs.config.worker.writers.presence[0]
+
self._presence_enabled = hs.config.use_presence
+ # Route presence EDUs to the right worker
+ hs.get_federation_registry().register_instances_for_edu(
+ "m.presence",
+ hs.config.worker.writers.presence,
+ )
+
# The number of ongoing syncs on this process, by user id.
# Empty if _presence_enabled is false.
self._user_to_num_current_syncs = {} # type: Dict[str, int]
@@ -318,8 +327,8 @@ class WorkerPresenceHandler(BasePresenceHandler):
self.notifier = hs.get_notifier()
self.instance_id = hs.get_instance_id()
- # user_id -> last_sync_ms. Lists the users that have stopped syncing
- # but we haven't notified the master of that yet
+ # user_id -> last_sync_ms. Lists the users that have stopped syncing but
+ # we haven't notified the presence writer of that yet
self.users_going_offline = {}
self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs)
@@ -352,22 +361,23 @@ class WorkerPresenceHandler(BasePresenceHandler):
)
def mark_as_coming_online(self, user_id):
- """A user has started syncing. Send a UserSync to the master, unless they
- had recently stopped syncing.
+ """A user has started syncing. Send a UserSync to the presence writer,
+ unless they had recently stopped syncing.
Args:
user_id (str)
"""
going_offline = self.users_going_offline.pop(user_id, None)
if not going_offline:
- # Safe to skip because we haven't yet told the master they were offline
+ # Safe to skip because we haven't yet told the presence writer they
+ # were offline
self.send_user_sync(user_id, True, self.clock.time_msec())
def mark_as_going_offline(self, user_id):
- """A user has stopped syncing. We wait before notifying the master as
- its likely they'll come back soon. This allows us to avoid sending
- a stopped syncing immediately followed by a started syncing notification
- to the master
+ """A user has stopped syncing. We wait before notifying the presence
+ writer as its likely they'll come back soon. This allows us to avoid
+ sending a stopped syncing immediately followed by a started syncing
+ notification to the presence writer
Args:
user_id (str)
@@ -375,8 +385,8 @@ class WorkerPresenceHandler(BasePresenceHandler):
self.users_going_offline[user_id] = self.clock.time_msec()
def send_stop_syncing(self):
- """Check if there are any users who have stopped syncing a while ago
- and haven't come back yet. If there are poke the master about them.
+ """Check if there are any users who have stopped syncing a while ago and
+ haven't come back yet. If there are poke the presence writer about them.
"""
now = self.clock.time_msec()
for user_id, last_sync_ms in list(self.users_going_offline.items()):
@@ -492,9 +502,12 @@ class WorkerPresenceHandler(BasePresenceHandler):
if not self.hs.config.use_presence:
return
- # Proxy request to master
+ # Proxy request to instance that writes presence
await self._set_state_client(
- user_id=user_id, state=state, ignore_status_msg=ignore_status_msg
+ instance_name=self._presence_writer_instance,
+ user_id=user_id,
+ state=state,
+ ignore_status_msg=ignore_status_msg,
)
async def bump_presence_active_time(self, user):
@@ -505,9 +518,11 @@ class WorkerPresenceHandler(BasePresenceHandler):
if not self.hs.config.use_presence:
return
- # Proxy request to master
+ # Proxy request to instance that writes presence
user_id = user.to_string()
- await self._bump_active_client(user_id=user_id)
+ await self._bump_active_client(
+ instance_name=self._presence_writer_instance, user_id=user_id
+ )
class PresenceHandler(BasePresenceHandler):
@@ -1909,7 +1924,7 @@ class PresenceFederationQueue:
self._queue_presence_updates = True
# Whether this instance is a presence writer.
- self._presence_writer = hs.config.worker.worker_app is None
+ self._presence_writer = self._instance_name in hs.config.worker.writers.presence
# The FederationSender instance, if this process sends federation traffic directly.
self._federation = None
@@ -1957,7 +1972,7 @@ class PresenceFederationQueue:
Will forward to the local federation sender (if there is one) and queue
to send over replication (if there are other federation sender instances.).
- Must only be called on the master process.
+ Must only be called on the presence writer process.
"""
# This should only be called on a presence writer.
@@ -2003,10 +2018,11 @@ class PresenceFederationQueue:
We return rows in the form of `(destination, user_id)` to keep the size
of each row bounded (rather than returning the sets in a row).
- On workers this will query the master process via HTTP replication.
+ On workers this will query the presence writer process via HTTP replication.
"""
if instance_name != self._instance_name:
- # If not local we query over http replication from the master
+ # If not local we query over http replication from the presence
+ # writer
result = await self._repl_client(
instance_name=instance_name,
stream_name=PresenceFederationStream.NAME,
|