diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index 48c1d45718..75133d7e40 100644
--- a/synapse/replication/tcp/streams/federation.py
+++ b/synapse/replication/tcp/streams/federation.py
@@ -15,8 +15,6 @@
# limitations under the License.
from collections import namedtuple
-from twisted.internet import defer
-
from synapse.replication.tcp.streams._base import Stream, db_query_to_update_function
@@ -35,7 +33,6 @@ class FederationStream(Stream):
NAME = "federation"
ROW_TYPE = FederationStreamRow
- _QUERY_MASTER = True
def __init__(self, hs):
# Not all synapse instances will have a federation sender instance,
@@ -43,10 +40,16 @@ class FederationStream(Stream):
# so we stub the stream out when that is the case.
if hs.config.worker_app is None or hs.should_send_federation():
federation_sender = hs.get_federation_sender()
- self.current_token = federation_sender.get_current_token # type: ignore
- self.update_function = db_query_to_update_function(federation_sender.get_replication_rows) # type: ignore
+ current_token = federation_sender.get_current_token
+ update_function = db_query_to_update_function(
+ federation_sender.get_replication_rows
+ )
else:
- self.current_token = lambda: 0 # type: ignore
- self.update_function = lambda from_token, upto_token, limit: defer.succeed(([], upto_token, bool)) # type: ignore
+ current_token = lambda: 0
+ update_function = self._stub_update_function
+
+ super().__init__(current_token, update_function)
- super(FederationStream, self).__init__(hs)
+ @staticmethod
+ async def _stub_update_function(from_token, upto_token, limit):
+ return [], upto_token, False
|