summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/appservice.py22
-rw-r--r--synapse/notifier.py38
-rw-r--r--synapse/storage/databases/main/devices.py4
-rw-r--r--synapse/storage/databases/main/presence.py2
4 files changed, 29 insertions, 37 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 36c206dae6..67f8ffcaff 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -182,7 +182,7 @@ class ApplicationServicesHandler:
     def notify_interested_services_ephemeral(
         self,
         stream_key: str,
-        new_token: Optional[int],
+        new_token: Union[int, RoomStreamToken],
         users: Optional[Collection[Union[str, UserID]]] = None,
     ) -> None:
         """
@@ -203,7 +203,7 @@ class ApplicationServicesHandler:
                 Appservices will only receive ephemeral events that fall within their
                 registered user and room namespaces.
 
-            new_token: The latest stream token.
+            new_token: The stream token of the event.
             users: The users that should be informed of the new event, if any.
         """
         if not self.notify_appservices:
@@ -212,6 +212,19 @@ class ApplicationServicesHandler:
         if stream_key not in ("typing_key", "receipt_key", "presence_key"):
             return
 
+        # Assert that new_token is an integer (and not a RoomStreamToken).
+        # All of the supported streams that this function handles use an
+        # integer to track progress (rather than a RoomStreamToken - a
+        # vector clock implementation) as they don't support multiple
+        # stream writers.
+        #
+        # As a result, we simply assert that new_token is an integer.
+        # If we do end up needing to pass a RoomStreamToken down here
+        # in the future, using RoomStreamToken.stream (the minimum stream
+        # position) to convert to an ascending integer value should work.
+        # Additional context: https://github.com/matrix-org/synapse/pull/11137
+        assert isinstance(new_token, int)
+
         services = [
             service
             for service in self.store.get_app_services()
@@ -231,14 +244,13 @@ class ApplicationServicesHandler:
         self,
         services: List[ApplicationService],
         stream_key: str,
-        new_token: Optional[int],
+        new_token: int,
         users: Collection[Union[str, UserID]],
     ) -> None:
         logger.debug("Checking interested services for %s" % (stream_key))
         with Measure(self.clock, "notify_interested_services_ephemeral"):
             for service in services:
-                # Only handle typing if we have the latest token
-                if stream_key == "typing_key" and new_token is not None:
+                if stream_key == "typing_key":
                     # Note that we don't persist the token (via set_type_stream_id_for_appservice)
                     # for typing_key due to performance reasons and due to their highly
                     # ephemeral nature.
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 1882fffd2a..60e5409895 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -383,29 +383,6 @@ class Notifier:
         except Exception:
             logger.exception("Error notifying application services of event")
 
-    def _notify_app_services_ephemeral(
-        self,
-        stream_key: str,
-        new_token: Union[int, RoomStreamToken],
-        users: Optional[Collection[Union[str, UserID]]] = None,
-    ) -> None:
-        """Notify application services of ephemeral event activity.
-
-        Args:
-            stream_key: The stream the event came from.
-            new_token: The value of the new stream token.
-            users: The users that should be informed of the new event, if any.
-        """
-        try:
-            stream_token = None
-            if isinstance(new_token, int):
-                stream_token = new_token
-            self.appservice_handler.notify_interested_services_ephemeral(
-                stream_key, stream_token, users or []
-            )
-        except Exception:
-            logger.exception("Error notifying application services of event")
-
     def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
         try:
             self._pusher_pool.on_new_notifications(max_room_stream_token)
@@ -467,12 +444,15 @@ class Notifier:
 
             self.notify_replication()
 
-            # Notify appservices
-            self._notify_app_services_ephemeral(
-                stream_key,
-                new_token,
-                users,
-            )
+            # Notify appservices.
+            try:
+                self.appservice_handler.notify_interested_services_ephemeral(
+                    stream_key,
+                    new_token,
+                    users,
+                )
+            except Exception:
+                logger.exception("Error notifying application services of event")
 
     def on_new_replication_data(self) -> None:
         """Used to inform replication listeners that something has happened
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index b15cd030e0..9ccc66e589 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -427,7 +427,7 @@ class DeviceWorkerStore(SQLBaseStore):
             user_ids: the users who were signed
 
         Returns:
-            THe new stream ID.
+            The new stream ID.
         """
 
         async with self._device_list_id_gen.get_next() as stream_id:
@@ -1322,7 +1322,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
 
     async def add_device_change_to_streams(
         self, user_id: str, device_ids: Collection[str], hosts: List[str]
-    ):
+    ) -> int:
         """Persist that a user's devices have been updated, and which hosts
         (if any) should be poked.
         """
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index 12cf6995eb..cc0eebdb46 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -92,7 +92,7 @@ class PresenceStore(PresenceBackgroundUpdateStore):
             prefilled_cache=presence_cache_prefill,
         )
 
-    async def update_presence(self, presence_states):
+    async def update_presence(self, presence_states) -> Tuple[int, int]:
         assert self._can_persist_presence
 
         stream_ordering_manager = self._presence_id_gen.get_next_mult(