From 80ec81dcc54bdb823b95c2f870a919868de9a481 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Oct 2023 18:28:40 +0300 Subject: Some refactors around receipts stream (#16426) --- synapse/notifier.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) (limited to 'synapse/notifier.py') diff --git a/synapse/notifier.py b/synapse/notifier.py index fc39e5c963..99e7715896 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -126,7 +126,7 @@ class _NotifierUserStream: def notify( self, - stream_key: str, + stream_key: StreamKeyType, stream_id: Union[int, RoomStreamToken], time_now_ms: int, ) -> None: @@ -454,7 +454,7 @@ class Notifier: def on_new_event( self, - stream_key: str, + stream_key: StreamKeyType, new_token: Union[int, RoomStreamToken], users: Optional[Collection[Union[str, UserID]]] = None, rooms: Optional[StrCollection] = None, @@ -655,30 +655,29 @@ class Notifier: events: List[Union[JsonDict, EventBase]] = [] end_token = from_token - for name, source in self.event_sources.sources.get_sources(): - keyname = "%s_key" % name - before_id = getattr(before_token, keyname) - after_id = getattr(after_token, keyname) + for keyname, source in self.event_sources.sources.get_sources(): + before_id = before_token.get_field(keyname) + after_id = after_token.get_field(keyname) if before_id == after_id: continue new_events, new_key = await source.get_new_events( user=user, - from_key=getattr(from_token, keyname), + from_key=from_token.get_field(keyname), limit=limit, is_guest=is_peeking, room_ids=room_ids, explicit_room_id=explicit_room_id, ) - if name == "room": + if keyname == StreamKeyType.ROOM: new_events = await filter_events_for_client( self._storage_controllers, user.to_string(), new_events, is_peeking=is_peeking, ) - elif name == "presence": + elif keyname == StreamKeyType.PRESENCE: now = self.clock.time_msec() new_events[:] = [ { -- cgit 1.4.1