summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--docs/tcp_replication.md4
-rw-r--r--synapse/federation/send_queue.py556
-rw-r--r--synapse/federation/sender/__init__.py8
-rw-r--r--synapse/replication/tcp/client.py61
-rw-r--r--synapse/replication/tcp/commands.py29
-rw-r--r--synapse/replication/tcp/handler.py22
-rw-r--r--synapse/replication/tcp/streams/__init__.py2
-rw-r--r--synapse/replication/tcp/streams/federation.py80
-rw-r--r--synapse/server.py7
-rw-r--r--tests/replication/tcp/streams/test_federation.py80
-rw-r--r--tests/replication/test_federation_ack.py73
11 files changed, 3 insertions, 919 deletions
diff --git a/docs/tcp_replication.md b/docs/tcp_replication.md
index 15df949deb..d894ad7aeb 100644
--- a/docs/tcp_replication.md
+++ b/docs/tcp_replication.md
@@ -216,10 +216,6 @@ Asks the server for the current position of all streams.
 
    This is used when a worker is shutting down.
 
-#### FEDERATION_ACK (C)
-
-   Acknowledge receipt of some federation data
-
 ### REMOTE_SERVER_UP (S, C)
 
    Inform other processes that a remote server may have come back online.
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
deleted file mode 100644
index ba4e8e2d37..0000000000
--- a/synapse/federation/send_queue.py
+++ /dev/null
@@ -1,556 +0,0 @@
-# Copyright 2014-2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""A federation sender that forwards things to be sent across replication to
-a worker process.
-
-It assumes there is a single worker process feeding off of it.
-
-Each row in the replication stream consists of a type and some json, where the
-types indicate whether they are presence, or edus, etc.
-
-Ephemeral or non-event data are queued up in-memory. When the worker requests
-updates since a particular point, all in-memory data since before that point is
-dropped. We also expire things in the queue after 5 minutes, to ensure that a
-dead worker doesn't cause the queues to grow limitlessly.
-
-Events are replicated via a separate events stream.
-"""
-
-import logging
-from collections import namedtuple
-from typing import (
-    TYPE_CHECKING,
-    Dict,
-    Hashable,
-    Iterable,
-    List,
-    Optional,
-    Sized,
-    Tuple,
-    Type,
-)
-
-from sortedcontainers import SortedDict
-
-from synapse.api.presence import UserPresenceState
-from synapse.federation.sender import AbstractFederationSender, FederationSender
-from synapse.metrics import LaterGauge
-from synapse.replication.tcp.streams.federation import FederationStream
-from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
-from synapse.util.metrics import Measure
-
-from .units import Edu
-
-if TYPE_CHECKING:
-    from synapse.server import HomeServer
-
-logger = logging.getLogger(__name__)
-
-
-class FederationRemoteSendQueue(AbstractFederationSender):
-    """A drop in replacement for FederationSender"""
-
-    def __init__(self, hs: "HomeServer"):
-        self.server_name = hs.hostname
-        self.clock = hs.get_clock()
-        self.notifier = hs.get_notifier()
-        self.is_mine_id = hs.is_mine_id
-
-        # We may have multiple federation sender instances, so we need to track
-        # their positions separately.
-        self._sender_instances = hs.config.worker.federation_shard_config.instances
-        self._sender_positions = {}  # type: Dict[str, int]
-
-        # Pending presence map user_id -> UserPresenceState
-        self.presence_map = {}  # type: Dict[str, UserPresenceState]
-
-        # Stream position -> list[user_id]
-        self.presence_changed = SortedDict()  # type: SortedDict[int, List[str]]
-
-        # Stores the destinations we need to explicitly send presence to about a
-        # given user.
-        # Stream position -> (user_id, destinations)
-        self.presence_destinations = (
-            SortedDict()
-        )  # type: SortedDict[int, Tuple[str, Iterable[str]]]
-
-        # (destination, key) -> EDU
-        self.keyed_edu = {}  # type: Dict[Tuple[str, tuple], Edu]
-
-        # stream position -> (destination, key)
-        self.keyed_edu_changed = (
-            SortedDict()
-        )  # type: SortedDict[int, Tuple[str, tuple]]
-
-        self.edus = SortedDict()  # type: SortedDict[int, Edu]
-
-        # stream ID for the next entry into presence_changed/keyed_edu_changed/edus.
-        self.pos = 1
-
-        # map from stream ID to the time that stream entry was generated, so that we
-        # can clear out entries after a while
-        self.pos_time = SortedDict()  # type: SortedDict[int, int]
-
-        # EVERYTHING IS SAD. In particular, python only makes new scopes when
-        # we make a new function, so we need to make a new function so the inner
-        # lambda binds to the queue rather than to the name of the queue which
-        # changes. ARGH.
-        def register(name: str, queue: Sized) -> None:
-            LaterGauge(
-                "synapse_federation_send_queue_%s_size" % (queue_name,),
-                "",
-                [],
-                lambda: len(queue),
-            )
-
-        for queue_name in [
-            "presence_map",
-            "presence_changed",
-            "keyed_edu",
-            "keyed_edu_changed",
-            "edus",
-            "pos_time",
-            "presence_destinations",
-        ]:
-            register(queue_name, getattr(self, queue_name))
-
-        self.clock.looping_call(self._clear_queue, 30 * 1000)
-
-    def _next_pos(self) -> int:
-        pos = self.pos
-        self.pos += 1
-        self.pos_time[self.clock.time_msec()] = pos
-        return pos
-
-    def _clear_queue(self) -> None:
-        """Clear the queues for anything older than N minutes"""
-
-        FIVE_MINUTES_AGO = 5 * 60 * 1000
-        now = self.clock.time_msec()
-
-        keys = self.pos_time.keys()
-        time = self.pos_time.bisect_left(now - FIVE_MINUTES_AGO)
-        if not keys[:time]:
-            return
-
-        position_to_delete = max(keys[:time])
-        for key in keys[:time]:
-            del self.pos_time[key]
-
-        self._clear_queue_before_pos(position_to_delete)
-
-    def _clear_queue_before_pos(self, position_to_delete: int) -> None:
-        """Clear all the queues from before a given position"""
-        with Measure(self.clock, "send_queue._clear"):
-            # Delete things out of presence maps
-            keys = self.presence_changed.keys()
-            i = self.presence_changed.bisect_left(position_to_delete)
-            for key in keys[:i]:
-                del self.presence_changed[key]
-
-            user_ids = {
-                user_id for uids in self.presence_changed.values() for user_id in uids
-            }
-
-            keys = self.presence_destinations.keys()
-            i = self.presence_destinations.bisect_left(position_to_delete)
-            for key in keys[:i]:
-                del self.presence_destinations[key]
-
-            user_ids.update(
-                user_id for user_id, _ in self.presence_destinations.values()
-            )
-
-            to_del = [
-                user_id for user_id in self.presence_map if user_id not in user_ids
-            ]
-            for user_id in to_del:
-                del self.presence_map[user_id]
-
-            # Delete things out of keyed edus
-            keys = self.keyed_edu_changed.keys()
-            i = self.keyed_edu_changed.bisect_left(position_to_delete)
-            for key in keys[:i]:
-                del self.keyed_edu_changed[key]
-
-            live_keys = set()
-            for edu_key in self.keyed_edu_changed.values():
-                live_keys.add(edu_key)
-
-            keys_to_del = [
-                edu_key for edu_key in self.keyed_edu if edu_key not in live_keys
-            ]
-            for edu_key in keys_to_del:
-                del self.keyed_edu[edu_key]
-
-            # Delete things out of edu map
-            keys = self.edus.keys()
-            i = self.edus.bisect_left(position_to_delete)
-            for key in keys[:i]:
-                del self.edus[key]
-
-    def notify_new_events(self, max_token: RoomStreamToken) -> None:
-        """As per FederationSender"""
-        # This should never get called.
-        raise NotImplementedError()
-
-    def build_and_send_edu(
-        self,
-        destination: str,
-        edu_type: str,
-        content: JsonDict,
-        key: Optional[Hashable] = None,
-    ) -> None:
-        """As per FederationSender"""
-        if destination == self.server_name:
-            logger.info("Not sending EDU to ourselves")
-            return
-
-        pos = self._next_pos()
-
-        edu = Edu(
-            origin=self.server_name,
-            destination=destination,
-            edu_type=edu_type,
-            content=content,
-        )
-
-        if key:
-            assert isinstance(key, tuple)
-            self.keyed_edu[(destination, key)] = edu
-            self.keyed_edu_changed[pos] = (destination, key)
-        else:
-            self.edus[pos] = edu
-
-        self.notifier.on_new_replication_data()
-
-    async def send_read_receipt(self, receipt: ReadReceipt) -> None:
-        """As per FederationSender
-
-        Args:
-            receipt:
-        """
-        # nothing to do here: the replication listener will handle it.
-
-    def send_presence_to_destinations(
-        self, states: Iterable[UserPresenceState], destinations: Iterable[str]
-    ) -> None:
-        """As per FederationSender
-
-        Args:
-            states
-            destinations
-        """
-        for state in states:
-            pos = self._next_pos()
-            self.presence_map.update({state.user_id: state for state in states})
-            self.presence_destinations[pos] = (state.user_id, destinations)
-
-        self.notifier.on_new_replication_data()
-
-    def send_device_messages(self, destination: str) -> None:
-        """As per FederationSender"""
-        # We don't need to replicate this as it gets sent down a different
-        # stream.
-
-    def wake_destination(self, server: str) -> None:
-        pass
-
-    def get_current_token(self) -> int:
-        return self.pos - 1
-
-    def federation_ack(self, instance_name: str, token: int) -> None:
-        if self._sender_instances:
-            # If we have configured multiple federation sender instances we need
-            # to track their positions separately, and only clear the queue up
-            # to the token all instances have acked.
-            self._sender_positions[instance_name] = token
-            token = min(self._sender_positions.values())
-
-        self._clear_queue_before_pos(token)
-
-    async def get_replication_rows(
-        self, instance_name: str, from_token: int, to_token: int, target_row_count: int
-    ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
-        """Get rows to be sent over federation between the two tokens
-
-        Args:
-            instance_name: the name of the current process
-            from_token: the previous stream token: the starting point for fetching the
-                updates
-            to_token: the new stream token: the point to get updates up to
-            target_row_count: a target for the number of rows to be returned.
-
-        Returns: a triplet `(updates, new_last_token, limited)`, where:
-           * `updates` is a list of `(token, row)` entries.
-           * `new_last_token` is the new position in stream.
-           * `limited` is whether there are more updates to fetch.
-        """
-        # TODO: Handle target_row_count.
-
-        # To handle restarts where we wrap around
-        if from_token > self.pos:
-            from_token = -1
-
-        # list of tuple(int, BaseFederationRow), where the first is the position
-        # of the federation stream.
-        rows = []  # type: List[Tuple[int, BaseFederationRow]]
-
-        # Fetch changed presence
-        i = self.presence_changed.bisect_right(from_token)
-        j = self.presence_changed.bisect_right(to_token) + 1
-        dest_user_ids = [
-            (pos, user_id)
-            for pos, user_id_list in self.presence_changed.items()[i:j]
-            for user_id in user_id_list
-        ]
-
-        for (key, user_id) in dest_user_ids:
-            rows.append((key, PresenceRow(state=self.presence_map[user_id])))
-
-        # Fetch presence to send to destinations
-        i = self.presence_destinations.bisect_right(from_token)
-        j = self.presence_destinations.bisect_right(to_token) + 1
-
-        for pos, (user_id, dests) in self.presence_destinations.items()[i:j]:
-            rows.append(
-                (
-                    pos,
-                    PresenceDestinationsRow(
-                        state=self.presence_map[user_id], destinations=list(dests)
-                    ),
-                )
-            )
-
-        # Fetch changes keyed edus
-        i = self.keyed_edu_changed.bisect_right(from_token)
-        j = self.keyed_edu_changed.bisect_right(to_token) + 1
-        # We purposefully clobber based on the key here, python dict comprehensions
-        # always use the last value, so this will correctly point to the last
-        # stream position.
-        keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]}
-
-        for ((destination, edu_key), pos) in keyed_edus.items():
-            rows.append(
-                (
-                    pos,
-                    KeyedEduRow(
-                        key=edu_key, edu=self.keyed_edu[(destination, edu_key)]
-                    ),
-                )
-            )
-
-        # Fetch changed edus
-        i = self.edus.bisect_right(from_token)
-        j = self.edus.bisect_right(to_token) + 1
-        edus = self.edus.items()[i:j]
-
-        for (pos, edu) in edus:
-            rows.append((pos, EduRow(edu)))
-
-        # Sort rows based on pos
-        rows.sort()
-
-        return (
-            [(pos, (row.TypeId, row.to_data())) for pos, row in rows],
-            to_token,
-            False,
-        )
-
-
-class BaseFederationRow:
-    """Base class for rows to be sent in the federation stream.
-
-    Specifies how to identify, serialize and deserialize the different types.
-    """
-
-    TypeId = ""  # Unique string that ids the type. Must be overridden in sub classes.
-
-    @staticmethod
-    def from_data(data):
-        """Parse the data from the federation stream into a row.
-
-        Args:
-            data: The value of ``data`` from FederationStreamRow.data, type
-                depends on the type of stream
-        """
-        raise NotImplementedError()
-
-    def to_data(self):
-        """Serialize this row to be sent over the federation stream.
-
-        Returns:
-            The value to be sent in FederationStreamRow.data. The type depends
-            on the type of stream.
-        """
-        raise NotImplementedError()
-
-    def add_to_buffer(self, buff):
-        """Add this row to the appropriate field in the buffer ready for this
-        to be sent over federation.
-
-        We use a buffer so that we can batch up events that have come in at
-        the same time and send them all at once.
-
-        Args:
-            buff (BufferedToSend)
-        """
-        raise NotImplementedError()
-
-
-class PresenceRow(
-    BaseFederationRow, namedtuple("PresenceRow", ("state",))  # UserPresenceState
-):
-    TypeId = "p"
-
-    @staticmethod
-    def from_data(data):
-        return PresenceRow(state=UserPresenceState.from_dict(data))
-
-    def to_data(self):
-        return self.state.as_dict()
-
-    def add_to_buffer(self, buff):
-        buff.presence.append(self.state)
-
-
-class PresenceDestinationsRow(
-    BaseFederationRow,
-    namedtuple(
-        "PresenceDestinationsRow",
-        ("state", "destinations"),  # UserPresenceState  # list[str]
-    ),
-):
-    TypeId = "pd"
-
-    @staticmethod
-    def from_data(data):
-        return PresenceDestinationsRow(
-            state=UserPresenceState.from_dict(data["state"]), destinations=data["dests"]
-        )
-
-    def to_data(self):
-        return {"state": self.state.as_dict(), "dests": self.destinations}
-
-    def add_to_buffer(self, buff):
-        buff.presence_destinations.append((self.state, self.destinations))
-
-
-class KeyedEduRow(
-    BaseFederationRow,
-    namedtuple(
-        "KeyedEduRow",
-        ("key", "edu"),  # tuple(str) - the edu key passed to send_edu  # Edu
-    ),
-):
-    """Streams EDUs that have an associated key that is ued to clobber. For example,
-    typing EDUs clobber based on room_id.
-    """
-
-    TypeId = "k"
-
-    @staticmethod
-    def from_data(data):
-        return KeyedEduRow(key=tuple(data["key"]), edu=Edu(**data["edu"]))
-
-    def to_data(self):
-        return {"key": self.key, "edu": self.edu.get_internal_dict()}
-
-    def add_to_buffer(self, buff):
-        buff.keyed_edus.setdefault(self.edu.destination, {})[self.key] = self.edu
-
-
-class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))):  # Edu
-    """Streams EDUs that don't have keys. See KeyedEduRow"""
-
-    TypeId = "e"
-
-    @staticmethod
-    def from_data(data):
-        return EduRow(Edu(**data))
-
-    def to_data(self):
-        return self.edu.get_internal_dict()
-
-    def add_to_buffer(self, buff):
-        buff.edus.setdefault(self.edu.destination, []).append(self.edu)
-
-
-_rowtypes = (
-    PresenceRow,
-    PresenceDestinationsRow,
-    KeyedEduRow,
-    EduRow,
-)  # type: Tuple[Type[BaseFederationRow], ...]
-
-TypeToRow = {Row.TypeId: Row for Row in _rowtypes}
-
-
-ParsedFederationStreamData = namedtuple(
-    "ParsedFederationStreamData",
-    (
-        "presence",  # list(UserPresenceState)
-        "presence_destinations",  # list of tuples of UserPresenceState and destinations
-        "keyed_edus",  # dict of destination -> { key -> Edu }
-        "edus",  # dict of destination -> [Edu]
-    ),
-)
-
-
-def process_rows_for_federation(
-    transaction_queue: FederationSender,
-    rows: List[FederationStream.FederationStreamRow],
-) -> None:
-    """Parse a list of rows from the federation stream and put them in the
-    transaction queue ready for sending to the relevant homeservers.
-
-    Args:
-        transaction_queue
-        rows
-    """
-
-    # The federation stream contains a bunch of different types of
-    # rows that need to be handled differently. We parse the rows, put
-    # them into the appropriate collection and then send them off.
-
-    buff = ParsedFederationStreamData(
-        presence=[],
-        presence_destinations=[],
-        keyed_edus={},
-        edus={},
-    )
-
-    # Parse the rows in the stream and add to the buffer
-    for row in rows:
-        if row.type not in TypeToRow:
-            logger.error("Unrecognized federation row type %r", row.type)
-            continue
-
-        RowType = TypeToRow[row.type]
-        parsed_row = RowType.from_data(row.data)
-        parsed_row.add_to_buffer(buff)
-
-    for state, destinations in buff.presence_destinations:
-        transaction_queue.send_presence_to_destinations(
-            states=[state], destinations=destinations
-        )
-
-    for destination, edu_map in buff.keyed_edus.items():
-        for key, edu in edu_map.items():
-            transaction_queue.send_edu(edu, key)
-
-    for destination, edu_list in buff.edus.items():
-        for edu in edu_list:
-            transaction_queue.send_edu(edu, None)
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 6266accaf5..cdb04ec0c4 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -124,10 +124,6 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
         raise NotImplementedError()
 
     @abc.abstractmethod
-    def federation_ack(self, instance_name: str, token: int) -> None:
-        raise NotImplementedError()
-
-    @abc.abstractmethod
     async def get_replication_rows(
         self, instance_name: str, from_token: int, to_token: int, target_row_count: int
     ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
@@ -635,10 +631,6 @@ class FederationSender(AbstractFederationSender):
         # to a worker.
         return 0
 
-    def federation_ack(self, instance_name: str, token: int) -> None:
-        # It is not expected that this gets called on FederationSender.
-        raise NotImplementedError()
-
     @staticmethod
     async def get_replication_rows(
         instance_name: str, from_token: int, to_token: int, target_row_count: int
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 874cb9c25e..1c3b6e1bc0 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -20,10 +20,8 @@ from twisted.internet.defer import Deferred
 from twisted.internet.protocol import ReconnectingClientFactory
 
 from synapse.api.constants import EventTypes
-from synapse.federation import send_queue
 from synapse.federation.sender import FederationSender
 from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
-from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
 from synapse.replication.tcp.streams import (
     AccountDataStream,
@@ -358,14 +356,8 @@ class FederationSenderHandler:
         self.federation_sender.wake_destination(server)
 
     async def process_replication_rows(self, stream_name, token, rows):
-        # The federation stream contains things that we want to send out, e.g.
-        # presence, typing, etc.
-        if stream_name == "federation":
-            send_queue.process_rows_for_federation(self.federation_sender, rows)
-            await self.update_token(token)
-
         # ... and when new receipts happen
-        elif stream_name == ReceiptsStream.NAME:
+        if stream_name == ReceiptsStream.NAME:
             await self._on_new_receipts(rows)
 
         # ... as well as device updates and messages
@@ -403,54 +395,3 @@ class FederationSenderHandler:
                 receipt.data,
             )
             await self.federation_sender.send_read_receipt(receipt_info)
-
-    async def update_token(self, token):
-        """Update the record of where we have processed to in the federation stream.
-
-        Called after we have processed a an update received over replication. Sends
-        a FEDERATION_ACK back to the master, and stores the token that we have processed
-         in `federation_stream_position` so that we can restart where we left off.
-        """
-        self.federation_position = token
-
-        # We save and send the ACK to master asynchronously, so we don't block
-        # processing on persistence. We don't need to do this operation for
-        # every single RDATA we receive, we just need to do it periodically.
-
-        if self._fed_position_linearizer.is_queued(None):
-            # There is already a task queued up to save and send the token, so
-            # no need to queue up another task.
-            return
-
-        run_as_background_process("_save_and_send_ack", self._save_and_send_ack)
-
-    async def _save_and_send_ack(self):
-        """Save the current federation position in the database and send an ACK
-        to master with where we're up to.
-        """
-        # We should only be calling this once we've got a token.
-        assert self.federation_position is not None
-
-        try:
-            # We linearize here to ensure we don't have races updating the token
-            #
-            # XXX this appears to be redundant, since the ReplicationCommandHandler
-            # has a linearizer which ensures that we only process one line of
-            # replication data at a time. Should we remove it, or is it doing useful
-            # service for robustness? Or could we replace it with an assertion that
-            # we're not being re-entered?
-
-            with (await self._fed_position_linearizer.queue(None)):
-                # We persist and ack the same position, so we take a copy of it
-                # here as otherwise it can get modified from underneath us.
-                current_position = self.federation_position
-
-                await self.store.update_federation_out_pos(
-                    "federation", current_position
-                )
-
-                # We ACK this token over replication so that the master can drop
-                # its in memory queues
-                self._hs.get_tcp_replication().send_federation_ack(current_position)
-        except Exception:
-            logger.exception("Error updating federation stream position")
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 505d450e19..ede8875439 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -297,33 +297,6 @@ class ClearUserSyncsCommand(Command):
         return self.instance_id
 
 
-class FederationAckCommand(Command):
-    """Sent by the client when it has processed up to a given point in the
-    federation stream. This allows the master to drop in-memory caches of the
-    federation stream.
-
-    This must only be sent from one worker (i.e. the one sending federation)
-
-    Format::
-
-        FEDERATION_ACK <instance_name> <token>
-    """
-
-    NAME = "FEDERATION_ACK"
-
-    def __init__(self, instance_name: str, token: int):
-        self.instance_name = instance_name
-        self.token = token
-
-    @classmethod
-    def from_line(cls, line: str) -> "FederationAckCommand":
-        instance_name, token = line.split(" ")
-        return cls(instance_name, int(token))
-
-    def to_line(self) -> str:
-        return "%s %s" % (self.instance_name, self.token)
-
-
 class UserIpCommand(Command):
     """Sent periodically when a worker sees activity from a client.
 
@@ -389,7 +362,6 @@ _COMMANDS = (
     NameCommand,
     ReplicateCommand,
     UserSyncCommand,
-    FederationAckCommand,
     UserIpCommand,
     RemoteServerUpCommand,
     ClearUserSyncsCommand,
@@ -415,7 +387,6 @@ VALID_CLIENT_COMMANDS = (
     PingCommand.NAME,
     UserSyncCommand.NAME,
     ClearUserSyncsCommand.NAME,
-    FederationAckCommand.NAME,
     UserIpCommand.NAME,
     ErrorCommand.NAME,
     RemoteServerUpCommand.NAME,
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 2ce1b9f222..21b59def6f 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -39,7 +39,6 @@ from synapse.replication.tcp.client import DirectTcpReplicationClientFactory
 from synapse.replication.tcp.commands import (
     ClearUserSyncsCommand,
     Command,
-    FederationAckCommand,
     PositionCommand,
     RdataCommand,
     RemoteServerUpCommand,
@@ -54,7 +53,6 @@ from synapse.replication.tcp.streams import (
     BackfillStream,
     CachesStream,
     EventsStream,
-    FederationStream,
     ReceiptsStream,
     Stream,
     TagAccountDataStream,
@@ -73,7 +71,6 @@ inbound_rdata_count = Counter(
     "synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"]
 )
 user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "")
-federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "")
 remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "")
 
 user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "")
@@ -157,11 +154,6 @@ class ReplicationCommandHandler:
             if hs.config.worker_app is not None:
                 continue
 
-            if stream.NAME == FederationStream.NAME and hs.config.send_federation:
-                # We only support federation stream if federation sending
-                # has been disabled on the master.
-                continue
-
             self._streams_to_replicate.append(stream)
 
         # Map of stream name to batched updates. See RdataCommand for info on
@@ -365,14 +357,6 @@ class ReplicationCommandHandler:
         else:
             return None
 
-    def on_FEDERATION_ACK(
-        self, conn: IReplicationConnection, cmd: FederationAckCommand
-    ):
-        federation_ack_counter.inc()
-
-        if self._federation_sender:
-            self._federation_sender.federation_ack(cmd.instance_name, cmd.token)
-
     def on_USER_IP(
         self, conn: IReplicationConnection, cmd: UserIpCommand
     ) -> Optional[Awaitable[None]]:
@@ -655,12 +639,6 @@ class ReplicationCommandHandler:
         else:
             logger.warning("Dropping command as not connected: %r", cmd.NAME)
 
-    def send_federation_ack(self, token: int):
-        """Ack data for the federation stream. This allows the master to drop
-        data stored purely in memory.
-        """
-        self.send_command(FederationAckCommand(self._instance_name, token))
-
     def send_user_sync(
         self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int
     ):
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index 4c0023c68a..a18c799a45 100644
--- a/synapse/replication/tcp/streams/__init__.py
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -43,7 +43,6 @@ from synapse.replication.tcp.streams._base import (
     UserSignatureStream,
 )
 from synapse.replication.tcp.streams.events import EventsStream
-from synapse.replication.tcp.streams.federation import FederationStream
 
 STREAMS_MAP = {
     stream.NAME: stream
@@ -60,7 +59,6 @@ STREAMS_MAP = {
         PublicRoomsStream,
         DeviceListsStream,
         ToDeviceStream,
-        FederationStream,
         TagAccountDataStream,
         AccountDataStream,
         GroupServerStream,
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
deleted file mode 100644
index 096a85d363..0000000000
--- a/synapse/replication/tcp/streams/federation.py
+++ /dev/null
@@ -1,80 +0,0 @@
-# Copyright 2017 Vector Creations Ltd
-# Copyright 2019 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from collections import namedtuple
-from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Tuple
-
-from synapse.replication.tcp.streams._base import (
-    Stream,
-    current_token_without_instance,
-    make_http_update_function,
-)
-
-if TYPE_CHECKING:
-    from synapse.server import HomeServer
-
-
-class FederationStream(Stream):
-    """Data to be sent over federation. Only available when master has federation
-    sending disabled.
-    """
-
-    FederationStreamRow = namedtuple(
-        "FederationStreamRow",
-        (
-            "type",  # str, the type of data as defined in the BaseFederationRows
-            "data",  # dict, serialization of a federation.send_queue.BaseFederationRow
-        ),
-    )
-
-    NAME = "federation"
-    ROW_TYPE = FederationStreamRow
-
-    def __init__(self, hs: "HomeServer"):
-        if hs.config.worker_app is None:
-            # master process: get updates from the FederationRemoteSendQueue.
-            # (if the master is configured to send federation itself, federation_sender
-            # will be a real FederationSender, which has stubs for current_token and
-            # get_replication_rows.)
-            federation_sender = hs.get_federation_sender()
-            current_token = current_token_without_instance(
-                federation_sender.get_current_token
-            )
-            update_function = (
-                federation_sender.get_replication_rows
-            )  # type: Callable[[str, int, int, int], Awaitable[Tuple[List[Tuple[int, Any]], int, bool]]]
-
-        elif hs.should_send_federation():
-            # federation sender: Query master process
-            update_function = make_http_update_function(hs, self.NAME)
-            current_token = self._stub_current_token
-
-        else:
-            # other worker: stub out the update function (we're not interested in
-            # any updates so when we get a POSITION we do nothing)
-            update_function = self._stub_update_function
-            current_token = self._stub_current_token
-
-        super().__init__(hs.get_instance_name(), current_token, update_function)
-
-    @staticmethod
-    def _stub_current_token(instance_name: str) -> int:
-        # dummy current-token method for use on workers
-        return 0
-
-    @staticmethod
-    async def _stub_update_function(
-        instance_name: str, from_token: int, upto_token: int, limit: int
-    ) -> Tuple[list, int, bool]:
-        return [], upto_token, False
diff --git a/synapse/server.py b/synapse/server.py
index 42d2fad8e8..eaeaec9d77 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -59,7 +59,6 @@ from synapse.federation.federation_server import (
     FederationHandlerRegistry,
     FederationServer,
 )
-from synapse.federation.send_queue import FederationRemoteSendQueue
 from synapse.federation.sender import AbstractFederationSender, FederationSender
 from synapse.federation.transport.client import TransportLayerClient
 from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer
@@ -580,13 +579,11 @@ class HomeServer(metaclass=abc.ABCMeta):
         return TransportLayerClient(self)
 
     @cache_in_self
-    def get_federation_sender(self) -> AbstractFederationSender:
+    def get_federation_sender(self) -> Optional[AbstractFederationSender]:
         if self.should_send_federation():
             return FederationSender(self)
-        elif not self.config.worker_app:
-            return FederationRemoteSendQueue(self)
         else:
-            raise Exception("Workers cannot send federation traffic")
+            return None
 
     @cache_in_self
     def get_receipts_handler(self) -> ReceiptsHandler:
diff --git a/tests/replication/tcp/streams/test_federation.py b/tests/replication/tcp/streams/test_federation.py
deleted file mode 100644
index ffec06a0d6..0000000000
--- a/tests/replication/tcp/streams/test_federation.py
+++ /dev/null
@@ -1,80 +0,0 @@
-# Copyright 2019 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from synapse.federation.send_queue import EduRow
-from synapse.replication.tcp.streams.federation import FederationStream
-
-from tests.replication._base import BaseStreamTestCase
-
-
-class FederationStreamTestCase(BaseStreamTestCase):
-    def _get_worker_hs_config(self) -> dict:
-        # enable federation sending on the worker
-        config = super()._get_worker_hs_config()
-        # TODO: make it so we don't need both of these
-        config["send_federation"] = False
-        config["worker_app"] = "synapse.app.federation_sender"
-        return config
-
-    def test_catchup(self):
-        """Basic test of catchup on reconnect
-
-        Makes sure that updates sent while we are offline are received later.
-        """
-        fed_sender = self.hs.get_federation_sender()
-        received_rows = self.test_handler.received_rdata_rows
-
-        fed_sender.build_and_send_edu("testdest", "m.test_edu", {"a": "b"})
-
-        self.reconnect()
-        self.reactor.advance(0)
-
-        # check we're testing what we think we are: no rows should yet have been
-        # received
-        self.assertEqual(received_rows, [])
-
-        # We should now see an attempt to connect to the master
-        request = self.handle_http_replication_attempt()
-        self.assert_request_is_get_repl_stream_updates(request, "federation")
-
-        # we should have received an update row
-        stream_name, token, row = received_rows.pop()
-        self.assertEqual(stream_name, "federation")
-        self.assertIsInstance(row, FederationStream.FederationStreamRow)
-        self.assertEqual(row.type, EduRow.TypeId)
-        edurow = EduRow.from_data(row.data)
-        self.assertEqual(edurow.edu.edu_type, "m.test_edu")
-        self.assertEqual(edurow.edu.origin, self.hs.hostname)
-        self.assertEqual(edurow.edu.destination, "testdest")
-        self.assertEqual(edurow.edu.content, {"a": "b"})
-
-        self.assertEqual(received_rows, [])
-
-        # additional updates should be transferred without an HTTP hit
-        fed_sender.build_and_send_edu("testdest", "m.test1", {"c": "d"})
-        self.reactor.advance(0)
-        # there should be no http hit
-        self.assertEqual(len(self.reactor.tcpClients), 0)
-        # ... but we should have a row
-        self.assertEqual(len(received_rows), 1)
-
-        stream_name, token, row = received_rows.pop()
-        self.assertEqual(stream_name, "federation")
-        self.assertIsInstance(row, FederationStream.FederationStreamRow)
-        self.assertEqual(row.type, EduRow.TypeId)
-        edurow = EduRow.from_data(row.data)
-        self.assertEqual(edurow.edu.edu_type, "m.test1")
-        self.assertEqual(edurow.edu.origin, self.hs.hostname)
-        self.assertEqual(edurow.edu.destination, "testdest")
-        self.assertEqual(edurow.edu.content, {"c": "d"})
diff --git a/tests/replication/test_federation_ack.py b/tests/replication/test_federation_ack.py
deleted file mode 100644
index 04a869e295..0000000000
--- a/tests/replication/test_federation_ack.py
+++ /dev/null
@@ -1,73 +0,0 @@
-# Copyright 2020 The Matrix.org Foundation C.I.C.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from unittest import mock
-
-from synapse.app.generic_worker import GenericWorkerServer
-from synapse.replication.tcp.commands import FederationAckCommand
-from synapse.replication.tcp.protocol import IReplicationConnection
-from synapse.replication.tcp.streams.federation import FederationStream
-
-from tests.unittest import HomeserverTestCase
-
-
-class FederationAckTestCase(HomeserverTestCase):
-    def default_config(self) -> dict:
-        config = super().default_config()
-        config["worker_app"] = "synapse.app.federation_sender"
-        config["send_federation"] = False
-        return config
-
-    def make_homeserver(self, reactor, clock):
-        hs = self.setup_test_homeserver(homeserver_to_use=GenericWorkerServer)
-
-        return hs
-
-    def test_federation_ack_sent(self):
-        """A FEDERATION_ACK should be sent back after each RDATA federation
-
-        This test checks that the federation sender is correctly sending back
-        FEDERATION_ACK messages. The test works by spinning up a federation_sender
-        worker server, and then fishing out its ReplicationCommandHandler. We wire
-        the RCH up to a mock connection (so that we can observe the command being sent)
-        and then poke in an RDATA row.
-
-        XXX: it might be nice to do this by pretending to be a synapse master worker
-        (or a redis server), and having the worker connect to us via a mocked-up TCP
-        transport, rather than assuming that the implementation has a
-        ReplicationCommandHandler.
-        """
-        rch = self.hs.get_tcp_replication()
-
-        # wire up the ReplicationCommandHandler to a mock connection, which needs
-        # to implement IReplicationConnection. (Note that Mock doesn't understand
-        # interfaces, but casing an interface to a list gives the attributes.)
-        mock_connection = mock.Mock(spec=list(IReplicationConnection))
-        rch.new_connection(mock_connection)
-
-        # tell it it received an RDATA row
-        self.get_success(
-            rch.on_rdata(
-                "federation",
-                "master",
-                token=10,
-                rows=[FederationStream.FederationStreamRow(type="x", data=[1, 2, 3])],
-            )
-        )
-
-        # now check that the FEDERATION_ACK was sent
-        mock_connection.send_command.assert_called_once()
-        cmd = mock_connection.send_command.call_args[0][0]
-        assert isinstance(cmd, FederationAckCommand)
-        self.assertEqual(cmd.token, 10)