diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index e8e9db4b91..f31e18328b 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -23,6 +23,7 @@ The methods that define policy are:
"""
import abc
import contextlib
+import itertools
import logging
from bisect import bisect
from contextlib import contextmanager
@@ -151,15 +152,13 @@ class BasePresenceHandler(abc.ABC):
self._federation_queue = PresenceFederationQueue(hs, self)
- self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
-
self.VALID_PRESENCE: Tuple[str, ...] = (
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
PresenceState.OFFLINE,
)
- if self._busy_presence_enabled:
+ if hs.config.experimental.msc3026_enabled:
self.VALID_PRESENCE += (PresenceState.BUSY,)
active_presence = self.store.take_presence_startup_info()
@@ -167,7 +166,11 @@ class BasePresenceHandler(abc.ABC):
@abc.abstractmethod
async def user_syncing(
- self, user_id: str, affect_presence: bool, presence_state: str
+ self,
+ user_id: str,
+ device_id: Optional[str],
+ affect_presence: bool,
+ presence_state: str,
) -> ContextManager[None]:
"""Returns a context manager that should surround any stream requests
from the user.
@@ -178,6 +181,7 @@ class BasePresenceHandler(abc.ABC):
Args:
user_id: the user that is starting a sync
+ device_id: the user's device that is starting a sync
affect_presence: If false this function will be a no-op.
Useful for streams that are not associated with an actual
client that is being used by a user.
@@ -185,15 +189,17 @@ class BasePresenceHandler(abc.ABC):
"""
@abc.abstractmethod
- def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
- """Get an iterable of syncing users on this worker, to send to the presence handler
+ def get_currently_syncing_users_for_replication(
+ self,
+ ) -> Iterable[Tuple[str, Optional[str]]]:
+ """Get an iterable of syncing users and devices on this worker, to send to the presence handler
This is called when a replication connection is established. It should return
- a list of user ids, which are then sent as USER_SYNC commands to inform the
- process handling presence about those users.
+ a list of tuples of user ID & device ID, which are then sent as USER_SYNC commands
+ to inform the process handling presence about those users/devices.
Returns:
- An iterable of user_id strings.
+ An iterable of tuples of user ID and device ID.
"""
async def get_state(self, target_user: UserID) -> UserPresenceState:
@@ -254,28 +260,39 @@ class BasePresenceHandler(abc.ABC):
async def set_state(
self,
target_user: UserID,
+ device_id: Optional[str],
state: JsonDict,
- ignore_status_msg: bool = False,
force_notify: bool = False,
+ is_sync: bool = False,
) -> None:
"""Set the presence state of the user.
Args:
target_user: The ID of the user to set the presence state of.
+ device_id: the device that the user is setting the presence state of.
state: The presence state as a JSON dictionary.
- ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
- If False, the user's current status will be updated.
force_notify: Whether to force notification of the update to clients.
+ is_sync: True if this update was from a sync, which results in
+ *not* overriding a previously set BUSY status, updating the
+ user's last_user_sync_ts, and ignoring the "status_msg" field of
+ the `state` dict.
"""
@abc.abstractmethod
- async def bump_presence_active_time(self, user: UserID) -> None:
+ async def bump_presence_active_time(
+ self, user: UserID, device_id: Optional[str]
+ ) -> None:
"""We've seen the user do something that indicates they're interacting
with the app.
"""
async def update_external_syncs_row( # noqa: B027 (no-op by design)
- self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int
+ self,
+ process_id: str,
+ user_id: str,
+ device_id: Optional[str],
+ is_syncing: bool,
+ sync_time_msec: int,
) -> None:
"""Update the syncing users for an external process as a delta.
@@ -286,6 +303,7 @@ class BasePresenceHandler(abc.ABC):
syncing against. This allows synapse to process updates
as user start and stop syncing against a given process.
user_id: The user who has started or stopped syncing
+ device_id: The user's device that has started or stopped syncing
is_syncing: Whether or not the user is now syncing
sync_time_msec: Time in ms when the user was last syncing
"""
@@ -336,7 +354,9 @@ class BasePresenceHandler(abc.ABC):
)
for destination, host_states in hosts_to_states.items():
- self._federation.send_presence_to_destinations(host_states, [destination])
+ await self._federation.send_presence_to_destinations(
+ host_states, [destination]
+ )
async def send_full_presence_to_users(self, user_ids: StrCollection) -> None:
"""
@@ -381,7 +401,9 @@ class BasePresenceHandler(abc.ABC):
# We set force_notify=True here so that this presence update is guaranteed to
# increment the presence stream ID (which resending the current user's presence
# otherwise would not do).
- await self.set_state(UserID.from_string(user_id), state, force_notify=True)
+ await self.set_state(
+ UserID.from_string(user_id), None, state, force_notify=True
+ )
async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool:
raise NotImplementedError(
@@ -414,16 +436,18 @@ class WorkerPresenceHandler(BasePresenceHandler):
hs.config.worker.writers.presence,
)
- # The number of ongoing syncs on this process, by user id.
+ # The number of ongoing syncs on this process, by (user ID, device ID).
# Empty if _presence_enabled is false.
- self._user_to_num_current_syncs: Dict[str, int] = {}
+ self._user_device_to_num_current_syncs: Dict[
+ Tuple[str, Optional[str]], int
+ ] = {}
self.notifier = hs.get_notifier()
self.instance_id = hs.get_instance_id()
- # user_id -> last_sync_ms. Lists the users that have stopped syncing but
- # we haven't notified the presence writer of that yet
- self.users_going_offline: Dict[str, int] = {}
+ # (user_id, device_id) -> last_sync_ms. Lists the devices that have stopped
+ # syncing but we haven't notified the presence writer of that yet
+ self._user_devices_going_offline: Dict[Tuple[str, Optional[str]], int] = {}
self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs)
self._set_state_client = ReplicationPresenceSetState.make_client(hs)
@@ -446,42 +470,54 @@ class WorkerPresenceHandler(BasePresenceHandler):
ClearUserSyncsCommand(self.instance_id)
)
- def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None:
+ def send_user_sync(
+ self,
+ user_id: str,
+ device_id: Optional[str],
+ is_syncing: bool,
+ last_sync_ms: int,
+ ) -> None:
if self._presence_enabled:
self.hs.get_replication_command_handler().send_user_sync(
- self.instance_id, user_id, is_syncing, last_sync_ms
+ self.instance_id, user_id, device_id, is_syncing, last_sync_ms
)
- def mark_as_coming_online(self, user_id: str) -> None:
+ def mark_as_coming_online(self, user_id: str, device_id: Optional[str]) -> None:
"""A user has started syncing. Send a UserSync to the presence writer,
unless they had recently stopped syncing.
"""
- going_offline = self.users_going_offline.pop(user_id, None)
+ going_offline = self._user_devices_going_offline.pop((user_id, device_id), None)
if not going_offline:
# Safe to skip because we haven't yet told the presence writer they
# were offline
- self.send_user_sync(user_id, True, self.clock.time_msec())
+ self.send_user_sync(user_id, device_id, True, self.clock.time_msec())
- def mark_as_going_offline(self, user_id: str) -> None:
+ def mark_as_going_offline(self, user_id: str, device_id: Optional[str]) -> None:
"""A user has stopped syncing. We wait before notifying the presence
writer as its likely they'll come back soon. This allows us to avoid
sending a stopped syncing immediately followed by a started syncing
notification to the presence writer
"""
- self.users_going_offline[user_id] = self.clock.time_msec()
+ self._user_devices_going_offline[(user_id, device_id)] = self.clock.time_msec()
def send_stop_syncing(self) -> None:
"""Check if there are any users who have stopped syncing a while ago and
haven't come back yet. If there are poke the presence writer about them.
"""
now = self.clock.time_msec()
- for user_id, last_sync_ms in list(self.users_going_offline.items()):
+ for (user_id, device_id), last_sync_ms in list(
+ self._user_devices_going_offline.items()
+ ):
if now - last_sync_ms > UPDATE_SYNCING_USERS_MS:
- self.users_going_offline.pop(user_id, None)
- self.send_user_sync(user_id, False, last_sync_ms)
+ self._user_devices_going_offline.pop((user_id, device_id), None)
+ self.send_user_sync(user_id, device_id, False, last_sync_ms)
async def user_syncing(
- self, user_id: str, affect_presence: bool, presence_state: str
+ self,
+ user_id: str,
+ device_id: Optional[str],
+ affect_presence: bool,
+ presence_state: str,
) -> ContextManager[None]:
"""Record that a user is syncing.
@@ -491,36 +527,32 @@ class WorkerPresenceHandler(BasePresenceHandler):
if not affect_presence or not self._presence_enabled:
return _NullContextManager()
- prev_state = await self.current_state_for_user(user_id)
- if prev_state.state != PresenceState.BUSY:
- # We set state here but pass ignore_status_msg = True as we don't want to
- # cause the status message to be cleared.
- # Note that this causes last_active_ts to be incremented which is not
- # what the spec wants: see comment in the BasePresenceHandler version
- # of this function.
- await self.set_state(
- UserID.from_string(user_id),
- {"presence": presence_state},
- ignore_status_msg=True,
- )
+ # Note that this causes last_active_ts to be incremented which is not
+ # what the spec wants.
+ await self.set_state(
+ UserID.from_string(user_id),
+ device_id,
+ state={"presence": presence_state},
+ is_sync=True,
+ )
- curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
- self._user_to_num_current_syncs[user_id] = curr_sync + 1
+ curr_sync = self._user_device_to_num_current_syncs.get((user_id, device_id), 0)
+ self._user_device_to_num_current_syncs[(user_id, device_id)] = curr_sync + 1
- # If we went from no in flight sync to some, notify replication
- if self._user_to_num_current_syncs[user_id] == 1:
- self.mark_as_coming_online(user_id)
+ # If this is the first in-flight sync, notify replication
+ if self._user_device_to_num_current_syncs[(user_id, device_id)] == 1:
+ self.mark_as_coming_online(user_id, device_id)
def _end() -> None:
# We check that the user_id is in user_to_num_current_syncs because
# user_to_num_current_syncs may have been cleared if we are
# shutting down.
- if user_id in self._user_to_num_current_syncs:
- self._user_to_num_current_syncs[user_id] -= 1
+ if (user_id, device_id) in self._user_device_to_num_current_syncs:
+ self._user_device_to_num_current_syncs[(user_id, device_id)] -= 1
- # If we went from one in flight sync to non, notify replication
- if self._user_to_num_current_syncs[user_id] == 0:
- self.mark_as_going_offline(user_id)
+ # If there are no more in-flight syncs, notify replication
+ if self._user_device_to_num_current_syncs[(user_id, device_id)] == 0:
+ self.mark_as_going_offline(user_id, device_id)
@contextlib.contextmanager
def _user_syncing() -> Generator[None, None, None]:
@@ -587,28 +619,34 @@ class WorkerPresenceHandler(BasePresenceHandler):
# If this is a federation sender, notify about presence updates.
await self.maybe_send_presence_to_interested_destinations(state_to_notify)
- def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
+ def get_currently_syncing_users_for_replication(
+ self,
+ ) -> Iterable[Tuple[str, Optional[str]]]:
return [
- user_id
- for user_id, count in self._user_to_num_current_syncs.items()
+ user_id_device_id
+ for user_id_device_id, count in self._user_device_to_num_current_syncs.items()
if count > 0
]
async def set_state(
self,
target_user: UserID,
+ device_id: Optional[str],
state: JsonDict,
- ignore_status_msg: bool = False,
force_notify: bool = False,
+ is_sync: bool = False,
) -> None:
"""Set the presence state of the user.
Args:
target_user: The ID of the user to set the presence state of.
+ device_id: the device that the user is setting the presence state of.
state: The presence state as a JSON dictionary.
- ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
- If False, the user's current status will be updated.
force_notify: Whether to force notification of the update to clients.
+ is_sync: True if this update was from a sync, which results in
+ *not* overriding a previously set BUSY status, updating the
+ user's last_user_sync_ts, and ignoring the "status_msg" field of
+ the `state` dict.
"""
presence = state["presence"]
@@ -625,12 +663,15 @@ class WorkerPresenceHandler(BasePresenceHandler):
await self._set_state_client(
instance_name=self._presence_writer_instance,
user_id=user_id,
+ device_id=device_id,
state=state,
- ignore_status_msg=ignore_status_msg,
force_notify=force_notify,
+ is_sync=is_sync,
)
- async def bump_presence_active_time(self, user: UserID) -> None:
+ async def bump_presence_active_time(
+ self, user: UserID, device_id: Optional[str]
+ ) -> None:
"""We've seen the user do something that indicates they're interacting
with the app.
"""
@@ -641,7 +682,9 @@ class WorkerPresenceHandler(BasePresenceHandler):
# Proxy request to instance that writes presence
user_id = user.to_string()
await self._bump_active_client(
- instance_name=self._presence_writer_instance, user_id=user_id
+ instance_name=self._presence_writer_instance,
+ user_id=user_id,
+ device_id=device_id,
)
@@ -703,17 +746,23 @@ class PresenceHandler(BasePresenceHandler):
# Keeps track of the number of *ongoing* syncs on this process. While
# this is non zero a user will never go offline.
- self.user_to_num_current_syncs: Dict[str, int] = {}
+ self._user_device_to_num_current_syncs: Dict[
+ Tuple[str, Optional[str]], int
+ ] = {}
# Keeps track of the number of *ongoing* syncs on other processes.
+ #
# While any sync is ongoing on another process the user will never
# go offline.
+ #
# Each process has a unique identifier and an update frequency. If
# no update is received from that process within the update period then
# we assume that all the sync requests on that process have stopped.
- # Stored as a dict from process_id to set of user_id, and a dict of
- # process_id to millisecond timestamp last updated.
- self.external_process_to_current_syncs: Dict[str, Set[str]] = {}
+ # Stored as a dict from process_id to set of (user_id, device_id), and
+ # a dict of process_id to millisecond timestamp last updated.
+ self.external_process_to_current_syncs: Dict[
+ str, Set[Tuple[str, Optional[str]]]
+ ] = {}
self.external_process_last_updated_ms: Dict[str, int] = {}
self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
@@ -889,7 +938,7 @@ class PresenceHandler(BasePresenceHandler):
)
for destination, states in hosts_to_states.items():
- self._federation_queue.send_presence_to_destinations(
+ await self._federation_queue.send_presence_to_destinations(
states, [destination]
)
@@ -918,7 +967,10 @@ class PresenceHandler(BasePresenceHandler):
# that were syncing on that process to see if they need to be timed
# out.
users_to_check.update(
- self.external_process_to_current_syncs.pop(process_id, ())
+ user_id
+ for user_id, device_id in self.external_process_to_current_syncs.pop(
+ process_id, ()
+ )
)
self.external_process_last_updated_ms.pop(process_id)
@@ -931,11 +983,15 @@ class PresenceHandler(BasePresenceHandler):
syncing_user_ids = {
user_id
- for user_id, count in self.user_to_num_current_syncs.items()
+ for (user_id, _), count in self._user_device_to_num_current_syncs.items()
if count
}
- for user_ids in self.external_process_to_current_syncs.values():
- syncing_user_ids.update(user_ids)
+ syncing_user_ids.update(
+ user_id
+ for user_id, _ in itertools.chain(
+ *self.external_process_to_current_syncs.values()
+ )
+ )
changes = handle_timeouts(
states,
@@ -946,7 +1002,9 @@ class PresenceHandler(BasePresenceHandler):
return await self._update_states(changes)
- async def bump_presence_active_time(self, user: UserID) -> None:
+ async def bump_presence_active_time(
+ self, user: UserID, device_id: Optional[str]
+ ) -> None:
"""We've seen the user do something that indicates they're interacting
with the app.
"""
@@ -969,6 +1027,7 @@ class PresenceHandler(BasePresenceHandler):
async def user_syncing(
self,
user_id: str,
+ device_id: Optional[str],
affect_presence: bool = True,
presence_state: str = PresenceState.ONLINE,
) -> ContextManager[None]:
@@ -980,7 +1039,8 @@ class PresenceHandler(BasePresenceHandler):
when users disconnect/reconnect.
Args:
- user_id
+ user_id: the user that is starting a sync
+ device_id: the user's device that is starting a sync
affect_presence: If false this function will be a no-op.
Useful for streams that are not associated with an actual
client that is being used by a user.
@@ -989,52 +1049,21 @@ class PresenceHandler(BasePresenceHandler):
if not affect_presence or not self._presence_enabled:
return _NullContextManager()
- curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
- self.user_to_num_current_syncs[user_id] = curr_sync + 1
+ curr_sync = self._user_device_to_num_current_syncs.get((user_id, device_id), 0)
+ self._user_device_to_num_current_syncs[(user_id, device_id)] = curr_sync + 1
- prev_state = await self.current_state_for_user(user_id)
-
- # If they're busy then they don't stop being busy just by syncing,
- # so just update the last sync time.
- if prev_state.state != PresenceState.BUSY:
- # XXX: We set_state separately here and just update the last_active_ts above
- # This keeps the logic as similar as possible between the worker and single
- # process modes. Using set_state will actually cause last_active_ts to be
- # updated always, which is not what the spec calls for, but synapse has done
- # this for... forever, I think.
- await self.set_state(
- UserID.from_string(user_id),
- {"presence": presence_state},
- ignore_status_msg=True,
- )
- # Retrieve the new state for the logic below. This should come from the
- # in-memory cache.
- prev_state = await self.current_state_for_user(user_id)
-
- # To keep the single process behaviour consistent with worker mode, run the
- # same logic as `update_external_syncs_row`, even though it looks weird.
- if prev_state.state == PresenceState.OFFLINE:
- await self._update_states(
- [
- prev_state.copy_and_replace(
- state=PresenceState.ONLINE,
- last_active_ts=self.clock.time_msec(),
- last_user_sync_ts=self.clock.time_msec(),
- )
- ]
- )
- # otherwise, set the new presence state & update the last sync time,
- # but don't update last_active_ts as this isn't an indication that
- # they've been active (even though it's probably been updated by
- # set_state above)
- else:
- await self._update_states(
- [prev_state.copy_and_replace(last_user_sync_ts=self.clock.time_msec())]
- )
+ # Note that this causes last_active_ts to be incremented which is not
+ # what the spec wants.
+ await self.set_state(
+ UserID.from_string(user_id),
+ device_id,
+ state={"presence": presence_state},
+ is_sync=True,
+ )
async def _end() -> None:
try:
- self.user_to_num_current_syncs[user_id] -= 1
+ self._user_device_to_num_current_syncs[(user_id, device_id)] -= 1
prev_state = await self.current_state_for_user(user_id)
await self._update_states(
@@ -1056,12 +1085,19 @@ class PresenceHandler(BasePresenceHandler):
return _user_syncing()
- def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
+ def get_currently_syncing_users_for_replication(
+ self,
+ ) -> Iterable[Tuple[str, Optional[str]]]:
# since we are the process handling presence, there is nothing to do here.
return []
async def update_external_syncs_row(
- self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int
+ self,
+ process_id: str,
+ user_id: str,
+ device_id: Optional[str],
+ is_syncing: bool,
+ sync_time_msec: int,
) -> None:
"""Update the syncing users for an external process as a delta.
@@ -1070,6 +1106,7 @@ class PresenceHandler(BasePresenceHandler):
syncing against. This allows synapse to process updates
as user start and stop syncing against a given process.
user_id: The user who has started or stopped syncing
+ device_id: The user's device that has started or stopped syncing
is_syncing: Whether or not the user is now syncing
sync_time_msec: Time in ms when the user was last syncing
"""
@@ -1080,31 +1117,27 @@ class PresenceHandler(BasePresenceHandler):
process_id, set()
)
- updates = []
- if is_syncing and user_id not in process_presence:
- if prev_state.state == PresenceState.OFFLINE:
- updates.append(
- prev_state.copy_and_replace(
- state=PresenceState.ONLINE,
- last_active_ts=sync_time_msec,
- last_user_sync_ts=sync_time_msec,
- )
- )
- else:
- updates.append(
- prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
- )
- process_presence.add(user_id)
- elif user_id in process_presence:
- updates.append(
- prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
+ # USER_SYNC is sent when a user's device starts or stops syncing on
+ # a remote # process. (But only for the initial and last sync for that
+ # device.)
+ #
+ # When a device *starts* syncing it also calls set_state(...) which
+ # will update the state, last_active_ts, and last_user_sync_ts.
+ # Simply ensure the user & device is tracked as syncing in this case.
+ #
+ # When a device *stops* syncing, update the last_user_sync_ts and mark
+ # them as no longer syncing. Note this doesn't quite match the
+ # monolith behaviour, which updates last_user_sync_ts at the end of
+ # every sync, not just the last in-flight sync.
+ if is_syncing and (user_id, device_id) not in process_presence:
+ process_presence.add((user_id, device_id))
+ elif not is_syncing and (user_id, device_id) in process_presence:
+ new_state = prev_state.copy_and_replace(
+ last_user_sync_ts=sync_time_msec
)
+ await self._update_states([new_state])
- if not is_syncing:
- process_presence.discard(user_id)
-
- if updates:
- await self._update_states(updates)
+ process_presence.discard((user_id, device_id))
self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
@@ -1118,7 +1151,9 @@ class PresenceHandler(BasePresenceHandler):
process_presence = self.external_process_to_current_syncs.pop(
process_id, set()
)
- prev_states = await self.current_state_for_users(process_presence)
+ prev_states = await self.current_state_for_users(
+ {user_id for user_id, device_id in process_presence}
+ )
time_now_ms = self.clock.time_msec()
await self._update_states(
@@ -1203,18 +1238,22 @@ class PresenceHandler(BasePresenceHandler):
async def set_state(
self,
target_user: UserID,
+ device_id: Optional[str],
state: JsonDict,
- ignore_status_msg: bool = False,
force_notify: bool = False,
+ is_sync: bool = False,
) -> None:
"""Set the presence state of the user.
Args:
target_user: The ID of the user to set the presence state of.
+ device_id: the device that the user is setting the presence state of.
state: The presence state as a JSON dictionary.
- ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
- If False, the user's current status will be updated.
force_notify: Whether to force notification of the update to clients.
+ is_sync: True if this update was from a sync, which results in
+ *not* overriding a previously set BUSY status, updating the
+ user's last_user_sync_ts, and ignoring the "status_msg" field of
+ the `state` dict.
"""
status_msg = state.get("status_msg", None)
presence = state["presence"]
@@ -1227,18 +1266,27 @@ class PresenceHandler(BasePresenceHandler):
return
user_id = target_user.to_string()
+ now = self.clock.time_msec()
prev_state = await self.current_state_for_user(user_id)
+ # Syncs do not override a previous presence of busy.
+ #
+ # TODO: This is a hack for lack of multi-device support. Unfortunately
+ # removing this requires coordination with clients.
+ if prev_state.state == PresenceState.BUSY and is_sync:
+ presence = PresenceState.BUSY
+
new_fields = {"state": presence}
- if not ignore_status_msg:
- new_fields["status_msg"] = status_msg
+ if presence == PresenceState.ONLINE or presence == PresenceState.BUSY:
+ new_fields["last_active_ts"] = now
- if presence == PresenceState.ONLINE or (
- presence == PresenceState.BUSY and self._busy_presence_enabled
- ):
- new_fields["last_active_ts"] = self.clock.time_msec()
+ if is_sync:
+ new_fields["last_user_sync_ts"] = now
+ else:
+ # Syncs do not override the status message.
+ new_fields["status_msg"] = status_msg
await self._update_states(
[prev_state.copy_and_replace(**new_fields)], force_notify=force_notify
@@ -1462,7 +1510,7 @@ class PresenceHandler(BasePresenceHandler):
or state.status_msg is not None
]
- self._federation_queue.send_presence_to_destinations(
+ await self._federation_queue.send_presence_to_destinations(
destinations=newly_joined_remote_hosts,
states=states,
)
@@ -1473,7 +1521,7 @@ class PresenceHandler(BasePresenceHandler):
prev_remote_hosts or newly_joined_remote_hosts
):
local_states = await self.current_state_for_users(newly_joined_local_users)
- self._federation_queue.send_presence_to_destinations(
+ await self._federation_queue.send_presence_to_destinations(
destinations=prev_remote_hosts | newly_joined_remote_hosts,
states=list(local_states.values()),
)
@@ -2136,7 +2184,7 @@ class PresenceFederationQueue:
index = bisect(self._queue, (clear_before,))
self._queue = self._queue[index:]
- def send_presence_to_destinations(
+ async def send_presence_to_destinations(
self, states: Collection[UserPresenceState], destinations: StrCollection
) -> None:
"""Send the presence states to the given destinations.
@@ -2156,7 +2204,7 @@ class PresenceFederationQueue:
return
if self._federation:
- self._federation.send_presence_to_destinations(
+ await self._federation.send_presence_to_destinations(
states=states,
destinations=destinations,
)
@@ -2279,7 +2327,7 @@ class PresenceFederationQueue:
for host, user_ids in hosts_to_users.items():
states = await self._presence_handler.current_state_for_users(user_ids)
- self._federation.send_presence_to_destinations(
+ await self._federation.send_presence_to_destinations(
states=states.values(),
destinations=[host],
)
|