summary refs log tree commit diff
path: root/synapse/replication/tcp/streams/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/streams/events.py')
-rw-r--r--synapse/replication/tcp/streams/events.py16
1 files changed, 9 insertions, 7 deletions
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index c6a595629f..051114596b 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -15,11 +15,11 @@
 # limitations under the License.
 
 import heapq
-from typing import Tuple, Type
+from typing import Iterable, Tuple, Type
 
 import attr
 
-from ._base import Stream, db_query_to_update_function
+from ._base import Stream, Token, db_query_to_update_function
 
 
 """Handling of the 'events' replication stream
@@ -116,12 +116,14 @@ class EventsStream(Stream):
 
     def __init__(self, hs):
         self._store = hs.get_datastore()
-        self.current_token = self._store.get_current_events_token  # type: ignore
-        self.update_function = db_query_to_update_function(self._update_function)  # type: ignore
-
-        super(EventsStream, self).__init__(hs)
+        super().__init__(
+            self._store.get_current_events_token,
+            db_query_to_update_function(self._update_function),
+        )
 
-    async def _update_function(self, from_token, current_token, limit=None):
+    async def _update_function(
+        self, from_token: Token, current_token: Token, limit: int
+    ) -> Iterable[tuple]:
         event_rows = await self._store.get_all_new_forward_event_rows(
             from_token, current_token, limit
         )