diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 36c206dae6..67f8ffcaff 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -182,7 +182,7 @@ class ApplicationServicesHandler:
def notify_interested_services_ephemeral(
self,
stream_key: str,
- new_token: Optional[int],
+ new_token: Union[int, RoomStreamToken],
users: Optional[Collection[Union[str, UserID]]] = None,
) -> None:
"""
@@ -203,7 +203,7 @@ class ApplicationServicesHandler:
Appservices will only receive ephemeral events that fall within their
registered user and room namespaces.
- new_token: The latest stream token.
+ new_token: The stream token of the event.
users: The users that should be informed of the new event, if any.
"""
if not self.notify_appservices:
@@ -212,6 +212,19 @@ class ApplicationServicesHandler:
if stream_key not in ("typing_key", "receipt_key", "presence_key"):
return
+ # Assert that new_token is an integer (and not a RoomStreamToken).
+ # All of the supported streams that this function handles use an
+ # integer to track progress (rather than a RoomStreamToken - a
+ # vector clock implementation) as they don't support multiple
+ # stream writers.
+ #
+ # As a result, we simply assert that new_token is an integer.
+ # If we do end up needing to pass a RoomStreamToken down here
+ # in the future, using RoomStreamToken.stream (the minimum stream
+ # position) to convert to an ascending integer value should work.
+ # Additional context: https://github.com/matrix-org/synapse/pull/11137
+ assert isinstance(new_token, int)
+
services = [
service
for service in self.store.get_app_services()
@@ -231,14 +244,13 @@ class ApplicationServicesHandler:
self,
services: List[ApplicationService],
stream_key: str,
- new_token: Optional[int],
+ new_token: int,
users: Collection[Union[str, UserID]],
) -> None:
logger.debug("Checking interested services for %s" % (stream_key))
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
- # Only handle typing if we have the latest token
- if stream_key == "typing_key" and new_token is not None:
+ if stream_key == "typing_key":
# Note that we don't persist the token (via set_type_stream_id_for_appservice)
# for typing_key due to performance reasons and due to their highly
# ephemeral nature.
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 1882fffd2a..60e5409895 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -383,29 +383,6 @@ class Notifier:
except Exception:
logger.exception("Error notifying application services of event")
- def _notify_app_services_ephemeral(
- self,
- stream_key: str,
- new_token: Union[int, RoomStreamToken],
- users: Optional[Collection[Union[str, UserID]]] = None,
- ) -> None:
- """Notify application services of ephemeral event activity.
-
- Args:
- stream_key: The stream the event came from.
- new_token: The value of the new stream token.
- users: The users that should be informed of the new event, if any.
- """
- try:
- stream_token = None
- if isinstance(new_token, int):
- stream_token = new_token
- self.appservice_handler.notify_interested_services_ephemeral(
- stream_key, stream_token, users or []
- )
- except Exception:
- logger.exception("Error notifying application services of event")
-
def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
try:
self._pusher_pool.on_new_notifications(max_room_stream_token)
@@ -467,12 +444,15 @@ class Notifier:
self.notify_replication()
- # Notify appservices
- self._notify_app_services_ephemeral(
- stream_key,
- new_token,
- users,
- )
+ # Notify appservices.
+ try:
+ self.appservice_handler.notify_interested_services_ephemeral(
+ stream_key,
+ new_token,
+ users,
+ )
+ except Exception:
+ logger.exception("Error notifying application services of event")
def on_new_replication_data(self) -> None:
"""Used to inform replication listeners that something has happened
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index b15cd030e0..9ccc66e589 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -427,7 +427,7 @@ class DeviceWorkerStore(SQLBaseStore):
user_ids: the users who were signed
Returns:
- THe new stream ID.
+ The new stream ID.
"""
async with self._device_list_id_gen.get_next() as stream_id:
@@ -1322,7 +1322,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
async def add_device_change_to_streams(
self, user_id: str, device_ids: Collection[str], hosts: List[str]
- ):
+ ) -> int:
"""Persist that a user's devices have been updated, and which hosts
(if any) should be poked.
"""
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index 12cf6995eb..cc0eebdb46 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -92,7 +92,7 @@ class PresenceStore(PresenceBackgroundUpdateStore):
prefilled_cache=presence_cache_prefill,
)
- async def update_presence(self, presence_states):
+ async def update_presence(self, presence_states) -> Tuple[int, int]:
assert self._can_persist_presence
stream_ordering_manager = self._presence_id_gen.get_next_mult(
|