diff --git a/synapse/notifier.py b/synapse/notifier.py
index dec47add7e..4213fd7c6f 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -198,12 +198,12 @@ class _NotifierUserStream:
@attr.s(slots=True, frozen=True, auto_attribs=True)
class EventStreamResult:
- events: List[Union[JsonDict, EventBase]]
+ events_by_source: Dict[StreamKeyType, List[Union[JsonDict, EventBase]]]
start_token: StreamToken
end_token: StreamToken
def __bool__(self) -> bool:
- return bool(self.events)
+ return any(bool(e) for e in self.events_by_source.values())
@attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -694,12 +694,12 @@ class Notifier:
before_token: StreamToken, after_token: StreamToken
) -> EventStreamResult:
if after_token == before_token:
- return EventStreamResult([], from_token, from_token)
+ return EventStreamResult({}, from_token, from_token)
# The events fetched from each source are a JsonDict, EventBase, or
# UserPresenceState, but see below for UserPresenceState being
# converted to JsonDict.
- events: List[Union[JsonDict, EventBase]] = []
+ events_by_source: Dict[StreamKeyType, List[Union[JsonDict, EventBase]]] = {}
end_token = from_token
for keyname, source in self.event_sources.sources.get_sources():
@@ -734,10 +734,12 @@ class Notifier:
for event in new_events
]
- events.extend(new_events)
+ if new_events:
+ events_by_source.setdefault(keyname, []).extend(new_events)
+
end_token = end_token.copy_and_replace(keyname, new_key)
- return EventStreamResult(events, from_token, end_token)
+ return EventStreamResult(events_by_source, from_token, end_token)
user_id_for_stream = user.to_string()
if is_peeking:
|