diff options
author | Richard van der Hoff <1389908+richvdh@users.noreply.github.com> | 2020-04-17 14:49:55 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-17 14:49:55 +0100 |
commit | 67ff7b8ba0d3647f3c370341dff3f035b3a1160a (patch) | |
tree | 3a4bed20db43138a514ea1ff06d7d0bf1f584891 /synapse/replication/tcp/streams/events.py | |
parent | Clarify the comments for media_storage_providers options (#7272) (diff) | |
download | synapse-67ff7b8ba0d3647f3c370341dff3f035b3a1160a.tar.xz |
Improve type checking in `replication.tcp.Stream` (#7291)
The general idea here is to get rid of the type: ignore annotations on all of the current_token and update_function assignments, which would have caught #7290. After a bit of experimentation, it seems like the least-awful way to do this is to pass the offending functions in as parameters to the Stream constructor. Unfortunately that means that the concrete implementations no longer have the same constructor signature as Stream itself, which means that it gets hard to correctly annotate STREAMS_MAP. I've also introduced a couple of new types, to take out some duplication.
Diffstat (limited to 'synapse/replication/tcp/streams/events.py')
-rw-r--r-- | synapse/replication/tcp/streams/events.py | 16 |
1 files changed, 9 insertions, 7 deletions
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index c6a595629f..051114596b 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -15,11 +15,11 @@ # limitations under the License. import heapq -from typing import Tuple, Type +from typing import Iterable, Tuple, Type import attr -from ._base import Stream, db_query_to_update_function +from ._base import Stream, Token, db_query_to_update_function """Handling of the 'events' replication stream @@ -116,12 +116,14 @@ class EventsStream(Stream): def __init__(self, hs): self._store = hs.get_datastore() - self.current_token = self._store.get_current_events_token # type: ignore - self.update_function = db_query_to_update_function(self._update_function) # type: ignore - - super(EventsStream, self).__init__(hs) + super().__init__( + self._store.get_current_events_token, + db_query_to_update_function(self._update_function), + ) - async def _update_function(self, from_token, current_token, limit=None): + async def _update_function( + self, from_token: Token, current_token: Token, limit: int + ) -> Iterable[tuple]: event_rows = await self._store.get_all_new_forward_event_rows( from_token, current_token, limit ) |