diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 7de7bd3289..873dadc3bd 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -47,6 +47,7 @@ from synapse.types import (
DeviceListUpdates,
JsonDict,
JsonMapping,
+ MultiWriterStreamToken,
RoomAlias,
RoomStreamToken,
StreamKeyType,
@@ -216,8 +217,8 @@ class ApplicationServicesHandler:
def notify_interested_services_ephemeral(
self,
- stream_key: str,
- new_token: Union[int, RoomStreamToken],
+ stream_key: StreamKeyType,
+ new_token: Union[int, RoomStreamToken, MultiWriterStreamToken],
users: Collection[Union[str, UserID]],
) -> None:
"""
@@ -259,19 +260,6 @@ class ApplicationServicesHandler:
):
return
- # Assert that new_token is an integer (and not a RoomStreamToken).
- # All of the supported streams that this function handles use an
- # integer to track progress (rather than a RoomStreamToken - a
- # vector clock implementation) as they don't support multiple
- # stream writers.
- #
- # As a result, we simply assert that new_token is an integer.
- # If we do end up needing to pass a RoomStreamToken down here
- # in the future, using RoomStreamToken.stream (the minimum stream
- # position) to convert to an ascending integer value should work.
- # Additional context: https://github.com/matrix-org/synapse/pull/11137
- assert isinstance(new_token, int)
-
# Ignore to-device messages if the feature flag is not enabled
if (
stream_key == StreamKeyType.TO_DEVICE
@@ -286,6 +274,9 @@ class ApplicationServicesHandler:
):
return
+ # We know we're not a `RoomStreamToken` at this point.
+ assert not isinstance(new_token, RoomStreamToken)
+
# Check whether there are any appservices which have registered to receive
# ephemeral events.
#
@@ -326,8 +317,8 @@ class ApplicationServicesHandler:
async def _notify_interested_services_ephemeral(
self,
services: List[ApplicationService],
- stream_key: str,
- new_token: int,
+ stream_key: StreamKeyType,
+ new_token: Union[int, MultiWriterStreamToken],
users: Collection[Union[str, UserID]],
) -> None:
logger.debug("Checking interested services for %s", stream_key)
@@ -340,6 +331,7 @@ class ApplicationServicesHandler:
#
# Instead we simply grab the latest typing updates in _handle_typing
# and, if they apply to this application service, send it off.
+ assert isinstance(new_token, int)
events = await self._handle_typing(service, new_token)
if events:
self.scheduler.enqueue_for_appservice(service, ephemeral=events)
@@ -350,15 +342,23 @@ class ApplicationServicesHandler:
(service.id, stream_key)
):
if stream_key == StreamKeyType.RECEIPT:
+ assert isinstance(new_token, MultiWriterStreamToken)
+
+ # We store appservice tokens as integers, so we ignore
+ # the `instance_map` components and instead simply
+ # follow the base stream position.
+ new_token = MultiWriterStreamToken(stream=new_token.stream)
+
events = await self._handle_receipts(service, new_token)
self.scheduler.enqueue_for_appservice(service, ephemeral=events)
# Persist the latest handled stream token for this appservice
await self.store.set_appservice_stream_type_pos(
- service, "read_receipt", new_token
+ service, "read_receipt", new_token.stream
)
elif stream_key == StreamKeyType.PRESENCE:
+ assert isinstance(new_token, int)
events = await self._handle_presence(service, users, new_token)
self.scheduler.enqueue_for_appservice(service, ephemeral=events)
@@ -368,6 +368,7 @@ class ApplicationServicesHandler:
)
elif stream_key == StreamKeyType.TO_DEVICE:
+ assert isinstance(new_token, int)
# Retrieve a list of to-device message events, as well as the
# maximum stream token of the messages we were able to retrieve.
to_device_messages = await self._get_to_device_messages(
@@ -383,6 +384,7 @@ class ApplicationServicesHandler:
)
elif stream_key == StreamKeyType.DEVICE_LIST:
+ assert isinstance(new_token, int)
device_list_summary = await self._get_device_list_summary(
service, new_token
)
@@ -432,7 +434,7 @@ class ApplicationServicesHandler:
return typing
async def _handle_receipts(
- self, service: ApplicationService, new_token: int
+ self, service: ApplicationService, new_token: MultiWriterStreamToken
) -> List[JsonMapping]:
"""
Return the latest read receipts that the given application service should receive.
@@ -455,15 +457,17 @@ class ApplicationServicesHandler:
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
- if new_token is not None and new_token <= from_key:
+ if new_token is not None and new_token.stream <= from_key:
logger.debug(
"Rejecting token lower than or equal to stored: %s" % (new_token,)
)
return []
+ from_token = MultiWriterStreamToken(stream=from_key)
+
receipts_source = self.event_sources.sources.receipt
receipts, _ = await receipts_source.get_new_events_as(
- service=service, from_key=from_key, to_key=new_token
+ service=service, from_key=from_token, to_key=new_token
)
return receipts
|