diff options
Diffstat (limited to 'synapse/handlers/appservice.py')
-rw-r--r-- | synapse/handlers/appservice.py | 22 |
1 files changed, 17 insertions, 5 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 36c206dae6..67f8ffcaff 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -182,7 +182,7 @@ class ApplicationServicesHandler: def notify_interested_services_ephemeral( self, stream_key: str, - new_token: Optional[int], + new_token: Union[int, RoomStreamToken], users: Optional[Collection[Union[str, UserID]]] = None, ) -> None: """ @@ -203,7 +203,7 @@ class ApplicationServicesHandler: Appservices will only receive ephemeral events that fall within their registered user and room namespaces. - new_token: The latest stream token. + new_token: The stream token of the event. users: The users that should be informed of the new event, if any. """ if not self.notify_appservices: @@ -212,6 +212,19 @@ class ApplicationServicesHandler: if stream_key not in ("typing_key", "receipt_key", "presence_key"): return + # Assert that new_token is an integer (and not a RoomStreamToken). + # All of the supported streams that this function handles use an + # integer to track progress (rather than a RoomStreamToken - a + # vector clock implementation) as they don't support multiple + # stream writers. + # + # As a result, we simply assert that new_token is an integer. + # If we do end up needing to pass a RoomStreamToken down here + # in the future, using RoomStreamToken.stream (the minimum stream + # position) to convert to an ascending integer value should work. + # Additional context: https://github.com/matrix-org/synapse/pull/11137 + assert isinstance(new_token, int) + services = [ service for service in self.store.get_app_services() @@ -231,14 +244,13 @@ class ApplicationServicesHandler: self, services: List[ApplicationService], stream_key: str, - new_token: Optional[int], + new_token: int, users: Collection[Union[str, UserID]], ) -> None: logger.debug("Checking interested services for %s" % (stream_key)) with Measure(self.clock, "notify_interested_services_ephemeral"): for service in services: - # Only handle typing if we have the latest token - if stream_key == "typing_key" and new_token is not None: + if stream_key == "typing_key": # Note that we don't persist the token (via set_type_stream_id_for_appservice) # for typing_key due to performance reasons and due to their highly # ephemeral nature. |