diff --git a/docs/tcp_replication.md b/docs/tcp_replication.md
index 15df949deb..d894ad7aeb 100644
--- a/docs/tcp_replication.md
+++ b/docs/tcp_replication.md
@@ -216,10 +216,6 @@ Asks the server for the current position of all streams.
This is used when a worker is shutting down.
-#### FEDERATION_ACK (C)
-
- Acknowledge receipt of some federation data
-
### REMOTE_SERVER_UP (S, C)
Inform other processes that a remote server may have come back online.
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
deleted file mode 100644
index ba4e8e2d37..0000000000
--- a/synapse/federation/send_queue.py
+++ /dev/null
@@ -1,556 +0,0 @@
-# Copyright 2014-2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""A federation sender that forwards things to be sent across replication to
-a worker process.
-
-It assumes there is a single worker process feeding off of it.
-
-Each row in the replication stream consists of a type and some json, where the
-types indicate whether they are presence, or edus, etc.
-
-Ephemeral or non-event data are queued up in-memory. When the worker requests
-updates since a particular point, all in-memory data since before that point is
-dropped. We also expire things in the queue after 5 minutes, to ensure that a
-dead worker doesn't cause the queues to grow limitlessly.
-
-Events are replicated via a separate events stream.
-"""
-
-import logging
-from collections import namedtuple
-from typing import (
- TYPE_CHECKING,
- Dict,
- Hashable,
- Iterable,
- List,
- Optional,
- Sized,
- Tuple,
- Type,
-)
-
-from sortedcontainers import SortedDict
-
-from synapse.api.presence import UserPresenceState
-from synapse.federation.sender import AbstractFederationSender, FederationSender
-from synapse.metrics import LaterGauge
-from synapse.replication.tcp.streams.federation import FederationStream
-from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
-from synapse.util.metrics import Measure
-
-from .units import Edu
-
-if TYPE_CHECKING:
- from synapse.server import HomeServer
-
-logger = logging.getLogger(__name__)
-
-
-class FederationRemoteSendQueue(AbstractFederationSender):
- """A drop in replacement for FederationSender"""
-
- def __init__(self, hs: "HomeServer"):
- self.server_name = hs.hostname
- self.clock = hs.get_clock()
- self.notifier = hs.get_notifier()
- self.is_mine_id = hs.is_mine_id
-
- # We may have multiple federation sender instances, so we need to track
- # their positions separately.
- self._sender_instances = hs.config.worker.federation_shard_config.instances
- self._sender_positions = {} # type: Dict[str, int]
-
- # Pending presence map user_id -> UserPresenceState
- self.presence_map = {} # type: Dict[str, UserPresenceState]
-
- # Stream position -> list[user_id]
- self.presence_changed = SortedDict() # type: SortedDict[int, List[str]]
-
- # Stores the destinations we need to explicitly send presence to about a
- # given user.
- # Stream position -> (user_id, destinations)
- self.presence_destinations = (
- SortedDict()
- ) # type: SortedDict[int, Tuple[str, Iterable[str]]]
-
- # (destination, key) -> EDU
- self.keyed_edu = {} # type: Dict[Tuple[str, tuple], Edu]
-
- # stream position -> (destination, key)
- self.keyed_edu_changed = (
- SortedDict()
- ) # type: SortedDict[int, Tuple[str, tuple]]
-
- self.edus = SortedDict() # type: SortedDict[int, Edu]
-
- # stream ID for the next entry into presence_changed/keyed_edu_changed/edus.
- self.pos = 1
-
- # map from stream ID to the time that stream entry was generated, so that we
- # can clear out entries after a while
- self.pos_time = SortedDict() # type: SortedDict[int, int]
-
- # EVERYTHING IS SAD. In particular, python only makes new scopes when
- # we make a new function, so we need to make a new function so the inner
- # lambda binds to the queue rather than to the name of the queue which
- # changes. ARGH.
- def register(name: str, queue: Sized) -> None:
- LaterGauge(
- "synapse_federation_send_queue_%s_size" % (queue_name,),
- "",
- [],
- lambda: len(queue),
- )
-
- for queue_name in [
- "presence_map",
- "presence_changed",
- "keyed_edu",
- "keyed_edu_changed",
- "edus",
- "pos_time",
- "presence_destinations",
- ]:
- register(queue_name, getattr(self, queue_name))
-
- self.clock.looping_call(self._clear_queue, 30 * 1000)
-
- def _next_pos(self) -> int:
- pos = self.pos
- self.pos += 1
- self.pos_time[self.clock.time_msec()] = pos
- return pos
-
- def _clear_queue(self) -> None:
- """Clear the queues for anything older than N minutes"""
-
- FIVE_MINUTES_AGO = 5 * 60 * 1000
- now = self.clock.time_msec()
-
- keys = self.pos_time.keys()
- time = self.pos_time.bisect_left(now - FIVE_MINUTES_AGO)
- if not keys[:time]:
- return
-
- position_to_delete = max(keys[:time])
- for key in keys[:time]:
- del self.pos_time[key]
-
- self._clear_queue_before_pos(position_to_delete)
-
- def _clear_queue_before_pos(self, position_to_delete: int) -> None:
- """Clear all the queues from before a given position"""
- with Measure(self.clock, "send_queue._clear"):
- # Delete things out of presence maps
- keys = self.presence_changed.keys()
- i = self.presence_changed.bisect_left(position_to_delete)
- for key in keys[:i]:
- del self.presence_changed[key]
-
- user_ids = {
- user_id for uids in self.presence_changed.values() for user_id in uids
- }
-
- keys = self.presence_destinations.keys()
- i = self.presence_destinations.bisect_left(position_to_delete)
- for key in keys[:i]:
- del self.presence_destinations[key]
-
- user_ids.update(
- user_id for user_id, _ in self.presence_destinations.values()
- )
-
- to_del = [
- user_id for user_id in self.presence_map if user_id not in user_ids
- ]
- for user_id in to_del:
- del self.presence_map[user_id]
-
- # Delete things out of keyed edus
- keys = self.keyed_edu_changed.keys()
- i = self.keyed_edu_changed.bisect_left(position_to_delete)
- for key in keys[:i]:
- del self.keyed_edu_changed[key]
-
- live_keys = set()
- for edu_key in self.keyed_edu_changed.values():
- live_keys.add(edu_key)
-
- keys_to_del = [
- edu_key for edu_key in self.keyed_edu if edu_key not in live_keys
- ]
- for edu_key in keys_to_del:
- del self.keyed_edu[edu_key]
-
- # Delete things out of edu map
- keys = self.edus.keys()
- i = self.edus.bisect_left(position_to_delete)
- for key in keys[:i]:
- del self.edus[key]
-
- def notify_new_events(self, max_token: RoomStreamToken) -> None:
- """As per FederationSender"""
- # This should never get called.
- raise NotImplementedError()
-
- def build_and_send_edu(
- self,
- destination: str,
- edu_type: str,
- content: JsonDict,
- key: Optional[Hashable] = None,
- ) -> None:
- """As per FederationSender"""
- if destination == self.server_name:
- logger.info("Not sending EDU to ourselves")
- return
-
- pos = self._next_pos()
-
- edu = Edu(
- origin=self.server_name,
- destination=destination,
- edu_type=edu_type,
- content=content,
- )
-
- if key:
- assert isinstance(key, tuple)
- self.keyed_edu[(destination, key)] = edu
- self.keyed_edu_changed[pos] = (destination, key)
- else:
- self.edus[pos] = edu
-
- self.notifier.on_new_replication_data()
-
- async def send_read_receipt(self, receipt: ReadReceipt) -> None:
- """As per FederationSender
-
- Args:
- receipt:
- """
- # nothing to do here: the replication listener will handle it.
-
- def send_presence_to_destinations(
- self, states: Iterable[UserPresenceState], destinations: Iterable[str]
- ) -> None:
- """As per FederationSender
-
- Args:
- states
- destinations
- """
- for state in states:
- pos = self._next_pos()
- self.presence_map.update({state.user_id: state for state in states})
- self.presence_destinations[pos] = (state.user_id, destinations)
-
- self.notifier.on_new_replication_data()
-
- def send_device_messages(self, destination: str) -> None:
- """As per FederationSender"""
- # We don't need to replicate this as it gets sent down a different
- # stream.
-
- def wake_destination(self, server: str) -> None:
- pass
-
- def get_current_token(self) -> int:
- return self.pos - 1
-
- def federation_ack(self, instance_name: str, token: int) -> None:
- if self._sender_instances:
- # If we have configured multiple federation sender instances we need
- # to track their positions separately, and only clear the queue up
- # to the token all instances have acked.
- self._sender_positions[instance_name] = token
- token = min(self._sender_positions.values())
-
- self._clear_queue_before_pos(token)
-
- async def get_replication_rows(
- self, instance_name: str, from_token: int, to_token: int, target_row_count: int
- ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
- """Get rows to be sent over federation between the two tokens
-
- Args:
- instance_name: the name of the current process
- from_token: the previous stream token: the starting point for fetching the
- updates
- to_token: the new stream token: the point to get updates up to
- target_row_count: a target for the number of rows to be returned.
-
- Returns: a triplet `(updates, new_last_token, limited)`, where:
- * `updates` is a list of `(token, row)` entries.
- * `new_last_token` is the new position in stream.
- * `limited` is whether there are more updates to fetch.
- """
- # TODO: Handle target_row_count.
-
- # To handle restarts where we wrap around
- if from_token > self.pos:
- from_token = -1
-
- # list of tuple(int, BaseFederationRow), where the first is the position
- # of the federation stream.
- rows = [] # type: List[Tuple[int, BaseFederationRow]]
-
- # Fetch changed presence
- i = self.presence_changed.bisect_right(from_token)
- j = self.presence_changed.bisect_right(to_token) + 1
- dest_user_ids = [
- (pos, user_id)
- for pos, user_id_list in self.presence_changed.items()[i:j]
- for user_id in user_id_list
- ]
-
- for (key, user_id) in dest_user_ids:
- rows.append((key, PresenceRow(state=self.presence_map[user_id])))
-
- # Fetch presence to send to destinations
- i = self.presence_destinations.bisect_right(from_token)
- j = self.presence_destinations.bisect_right(to_token) + 1
-
- for pos, (user_id, dests) in self.presence_destinations.items()[i:j]:
- rows.append(
- (
- pos,
- PresenceDestinationsRow(
- state=self.presence_map[user_id], destinations=list(dests)
- ),
- )
- )
-
- # Fetch changes keyed edus
- i = self.keyed_edu_changed.bisect_right(from_token)
- j = self.keyed_edu_changed.bisect_right(to_token) + 1
- # We purposefully clobber based on the key here, python dict comprehensions
- # always use the last value, so this will correctly point to the last
- # stream position.
- keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]}
-
- for ((destination, edu_key), pos) in keyed_edus.items():
- rows.append(
- (
- pos,
- KeyedEduRow(
- key=edu_key, edu=self.keyed_edu[(destination, edu_key)]
- ),
- )
- )
-
- # Fetch changed edus
- i = self.edus.bisect_right(from_token)
- j = self.edus.bisect_right(to_token) + 1
- edus = self.edus.items()[i:j]
-
- for (pos, edu) in edus:
- rows.append((pos, EduRow(edu)))
-
- # Sort rows based on pos
- rows.sort()
-
- return (
- [(pos, (row.TypeId, row.to_data())) for pos, row in rows],
- to_token,
- False,
- )
-
-
-class BaseFederationRow:
- """Base class for rows to be sent in the federation stream.
-
- Specifies how to identify, serialize and deserialize the different types.
- """
-
- TypeId = "" # Unique string that ids the type. Must be overridden in sub classes.
-
- @staticmethod
- def from_data(data):
- """Parse the data from the federation stream into a row.
-
- Args:
- data: The value of ``data`` from FederationStreamRow.data, type
- depends on the type of stream
- """
- raise NotImplementedError()
-
- def to_data(self):
- """Serialize this row to be sent over the federation stream.
-
- Returns:
- The value to be sent in FederationStreamRow.data. The type depends
- on the type of stream.
- """
- raise NotImplementedError()
-
- def add_to_buffer(self, buff):
- """Add this row to the appropriate field in the buffer ready for this
- to be sent over federation.
-
- We use a buffer so that we can batch up events that have come in at
- the same time and send them all at once.
-
- Args:
- buff (BufferedToSend)
- """
- raise NotImplementedError()
-
-
-class PresenceRow(
- BaseFederationRow, namedtuple("PresenceRow", ("state",)) # UserPresenceState
-):
- TypeId = "p"
-
- @staticmethod
- def from_data(data):
- return PresenceRow(state=UserPresenceState.from_dict(data))
-
- def to_data(self):
- return self.state.as_dict()
-
- def add_to_buffer(self, buff):
- buff.presence.append(self.state)
-
-
-class PresenceDestinationsRow(
- BaseFederationRow,
- namedtuple(
- "PresenceDestinationsRow",
- ("state", "destinations"), # UserPresenceState # list[str]
- ),
-):
- TypeId = "pd"
-
- @staticmethod
- def from_data(data):
- return PresenceDestinationsRow(
- state=UserPresenceState.from_dict(data["state"]), destinations=data["dests"]
- )
-
- def to_data(self):
- return {"state": self.state.as_dict(), "dests": self.destinations}
-
- def add_to_buffer(self, buff):
- buff.presence_destinations.append((self.state, self.destinations))
-
-
-class KeyedEduRow(
- BaseFederationRow,
- namedtuple(
- "KeyedEduRow",
- ("key", "edu"), # tuple(str) - the edu key passed to send_edu # Edu
- ),
-):
- """Streams EDUs that have an associated key that is ued to clobber. For example,
- typing EDUs clobber based on room_id.
- """
-
- TypeId = "k"
-
- @staticmethod
- def from_data(data):
- return KeyedEduRow(key=tuple(data["key"]), edu=Edu(**data["edu"]))
-
- def to_data(self):
- return {"key": self.key, "edu": self.edu.get_internal_dict()}
-
- def add_to_buffer(self, buff):
- buff.keyed_edus.setdefault(self.edu.destination, {})[self.key] = self.edu
-
-
-class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu
- """Streams EDUs that don't have keys. See KeyedEduRow"""
-
- TypeId = "e"
-
- @staticmethod
- def from_data(data):
- return EduRow(Edu(**data))
-
- def to_data(self):
- return self.edu.get_internal_dict()
-
- def add_to_buffer(self, buff):
- buff.edus.setdefault(self.edu.destination, []).append(self.edu)
-
-
-_rowtypes = (
- PresenceRow,
- PresenceDestinationsRow,
- KeyedEduRow,
- EduRow,
-) # type: Tuple[Type[BaseFederationRow], ...]
-
-TypeToRow = {Row.TypeId: Row for Row in _rowtypes}
-
-
-ParsedFederationStreamData = namedtuple(
- "ParsedFederationStreamData",
- (
- "presence", # list(UserPresenceState)
- "presence_destinations", # list of tuples of UserPresenceState and destinations
- "keyed_edus", # dict of destination -> { key -> Edu }
- "edus", # dict of destination -> [Edu]
- ),
-)
-
-
-def process_rows_for_federation(
- transaction_queue: FederationSender,
- rows: List[FederationStream.FederationStreamRow],
-) -> None:
- """Parse a list of rows from the federation stream and put them in the
- transaction queue ready for sending to the relevant homeservers.
-
- Args:
- transaction_queue
- rows
- """
-
- # The federation stream contains a bunch of different types of
- # rows that need to be handled differently. We parse the rows, put
- # them into the appropriate collection and then send them off.
-
- buff = ParsedFederationStreamData(
- presence=[],
- presence_destinations=[],
- keyed_edus={},
- edus={},
- )
-
- # Parse the rows in the stream and add to the buffer
- for row in rows:
- if row.type not in TypeToRow:
- logger.error("Unrecognized federation row type %r", row.type)
- continue
-
- RowType = TypeToRow[row.type]
- parsed_row = RowType.from_data(row.data)
- parsed_row.add_to_buffer(buff)
-
- for state, destinations in buff.presence_destinations:
- transaction_queue.send_presence_to_destinations(
- states=[state], destinations=destinations
- )
-
- for destination, edu_map in buff.keyed_edus.items():
- for key, edu in edu_map.items():
- transaction_queue.send_edu(edu, key)
-
- for destination, edu_list in buff.edus.items():
- for edu in edu_list:
- transaction_queue.send_edu(edu, None)
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 6266accaf5..cdb04ec0c4 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -124,10 +124,6 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
raise NotImplementedError()
@abc.abstractmethod
- def federation_ack(self, instance_name: str, token: int) -> None:
- raise NotImplementedError()
-
- @abc.abstractmethod
async def get_replication_rows(
self, instance_name: str, from_token: int, to_token: int, target_row_count: int
) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
@@ -635,10 +631,6 @@ class FederationSender(AbstractFederationSender):
# to a worker.
return 0
- def federation_ack(self, instance_name: str, token: int) -> None:
- # It is not expected that this gets called on FederationSender.
- raise NotImplementedError()
-
@staticmethod
async def get_replication_rows(
instance_name: str, from_token: int, to_token: int, target_row_count: int
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 874cb9c25e..1c3b6e1bc0 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -20,10 +20,8 @@ from twisted.internet.defer import Deferred
from twisted.internet.protocol import ReconnectingClientFactory
from synapse.api.constants import EventTypes
-from synapse.federation import send_queue
from synapse.federation.sender import FederationSender
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
-from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
from synapse.replication.tcp.streams import (
AccountDataStream,
@@ -358,14 +356,8 @@ class FederationSenderHandler:
self.federation_sender.wake_destination(server)
async def process_replication_rows(self, stream_name, token, rows):
- # The federation stream contains things that we want to send out, e.g.
- # presence, typing, etc.
- if stream_name == "federation":
- send_queue.process_rows_for_federation(self.federation_sender, rows)
- await self.update_token(token)
-
# ... and when new receipts happen
- elif stream_name == ReceiptsStream.NAME:
+ if stream_name == ReceiptsStream.NAME:
await self._on_new_receipts(rows)
# ... as well as device updates and messages
@@ -403,54 +395,3 @@ class FederationSenderHandler:
receipt.data,
)
await self.federation_sender.send_read_receipt(receipt_info)
-
- async def update_token(self, token):
- """Update the record of where we have processed to in the federation stream.
-
- Called after we have processed a an update received over replication. Sends
- a FEDERATION_ACK back to the master, and stores the token that we have processed
- in `federation_stream_position` so that we can restart where we left off.
- """
- self.federation_position = token
-
- # We save and send the ACK to master asynchronously, so we don't block
- # processing on persistence. We don't need to do this operation for
- # every single RDATA we receive, we just need to do it periodically.
-
- if self._fed_position_linearizer.is_queued(None):
- # There is already a task queued up to save and send the token, so
- # no need to queue up another task.
- return
-
- run_as_background_process("_save_and_send_ack", self._save_and_send_ack)
-
- async def _save_and_send_ack(self):
- """Save the current federation position in the database and send an ACK
- to master with where we're up to.
- """
- # We should only be calling this once we've got a token.
- assert self.federation_position is not None
-
- try:
- # We linearize here to ensure we don't have races updating the token
- #
- # XXX this appears to be redundant, since the ReplicationCommandHandler
- # has a linearizer which ensures that we only process one line of
- # replication data at a time. Should we remove it, or is it doing useful
- # service for robustness? Or could we replace it with an assertion that
- # we're not being re-entered?
-
- with (await self._fed_position_linearizer.queue(None)):
- # We persist and ack the same position, so we take a copy of it
- # here as otherwise it can get modified from underneath us.
- current_position = self.federation_position
-
- await self.store.update_federation_out_pos(
- "federation", current_position
- )
-
- # We ACK this token over replication so that the master can drop
- # its in memory queues
- self._hs.get_tcp_replication().send_federation_ack(current_position)
- except Exception:
- logger.exception("Error updating federation stream position")
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 505d450e19..ede8875439 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -297,33 +297,6 @@ class ClearUserSyncsCommand(Command):
return self.instance_id
-class FederationAckCommand(Command):
- """Sent by the client when it has processed up to a given point in the
- federation stream. This allows the master to drop in-memory caches of the
- federation stream.
-
- This must only be sent from one worker (i.e. the one sending federation)
-
- Format::
-
- FEDERATION_ACK <instance_name> <token>
- """
-
- NAME = "FEDERATION_ACK"
-
- def __init__(self, instance_name: str, token: int):
- self.instance_name = instance_name
- self.token = token
-
- @classmethod
- def from_line(cls, line: str) -> "FederationAckCommand":
- instance_name, token = line.split(" ")
- return cls(instance_name, int(token))
-
- def to_line(self) -> str:
- return "%s %s" % (self.instance_name, self.token)
-
-
class UserIpCommand(Command):
"""Sent periodically when a worker sees activity from a client.
@@ -389,7 +362,6 @@ _COMMANDS = (
NameCommand,
ReplicateCommand,
UserSyncCommand,
- FederationAckCommand,
UserIpCommand,
RemoteServerUpCommand,
ClearUserSyncsCommand,
@@ -415,7 +387,6 @@ VALID_CLIENT_COMMANDS = (
PingCommand.NAME,
UserSyncCommand.NAME,
ClearUserSyncsCommand.NAME,
- FederationAckCommand.NAME,
UserIpCommand.NAME,
ErrorCommand.NAME,
RemoteServerUpCommand.NAME,
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 2ce1b9f222..21b59def6f 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -39,7 +39,6 @@ from synapse.replication.tcp.client import DirectTcpReplicationClientFactory
from synapse.replication.tcp.commands import (
ClearUserSyncsCommand,
Command,
- FederationAckCommand,
PositionCommand,
RdataCommand,
RemoteServerUpCommand,
@@ -54,7 +53,6 @@ from synapse.replication.tcp.streams import (
BackfillStream,
CachesStream,
EventsStream,
- FederationStream,
ReceiptsStream,
Stream,
TagAccountDataStream,
@@ -73,7 +71,6 @@ inbound_rdata_count = Counter(
"synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"]
)
user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "")
-federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "")
remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "")
user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "")
@@ -157,11 +154,6 @@ class ReplicationCommandHandler:
if hs.config.worker_app is not None:
continue
- if stream.NAME == FederationStream.NAME and hs.config.send_federation:
- # We only support federation stream if federation sending
- # has been disabled on the master.
- continue
-
self._streams_to_replicate.append(stream)
# Map of stream name to batched updates. See RdataCommand for info on
@@ -365,14 +357,6 @@ class ReplicationCommandHandler:
else:
return None
- def on_FEDERATION_ACK(
- self, conn: IReplicationConnection, cmd: FederationAckCommand
- ):
- federation_ack_counter.inc()
-
- if self._federation_sender:
- self._federation_sender.federation_ack(cmd.instance_name, cmd.token)
-
def on_USER_IP(
self, conn: IReplicationConnection, cmd: UserIpCommand
) -> Optional[Awaitable[None]]:
@@ -655,12 +639,6 @@ class ReplicationCommandHandler:
else:
logger.warning("Dropping command as not connected: %r", cmd.NAME)
- def send_federation_ack(self, token: int):
- """Ack data for the federation stream. This allows the master to drop
- data stored purely in memory.
- """
- self.send_command(FederationAckCommand(self._instance_name, token))
-
def send_user_sync(
self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int
):
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index 4c0023c68a..a18c799a45 100644
--- a/synapse/replication/tcp/streams/__init__.py
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -43,7 +43,6 @@ from synapse.replication.tcp.streams._base import (
UserSignatureStream,
)
from synapse.replication.tcp.streams.events import EventsStream
-from synapse.replication.tcp.streams.federation import FederationStream
STREAMS_MAP = {
stream.NAME: stream
@@ -60,7 +59,6 @@ STREAMS_MAP = {
PublicRoomsStream,
DeviceListsStream,
ToDeviceStream,
- FederationStream,
TagAccountDataStream,
AccountDataStream,
GroupServerStream,
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
deleted file mode 100644
index 096a85d363..0000000000
--- a/synapse/replication/tcp/streams/federation.py
+++ /dev/null
@@ -1,80 +0,0 @@
-# Copyright 2017 Vector Creations Ltd
-# Copyright 2019 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from collections import namedtuple
-from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Tuple
-
-from synapse.replication.tcp.streams._base import (
- Stream,
- current_token_without_instance,
- make_http_update_function,
-)
-
-if TYPE_CHECKING:
- from synapse.server import HomeServer
-
-
-class FederationStream(Stream):
- """Data to be sent over federation. Only available when master has federation
- sending disabled.
- """
-
- FederationStreamRow = namedtuple(
- "FederationStreamRow",
- (
- "type", # str, the type of data as defined in the BaseFederationRows
- "data", # dict, serialization of a federation.send_queue.BaseFederationRow
- ),
- )
-
- NAME = "federation"
- ROW_TYPE = FederationStreamRow
-
- def __init__(self, hs: "HomeServer"):
- if hs.config.worker_app is None:
- # master process: get updates from the FederationRemoteSendQueue.
- # (if the master is configured to send federation itself, federation_sender
- # will be a real FederationSender, which has stubs for current_token and
- # get_replication_rows.)
- federation_sender = hs.get_federation_sender()
- current_token = current_token_without_instance(
- federation_sender.get_current_token
- )
- update_function = (
- federation_sender.get_replication_rows
- ) # type: Callable[[str, int, int, int], Awaitable[Tuple[List[Tuple[int, Any]], int, bool]]]
-
- elif hs.should_send_federation():
- # federation sender: Query master process
- update_function = make_http_update_function(hs, self.NAME)
- current_token = self._stub_current_token
-
- else:
- # other worker: stub out the update function (we're not interested in
- # any updates so when we get a POSITION we do nothing)
- update_function = self._stub_update_function
- current_token = self._stub_current_token
-
- super().__init__(hs.get_instance_name(), current_token, update_function)
-
- @staticmethod
- def _stub_current_token(instance_name: str) -> int:
- # dummy current-token method for use on workers
- return 0
-
- @staticmethod
- async def _stub_update_function(
- instance_name: str, from_token: int, upto_token: int, limit: int
- ) -> Tuple[list, int, bool]:
- return [], upto_token, False
diff --git a/synapse/server.py b/synapse/server.py
index 42d2fad8e8..eaeaec9d77 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -59,7 +59,6 @@ from synapse.federation.federation_server import (
FederationHandlerRegistry,
FederationServer,
)
-from synapse.federation.send_queue import FederationRemoteSendQueue
from synapse.federation.sender import AbstractFederationSender, FederationSender
from synapse.federation.transport.client import TransportLayerClient
from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer
@@ -580,13 +579,11 @@ class HomeServer(metaclass=abc.ABCMeta):
return TransportLayerClient(self)
@cache_in_self
- def get_federation_sender(self) -> AbstractFederationSender:
+ def get_federation_sender(self) -> Optional[AbstractFederationSender]:
if self.should_send_federation():
return FederationSender(self)
- elif not self.config.worker_app:
- return FederationRemoteSendQueue(self)
else:
- raise Exception("Workers cannot send federation traffic")
+ return None
@cache_in_self
def get_receipts_handler(self) -> ReceiptsHandler:
diff --git a/tests/replication/tcp/streams/test_federation.py b/tests/replication/tcp/streams/test_federation.py
deleted file mode 100644
index ffec06a0d6..0000000000
--- a/tests/replication/tcp/streams/test_federation.py
+++ /dev/null
@@ -1,80 +0,0 @@
-# Copyright 2019 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from synapse.federation.send_queue import EduRow
-from synapse.replication.tcp.streams.federation import FederationStream
-
-from tests.replication._base import BaseStreamTestCase
-
-
-class FederationStreamTestCase(BaseStreamTestCase):
- def _get_worker_hs_config(self) -> dict:
- # enable federation sending on the worker
- config = super()._get_worker_hs_config()
- # TODO: make it so we don't need both of these
- config["send_federation"] = False
- config["worker_app"] = "synapse.app.federation_sender"
- return config
-
- def test_catchup(self):
- """Basic test of catchup on reconnect
-
- Makes sure that updates sent while we are offline are received later.
- """
- fed_sender = self.hs.get_federation_sender()
- received_rows = self.test_handler.received_rdata_rows
-
- fed_sender.build_and_send_edu("testdest", "m.test_edu", {"a": "b"})
-
- self.reconnect()
- self.reactor.advance(0)
-
- # check we're testing what we think we are: no rows should yet have been
- # received
- self.assertEqual(received_rows, [])
-
- # We should now see an attempt to connect to the master
- request = self.handle_http_replication_attempt()
- self.assert_request_is_get_repl_stream_updates(request, "federation")
-
- # we should have received an update row
- stream_name, token, row = received_rows.pop()
- self.assertEqual(stream_name, "federation")
- self.assertIsInstance(row, FederationStream.FederationStreamRow)
- self.assertEqual(row.type, EduRow.TypeId)
- edurow = EduRow.from_data(row.data)
- self.assertEqual(edurow.edu.edu_type, "m.test_edu")
- self.assertEqual(edurow.edu.origin, self.hs.hostname)
- self.assertEqual(edurow.edu.destination, "testdest")
- self.assertEqual(edurow.edu.content, {"a": "b"})
-
- self.assertEqual(received_rows, [])
-
- # additional updates should be transferred without an HTTP hit
- fed_sender.build_and_send_edu("testdest", "m.test1", {"c": "d"})
- self.reactor.advance(0)
- # there should be no http hit
- self.assertEqual(len(self.reactor.tcpClients), 0)
- # ... but we should have a row
- self.assertEqual(len(received_rows), 1)
-
- stream_name, token, row = received_rows.pop()
- self.assertEqual(stream_name, "federation")
- self.assertIsInstance(row, FederationStream.FederationStreamRow)
- self.assertEqual(row.type, EduRow.TypeId)
- edurow = EduRow.from_data(row.data)
- self.assertEqual(edurow.edu.edu_type, "m.test1")
- self.assertEqual(edurow.edu.origin, self.hs.hostname)
- self.assertEqual(edurow.edu.destination, "testdest")
- self.assertEqual(edurow.edu.content, {"c": "d"})
diff --git a/tests/replication/test_federation_ack.py b/tests/replication/test_federation_ack.py
deleted file mode 100644
index 04a869e295..0000000000
--- a/tests/replication/test_federation_ack.py
+++ /dev/null
@@ -1,73 +0,0 @@
-# Copyright 2020 The Matrix.org Foundation C.I.C.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from unittest import mock
-
-from synapse.app.generic_worker import GenericWorkerServer
-from synapse.replication.tcp.commands import FederationAckCommand
-from synapse.replication.tcp.protocol import IReplicationConnection
-from synapse.replication.tcp.streams.federation import FederationStream
-
-from tests.unittest import HomeserverTestCase
-
-
-class FederationAckTestCase(HomeserverTestCase):
- def default_config(self) -> dict:
- config = super().default_config()
- config["worker_app"] = "synapse.app.federation_sender"
- config["send_federation"] = False
- return config
-
- def make_homeserver(self, reactor, clock):
- hs = self.setup_test_homeserver(homeserver_to_use=GenericWorkerServer)
-
- return hs
-
- def test_federation_ack_sent(self):
- """A FEDERATION_ACK should be sent back after each RDATA federation
-
- This test checks that the federation sender is correctly sending back
- FEDERATION_ACK messages. The test works by spinning up a federation_sender
- worker server, and then fishing out its ReplicationCommandHandler. We wire
- the RCH up to a mock connection (so that we can observe the command being sent)
- and then poke in an RDATA row.
-
- XXX: it might be nice to do this by pretending to be a synapse master worker
- (or a redis server), and having the worker connect to us via a mocked-up TCP
- transport, rather than assuming that the implementation has a
- ReplicationCommandHandler.
- """
- rch = self.hs.get_tcp_replication()
-
- # wire up the ReplicationCommandHandler to a mock connection, which needs
- # to implement IReplicationConnection. (Note that Mock doesn't understand
- # interfaces, but casing an interface to a list gives the attributes.)
- mock_connection = mock.Mock(spec=list(IReplicationConnection))
- rch.new_connection(mock_connection)
-
- # tell it it received an RDATA row
- self.get_success(
- rch.on_rdata(
- "federation",
- "master",
- token=10,
- rows=[FederationStream.FederationStreamRow(type="x", data=[1, 2, 3])],
- )
- )
-
- # now check that the FEDERATION_ACK was sent
- mock_connection.send_command.assert_called_once()
- cmd = mock_connection.send_command.call_args[0][0]
- assert isinstance(cmd, FederationAckCommand)
- self.assertEqual(cmd.token, 10)
|