diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 163278708c..36c206dae6 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -185,19 +185,26 @@ class ApplicationServicesHandler:
new_token: Optional[int],
users: Optional[Collection[Union[str, UserID]]] = None,
) -> None:
- """This is called by the notifier in the background
- when a ephemeral event handled by the homeserver.
-
- This will determine which appservices
- are interested in the event, and submit them.
+ """
+ This is called by the notifier in the background when an ephemeral event is handled
+ by the homeserver.
- Events will only be pushed to appservices
- that have opted into ephemeral events
+ This will determine which appservices are interested in the event, and submit them.
Args:
stream_key: The stream the event came from.
- new_token: The latest stream token
- users: The user(s) involved with the event.
+
+ `stream_key` can be "typing_key", "receipt_key" or "presence_key". Any other
+ value for `stream_key` will cause this function to return early.
+
+ Ephemeral events will only be pushed to appservices that have opted into
+ them.
+
+ Appservices will only receive ephemeral events that fall within their
+ registered user and room namespaces.
+
+ new_token: The latest stream token.
+ users: The users that should be informed of the new event, if any.
"""
if not self.notify_appservices:
return
@@ -232,21 +239,32 @@ class ApplicationServicesHandler:
for service in services:
# Only handle typing if we have the latest token
if stream_key == "typing_key" and new_token is not None:
+ # Note that we don't persist the token (via set_type_stream_id_for_appservice)
+ # for typing_key due to performance reasons and due to their highly
+ # ephemeral nature.
+ #
+ # Instead we simply grab the latest typing updates in _handle_typing
+ # and, if they apply to this application service, send it off.
events = await self._handle_typing(service, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
- # We don't persist the token for typing_key for performance reasons
+
elif stream_key == "receipt_key":
events = await self._handle_receipts(service)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
+
+ # Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "read_receipt", new_token
)
+
elif stream_key == "presence_key":
events = await self._handle_presence(service, users)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
+
+ # Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "presence", new_token
)
@@ -254,18 +272,54 @@ class ApplicationServicesHandler:
async def _handle_typing(
self, service: ApplicationService, new_token: int
) -> List[JsonDict]:
+ """
+ Return the typing events since the given stream token that the given application
+ service should receive.
+
+ First fetch all typing events between the given typing stream token (non-inclusive)
+ and the latest typing event stream token (inclusive). Then return only those typing
+ events that the given application service may be interested in.
+
+ Args:
+ service: The application service to check for which events it should receive.
+ new_token: A typing event stream token.
+
+ Returns:
+ A list of JSON dictionaries containing data derived from the typing events that
+ should be sent to the given application service.
+ """
typing_source = self.event_sources.sources.typing
# Get the typing events from just before current
typing, _ = await typing_source.get_new_events_as(
service=service,
# For performance reasons, we don't persist the previous
- # token in the DB and instead fetch the latest typing information
+ # token in the DB and instead fetch the latest typing event
# for appservices.
+ # TODO: It'd likely be more efficient to simply fetch the
+ # typing event with the given 'new_token' stream token and
+ # check if the given service was interested, rather than
+ # iterating over all typing events and only grabbing the
+ # latest few.
from_key=new_token - 1,
)
return typing
async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
+ """
+ Return the latest read receipts that the given application service should receive.
+
+ First fetch all read receipts between the last receipt stream token that this
+ application service should have previously received (non-inclusive) and the
+ latest read receipt stream token (inclusive). Then from that set, return only
+ those read receipts that the given application service may be interested in.
+
+ Args:
+ service: The application service to check for which events it should receive.
+
+ Returns:
+ A list of JSON dictionaries containing data derived from the read receipts that
+ should be sent to the given application service.
+ """
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
@@ -278,6 +332,22 @@ class ApplicationServicesHandler:
async def _handle_presence(
self, service: ApplicationService, users: Collection[Union[str, UserID]]
) -> List[JsonDict]:
+ """
+ Return the latest presence updates that the given application service should receive.
+
+ First, filter the given users list to those that the application service is
+ interested in. Then retrieve the latest presence updates since the
+ the last-known previously received presence stream token for the given
+ application service. Return those presence updates.
+
+ Args:
+ service: The application service that ephemeral events are being sent to.
+ users: The users that should receive the presence update.
+
+ Returns:
+ A list of json dictionaries containing data derived from the presence events
+ that should be sent to the given application service.
+ """
events: List[JsonDict] = []
presence_source = self.event_sources.sources.presence
from_key = await self.store.get_type_stream_id_for_appservice(
@@ -290,9 +360,9 @@ class ApplicationServicesHandler:
interested = await service.is_interested_in_presence(user, self.store)
if not interested:
continue
+
presence_events, _ = await presence_source.get_new_events(
user=user,
- service=service,
from_key=from_key,
)
time_now = self.clock.time_msec()
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 6eafbea25d..68b446eb66 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -454,6 +454,10 @@ class DeviceHandler(DeviceWorkerHandler):
) -> None:
"""Notify that a user's device(s) has changed. Pokes the notifier, and
remote servers if the user is local.
+
+ Args:
+ user_id: The Matrix ID of the user who's device list has been updated.
+ device_ids: The device IDs that have changed.
"""
if not device_ids:
# No changes to notify about, so this is a no-op.
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index b5968e047b..fdab50da37 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -52,7 +52,6 @@ import synapse.metrics
from synapse.api.constants import EventTypes, Membership, PresenceState
from synapse.api.errors import SynapseError
from synapse.api.presence import UserPresenceState
-from synapse.appservice import ApplicationService
from synapse.events.presence_router import PresenceRouter
from synapse.logging.context import run_in_background
from synapse.logging.utils import log_function
@@ -1483,11 +1482,37 @@ def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) ->
def format_user_presence_state(
state: UserPresenceState, now: int, include_user_id: bool = True
) -> JsonDict:
- """Convert UserPresenceState to a format that can be sent down to clients
+ """Convert UserPresenceState to a JSON format that can be sent down to clients
and to other servers.
- The "user_id" is optional so that this function can be used to format presence
- updates for client /sync responses and for federation /send requests.
+ Args:
+ state: The user presence state to format.
+ now: The current timestamp since the epoch in ms.
+ include_user_id: Whether to include `user_id` in the returned dictionary.
+ As this function can be used both to format presence updates for client /sync
+ responses and for federation /send requests, only the latter needs the include
+ the `user_id` field.
+
+ Returns:
+ A JSON dictionary with the following keys:
+ * presence: The presence state as a str.
+ * user_id: Optional. Included if `include_user_id` is truthy. The canonical
+ Matrix ID of the user.
+ * last_active_ago: Optional. Included if `last_active_ts` is set on `state`.
+ The timestamp that the user was last active.
+ * status_msg: Optional. Included if `status_msg` is set on `state`. The user's
+ status.
+ * currently_active: Optional. Included only if `state.state` is "online".
+
+ Example:
+
+ {
+ "presence": "online",
+ "user_id": "@alice:example.com",
+ "last_active_ago": 16783813918,
+ "status_msg": "Hello world!",
+ "currently_active": True
+ }
"""
content: JsonDict = {"presence": state.state}
if include_user_id:
@@ -1526,7 +1551,6 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
is_guest: bool = False,
explicit_room_id: Optional[str] = None,
include_offline: bool = True,
- service: Optional[ApplicationService] = None,
) -> Tuple[List[UserPresenceState], int]:
# The process for getting presence events are:
# 1. Get the rooms the user is in.
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 374e961e3b..4911a11535 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -241,12 +241,18 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
async def get_new_events_as(
self, from_key: int, service: ApplicationService
) -> Tuple[List[JsonDict], int]:
- """Returns a set of new receipt events that an appservice
+ """Returns a set of new read receipt events that an appservice
may be interested in.
Args:
from_key: the stream position at which events should be fetched from
service: The appservice which may be interested
+
+ Returns:
+ A two-tuple containing the following:
+ * A list of json dictionaries derived from read receipts that the
+ appservice may be interested in.
+ * The current read receipt stream token.
"""
from_key = int(from_key)
to_key = self.get_current_key()
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index d10e9b8ec4..c411d69924 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -465,17 +465,23 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
may be interested in.
Args:
- from_key: the stream position at which events should be fetched from
- service: The appservice which may be interested
+ from_key: the stream position at which events should be fetched from.
+ service: The appservice which may be interested.
+
+ Returns:
+ A two-tuple containing the following:
+ * A list of json dictionaries derived from typing events that the
+ appservice may be interested in.
+ * The latest known room serial.
"""
with Measure(self.clock, "typing.get_new_events_as"):
- from_key = int(from_key)
handler = self.get_typing_handler()
events = []
for room_id in handler._room_serials.keys():
if handler._room_serials[room_id] <= from_key:
continue
+
if not await service.matches_user_in_member_list(
room_id, handler.store
):
|