diff options
author | Erik Johnston <erik@matrix.org> | 2020-03-25 14:54:01 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-25 14:54:01 +0000 |
commit | 4cff617df1ba6f241fee6957cc44859f57edcc0e (patch) | |
tree | c0c0c3ffc496262981fc917362942468191da9d4 /synapse/replication/tcp/streams/federation.py | |
parent | Various cleanups to INSTALL.md (#7141) (diff) | |
download | synapse-4cff617df1ba6f241fee6957cc44859f57edcc0e.tar.xz |
Move catchup of replication streams to worker. (#7024)
This changes the replication protocol so that the server does not send down `RDATA` for rows that happened before the client connected. Instead, the server will send a `POSITION` and clients then query the database (or master out of band) to get up to date.
Diffstat (limited to 'synapse/replication/tcp/streams/federation.py')
-rw-r--r-- | synapse/replication/tcp/streams/federation.py | 19 |
1 files changed, 14 insertions, 5 deletions
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py index f5f9336430..48c1d45718 100644 --- a/synapse/replication/tcp/streams/federation.py +++ b/synapse/replication/tcp/streams/federation.py @@ -15,7 +15,9 @@ # limitations under the License. from collections import namedtuple -from ._base import Stream +from twisted.internet import defer + +from synapse.replication.tcp.streams._base import Stream, db_query_to_update_function class FederationStream(Stream): @@ -33,11 +35,18 @@ class FederationStream(Stream): NAME = "federation" ROW_TYPE = FederationStreamRow + _QUERY_MASTER = True def __init__(self, hs): - federation_sender = hs.get_federation_sender() - - self.current_token = federation_sender.get_current_token # type: ignore - self.update_function = federation_sender.get_replication_rows # type: ignore + # Not all synapse instances will have a federation sender instance, + # whether that's a `FederationSender` or a `FederationRemoteSendQueue`, + # so we stub the stream out when that is the case. + if hs.config.worker_app is None or hs.should_send_federation(): + federation_sender = hs.get_federation_sender() + self.current_token = federation_sender.get_current_token # type: ignore + self.update_function = db_query_to_update_function(federation_sender.get_replication_rows) # type: ignore + else: + self.current_token = lambda: 0 # type: ignore + self.update_function = lambda from_token, upto_token, limit: defer.succeed(([], upto_token, bool)) # type: ignore super(FederationStream, self).__init__(hs) |