diff options
author | Erik Johnston <erikj@matrix.org> | 2023-10-25 16:16:19 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-10-25 16:16:19 +0100 |
commit | ba47fea5286e084ec70d568aa62eb4820b857c47 (patch) | |
tree | 6e2c608feb1ea0c23b2b9cc40d11211cc3a10aa5 /synapse/handlers/receipts.py | |
parent | Fix tests on Twisted trunk. (#16528) (diff) | |
download | synapse-ba47fea5286e084ec70d568aa62eb4820b857c47.tar.xz |
Allow multiple workers to write to receipts stream. (#16432)
Fixes #16417
Diffstat (limited to 'synapse/handlers/receipts.py')
-rw-r--r-- | synapse/handlers/receipts.py | 19 |
1 files changed, 10 insertions, 9 deletions
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 69ac468f75..b5f7a8b47e 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -20,6 +20,7 @@ from synapse.streams import EventSource from synapse.types import ( JsonDict, JsonMapping, + MultiWriterStreamToken, ReadReceipt, StreamKeyType, UserID, @@ -200,7 +201,7 @@ class ReceiptsHandler: await self.federation_sender.send_read_receipt(receipt) -class ReceiptEventSource(EventSource[int, JsonMapping]): +class ReceiptEventSource(EventSource[MultiWriterStreamToken, JsonMapping]): def __init__(self, hs: "HomeServer"): self.store = hs.get_datastores().main self.config = hs.config @@ -273,13 +274,12 @@ class ReceiptEventSource(EventSource[int, JsonMapping]): async def get_new_events( self, user: UserID, - from_key: int, + from_key: MultiWriterStreamToken, limit: int, room_ids: Iterable[str], is_guest: bool, explicit_room_id: Optional[str] = None, - ) -> Tuple[List[JsonMapping], int]: - from_key = int(from_key) + ) -> Tuple[List[JsonMapping], MultiWriterStreamToken]: to_key = self.get_current_key() if from_key == to_key: @@ -296,8 +296,11 @@ class ReceiptEventSource(EventSource[int, JsonMapping]): return events, to_key async def get_new_events_as( - self, from_key: int, to_key: int, service: ApplicationService - ) -> Tuple[List[JsonMapping], int]: + self, + from_key: MultiWriterStreamToken, + to_key: MultiWriterStreamToken, + service: ApplicationService, + ) -> Tuple[List[JsonMapping], MultiWriterStreamToken]: """Returns a set of new read receipt events that an appservice may be interested in. @@ -312,8 +315,6 @@ class ReceiptEventSource(EventSource[int, JsonMapping]): appservice may be interested in. * The current read receipt stream token. """ - from_key = int(from_key) - if from_key == to_key: return [], to_key @@ -333,5 +334,5 @@ class ReceiptEventSource(EventSource[int, JsonMapping]): return events, to_key - def get_current_key(self) -> int: + def get_current_key(self) -> MultiWriterStreamToken: return self.store.get_max_receipt_stream_id() |