diff options
Diffstat (limited to 'synapse/handlers/appservice.py')
-rw-r--r-- | synapse/handlers/appservice.py | 42 |
1 files changed, 23 insertions, 19 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index c200a45f3a..873dadc3bd 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -47,6 +47,7 @@ from synapse.types import ( DeviceListUpdates, JsonDict, JsonMapping, + MultiWriterStreamToken, RoomAlias, RoomStreamToken, StreamKeyType, @@ -217,7 +218,7 @@ class ApplicationServicesHandler: def notify_interested_services_ephemeral( self, stream_key: StreamKeyType, - new_token: Union[int, RoomStreamToken], + new_token: Union[int, RoomStreamToken, MultiWriterStreamToken], users: Collection[Union[str, UserID]], ) -> None: """ @@ -259,19 +260,6 @@ class ApplicationServicesHandler: ): return - # Assert that new_token is an integer (and not a RoomStreamToken). - # All of the supported streams that this function handles use an - # integer to track progress (rather than a RoomStreamToken - a - # vector clock implementation) as they don't support multiple - # stream writers. - # - # As a result, we simply assert that new_token is an integer. - # If we do end up needing to pass a RoomStreamToken down here - # in the future, using RoomStreamToken.stream (the minimum stream - # position) to convert to an ascending integer value should work. - # Additional context: https://github.com/matrix-org/synapse/pull/11137 - assert isinstance(new_token, int) - # Ignore to-device messages if the feature flag is not enabled if ( stream_key == StreamKeyType.TO_DEVICE @@ -286,6 +274,9 @@ class ApplicationServicesHandler: ): return + # We know we're not a `RoomStreamToken` at this point. + assert not isinstance(new_token, RoomStreamToken) + # Check whether there are any appservices which have registered to receive # ephemeral events. # @@ -327,7 +318,7 @@ class ApplicationServicesHandler: self, services: List[ApplicationService], stream_key: StreamKeyType, - new_token: int, + new_token: Union[int, MultiWriterStreamToken], users: Collection[Union[str, UserID]], ) -> None: logger.debug("Checking interested services for %s", stream_key) @@ -340,6 +331,7 @@ class ApplicationServicesHandler: # # Instead we simply grab the latest typing updates in _handle_typing # and, if they apply to this application service, send it off. + assert isinstance(new_token, int) events = await self._handle_typing(service, new_token) if events: self.scheduler.enqueue_for_appservice(service, ephemeral=events) @@ -350,15 +342,23 @@ class ApplicationServicesHandler: (service.id, stream_key) ): if stream_key == StreamKeyType.RECEIPT: + assert isinstance(new_token, MultiWriterStreamToken) + + # We store appservice tokens as integers, so we ignore + # the `instance_map` components and instead simply + # follow the base stream position. + new_token = MultiWriterStreamToken(stream=new_token.stream) + events = await self._handle_receipts(service, new_token) self.scheduler.enqueue_for_appservice(service, ephemeral=events) # Persist the latest handled stream token for this appservice await self.store.set_appservice_stream_type_pos( - service, "read_receipt", new_token + service, "read_receipt", new_token.stream ) elif stream_key == StreamKeyType.PRESENCE: + assert isinstance(new_token, int) events = await self._handle_presence(service, users, new_token) self.scheduler.enqueue_for_appservice(service, ephemeral=events) @@ -368,6 +368,7 @@ class ApplicationServicesHandler: ) elif stream_key == StreamKeyType.TO_DEVICE: + assert isinstance(new_token, int) # Retrieve a list of to-device message events, as well as the # maximum stream token of the messages we were able to retrieve. to_device_messages = await self._get_to_device_messages( @@ -383,6 +384,7 @@ class ApplicationServicesHandler: ) elif stream_key == StreamKeyType.DEVICE_LIST: + assert isinstance(new_token, int) device_list_summary = await self._get_device_list_summary( service, new_token ) @@ -432,7 +434,7 @@ class ApplicationServicesHandler: return typing async def _handle_receipts( - self, service: ApplicationService, new_token: int + self, service: ApplicationService, new_token: MultiWriterStreamToken ) -> List[JsonMapping]: """ Return the latest read receipts that the given application service should receive. @@ -455,15 +457,17 @@ class ApplicationServicesHandler: from_key = await self.store.get_type_stream_id_for_appservice( service, "read_receipt" ) - if new_token is not None and new_token <= from_key: + if new_token is not None and new_token.stream <= from_key: logger.debug( "Rejecting token lower than or equal to stored: %s" % (new_token,) ) return [] + from_token = MultiWriterStreamToken(stream=from_key) + receipts_source = self.event_sources.sources.receipt receipts, _ = await receipts_source.get_new_events_as( - service=service, from_key=from_key, to_key=new_token + service=service, from_key=from_token, to_key=new_token ) return receipts |