summary refs log tree commit diff
path: root/synapse/handlers/appservice.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/appservice.py')
-rw-r--r--synapse/handlers/appservice.py94
1 files changed, 82 insertions, 12 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 163278708c..36c206dae6 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -185,19 +185,26 @@ class ApplicationServicesHandler:
         new_token: Optional[int],
         users: Optional[Collection[Union[str, UserID]]] = None,
     ) -> None:
-        """This is called by the notifier in the background
-        when a ephemeral event handled by the homeserver.
-
-        This will determine which appservices
-        are interested in the event, and submit them.
+        """
+        This is called by the notifier in the background when an ephemeral event is handled
+        by the homeserver.
 
-        Events will only be pushed to appservices
-        that have opted into ephemeral events
+        This will determine which appservices are interested in the event, and submit them.
 
         Args:
             stream_key: The stream the event came from.
-            new_token: The latest stream token
-            users: The user(s) involved with the event.
+
+                `stream_key` can be "typing_key", "receipt_key" or "presence_key". Any other
+                value for `stream_key` will cause this function to return early.
+
+                Ephemeral events will only be pushed to appservices that have opted into
+                them.
+
+                Appservices will only receive ephemeral events that fall within their
+                registered user and room namespaces.
+
+            new_token: The latest stream token.
+            users: The users that should be informed of the new event, if any.
         """
         if not self.notify_appservices:
             return
@@ -232,21 +239,32 @@ class ApplicationServicesHandler:
             for service in services:
                 # Only handle typing if we have the latest token
                 if stream_key == "typing_key" and new_token is not None:
+                    # Note that we don't persist the token (via set_type_stream_id_for_appservice)
+                    # for typing_key due to performance reasons and due to their highly
+                    # ephemeral nature.
+                    #
+                    # Instead we simply grab the latest typing updates in _handle_typing
+                    # and, if they apply to this application service, send it off.
                     events = await self._handle_typing(service, new_token)
                     if events:
                         self.scheduler.submit_ephemeral_events_for_as(service, events)
-                    # We don't persist the token for typing_key for performance reasons
+
                 elif stream_key == "receipt_key":
                     events = await self._handle_receipts(service)
                     if events:
                         self.scheduler.submit_ephemeral_events_for_as(service, events)
+
+                    # Persist the latest handled stream token for this appservice
                     await self.store.set_type_stream_id_for_appservice(
                         service, "read_receipt", new_token
                     )
+
                 elif stream_key == "presence_key":
                     events = await self._handle_presence(service, users)
                     if events:
                         self.scheduler.submit_ephemeral_events_for_as(service, events)
+
+                    # Persist the latest handled stream token for this appservice
                     await self.store.set_type_stream_id_for_appservice(
                         service, "presence", new_token
                     )
@@ -254,18 +272,54 @@ class ApplicationServicesHandler:
     async def _handle_typing(
         self, service: ApplicationService, new_token: int
     ) -> List[JsonDict]:
+        """
+        Return the typing events since the given stream token that the given application
+        service should receive.
+
+        First fetch all typing events between the given typing stream token (non-inclusive)
+        and the latest typing event stream token (inclusive). Then return only those typing
+        events that the given application service may be interested in.
+
+        Args:
+            service: The application service to check for which events it should receive.
+            new_token: A typing event stream token.
+
+        Returns:
+            A list of JSON dictionaries containing data derived from the typing events that
+            should be sent to the given application service.
+        """
         typing_source = self.event_sources.sources.typing
         # Get the typing events from just before current
         typing, _ = await typing_source.get_new_events_as(
             service=service,
             # For performance reasons, we don't persist the previous
-            # token in the DB and instead fetch the latest typing information
+            # token in the DB and instead fetch the latest typing event
             # for appservices.
+            # TODO: It'd likely be more efficient to simply fetch the
+            #  typing event with the given 'new_token' stream token and
+            #  check if the given service was interested, rather than
+            #  iterating over all typing events and only grabbing the
+            #  latest few.
             from_key=new_token - 1,
         )
         return typing
 
     async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
+        """
+        Return the latest read receipts that the given application service should receive.
+
+        First fetch all read receipts between the last receipt stream token that this
+        application service should have previously received (non-inclusive) and the
+        latest read receipt stream token (inclusive). Then from that set, return only
+        those read receipts that the given application service may be interested in.
+
+        Args:
+            service: The application service to check for which events it should receive.
+
+        Returns:
+            A list of JSON dictionaries containing data derived from the read receipts that
+            should be sent to the given application service.
+        """
         from_key = await self.store.get_type_stream_id_for_appservice(
             service, "read_receipt"
         )
@@ -278,6 +332,22 @@ class ApplicationServicesHandler:
     async def _handle_presence(
         self, service: ApplicationService, users: Collection[Union[str, UserID]]
     ) -> List[JsonDict]:
+        """
+        Return the latest presence updates that the given application service should receive.
+
+        First, filter the given users list to those that the application service is
+        interested in. Then retrieve the latest presence updates since the
+        the last-known previously received presence stream token for the given
+        application service. Return those presence updates.
+
+        Args:
+            service: The application service that ephemeral events are being sent to.
+            users: The users that should receive the presence update.
+
+        Returns:
+            A list of json dictionaries containing data derived from the presence events
+            that should be sent to the given application service.
+        """
         events: List[JsonDict] = []
         presence_source = self.event_sources.sources.presence
         from_key = await self.store.get_type_stream_id_for_appservice(
@@ -290,9 +360,9 @@ class ApplicationServicesHandler:
             interested = await service.is_interested_in_presence(user, self.store)
             if not interested:
                 continue
+
             presence_events, _ = await presence_source.get_new_events(
                 user=user,
-                service=service,
                 from_key=from_key,
             )
             time_now = self.clock.time_msec()