diff --git a/synapse/notifier.py b/synapse/notifier.py
index 60e5409895..bbabdb0587 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -13,7 +13,6 @@
# limitations under the License.
import logging
-from collections import namedtuple
from typing import (
Awaitable,
Callable,
@@ -44,7 +43,13 @@ from synapse.logging.opentracing import log_kv, start_active_span
from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
from synapse.streams.config import PaginationConfig
-from synapse.types import PersistedEventPosition, RoomStreamToken, StreamToken, UserID
+from synapse.types import (
+ JsonDict,
+ PersistedEventPosition,
+ RoomStreamToken,
+ StreamToken,
+ UserID,
+)
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.metrics import Measure
from synapse.visibility import filter_events_for_client
@@ -178,7 +183,12 @@ class _NotifierUserStream:
return _NotificationListener(self.notify_deferred.observe())
-class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class EventStreamResult:
+ events: List[Union[JsonDict, EventBase]]
+ start_token: StreamToken
+ end_token: StreamToken
+
def __bool__(self):
return bool(self.events)
@@ -582,9 +592,12 @@ class Notifier:
before_token: StreamToken, after_token: StreamToken
) -> EventStreamResult:
if after_token == before_token:
- return EventStreamResult([], (from_token, from_token))
+ return EventStreamResult([], from_token, from_token)
- events: List[EventBase] = []
+ # The events fetched from each source are a JsonDict, EventBase, or
+ # UserPresenceState, but see below for UserPresenceState being
+ # converted to JsonDict.
+ events: List[Union[JsonDict, EventBase]] = []
end_token = from_token
for name, source in self.event_sources.sources.get_sources():
@@ -623,7 +636,7 @@ class Notifier:
events.extend(new_events)
end_token = end_token.copy_and_replace(keyname, new_key)
- return EventStreamResult(events, (from_token, end_token))
+ return EventStreamResult(events, from_token, end_token)
user_id_for_stream = user.to_string()
if is_peeking:
|