summary refs log tree commit diff
path: root/synapse/replication/tcp/streams/federation.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/streams/federation.py')
-rw-r--r--synapse/replication/tcp/streams/federation.py19
1 files changed, 14 insertions, 5 deletions
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index 615f3dc9ac..5d9e87188b 100644
--- a/synapse/replication/tcp/streams/federation.py
+++ b/synapse/replication/tcp/streams/federation.py
@@ -15,7 +15,9 @@
 # limitations under the License.
 from collections import namedtuple
 
-from ._base import Stream
+from twisted.internet import defer
+
+from synapse.replication.tcp.streams._base import Stream
 
 FederationStreamRow = namedtuple(
     "FederationStreamRow",
@@ -33,11 +35,18 @@ class FederationStream(Stream):
 
     NAME = "federation"
     ROW_TYPE = FederationStreamRow
+    _QUERY_MASTER = True
 
     def __init__(self, hs):
-        federation_sender = hs.get_federation_sender()
-
-        self.current_token = federation_sender.get_current_token  # type: ignore
-        self.update_function = federation_sender.get_replication_rows  # type: ignore
+        # Not all synapse instances will have a federation sender instance,
+        # whether that's a `FederationSender` or a `FederationRemoteSendQueue`,
+        # so we stub the stream out when that is the case.
+        if hs.config.worker_app is None or hs.should_send_federation():
+            federation_sender = hs.get_federation_sender()
+            self.current_token = federation_sender.get_current_token  # type: ignore
+            self.update_function = federation_sender.get_replication_rows  # type: ignore
+        else:
+            self.current_token = lambda: 0  # type: ignore
+            self.update_function = lambda *args, **kwargs: defer.succeed([])  # type: ignore
 
         super(FederationStream, self).__init__(hs)