diff options
author | Richard van der Hoff <1389908+richvdh@users.noreply.github.com> | 2020-05-05 14:15:57 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-05 14:15:57 +0100 |
commit | d5aa7d93ed1f7963524125d16ab640ebf6cb91c2 (patch) | |
tree | 20e85df13577cdd24effdc9038f3a2357a584dfc /synapse/replication | |
parent | Add MultiWriterIdGenerator. (#7281) (diff) | |
download | synapse-d5aa7d93ed1f7963524125d16ab640ebf6cb91c2.tar.xz |
Fix catchup-on-reconnect for the Federation Stream (#7374)
looks like we managed to break this during the refactorathon.
Diffstat (limited to 'synapse/replication')
-rw-r--r-- | synapse/replication/tcp/resource.py | 2 | ||||
-rw-r--r-- | synapse/replication/tcp/streams/_base.py | 3 | ||||
-rw-r--r-- | synapse/replication/tcp/streams/federation.py | 30 |
3 files changed, 24 insertions, 11 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 33d2f589ac..b690abedad 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -80,7 +80,7 @@ class ReplicationStreamer(object): for stream in STREAMS_MAP.values(): if stream == FederationStream and hs.config.send_federation: # We only support federation stream if federation sending - # hase been disabled on the master. + # has been disabled on the master. continue self.streams.append(stream(hs)) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index b0f87c365b..084604e8b0 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -104,7 +104,8 @@ class Stream(object): implemented by subclasses. current_token_function is called to get the current token of the underlying - stream. + stream. It is only meaningful on the process that is the source of the + replication stream (ie, usually the master). update_function is called to get updates for this stream between a pair of stream tokens. See the UpdateFunction type definition for more info. diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py index e8bd52e389..b0505b8a2c 100644 --- a/synapse/replication/tcp/streams/federation.py +++ b/synapse/replication/tcp/streams/federation.py @@ -15,7 +15,7 @@ # limitations under the License. from collections import namedtuple -from synapse.replication.tcp.streams._base import Stream, db_query_to_update_function +from synapse.replication.tcp.streams._base import Stream, make_http_update_function class FederationStream(Stream): @@ -35,21 +35,33 @@ class FederationStream(Stream): ROW_TYPE = FederationStreamRow def __init__(self, hs): - # Not all synapse instances will have a federation sender instance, - # whether that's a `FederationSender` or a `FederationRemoteSendQueue`, - # so we stub the stream out when that is the case. - if hs.config.worker_app is None or hs.should_send_federation(): + if hs.config.worker_app is None: + # master process: get updates from the FederationRemoteSendQueue. + # (if the master is configured to send federation itself, federation_sender + # will be a real FederationSender, which has stubs for current_token and + # get_replication_rows.) federation_sender = hs.get_federation_sender() current_token = federation_sender.get_current_token - update_function = db_query_to_update_function( - federation_sender.get_replication_rows - ) + update_function = federation_sender.get_replication_rows + + elif hs.should_send_federation(): + # federation sender: Query master process + update_function = make_http_update_function(hs, self.NAME) + current_token = self._stub_current_token + else: - current_token = lambda: 0 + # other worker: stub out the update function (we're not interested in + # any updates so when we get a POSITION we do nothing) update_function = self._stub_update_function + current_token = self._stub_current_token super().__init__(hs.get_instance_name(), current_token, update_function) @staticmethod + def _stub_current_token(): + # dummy current-token method for use on workers + return 0 + + @staticmethod async def _stub_update_function(instance_name, from_token, upto_token, limit): return [], upto_token, False |