diff options
Diffstat (limited to 'synapse/replication/tcp/streams/events.py')
-rw-r--r-- | synapse/replication/tcp/streams/events.py | 16 |
1 files changed, 9 insertions, 7 deletions
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index c6a595629f..051114596b 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -15,11 +15,11 @@ # limitations under the License. import heapq -from typing import Tuple, Type +from typing import Iterable, Tuple, Type import attr -from ._base import Stream, db_query_to_update_function +from ._base import Stream, Token, db_query_to_update_function """Handling of the 'events' replication stream @@ -116,12 +116,14 @@ class EventsStream(Stream): def __init__(self, hs): self._store = hs.get_datastore() - self.current_token = self._store.get_current_events_token # type: ignore - self.update_function = db_query_to_update_function(self._update_function) # type: ignore - - super(EventsStream, self).__init__(hs) + super().__init__( + self._store.get_current_events_token, + db_query_to_update_function(self._update_function), + ) - async def _update_function(self, from_token, current_token, limit=None): + async def _update_function( + self, from_token: Token, current_token: Token, limit: int + ) -> Iterable[tuple]: event_rows = await self._store.get_all_new_forward_event_rows( from_token, current_token, limit ) |