From c836cb988ec61b12b7cccc92776e1c473c589151 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Sat, 13 Jan 2024 11:18:48 +0000 Subject: Better event stream typing --- synapse/notifier.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) (limited to 'synapse/notifier.py') diff --git a/synapse/notifier.py b/synapse/notifier.py index dec47add7e..4213fd7c6f 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -198,12 +198,12 @@ class _NotifierUserStream: @attr.s(slots=True, frozen=True, auto_attribs=True) class EventStreamResult: - events: List[Union[JsonDict, EventBase]] + events_by_source: Dict[StreamKeyType, List[Union[JsonDict, EventBase]]] start_token: StreamToken end_token: StreamToken def __bool__(self) -> bool: - return bool(self.events) + return any(bool(e) for e in self.events_by_source.values()) @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -694,12 +694,12 @@ class Notifier: before_token: StreamToken, after_token: StreamToken ) -> EventStreamResult: if after_token == before_token: - return EventStreamResult([], from_token, from_token) + return EventStreamResult({}, from_token, from_token) # The events fetched from each source are a JsonDict, EventBase, or # UserPresenceState, but see below for UserPresenceState being # converted to JsonDict. - events: List[Union[JsonDict, EventBase]] = [] + events_by_source: Dict[StreamKeyType, List[Union[JsonDict, EventBase]]] = {} end_token = from_token for keyname, source in self.event_sources.sources.get_sources(): @@ -734,10 +734,12 @@ class Notifier: for event in new_events ] - events.extend(new_events) + if new_events: + events_by_source.setdefault(keyname, []).extend(new_events) + end_token = end_token.copy_and_replace(keyname, new_key) - return EventStreamResult(events, from_token, end_token) + return EventStreamResult(events_by_source, from_token, end_token) user_id_for_stream = user.to_string() if is_peeking: -- cgit 1.5.1