summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--changelog.d/11574.misc1
-rw-r--r--synapse/handlers/events.py7
-rw-r--r--synapse/notifier.py25
3 files changed, 24 insertions, 9 deletions
diff --git a/changelog.d/11574.misc b/changelog.d/11574.misc
new file mode 100644
index 0000000000..2b090a3780
--- /dev/null
+++ b/changelog.d/11574.misc
@@ -0,0 +1 @@
+Convert `EventStreamResult` from a `namedtuple` to `attrs` to improve type hints.
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 32b0254c5f..afed80ba14 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -79,13 +79,14 @@ class EventStreamHandler:
                 # thundering herds on restart.
                 timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1))
 
-            events, tokens = await self.notifier.get_events_for(
+            stream_result = await self.notifier.get_events_for(
                 auth_user,
                 pagin_config,
                 timeout,
                 is_guest=is_guest,
                 explicit_room_id=room_id,
             )
+            events = stream_result.events
 
             time_now = self.clock.time_msec()
 
@@ -128,8 +129,8 @@ class EventStreamHandler:
 
             chunk = {
                 "chunk": chunks,
-                "start": await tokens[0].to_string(self.store),
-                "end": await tokens[1].to_string(self.store),
+                "start": await stream_result.start_token.to_string(self.store),
+                "end": await stream_result.end_token.to_string(self.store),
             }
 
             return chunk
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 60e5409895..bbabdb0587 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -13,7 +13,6 @@
 # limitations under the License.
 
 import logging
-from collections import namedtuple
 from typing import (
     Awaitable,
     Callable,
@@ -44,7 +43,13 @@ from synapse.logging.opentracing import log_kv, start_active_span
 from synapse.logging.utils import log_function
 from synapse.metrics import LaterGauge
 from synapse.streams.config import PaginationConfig
-from synapse.types import PersistedEventPosition, RoomStreamToken, StreamToken, UserID
+from synapse.types import (
+    JsonDict,
+    PersistedEventPosition,
+    RoomStreamToken,
+    StreamToken,
+    UserID,
+)
 from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
 from synapse.util.metrics import Measure
 from synapse.visibility import filter_events_for_client
@@ -178,7 +183,12 @@ class _NotifierUserStream:
             return _NotificationListener(self.notify_deferred.observe())
 
 
-class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class EventStreamResult:
+    events: List[Union[JsonDict, EventBase]]
+    start_token: StreamToken
+    end_token: StreamToken
+
     def __bool__(self):
         return bool(self.events)
 
@@ -582,9 +592,12 @@ class Notifier:
             before_token: StreamToken, after_token: StreamToken
         ) -> EventStreamResult:
             if after_token == before_token:
-                return EventStreamResult([], (from_token, from_token))
+                return EventStreamResult([], from_token, from_token)
 
-            events: List[EventBase] = []
+            # The events fetched from each source are a JsonDict, EventBase, or
+            # UserPresenceState, but see below for UserPresenceState being
+            # converted to JsonDict.
+            events: List[Union[JsonDict, EventBase]] = []
             end_token = from_token
 
             for name, source in self.event_sources.sources.get_sources():
@@ -623,7 +636,7 @@ class Notifier:
                 events.extend(new_events)
                 end_token = end_token.copy_and_replace(keyname, new_key)
 
-            return EventStreamResult(events, (from_token, end_token))
+            return EventStreamResult(events, from_token, end_token)
 
         user_id_for_stream = user.to_string()
         if is_peeking: