diff options
Diffstat (limited to 'synapse/replication/tcp/streams/federation.py')
-rw-r--r-- | synapse/replication/tcp/streams/federation.py | 19 |
1 files changed, 11 insertions, 8 deletions
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py index 48c1d45718..75133d7e40 100644 --- a/synapse/replication/tcp/streams/federation.py +++ b/synapse/replication/tcp/streams/federation.py @@ -15,8 +15,6 @@ # limitations under the License. from collections import namedtuple -from twisted.internet import defer - from synapse.replication.tcp.streams._base import Stream, db_query_to_update_function @@ -35,7 +33,6 @@ class FederationStream(Stream): NAME = "federation" ROW_TYPE = FederationStreamRow - _QUERY_MASTER = True def __init__(self, hs): # Not all synapse instances will have a federation sender instance, @@ -43,10 +40,16 @@ class FederationStream(Stream): # so we stub the stream out when that is the case. if hs.config.worker_app is None or hs.should_send_federation(): federation_sender = hs.get_federation_sender() - self.current_token = federation_sender.get_current_token # type: ignore - self.update_function = db_query_to_update_function(federation_sender.get_replication_rows) # type: ignore + current_token = federation_sender.get_current_token + update_function = db_query_to_update_function( + federation_sender.get_replication_rows + ) else: - self.current_token = lambda: 0 # type: ignore - self.update_function = lambda from_token, upto_token, limit: defer.succeed(([], upto_token, bool)) # type: ignore + current_token = lambda: 0 + update_function = self._stub_update_function + + super().__init__(current_token, update_function) - super(FederationStream, self).__init__(hs) + @staticmethod + async def _stub_update_function(from_token, upto_token, limit): + return [], upto_token, False |