diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/federation/send_queue.py | 556 | ||||
-rw-r--r-- | synapse/federation/sender/__init__.py | 8 | ||||
-rw-r--r-- | synapse/replication/tcp/client.py | 61 | ||||
-rw-r--r-- | synapse/replication/tcp/commands.py | 29 | ||||
-rw-r--r-- | synapse/replication/tcp/handler.py | 22 | ||||
-rw-r--r-- | synapse/replication/tcp/streams/__init__.py | 2 | ||||
-rw-r--r-- | synapse/replication/tcp/streams/federation.py | 80 | ||||
-rw-r--r-- | synapse/server.py | 7 |
8 files changed, 3 insertions, 762 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py deleted file mode 100644 index ba4e8e2d37..0000000000 --- a/synapse/federation/send_queue.py +++ /dev/null @@ -1,556 +0,0 @@ -# Copyright 2014-2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""A federation sender that forwards things to be sent across replication to -a worker process. - -It assumes there is a single worker process feeding off of it. - -Each row in the replication stream consists of a type and some json, where the -types indicate whether they are presence, or edus, etc. - -Ephemeral or non-event data are queued up in-memory. When the worker requests -updates since a particular point, all in-memory data since before that point is -dropped. We also expire things in the queue after 5 minutes, to ensure that a -dead worker doesn't cause the queues to grow limitlessly. - -Events are replicated via a separate events stream. -""" - -import logging -from collections import namedtuple -from typing import ( - TYPE_CHECKING, - Dict, - Hashable, - Iterable, - List, - Optional, - Sized, - Tuple, - Type, -) - -from sortedcontainers import SortedDict - -from synapse.api.presence import UserPresenceState -from synapse.federation.sender import AbstractFederationSender, FederationSender -from synapse.metrics import LaterGauge -from synapse.replication.tcp.streams.federation import FederationStream -from synapse.types import JsonDict, ReadReceipt, RoomStreamToken -from synapse.util.metrics import Measure - -from .units import Edu - -if TYPE_CHECKING: - from synapse.server import HomeServer - -logger = logging.getLogger(__name__) - - -class FederationRemoteSendQueue(AbstractFederationSender): - """A drop in replacement for FederationSender""" - - def __init__(self, hs: "HomeServer"): - self.server_name = hs.hostname - self.clock = hs.get_clock() - self.notifier = hs.get_notifier() - self.is_mine_id = hs.is_mine_id - - # We may have multiple federation sender instances, so we need to track - # their positions separately. - self._sender_instances = hs.config.worker.federation_shard_config.instances - self._sender_positions = {} # type: Dict[str, int] - - # Pending presence map user_id -> UserPresenceState - self.presence_map = {} # type: Dict[str, UserPresenceState] - - # Stream position -> list[user_id] - self.presence_changed = SortedDict() # type: SortedDict[int, List[str]] - - # Stores the destinations we need to explicitly send presence to about a - # given user. - # Stream position -> (user_id, destinations) - self.presence_destinations = ( - SortedDict() - ) # type: SortedDict[int, Tuple[str, Iterable[str]]] - - # (destination, key) -> EDU - self.keyed_edu = {} # type: Dict[Tuple[str, tuple], Edu] - - # stream position -> (destination, key) - self.keyed_edu_changed = ( - SortedDict() - ) # type: SortedDict[int, Tuple[str, tuple]] - - self.edus = SortedDict() # type: SortedDict[int, Edu] - - # stream ID for the next entry into presence_changed/keyed_edu_changed/edus. - self.pos = 1 - - # map from stream ID to the time that stream entry was generated, so that we - # can clear out entries after a while - self.pos_time = SortedDict() # type: SortedDict[int, int] - - # EVERYTHING IS SAD. In particular, python only makes new scopes when - # we make a new function, so we need to make a new function so the inner - # lambda binds to the queue rather than to the name of the queue which - # changes. ARGH. - def register(name: str, queue: Sized) -> None: - LaterGauge( - "synapse_federation_send_queue_%s_size" % (queue_name,), - "", - [], - lambda: len(queue), - ) - - for queue_name in [ - "presence_map", - "presence_changed", - "keyed_edu", - "keyed_edu_changed", - "edus", - "pos_time", - "presence_destinations", - ]: - register(queue_name, getattr(self, queue_name)) - - self.clock.looping_call(self._clear_queue, 30 * 1000) - - def _next_pos(self) -> int: - pos = self.pos - self.pos += 1 - self.pos_time[self.clock.time_msec()] = pos - return pos - - def _clear_queue(self) -> None: - """Clear the queues for anything older than N minutes""" - - FIVE_MINUTES_AGO = 5 * 60 * 1000 - now = self.clock.time_msec() - - keys = self.pos_time.keys() - time = self.pos_time.bisect_left(now - FIVE_MINUTES_AGO) - if not keys[:time]: - return - - position_to_delete = max(keys[:time]) - for key in keys[:time]: - del self.pos_time[key] - - self._clear_queue_before_pos(position_to_delete) - - def _clear_queue_before_pos(self, position_to_delete: int) -> None: - """Clear all the queues from before a given position""" - with Measure(self.clock, "send_queue._clear"): - # Delete things out of presence maps - keys = self.presence_changed.keys() - i = self.presence_changed.bisect_left(position_to_delete) - for key in keys[:i]: - del self.presence_changed[key] - - user_ids = { - user_id for uids in self.presence_changed.values() for user_id in uids - } - - keys = self.presence_destinations.keys() - i = self.presence_destinations.bisect_left(position_to_delete) - for key in keys[:i]: - del self.presence_destinations[key] - - user_ids.update( - user_id for user_id, _ in self.presence_destinations.values() - ) - - to_del = [ - user_id for user_id in self.presence_map if user_id not in user_ids - ] - for user_id in to_del: - del self.presence_map[user_id] - - # Delete things out of keyed edus - keys = self.keyed_edu_changed.keys() - i = self.keyed_edu_changed.bisect_left(position_to_delete) - for key in keys[:i]: - del self.keyed_edu_changed[key] - - live_keys = set() - for edu_key in self.keyed_edu_changed.values(): - live_keys.add(edu_key) - - keys_to_del = [ - edu_key for edu_key in self.keyed_edu if edu_key not in live_keys - ] - for edu_key in keys_to_del: - del self.keyed_edu[edu_key] - - # Delete things out of edu map - keys = self.edus.keys() - i = self.edus.bisect_left(position_to_delete) - for key in keys[:i]: - del self.edus[key] - - def notify_new_events(self, max_token: RoomStreamToken) -> None: - """As per FederationSender""" - # This should never get called. - raise NotImplementedError() - - def build_and_send_edu( - self, - destination: str, - edu_type: str, - content: JsonDict, - key: Optional[Hashable] = None, - ) -> None: - """As per FederationSender""" - if destination == self.server_name: - logger.info("Not sending EDU to ourselves") - return - - pos = self._next_pos() - - edu = Edu( - origin=self.server_name, - destination=destination, - edu_type=edu_type, - content=content, - ) - - if key: - assert isinstance(key, tuple) - self.keyed_edu[(destination, key)] = edu - self.keyed_edu_changed[pos] = (destination, key) - else: - self.edus[pos] = edu - - self.notifier.on_new_replication_data() - - async def send_read_receipt(self, receipt: ReadReceipt) -> None: - """As per FederationSender - - Args: - receipt: - """ - # nothing to do here: the replication listener will handle it. - - def send_presence_to_destinations( - self, states: Iterable[UserPresenceState], destinations: Iterable[str] - ) -> None: - """As per FederationSender - - Args: - states - destinations - """ - for state in states: - pos = self._next_pos() - self.presence_map.update({state.user_id: state for state in states}) - self.presence_destinations[pos] = (state.user_id, destinations) - - self.notifier.on_new_replication_data() - - def send_device_messages(self, destination: str) -> None: - """As per FederationSender""" - # We don't need to replicate this as it gets sent down a different - # stream. - - def wake_destination(self, server: str) -> None: - pass - - def get_current_token(self) -> int: - return self.pos - 1 - - def federation_ack(self, instance_name: str, token: int) -> None: - if self._sender_instances: - # If we have configured multiple federation sender instances we need - # to track their positions separately, and only clear the queue up - # to the token all instances have acked. - self._sender_positions[instance_name] = token - token = min(self._sender_positions.values()) - - self._clear_queue_before_pos(token) - - async def get_replication_rows( - self, instance_name: str, from_token: int, to_token: int, target_row_count: int - ) -> Tuple[List[Tuple[int, Tuple]], int, bool]: - """Get rows to be sent over federation between the two tokens - - Args: - instance_name: the name of the current process - from_token: the previous stream token: the starting point for fetching the - updates - to_token: the new stream token: the point to get updates up to - target_row_count: a target for the number of rows to be returned. - - Returns: a triplet `(updates, new_last_token, limited)`, where: - * `updates` is a list of `(token, row)` entries. - * `new_last_token` is the new position in stream. - * `limited` is whether there are more updates to fetch. - """ - # TODO: Handle target_row_count. - - # To handle restarts where we wrap around - if from_token > self.pos: - from_token = -1 - - # list of tuple(int, BaseFederationRow), where the first is the position - # of the federation stream. - rows = [] # type: List[Tuple[int, BaseFederationRow]] - - # Fetch changed presence - i = self.presence_changed.bisect_right(from_token) - j = self.presence_changed.bisect_right(to_token) + 1 - dest_user_ids = [ - (pos, user_id) - for pos, user_id_list in self.presence_changed.items()[i:j] - for user_id in user_id_list - ] - - for (key, user_id) in dest_user_ids: - rows.append((key, PresenceRow(state=self.presence_map[user_id]))) - - # Fetch presence to send to destinations - i = self.presence_destinations.bisect_right(from_token) - j = self.presence_destinations.bisect_right(to_token) + 1 - - for pos, (user_id, dests) in self.presence_destinations.items()[i:j]: - rows.append( - ( - pos, - PresenceDestinationsRow( - state=self.presence_map[user_id], destinations=list(dests) - ), - ) - ) - - # Fetch changes keyed edus - i = self.keyed_edu_changed.bisect_right(from_token) - j = self.keyed_edu_changed.bisect_right(to_token) + 1 - # We purposefully clobber based on the key here, python dict comprehensions - # always use the last value, so this will correctly point to the last - # stream position. - keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]} - - for ((destination, edu_key), pos) in keyed_edus.items(): - rows.append( - ( - pos, - KeyedEduRow( - key=edu_key, edu=self.keyed_edu[(destination, edu_key)] - ), - ) - ) - - # Fetch changed edus - i = self.edus.bisect_right(from_token) - j = self.edus.bisect_right(to_token) + 1 - edus = self.edus.items()[i:j] - - for (pos, edu) in edus: - rows.append((pos, EduRow(edu))) - - # Sort rows based on pos - rows.sort() - - return ( - [(pos, (row.TypeId, row.to_data())) for pos, row in rows], - to_token, - False, - ) - - -class BaseFederationRow: - """Base class for rows to be sent in the federation stream. - - Specifies how to identify, serialize and deserialize the different types. - """ - - TypeId = "" # Unique string that ids the type. Must be overridden in sub classes. - - @staticmethod - def from_data(data): - """Parse the data from the federation stream into a row. - - Args: - data: The value of ``data`` from FederationStreamRow.data, type - depends on the type of stream - """ - raise NotImplementedError() - - def to_data(self): - """Serialize this row to be sent over the federation stream. - - Returns: - The value to be sent in FederationStreamRow.data. The type depends - on the type of stream. - """ - raise NotImplementedError() - - def add_to_buffer(self, buff): - """Add this row to the appropriate field in the buffer ready for this - to be sent over federation. - - We use a buffer so that we can batch up events that have come in at - the same time and send them all at once. - - Args: - buff (BufferedToSend) - """ - raise NotImplementedError() - - -class PresenceRow( - BaseFederationRow, namedtuple("PresenceRow", ("state",)) # UserPresenceState -): - TypeId = "p" - - @staticmethod - def from_data(data): - return PresenceRow(state=UserPresenceState.from_dict(data)) - - def to_data(self): - return self.state.as_dict() - - def add_to_buffer(self, buff): - buff.presence.append(self.state) - - -class PresenceDestinationsRow( - BaseFederationRow, - namedtuple( - "PresenceDestinationsRow", - ("state", "destinations"), # UserPresenceState # list[str] - ), -): - TypeId = "pd" - - @staticmethod - def from_data(data): - return PresenceDestinationsRow( - state=UserPresenceState.from_dict(data["state"]), destinations=data["dests"] - ) - - def to_data(self): - return {"state": self.state.as_dict(), "dests": self.destinations} - - def add_to_buffer(self, buff): - buff.presence_destinations.append((self.state, self.destinations)) - - -class KeyedEduRow( - BaseFederationRow, - namedtuple( - "KeyedEduRow", - ("key", "edu"), # tuple(str) - the edu key passed to send_edu # Edu - ), -): - """Streams EDUs that have an associated key that is ued to clobber. For example, - typing EDUs clobber based on room_id. - """ - - TypeId = "k" - - @staticmethod - def from_data(data): - return KeyedEduRow(key=tuple(data["key"]), edu=Edu(**data["edu"])) - - def to_data(self): - return {"key": self.key, "edu": self.edu.get_internal_dict()} - - def add_to_buffer(self, buff): - buff.keyed_edus.setdefault(self.edu.destination, {})[self.key] = self.edu - - -class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu - """Streams EDUs that don't have keys. See KeyedEduRow""" - - TypeId = "e" - - @staticmethod - def from_data(data): - return EduRow(Edu(**data)) - - def to_data(self): - return self.edu.get_internal_dict() - - def add_to_buffer(self, buff): - buff.edus.setdefault(self.edu.destination, []).append(self.edu) - - -_rowtypes = ( - PresenceRow, - PresenceDestinationsRow, - KeyedEduRow, - EduRow, -) # type: Tuple[Type[BaseFederationRow], ...] - -TypeToRow = {Row.TypeId: Row for Row in _rowtypes} - - -ParsedFederationStreamData = namedtuple( - "ParsedFederationStreamData", - ( - "presence", # list(UserPresenceState) - "presence_destinations", # list of tuples of UserPresenceState and destinations - "keyed_edus", # dict of destination -> { key -> Edu } - "edus", # dict of destination -> [Edu] - ), -) - - -def process_rows_for_federation( - transaction_queue: FederationSender, - rows: List[FederationStream.FederationStreamRow], -) -> None: - """Parse a list of rows from the federation stream and put them in the - transaction queue ready for sending to the relevant homeservers. - - Args: - transaction_queue - rows - """ - - # The federation stream contains a bunch of different types of - # rows that need to be handled differently. We parse the rows, put - # them into the appropriate collection and then send them off. - - buff = ParsedFederationStreamData( - presence=[], - presence_destinations=[], - keyed_edus={}, - edus={}, - ) - - # Parse the rows in the stream and add to the buffer - for row in rows: - if row.type not in TypeToRow: - logger.error("Unrecognized federation row type %r", row.type) - continue - - RowType = TypeToRow[row.type] - parsed_row = RowType.from_data(row.data) - parsed_row.add_to_buffer(buff) - - for state, destinations in buff.presence_destinations: - transaction_queue.send_presence_to_destinations( - states=[state], destinations=destinations - ) - - for destination, edu_map in buff.keyed_edus.items(): - for key, edu in edu_map.items(): - transaction_queue.send_edu(edu, key) - - for destination, edu_list in buff.edus.items(): - for edu in edu_list: - transaction_queue.send_edu(edu, None) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 6266accaf5..cdb04ec0c4 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -124,10 +124,6 @@ class AbstractFederationSender(metaclass=abc.ABCMeta): raise NotImplementedError() @abc.abstractmethod - def federation_ack(self, instance_name: str, token: int) -> None: - raise NotImplementedError() - - @abc.abstractmethod async def get_replication_rows( self, instance_name: str, from_token: int, to_token: int, target_row_count: int ) -> Tuple[List[Tuple[int, Tuple]], int, bool]: @@ -635,10 +631,6 @@ class FederationSender(AbstractFederationSender): # to a worker. return 0 - def federation_ack(self, instance_name: str, token: int) -> None: - # It is not expected that this gets called on FederationSender. - raise NotImplementedError() - @staticmethod async def get_replication_rows( instance_name: str, from_token: int, to_token: int, target_row_count: int diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 874cb9c25e..1c3b6e1bc0 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -20,10 +20,8 @@ from twisted.internet.defer import Deferred from twisted.internet.protocol import ReconnectingClientFactory from synapse.api.constants import EventTypes -from synapse.federation import send_queue from synapse.federation.sender import FederationSender from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable -from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol from synapse.replication.tcp.streams import ( AccountDataStream, @@ -358,14 +356,8 @@ class FederationSenderHandler: self.federation_sender.wake_destination(server) async def process_replication_rows(self, stream_name, token, rows): - # The federation stream contains things that we want to send out, e.g. - # presence, typing, etc. - if stream_name == "federation": - send_queue.process_rows_for_federation(self.federation_sender, rows) - await self.update_token(token) - # ... and when new receipts happen - elif stream_name == ReceiptsStream.NAME: + if stream_name == ReceiptsStream.NAME: await self._on_new_receipts(rows) # ... as well as device updates and messages @@ -403,54 +395,3 @@ class FederationSenderHandler: receipt.data, ) await self.federation_sender.send_read_receipt(receipt_info) - - async def update_token(self, token): - """Update the record of where we have processed to in the federation stream. - - Called after we have processed a an update received over replication. Sends - a FEDERATION_ACK back to the master, and stores the token that we have processed - in `federation_stream_position` so that we can restart where we left off. - """ - self.federation_position = token - - # We save and send the ACK to master asynchronously, so we don't block - # processing on persistence. We don't need to do this operation for - # every single RDATA we receive, we just need to do it periodically. - - if self._fed_position_linearizer.is_queued(None): - # There is already a task queued up to save and send the token, so - # no need to queue up another task. - return - - run_as_background_process("_save_and_send_ack", self._save_and_send_ack) - - async def _save_and_send_ack(self): - """Save the current federation position in the database and send an ACK - to master with where we're up to. - """ - # We should only be calling this once we've got a token. - assert self.federation_position is not None - - try: - # We linearize here to ensure we don't have races updating the token - # - # XXX this appears to be redundant, since the ReplicationCommandHandler - # has a linearizer which ensures that we only process one line of - # replication data at a time. Should we remove it, or is it doing useful - # service for robustness? Or could we replace it with an assertion that - # we're not being re-entered? - - with (await self._fed_position_linearizer.queue(None)): - # We persist and ack the same position, so we take a copy of it - # here as otherwise it can get modified from underneath us. - current_position = self.federation_position - - await self.store.update_federation_out_pos( - "federation", current_position - ) - - # We ACK this token over replication so that the master can drop - # its in memory queues - self._hs.get_tcp_replication().send_federation_ack(current_position) - except Exception: - logger.exception("Error updating federation stream position") diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 505d450e19..ede8875439 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -297,33 +297,6 @@ class ClearUserSyncsCommand(Command): return self.instance_id -class FederationAckCommand(Command): - """Sent by the client when it has processed up to a given point in the - federation stream. This allows the master to drop in-memory caches of the - federation stream. - - This must only be sent from one worker (i.e. the one sending federation) - - Format:: - - FEDERATION_ACK <instance_name> <token> - """ - - NAME = "FEDERATION_ACK" - - def __init__(self, instance_name: str, token: int): - self.instance_name = instance_name - self.token = token - - @classmethod - def from_line(cls, line: str) -> "FederationAckCommand": - instance_name, token = line.split(" ") - return cls(instance_name, int(token)) - - def to_line(self) -> str: - return "%s %s" % (self.instance_name, self.token) - - class UserIpCommand(Command): """Sent periodically when a worker sees activity from a client. @@ -389,7 +362,6 @@ _COMMANDS = ( NameCommand, ReplicateCommand, UserSyncCommand, - FederationAckCommand, UserIpCommand, RemoteServerUpCommand, ClearUserSyncsCommand, @@ -415,7 +387,6 @@ VALID_CLIENT_COMMANDS = ( PingCommand.NAME, UserSyncCommand.NAME, ClearUserSyncsCommand.NAME, - FederationAckCommand.NAME, UserIpCommand.NAME, ErrorCommand.NAME, RemoteServerUpCommand.NAME, diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 2ce1b9f222..21b59def6f 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -39,7 +39,6 @@ from synapse.replication.tcp.client import DirectTcpReplicationClientFactory from synapse.replication.tcp.commands import ( ClearUserSyncsCommand, Command, - FederationAckCommand, PositionCommand, RdataCommand, RemoteServerUpCommand, @@ -54,7 +53,6 @@ from synapse.replication.tcp.streams import ( BackfillStream, CachesStream, EventsStream, - FederationStream, ReceiptsStream, Stream, TagAccountDataStream, @@ -73,7 +71,6 @@ inbound_rdata_count = Counter( "synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"] ) user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "") -federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "") remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "") user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "") @@ -157,11 +154,6 @@ class ReplicationCommandHandler: if hs.config.worker_app is not None: continue - if stream.NAME == FederationStream.NAME and hs.config.send_federation: - # We only support federation stream if federation sending - # has been disabled on the master. - continue - self._streams_to_replicate.append(stream) # Map of stream name to batched updates. See RdataCommand for info on @@ -365,14 +357,6 @@ class ReplicationCommandHandler: else: return None - def on_FEDERATION_ACK( - self, conn: IReplicationConnection, cmd: FederationAckCommand - ): - federation_ack_counter.inc() - - if self._federation_sender: - self._federation_sender.federation_ack(cmd.instance_name, cmd.token) - def on_USER_IP( self, conn: IReplicationConnection, cmd: UserIpCommand ) -> Optional[Awaitable[None]]: @@ -655,12 +639,6 @@ class ReplicationCommandHandler: else: logger.warning("Dropping command as not connected: %r", cmd.NAME) - def send_federation_ack(self, token: int): - """Ack data for the federation stream. This allows the master to drop - data stored purely in memory. - """ - self.send_command(FederationAckCommand(self._instance_name, token)) - def send_user_sync( self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int ): diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py index 4c0023c68a..a18c799a45 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py @@ -43,7 +43,6 @@ from synapse.replication.tcp.streams._base import ( UserSignatureStream, ) from synapse.replication.tcp.streams.events import EventsStream -from synapse.replication.tcp.streams.federation import FederationStream STREAMS_MAP = { stream.NAME: stream @@ -60,7 +59,6 @@ STREAMS_MAP = { PublicRoomsStream, DeviceListsStream, ToDeviceStream, - FederationStream, TagAccountDataStream, AccountDataStream, GroupServerStream, diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py deleted file mode 100644 index 096a85d363..0000000000 --- a/synapse/replication/tcp/streams/federation.py +++ /dev/null @@ -1,80 +0,0 @@ -# Copyright 2017 Vector Creations Ltd -# Copyright 2019 New Vector Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from collections import namedtuple -from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Tuple - -from synapse.replication.tcp.streams._base import ( - Stream, - current_token_without_instance, - make_http_update_function, -) - -if TYPE_CHECKING: - from synapse.server import HomeServer - - -class FederationStream(Stream): - """Data to be sent over federation. Only available when master has federation - sending disabled. - """ - - FederationStreamRow = namedtuple( - "FederationStreamRow", - ( - "type", # str, the type of data as defined in the BaseFederationRows - "data", # dict, serialization of a federation.send_queue.BaseFederationRow - ), - ) - - NAME = "federation" - ROW_TYPE = FederationStreamRow - - def __init__(self, hs: "HomeServer"): - if hs.config.worker_app is None: - # master process: get updates from the FederationRemoteSendQueue. - # (if the master is configured to send federation itself, federation_sender - # will be a real FederationSender, which has stubs for current_token and - # get_replication_rows.) - federation_sender = hs.get_federation_sender() - current_token = current_token_without_instance( - federation_sender.get_current_token - ) - update_function = ( - federation_sender.get_replication_rows - ) # type: Callable[[str, int, int, int], Awaitable[Tuple[List[Tuple[int, Any]], int, bool]]] - - elif hs.should_send_federation(): - # federation sender: Query master process - update_function = make_http_update_function(hs, self.NAME) - current_token = self._stub_current_token - - else: - # other worker: stub out the update function (we're not interested in - # any updates so when we get a POSITION we do nothing) - update_function = self._stub_update_function - current_token = self._stub_current_token - - super().__init__(hs.get_instance_name(), current_token, update_function) - - @staticmethod - def _stub_current_token(instance_name: str) -> int: - # dummy current-token method for use on workers - return 0 - - @staticmethod - async def _stub_update_function( - instance_name: str, from_token: int, upto_token: int, limit: int - ) -> Tuple[list, int, bool]: - return [], upto_token, False diff --git a/synapse/server.py b/synapse/server.py index 42d2fad8e8..eaeaec9d77 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -59,7 +59,6 @@ from synapse.federation.federation_server import ( FederationHandlerRegistry, FederationServer, ) -from synapse.federation.send_queue import FederationRemoteSendQueue from synapse.federation.sender import AbstractFederationSender, FederationSender from synapse.federation.transport.client import TransportLayerClient from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer @@ -580,13 +579,11 @@ class HomeServer(metaclass=abc.ABCMeta): return TransportLayerClient(self) @cache_in_self - def get_federation_sender(self) -> AbstractFederationSender: + def get_federation_sender(self) -> Optional[AbstractFederationSender]: if self.should_send_federation(): return FederationSender(self) - elif not self.config.worker_app: - return FederationRemoteSendQueue(self) else: - raise Exception("Workers cannot send federation traffic") + return None @cache_in_self def get_receipts_handler(self) -> ReceiptsHandler: |