summary refs log tree commit diff
path: root/synapse/replication/tcp/streams/federation.py
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2020-04-17 14:49:55 +0100
committerGitHub <noreply@github.com>2020-04-17 14:49:55 +0100
commit67ff7b8ba0d3647f3c370341dff3f035b3a1160a (patch)
tree3a4bed20db43138a514ea1ff06d7d0bf1f584891 /synapse/replication/tcp/streams/federation.py
parentClarify the comments for media_storage_providers options (#7272) (diff)
downloadsynapse-67ff7b8ba0d3647f3c370341dff3f035b3a1160a.tar.xz
Improve type checking in `replication.tcp.Stream` (#7291)
The general idea here is to get rid of the type: ignore annotations on all of the current_token and update_function assignments, which would have caught #7290.

After a bit of experimentation, it seems like the least-awful way to do this is to pass the offending functions in as parameters to the Stream constructor. Unfortunately that means that the concrete implementations no longer have the same constructor signature as Stream itself, which means that it gets hard to correctly annotate STREAMS_MAP.

I've also introduced a couple of new types, to take out some duplication.
Diffstat (limited to 'synapse/replication/tcp/streams/federation.py')
-rw-r--r--synapse/replication/tcp/streams/federation.py19
1 files changed, 11 insertions, 8 deletions
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index 48c1d45718..75133d7e40 100644
--- a/synapse/replication/tcp/streams/federation.py
+++ b/synapse/replication/tcp/streams/federation.py
@@ -15,8 +15,6 @@
 # limitations under the License.
 from collections import namedtuple
 
-from twisted.internet import defer
-
 from synapse.replication.tcp.streams._base import Stream, db_query_to_update_function
 
 
@@ -35,7 +33,6 @@ class FederationStream(Stream):
 
     NAME = "federation"
     ROW_TYPE = FederationStreamRow
-    _QUERY_MASTER = True
 
     def __init__(self, hs):
         # Not all synapse instances will have a federation sender instance,
@@ -43,10 +40,16 @@ class FederationStream(Stream):
         # so we stub the stream out when that is the case.
         if hs.config.worker_app is None or hs.should_send_federation():
             federation_sender = hs.get_federation_sender()
-            self.current_token = federation_sender.get_current_token  # type: ignore
-            self.update_function = db_query_to_update_function(federation_sender.get_replication_rows)  # type: ignore
+            current_token = federation_sender.get_current_token
+            update_function = db_query_to_update_function(
+                federation_sender.get_replication_rows
+            )
         else:
-            self.current_token = lambda: 0  # type: ignore
-            self.update_function = lambda from_token, upto_token, limit: defer.succeed(([], upto_token, bool))  # type: ignore
+            current_token = lambda: 0
+            update_function = self._stub_update_function
+
+        super().__init__(current_token, update_function)
 
-        super(FederationStream, self).__init__(hs)
+    @staticmethod
+    async def _stub_update_function(from_token, upto_token, limit):
+        return [], upto_token, False