summary refs log tree commit diff
path: root/synapse/replication/tcp/streams/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/streams/events.py')
-rw-r--r--synapse/replication/tcp/streams/events.py28
1 files changed, 14 insertions, 14 deletions
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index e7e87bac92..a030e9299e 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -65,7 +65,7 @@ class BaseEventsStreamRow:
     """
 
     # Unique string that ids the type. Must be overridden in sub classes.
-    TypeId = None  # type: str
+    TypeId: str
 
     @classmethod
     def from_data(cls, data):
@@ -103,10 +103,10 @@ class EventsStreamCurrentStateRow(BaseEventsStreamRow):
     event_id = attr.ib()  # str, optional
 
 
-_EventRows = (
+_EventRows: Tuple[Type[BaseEventsStreamRow], ...] = (
     EventsStreamEventRow,
     EventsStreamCurrentStateRow,
-)  # type: Tuple[Type[BaseEventsStreamRow], ...]
+)
 
 TypeToRow = {Row.TypeId: Row for Row in _EventRows}
 
@@ -157,9 +157,9 @@ class EventsStream(Stream):
 
         # now we fetch up to that many rows from the events table
 
-        event_rows = await self._store.get_all_new_forward_event_rows(
+        event_rows: List[Tuple] = await self._store.get_all_new_forward_event_rows(
             instance_name, from_token, current_token, target_row_count
-        )  # type: List[Tuple]
+        )
 
         # we rely on get_all_new_forward_event_rows strictly honouring the limit, so
         # that we know it is safe to just take upper_limit = event_rows[-1][0].
@@ -172,7 +172,7 @@ class EventsStream(Stream):
 
         if len(event_rows) == target_row_count:
             limited = True
-            upper_limit = event_rows[-1][0]  # type: int
+            upper_limit: int = event_rows[-1][0]
         else:
             limited = False
             upper_limit = current_token
@@ -191,30 +191,30 @@ class EventsStream(Stream):
         # finally, fetch the ex-outliers rows. We assume there are few enough of these
         # not to bother with the limit.
 
-        ex_outliers_rows = await self._store.get_ex_outlier_stream_rows(
+        ex_outliers_rows: List[Tuple] = await self._store.get_ex_outlier_stream_rows(
             instance_name, from_token, upper_limit
-        )  # type: List[Tuple]
+        )
 
         # we now need to turn the raw database rows returned into tuples suitable
         # for the replication protocol (basically, we add an identifier to
         # distinguish the row type). At the same time, we can limit the event_rows
         # to the max stream_id from state_rows.
 
-        event_updates = (
+        event_updates: Iterable[Tuple[int, Tuple]] = (
             (stream_id, (EventsStreamEventRow.TypeId, rest))
             for (stream_id, *rest) in event_rows
             if stream_id <= upper_limit
-        )  # type: Iterable[Tuple[int, Tuple]]
+        )
 
-        state_updates = (
+        state_updates: Iterable[Tuple[int, Tuple]] = (
             (stream_id, (EventsStreamCurrentStateRow.TypeId, rest))
             for (stream_id, *rest) in state_rows
-        )  # type: Iterable[Tuple[int, Tuple]]
+        )
 
-        ex_outliers_updates = (
+        ex_outliers_updates: Iterable[Tuple[int, Tuple]] = (
             (stream_id, (EventsStreamEventRow.TypeId, rest))
             for (stream_id, *rest) in ex_outliers_rows
-        )  # type: Iterable[Tuple[int, Tuple]]
+        )
 
         # we need to return a sorted list, so merge them together.
         updates = list(heapq.merge(event_updates, state_updates, ex_outliers_updates))