summary refs log tree commit diff
path: root/synapse/replication/tcp/streams/federation.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2020-05-06 11:43:11 +0100
committerRichard van der Hoff <richard@matrix.org>2020-05-06 11:43:11 +0100
commit30a19daa02cf61d2a811efb0fd619ff48ce6bcf6 (patch)
treec430aa536e76a2329831d04ae7d8d761ed2ec188 /synapse/replication/tcp/streams/federation.py
parentuse an upsert to update device_lists_outbound_last_success (diff)
parentFix typing annotations in synapse/federation (#7382) (diff)
downloadsynapse-30a19daa02cf61d2a811efb0fd619ff48ce6bcf6.tar.xz
Merge branch 'develop' into rav/upsert_for_device_list
Diffstat (limited to 'synapse/replication/tcp/streams/federation.py')
-rw-r--r--synapse/replication/tcp/streams/federation.py34
1 files changed, 23 insertions, 11 deletions
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py

index 75133d7e40..b0505b8a2c 100644 --- a/synapse/replication/tcp/streams/federation.py +++ b/synapse/replication/tcp/streams/federation.py
@@ -15,7 +15,7 @@ # limitations under the License. from collections import namedtuple -from synapse.replication.tcp.streams._base import Stream, db_query_to_update_function +from synapse.replication.tcp.streams._base import Stream, make_http_update_function class FederationStream(Stream): @@ -35,21 +35,33 @@ class FederationStream(Stream): ROW_TYPE = FederationStreamRow def __init__(self, hs): - # Not all synapse instances will have a federation sender instance, - # whether that's a `FederationSender` or a `FederationRemoteSendQueue`, - # so we stub the stream out when that is the case. - if hs.config.worker_app is None or hs.should_send_federation(): + if hs.config.worker_app is None: + # master process: get updates from the FederationRemoteSendQueue. + # (if the master is configured to send federation itself, federation_sender + # will be a real FederationSender, which has stubs for current_token and + # get_replication_rows.) federation_sender = hs.get_federation_sender() current_token = federation_sender.get_current_token - update_function = db_query_to_update_function( - federation_sender.get_replication_rows - ) + update_function = federation_sender.get_replication_rows + + elif hs.should_send_federation(): + # federation sender: Query master process + update_function = make_http_update_function(hs, self.NAME) + current_token = self._stub_current_token + else: - current_token = lambda: 0 + # other worker: stub out the update function (we're not interested in + # any updates so when we get a POSITION we do nothing) update_function = self._stub_update_function + current_token = self._stub_current_token - super().__init__(current_token, update_function) + super().__init__(hs.get_instance_name(), current_token, update_function) + + @staticmethod + def _stub_current_token(): + # dummy current-token method for use on workers + return 0 @staticmethod - async def _stub_update_function(from_token, upto_token, limit): + async def _stub_update_function(instance_name, from_token, upto_token, limit): return [], upto_token, False