summary refs log tree commit diff
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2021-12-15 11:10:02 -0500
committerGitHub <noreply@github.com>2021-12-15 11:10:02 -0500
commit323151b787cbc511919dd54a3acea76dc2ad0603 (patch)
tree8c5a8de579db53329c35971e1a01f189535b3384
parentAdd experimental support for MSC3202: allowing application services to masque... (diff)
downloadsynapse-323151b787cbc511919dd54a3acea76dc2ad0603.tar.xz
Convert EventStreamResult to attrs. (#11574)
-rw-r--r--changelog.d/11574.misc1
-rw-r--r--synapse/handlers/events.py7
-rw-r--r--synapse/notifier.py25
3 files changed, 24 insertions, 9 deletions
diff --git a/changelog.d/11574.misc b/changelog.d/11574.misc
new file mode 100644

index 0000000000..2b090a3780 --- /dev/null +++ b/changelog.d/11574.misc
@@ -0,0 +1 @@ +Convert `EventStreamResult` from a `namedtuple` to `attrs` to improve type hints. diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 32b0254c5f..afed80ba14 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py
@@ -79,13 +79,14 @@ class EventStreamHandler: # thundering herds on restart. timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1)) - events, tokens = await self.notifier.get_events_for( + stream_result = await self.notifier.get_events_for( auth_user, pagin_config, timeout, is_guest=is_guest, explicit_room_id=room_id, ) + events = stream_result.events time_now = self.clock.time_msec() @@ -128,8 +129,8 @@ class EventStreamHandler: chunk = { "chunk": chunks, - "start": await tokens[0].to_string(self.store), - "end": await tokens[1].to_string(self.store), + "start": await stream_result.start_token.to_string(self.store), + "end": await stream_result.end_token.to_string(self.store), } return chunk diff --git a/synapse/notifier.py b/synapse/notifier.py
index 60e5409895..bbabdb0587 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py
@@ -13,7 +13,6 @@ # limitations under the License. import logging -from collections import namedtuple from typing import ( Awaitable, Callable, @@ -44,7 +43,13 @@ from synapse.logging.opentracing import log_kv, start_active_span from synapse.logging.utils import log_function from synapse.metrics import LaterGauge from synapse.streams.config import PaginationConfig -from synapse.types import PersistedEventPosition, RoomStreamToken, StreamToken, UserID +from synapse.types import ( + JsonDict, + PersistedEventPosition, + RoomStreamToken, + StreamToken, + UserID, +) from synapse.util.async_helpers import ObservableDeferred, timeout_deferred from synapse.util.metrics import Measure from synapse.visibility import filter_events_for_client @@ -178,7 +183,12 @@ class _NotifierUserStream: return _NotificationListener(self.notify_deferred.observe()) -class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))): +@attr.s(slots=True, frozen=True, auto_attribs=True) +class EventStreamResult: + events: List[Union[JsonDict, EventBase]] + start_token: StreamToken + end_token: StreamToken + def __bool__(self): return bool(self.events) @@ -582,9 +592,12 @@ class Notifier: before_token: StreamToken, after_token: StreamToken ) -> EventStreamResult: if after_token == before_token: - return EventStreamResult([], (from_token, from_token)) + return EventStreamResult([], from_token, from_token) - events: List[EventBase] = [] + # The events fetched from each source are a JsonDict, EventBase, or + # UserPresenceState, but see below for UserPresenceState being + # converted to JsonDict. + events: List[Union[JsonDict, EventBase]] = [] end_token = from_token for name, source in self.event_sources.sources.get_sources(): @@ -623,7 +636,7 @@ class Notifier: events.extend(new_events) end_token = end_token.copy_and_replace(keyname, new_key) - return EventStreamResult(events, (from_token, end_token)) + return EventStreamResult(events, from_token, end_token) user_id_for_stream = user.to_string() if is_peeking: