diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index dc2484109d..9bcd13b009 100644
--- a/synapse/replication/tcp/streams/federation.py
+++ b/synapse/replication/tcp/streams/federation.py
@@ -15,14 +15,10 @@
# limitations under the License.
from collections import namedtuple
-from ._base import Stream
-
-FederationStreamRow = namedtuple(
- "FederationStreamRow",
- (
- "type", # str, the type of data as defined in the BaseFederationRows
- "data", # dict, serialization of a federation.send_queue.BaseFederationRow
- ),
+from synapse.replication.tcp.streams._base import (
+ Stream,
+ current_token_without_instance,
+ make_http_update_function,
)
@@ -31,13 +27,47 @@ class FederationStream(Stream):
sending disabled.
"""
+ FederationStreamRow = namedtuple(
+ "FederationStreamRow",
+ (
+ "type", # str, the type of data as defined in the BaseFederationRows
+ "data", # dict, serialization of a federation.send_queue.BaseFederationRow
+ ),
+ )
+
NAME = "federation"
ROW_TYPE = FederationStreamRow
def __init__(self, hs):
- federation_sender = hs.get_federation_sender()
+ if hs.config.worker_app is None:
+ # master process: get updates from the FederationRemoteSendQueue.
+ # (if the master is configured to send federation itself, federation_sender
+ # will be a real FederationSender, which has stubs for current_token and
+ # get_replication_rows.)
+ federation_sender = hs.get_federation_sender()
+ current_token = current_token_without_instance(
+ federation_sender.get_current_token
+ )
+ update_function = federation_sender.get_replication_rows
+
+ elif hs.should_send_federation():
+ # federation sender: Query master process
+ update_function = make_http_update_function(hs, self.NAME)
+ current_token = self._stub_current_token
+
+ else:
+ # other worker: stub out the update function (we're not interested in
+ # any updates so when we get a POSITION we do nothing)
+ update_function = self._stub_update_function
+ current_token = self._stub_current_token
+
+ super().__init__(hs.get_instance_name(), current_token, update_function)
- self.current_token = federation_sender.get_current_token
- self.update_function = federation_sender.get_replication_rows
+ @staticmethod
+ def _stub_current_token(instance_name: str) -> int:
+ # dummy current-token method for use on workers
+ return 0
- super(FederationStream, self).__init__(hs)
+ @staticmethod
+ async def _stub_update_function(instance_name, from_token, upto_token, limit):
+ return [], upto_token, False
|