summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erikj@matrix.org>2023-10-25 16:16:19 +0100
committerGitHub <noreply@github.com>2023-10-25 16:16:19 +0100
commitba47fea5286e084ec70d568aa62eb4820b857c47 (patch)
tree6e2c608feb1ea0c23b2b9cc40d11211cc3a10aa5 /synapse/handlers
parentFix tests on Twisted trunk. (#16528) (diff)
downloadsynapse-ba47fea5286e084ec70d568aa62eb4820b857c47.tar.xz
Allow multiple workers to write to receipts stream. (#16432)
Fixes #16417
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/appservice.py42
-rw-r--r--synapse/handlers/initial_sync.py2
-rw-r--r--synapse/handlers/receipts.py19
-rw-r--r--synapse/handlers/sync.py7
4 files changed, 40 insertions, 30 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index c200a45f3a..873dadc3bd 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -47,6 +47,7 @@ from synapse.types import (
     DeviceListUpdates,
     JsonDict,
     JsonMapping,
+    MultiWriterStreamToken,
     RoomAlias,
     RoomStreamToken,
     StreamKeyType,
@@ -217,7 +218,7 @@ class ApplicationServicesHandler:
     def notify_interested_services_ephemeral(
         self,
         stream_key: StreamKeyType,
-        new_token: Union[int, RoomStreamToken],
+        new_token: Union[int, RoomStreamToken, MultiWriterStreamToken],
         users: Collection[Union[str, UserID]],
     ) -> None:
         """
@@ -259,19 +260,6 @@ class ApplicationServicesHandler:
         ):
             return
 
-        # Assert that new_token is an integer (and not a RoomStreamToken).
-        # All of the supported streams that this function handles use an
-        # integer to track progress (rather than a RoomStreamToken - a
-        # vector clock implementation) as they don't support multiple
-        # stream writers.
-        #
-        # As a result, we simply assert that new_token is an integer.
-        # If we do end up needing to pass a RoomStreamToken down here
-        # in the future, using RoomStreamToken.stream (the minimum stream
-        # position) to convert to an ascending integer value should work.
-        # Additional context: https://github.com/matrix-org/synapse/pull/11137
-        assert isinstance(new_token, int)
-
         # Ignore to-device messages if the feature flag is not enabled
         if (
             stream_key == StreamKeyType.TO_DEVICE
@@ -286,6 +274,9 @@ class ApplicationServicesHandler:
         ):
             return
 
+        # We know we're not a `RoomStreamToken` at this point.
+        assert not isinstance(new_token, RoomStreamToken)
+
         # Check whether there are any appservices which have registered to receive
         # ephemeral events.
         #
@@ -327,7 +318,7 @@ class ApplicationServicesHandler:
         self,
         services: List[ApplicationService],
         stream_key: StreamKeyType,
-        new_token: int,
+        new_token: Union[int, MultiWriterStreamToken],
         users: Collection[Union[str, UserID]],
     ) -> None:
         logger.debug("Checking interested services for %s", stream_key)
@@ -340,6 +331,7 @@ class ApplicationServicesHandler:
                     #
                     # Instead we simply grab the latest typing updates in _handle_typing
                     # and, if they apply to this application service, send it off.
+                    assert isinstance(new_token, int)
                     events = await self._handle_typing(service, new_token)
                     if events:
                         self.scheduler.enqueue_for_appservice(service, ephemeral=events)
@@ -350,15 +342,23 @@ class ApplicationServicesHandler:
                     (service.id, stream_key)
                 ):
                     if stream_key == StreamKeyType.RECEIPT:
+                        assert isinstance(new_token, MultiWriterStreamToken)
+
+                        # We store appservice tokens as integers, so we ignore
+                        # the `instance_map` components and instead simply
+                        # follow the base stream position.
+                        new_token = MultiWriterStreamToken(stream=new_token.stream)
+
                         events = await self._handle_receipts(service, new_token)
                         self.scheduler.enqueue_for_appservice(service, ephemeral=events)
 
                         # Persist the latest handled stream token for this appservice
                         await self.store.set_appservice_stream_type_pos(
-                            service, "read_receipt", new_token
+                            service, "read_receipt", new_token.stream
                         )
 
                     elif stream_key == StreamKeyType.PRESENCE:
+                        assert isinstance(new_token, int)
                         events = await self._handle_presence(service, users, new_token)
                         self.scheduler.enqueue_for_appservice(service, ephemeral=events)
 
@@ -368,6 +368,7 @@ class ApplicationServicesHandler:
                         )
 
                     elif stream_key == StreamKeyType.TO_DEVICE:
+                        assert isinstance(new_token, int)
                         # Retrieve a list of to-device message events, as well as the
                         # maximum stream token of the messages we were able to retrieve.
                         to_device_messages = await self._get_to_device_messages(
@@ -383,6 +384,7 @@ class ApplicationServicesHandler:
                         )
 
                     elif stream_key == StreamKeyType.DEVICE_LIST:
+                        assert isinstance(new_token, int)
                         device_list_summary = await self._get_device_list_summary(
                             service, new_token
                         )
@@ -432,7 +434,7 @@ class ApplicationServicesHandler:
         return typing
 
     async def _handle_receipts(
-        self, service: ApplicationService, new_token: int
+        self, service: ApplicationService, new_token: MultiWriterStreamToken
     ) -> List[JsonMapping]:
         """
         Return the latest read receipts that the given application service should receive.
@@ -455,15 +457,17 @@ class ApplicationServicesHandler:
         from_key = await self.store.get_type_stream_id_for_appservice(
             service, "read_receipt"
         )
-        if new_token is not None and new_token <= from_key:
+        if new_token is not None and new_token.stream <= from_key:
             logger.debug(
                 "Rejecting token lower than or equal to stored: %s" % (new_token,)
             )
             return []
 
+        from_token = MultiWriterStreamToken(stream=from_key)
+
         receipts_source = self.event_sources.sources.receipt
         receipts, _ = await receipts_source.get_new_events_as(
-            service=service, from_key=from_key, to_key=new_token
+            service=service, from_key=from_token, to_key=new_token
         )
         return receipts
 
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index c34bd7db95..b1d8be866f 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -145,7 +145,7 @@ class InitialSyncHandler:
         joined_rooms = [r.room_id for r in room_list if r.membership == Membership.JOIN]
         receipt = await self.store.get_linearized_receipts_for_rooms(
             joined_rooms,
-            to_key=int(now_token.receipt_key),
+            to_key=now_token.receipt_key,
         )
 
         receipt = ReceiptEventSource.filter_out_private_receipts(receipt, user_id)
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 69ac468f75..b5f7a8b47e 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -20,6 +20,7 @@ from synapse.streams import EventSource
 from synapse.types import (
     JsonDict,
     JsonMapping,
+    MultiWriterStreamToken,
     ReadReceipt,
     StreamKeyType,
     UserID,
@@ -200,7 +201,7 @@ class ReceiptsHandler:
             await self.federation_sender.send_read_receipt(receipt)
 
 
-class ReceiptEventSource(EventSource[int, JsonMapping]):
+class ReceiptEventSource(EventSource[MultiWriterStreamToken, JsonMapping]):
     def __init__(self, hs: "HomeServer"):
         self.store = hs.get_datastores().main
         self.config = hs.config
@@ -273,13 +274,12 @@ class ReceiptEventSource(EventSource[int, JsonMapping]):
     async def get_new_events(
         self,
         user: UserID,
-        from_key: int,
+        from_key: MultiWriterStreamToken,
         limit: int,
         room_ids: Iterable[str],
         is_guest: bool,
         explicit_room_id: Optional[str] = None,
-    ) -> Tuple[List[JsonMapping], int]:
-        from_key = int(from_key)
+    ) -> Tuple[List[JsonMapping], MultiWriterStreamToken]:
         to_key = self.get_current_key()
 
         if from_key == to_key:
@@ -296,8 +296,11 @@ class ReceiptEventSource(EventSource[int, JsonMapping]):
         return events, to_key
 
     async def get_new_events_as(
-        self, from_key: int, to_key: int, service: ApplicationService
-    ) -> Tuple[List[JsonMapping], int]:
+        self,
+        from_key: MultiWriterStreamToken,
+        to_key: MultiWriterStreamToken,
+        service: ApplicationService,
+    ) -> Tuple[List[JsonMapping], MultiWriterStreamToken]:
         """Returns a set of new read receipt events that an appservice
         may be interested in.
 
@@ -312,8 +315,6 @@ class ReceiptEventSource(EventSource[int, JsonMapping]):
                   appservice may be interested in.
                 * The current read receipt stream token.
         """
-        from_key = int(from_key)
-
         if from_key == to_key:
             return [], to_key
 
@@ -333,5 +334,5 @@ class ReceiptEventSource(EventSource[int, JsonMapping]):
 
         return events, to_key
 
-    def get_current_key(self) -> int:
+    def get_current_key(self) -> MultiWriterStreamToken:
         return self.store.get_max_receipt_stream_id()
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index f131c0e8e0..f75c1548ca 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -57,6 +57,7 @@ from synapse.types import (
     DeviceListUpdates,
     JsonDict,
     JsonMapping,
+    MultiWriterStreamToken,
     MutableStateMap,
     Requester,
     RoomStreamToken,
@@ -477,7 +478,11 @@ class SyncHandler:
                 event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
                 ephemeral_by_room.setdefault(room_id, []).append(event_copy)
 
-            receipt_key = since_token.receipt_key if since_token else 0
+            receipt_key = (
+                since_token.receipt_key
+                if since_token
+                else MultiWriterStreamToken(stream=0)
+            )
 
             receipt_source = self.event_sources.sources.receipt
             receipts, receipt_key = await receipt_source.get_new_events(