summary refs log tree commit diff
path: root/synapse/handlers/receipts.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@matrix.org>2023-10-25 16:16:19 +0100
committerGitHub <noreply@github.com>2023-10-25 16:16:19 +0100
commitba47fea5286e084ec70d568aa62eb4820b857c47 (patch)
tree6e2c608feb1ea0c23b2b9cc40d11211cc3a10aa5 /synapse/handlers/receipts.py
parentFix tests on Twisted trunk. (#16528) (diff)
downloadsynapse-ba47fea5286e084ec70d568aa62eb4820b857c47.tar.xz
Allow multiple workers to write to receipts stream. (#16432)
Fixes #16417
Diffstat (limited to 'synapse/handlers/receipts.py')
-rw-r--r--synapse/handlers/receipts.py19
1 files changed, 10 insertions, 9 deletions
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 69ac468f75..b5f7a8b47e 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -20,6 +20,7 @@ from synapse.streams import EventSource
 from synapse.types import (
     JsonDict,
     JsonMapping,
+    MultiWriterStreamToken,
     ReadReceipt,
     StreamKeyType,
     UserID,
@@ -200,7 +201,7 @@ class ReceiptsHandler:
             await self.federation_sender.send_read_receipt(receipt)
 
 
-class ReceiptEventSource(EventSource[int, JsonMapping]):
+class ReceiptEventSource(EventSource[MultiWriterStreamToken, JsonMapping]):
     def __init__(self, hs: "HomeServer"):
         self.store = hs.get_datastores().main
         self.config = hs.config
@@ -273,13 +274,12 @@ class ReceiptEventSource(EventSource[int, JsonMapping]):
     async def get_new_events(
         self,
         user: UserID,
-        from_key: int,
+        from_key: MultiWriterStreamToken,
         limit: int,
         room_ids: Iterable[str],
         is_guest: bool,
         explicit_room_id: Optional[str] = None,
-    ) -> Tuple[List[JsonMapping], int]:
-        from_key = int(from_key)
+    ) -> Tuple[List[JsonMapping], MultiWriterStreamToken]:
         to_key = self.get_current_key()
 
         if from_key == to_key:
@@ -296,8 +296,11 @@ class ReceiptEventSource(EventSource[int, JsonMapping]):
         return events, to_key
 
     async def get_new_events_as(
-        self, from_key: int, to_key: int, service: ApplicationService
-    ) -> Tuple[List[JsonMapping], int]:
+        self,
+        from_key: MultiWriterStreamToken,
+        to_key: MultiWriterStreamToken,
+        service: ApplicationService,
+    ) -> Tuple[List[JsonMapping], MultiWriterStreamToken]:
         """Returns a set of new read receipt events that an appservice
         may be interested in.
 
@@ -312,8 +315,6 @@ class ReceiptEventSource(EventSource[int, JsonMapping]):
                   appservice may be interested in.
                 * The current read receipt stream token.
         """
-        from_key = int(from_key)
-
         if from_key == to_key:
             return [], to_key
 
@@ -333,5 +334,5 @@ class ReceiptEventSource(EventSource[int, JsonMapping]):
 
         return events, to_key
 
-    def get_current_key(self) -> int:
+    def get_current_key(self) -> MultiWriterStreamToken:
         return self.store.get_max_receipt_stream_id()