From fcd69614411428fae1072704978a349e8c28be3d Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Wed, 10 Jun 2020 17:44:34 +0100 Subject: Add option to enable encryption by default for new rooms (#7639) Fixes https://github.com/matrix-org/synapse/issues/2431 Adds config option `encryption_enabled_by_default_for_room_type`, which determines whether encryption should be enabled with the default encryption algorithm in private or public rooms upon creation. Whether the room is private or public is decided based upon the room creation preset that is used. Part of this PR is also pulling out all of the individual instances of `m.megolm.v1.aes-sha2` into a constant variable to eliminate typos ala https://github.com/matrix-org/synapse/pull/7637 Based on #7637 --- synapse/config/homeserver.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/config/homeserver.py') diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 2c7b3a699f..264c274c52 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -36,6 +36,7 @@ from .ratelimiting import RatelimitConfig from .redis import RedisConfig from .registration import RegistrationConfig from .repository import ContentRepositoryConfig +from .room import RoomConfig from .room_directory import RoomDirectoryConfig from .saml2_config import SAML2Config from .server import ServerConfig @@ -79,6 +80,7 @@ class HomeServerConfig(RootConfig): PasswordAuthProviderConfig, PushConfig, SpamCheckerConfig, + RoomConfig, GroupsConfig, UserDirectoryConfig, ConsentConfig, -- cgit 1.5.1 From f299441cc67f31dcd47b8fdfda4a218bee9df9ba Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 10 Jul 2020 18:26:36 +0100 Subject: Add ability to shard the federation sender (#7798) --- changelog.d/7798.feature | 1 + docs/sample_config.yaml | 65 ++--- synapse/app/generic_worker.py | 59 ++--- synapse/config/federation.py | 129 ++++++++++ synapse/config/homeserver.py | 3 + synapse/config/server.py | 66 ----- synapse/federation/send_queue.py | 14 +- synapse/federation/sender/__init__.py | 48 +++- synapse/federation/sender/per_destination_queue.py | 22 ++ synapse/replication/tcp/commands.py | 10 +- synapse/replication/tcp/handler.py | 4 +- .../delta/58/10federation_pos_instance_name.sql | 22 ++ synapse/storage/data_stores/main/stream.py | 97 ++++++- tests/replication/test_federation_ack.py | 1 + tests/replication/test_federation_sender_shard.py | 286 +++++++++++++++++++++ 15 files changed, 670 insertions(+), 157 deletions(-) create mode 100644 changelog.d/7798.feature create mode 100644 synapse/config/federation.py create mode 100644 synapse/storage/data_stores/main/schema/delta/58/10federation_pos_instance_name.sql create mode 100644 tests/replication/test_federation_sender_shard.py (limited to 'synapse/config/homeserver.py') diff --git a/changelog.d/7798.feature b/changelog.d/7798.feature new file mode 100644 index 0000000000..56ffaf0d4a --- /dev/null +++ b/changelog.d/7798.feature @@ -0,0 +1 @@ +Add experimental support for running multiple federation sender processes. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 164a104045..1a2d9fb153 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -118,38 +118,6 @@ pid_file: DATADIR/homeserver.pid # #enable_search: false -# Restrict federation to the following whitelist of domains. -# N.B. we recommend also firewalling your federation listener to limit -# inbound federation traffic as early as possible, rather than relying -# purely on this application-layer restriction. If not specified, the -# default is to whitelist everything. -# -#federation_domain_whitelist: -# - lon.example.com -# - nyc.example.com -# - syd.example.com - -# Prevent federation requests from being sent to the following -# blacklist IP address CIDR ranges. If this option is not specified, or -# specified with an empty list, no ip range blacklist will be enforced. -# -# As of Synapse v1.4.0 this option also affects any outbound requests to identity -# servers provided by user input. -# -# (0.0.0.0 and :: are always blacklisted, whether or not they are explicitly -# listed here, since they correspond to unroutable addresses.) -# -federation_ip_range_blacklist: - - '127.0.0.0/8' - - '10.0.0.0/8' - - '172.16.0.0/12' - - '192.168.0.0/16' - - '100.64.0.0/10' - - '169.254.0.0/16' - - '::1/128' - - 'fe80::/64' - - 'fc00::/7' - # List of ports that Synapse should listen on, their purpose and their # configuration. # @@ -608,6 +576,39 @@ acme: +# Restrict federation to the following whitelist of domains. +# N.B. we recommend also firewalling your federation listener to limit +# inbound federation traffic as early as possible, rather than relying +# purely on this application-layer restriction. If not specified, the +# default is to whitelist everything. +# +#federation_domain_whitelist: +# - lon.example.com +# - nyc.example.com +# - syd.example.com + +# Prevent federation requests from being sent to the following +# blacklist IP address CIDR ranges. If this option is not specified, or +# specified with an empty list, no ip range blacklist will be enforced. +# +# As of Synapse v1.4.0 this option also affects any outbound requests to identity +# servers provided by user input. +# +# (0.0.0.0 and :: are always blacklisted, whether or not they are explicitly +# listed here, since they correspond to unroutable addresses.) +# +federation_ip_range_blacklist: + - '127.0.0.0/8' + - '10.0.0.0/8' + - '172.16.0.0/12' + - '192.168.0.0/16' + - '100.64.0.0/10' + - '169.254.0.0/16' + - '::1/128' + - 'fe80::/64' + - 'fc00::/7' + + ## Caching ## # Caching can be configured through the following options. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index f6792d9fc8..e90695f026 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -511,25 +511,7 @@ class GenericWorkerSlavedStore( SearchWorkerStore, BaseSlavedStore, ): - def __init__(self, database, db_conn, hs): - super(GenericWorkerSlavedStore, self).__init__(database, db_conn, hs) - - # We pull out the current federation stream position now so that we - # always have a known value for the federation position in memory so - # that we don't have to bounce via a deferred once when we start the - # replication streams. - self.federation_out_pos_startup = self._get_federation_out_pos(db_conn) - - def _get_federation_out_pos(self, db_conn): - sql = "SELECT stream_id FROM federation_stream_position WHERE type = ?" - sql = self.database_engine.convert_param_style(sql) - - txn = db_conn.cursor() - txn.execute(sql, ("federation",)) - rows = txn.fetchall() - txn.close() - - return rows[0][0] if rows else -1 + pass class GenericWorkerServer(HomeServer): @@ -812,19 +794,11 @@ class FederationSenderHandler(object): self.federation_sender = hs.get_federation_sender() self._hs = hs - # if the worker is restarted, we want to pick up where we left off in - # the replication stream, so load the position from the database. - # - # XXX is this actually worthwhile? Whenever the master is restarted, we'll - # drop some rows anyway (which is mostly fine because we're only dropping - # typing and presence notifications). If the replication stream is - # unreliable, why do we do all this hoop-jumping to store the position in the - # database? See also https://github.com/matrix-org/synapse/issues/7535. - # - self.federation_position = self.store.federation_out_pos_startup + # Stores the latest position in the federation stream we've gotten up + # to. This is always set before we use it. + self.federation_position = None self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") - self._last_ack = self.federation_position def on_start(self): # There may be some events that are persisted but haven't been sent, @@ -932,7 +906,6 @@ class FederationSenderHandler(object): # We ACK this token over replication so that the master can drop # its in memory queues self._hs.get_tcp_replication().send_federation_ack(current_position) - self._last_ack = current_position except Exception: logger.exception("Error updating federation stream position") @@ -960,7 +933,7 @@ def start(config_options): ) if config.worker_app == "synapse.app.appservice": - if config.notify_appservices: + if config.appservice.notify_appservices: sys.stderr.write( "\nThe appservices must be disabled in the main synapse process" "\nbefore they can be run in a separate worker." @@ -970,13 +943,13 @@ def start(config_options): sys.exit(1) # Force the appservice to start since they will be disabled in the main config - config.notify_appservices = True + config.appservice.notify_appservices = True else: # For other worker types we force this to off. - config.notify_appservices = False + config.appservice.notify_appservices = False if config.worker_app == "synapse.app.pusher": - if config.start_pushers: + if config.server.start_pushers: sys.stderr.write( "\nThe pushers must be disabled in the main synapse process" "\nbefore they can be run in a separate worker." @@ -986,13 +959,13 @@ def start(config_options): sys.exit(1) # Force the pushers to start since they will be disabled in the main config - config.start_pushers = True + config.server.start_pushers = True else: # For other worker types we force this to off. - config.start_pushers = False + config.server.start_pushers = False if config.worker_app == "synapse.app.user_dir": - if config.update_user_directory: + if config.server.update_user_directory: sys.stderr.write( "\nThe update_user_directory must be disabled in the main synapse process" "\nbefore they can be run in a separate worker." @@ -1002,13 +975,13 @@ def start(config_options): sys.exit(1) # Force the pushers to start since they will be disabled in the main config - config.update_user_directory = True + config.server.update_user_directory = True else: # For other worker types we force this to off. - config.update_user_directory = False + config.server.update_user_directory = False if config.worker_app == "synapse.app.federation_sender": - if config.send_federation: + if config.federation.send_federation: sys.stderr.write( "\nThe send_federation must be disabled in the main synapse process" "\nbefore they can be run in a separate worker." @@ -1018,10 +991,10 @@ def start(config_options): sys.exit(1) # Force the pushers to start since they will be disabled in the main config - config.send_federation = True + config.federation.send_federation = True else: # For other worker types we force this to off. - config.send_federation = False + config.federation.send_federation = False synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts diff --git a/synapse/config/federation.py b/synapse/config/federation.py new file mode 100644 index 0000000000..7782ab4c9d --- /dev/null +++ b/synapse/config/federation.py @@ -0,0 +1,129 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from hashlib import sha256 +from typing import List, Optional + +import attr +from netaddr import IPSet + +from ._base import Config, ConfigError + + +@attr.s +class ShardedFederationSendingConfig: + """Algorithm for choosing which federation sender instance is responsible + for which destionation host. + """ + + instances = attr.ib(type=List[str]) + + def should_send_to(self, instance_name: str, destination: str) -> bool: + """Whether this instance is responsible for sending transcations for + the given host. + """ + + # If multiple federation senders are not defined we always return true. + if not self.instances or len(self.instances) == 1: + return True + + # We shard by taking the hash, modulo it by the number of federation + # senders and then checking whether this instance matches the instance + # at that index. + # + # (Technically this introduces some bias and is not entirely uniform, but + # since the hash is so large the bias is ridiculously small). + dest_hash = sha256(destination.encode("utf8")).digest() + dest_int = int.from_bytes(dest_hash, byteorder="little") + remainder = dest_int % (len(self.instances)) + return self.instances[remainder] == instance_name + + +class FederationConfig(Config): + section = "federation" + + def read_config(self, config, **kwargs): + # Whether to send federation traffic out in this process. This only + # applies to some federation traffic, and so shouldn't be used to + # "disable" federation + self.send_federation = config.get("send_federation", True) + + federation_sender_instances = config.get("federation_sender_instances") or [] + self.federation_shard_config = ShardedFederationSendingConfig( + federation_sender_instances + ) + + # FIXME: federation_domain_whitelist needs sytests + self.federation_domain_whitelist = None # type: Optional[dict] + federation_domain_whitelist = config.get("federation_domain_whitelist", None) + + if federation_domain_whitelist is not None: + # turn the whitelist into a hash for speed of lookup + self.federation_domain_whitelist = {} + + for domain in federation_domain_whitelist: + self.federation_domain_whitelist[domain] = True + + self.federation_ip_range_blacklist = config.get( + "federation_ip_range_blacklist", [] + ) + + # Attempt to create an IPSet from the given ranges + try: + self.federation_ip_range_blacklist = IPSet( + self.federation_ip_range_blacklist + ) + + # Always blacklist 0.0.0.0, :: + self.federation_ip_range_blacklist.update(["0.0.0.0", "::"]) + except Exception as e: + raise ConfigError( + "Invalid range(s) provided in federation_ip_range_blacklist: %s" % e + ) + + def generate_config_section(self, config_dir_path, server_name, **kwargs): + return """\ + # Restrict federation to the following whitelist of domains. + # N.B. we recommend also firewalling your federation listener to limit + # inbound federation traffic as early as possible, rather than relying + # purely on this application-layer restriction. If not specified, the + # default is to whitelist everything. + # + #federation_domain_whitelist: + # - lon.example.com + # - nyc.example.com + # - syd.example.com + + # Prevent federation requests from being sent to the following + # blacklist IP address CIDR ranges. If this option is not specified, or + # specified with an empty list, no ip range blacklist will be enforced. + # + # As of Synapse v1.4.0 this option also affects any outbound requests to identity + # servers provided by user input. + # + # (0.0.0.0 and :: are always blacklisted, whether or not they are explicitly + # listed here, since they correspond to unroutable addresses.) + # + federation_ip_range_blacklist: + - '127.0.0.0/8' + - '10.0.0.0/8' + - '172.16.0.0/12' + - '192.168.0.0/16' + - '100.64.0.0/10' + - '169.254.0.0/16' + - '::1/128' + - 'fe80::/64' + - 'fc00::/7' + """ diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 264c274c52..8e93d31394 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -23,6 +23,7 @@ from .cas import CasConfig from .consent_config import ConsentConfig from .database import DatabaseConfig from .emailconfig import EmailConfig +from .federation import FederationConfig from .groups import GroupsConfig from .jwt_config import JWTConfig from .key import KeyConfig @@ -57,6 +58,7 @@ class HomeServerConfig(RootConfig): config_classes = [ ServerConfig, TlsConfig, + FederationConfig, CacheConfig, DatabaseConfig, LoggingConfig, @@ -90,4 +92,5 @@ class HomeServerConfig(RootConfig): ThirdPartyRulesConfig, TracerConfig, RedisConfig, + FederationConfig, ] diff --git a/synapse/config/server.py b/synapse/config/server.py index 8204664883..b6afa642ca 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -23,7 +23,6 @@ from typing import Any, Dict, Iterable, List, Optional import attr import yaml -from netaddr import IPSet from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.http.endpoint import parse_and_validate_server_name @@ -136,11 +135,6 @@ class ServerConfig(Config): self.use_frozen_dicts = config.get("use_frozen_dicts", False) self.public_baseurl = config.get("public_baseurl") - # Whether to send federation traffic out in this process. This only - # applies to some federation traffic, and so shouldn't be used to - # "disable" federation - self.send_federation = config.get("send_federation", True) - # Whether to enable user presence. self.use_presence = config.get("use_presence", True) @@ -263,34 +257,6 @@ class ServerConfig(Config): # due to resource constraints self.admin_contact = config.get("admin_contact", None) - # FIXME: federation_domain_whitelist needs sytests - self.federation_domain_whitelist = None # type: Optional[dict] - federation_domain_whitelist = config.get("federation_domain_whitelist", None) - - if federation_domain_whitelist is not None: - # turn the whitelist into a hash for speed of lookup - self.federation_domain_whitelist = {} - - for domain in federation_domain_whitelist: - self.federation_domain_whitelist[domain] = True - - self.federation_ip_range_blacklist = config.get( - "federation_ip_range_blacklist", [] - ) - - # Attempt to create an IPSet from the given ranges - try: - self.federation_ip_range_blacklist = IPSet( - self.federation_ip_range_blacklist - ) - - # Always blacklist 0.0.0.0, :: - self.federation_ip_range_blacklist.update(["0.0.0.0", "::"]) - except Exception as e: - raise ConfigError( - "Invalid range(s) provided in federation_ip_range_blacklist: %s" % e - ) - if self.public_baseurl is not None: if self.public_baseurl[-1] != "/": self.public_baseurl += "/" @@ -743,38 +709,6 @@ class ServerConfig(Config): # #enable_search: false - # Restrict federation to the following whitelist of domains. - # N.B. we recommend also firewalling your federation listener to limit - # inbound federation traffic as early as possible, rather than relying - # purely on this application-layer restriction. If not specified, the - # default is to whitelist everything. - # - #federation_domain_whitelist: - # - lon.example.com - # - nyc.example.com - # - syd.example.com - - # Prevent federation requests from being sent to the following - # blacklist IP address CIDR ranges. If this option is not specified, or - # specified with an empty list, no ip range blacklist will be enforced. - # - # As of Synapse v1.4.0 this option also affects any outbound requests to identity - # servers provided by user input. - # - # (0.0.0.0 and :: are always blacklisted, whether or not they are explicitly - # listed here, since they correspond to unroutable addresses.) - # - federation_ip_range_blacklist: - - '127.0.0.0/8' - - '10.0.0.0/8' - - '172.16.0.0/12' - - '192.168.0.0/16' - - '100.64.0.0/10' - - '169.254.0.0/16' - - '::1/128' - - 'fe80::/64' - - 'fc00::/7' - # List of ports that Synapse should listen on, their purpose and their # configuration. # diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 860b03f7b9..4fc9ff92e5 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -55,6 +55,11 @@ class FederationRemoteSendQueue(object): self.notifier = hs.get_notifier() self.is_mine_id = hs.is_mine_id + # We may have multiple federation sender instances, so we need to track + # their positions separately. + self._sender_instances = hs.config.federation.federation_shard_config.instances + self._sender_positions = {} + # Pending presence map user_id -> UserPresenceState self.presence_map = {} # type: Dict[str, UserPresenceState] @@ -261,7 +266,14 @@ class FederationRemoteSendQueue(object): def get_current_token(self): return self.pos - 1 - def federation_ack(self, token): + def federation_ack(self, instance_name, token): + if self._sender_instances: + # If we have configured multiple federation sender instances we need + # to track their positions separately, and only clear the queue up + # to the token all instances have acked. + self._sender_positions[instance_name] = token + token = min(self._sender_positions.values()) + self._clear_queue_before_pos(token) async def get_replication_rows( diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 464d7a41de..4b63a0755f 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -69,6 +69,9 @@ class FederationSender(object): self._transaction_manager = TransactionManager(hs) + self._instance_name = hs.get_instance_name() + self._federation_shard_config = hs.config.federation.federation_shard_config + # map from destination to PerDestinationQueue self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue] @@ -191,7 +194,13 @@ class FederationSender(object): ) return - destinations = set(destinations) + destinations = { + d + for d in destinations + if self._federation_shard_config.should_send_to( + self._instance_name, d + ) + } if send_on_behalf_of is not None: # If we are sending the event on behalf of another server @@ -322,7 +331,12 @@ class FederationSender(object): # Work out which remote servers should be poked and poke them. domains = yield self.state.get_current_hosts_in_room(room_id) - domains = [d for d in domains if d != self.server_name] + domains = [ + d + for d in domains + if d != self.server_name + and self._federation_shard_config.should_send_to(self._instance_name, d) + ] if not domains: return @@ -427,6 +441,10 @@ class FederationSender(object): for destination in destinations: if destination == self.server_name: continue + if not self._federation_shard_config.should_send_to( + self._instance_name, destination + ): + continue self._get_per_destination_queue(destination).send_presence(states) @measure_func("txnqueue._process_presence") @@ -441,6 +459,12 @@ class FederationSender(object): for destination in destinations: if destination == self.server_name: continue + + if not self._federation_shard_config.should_send_to( + self._instance_name, destination + ): + continue + self._get_per_destination_queue(destination).send_presence(states) def build_and_send_edu( @@ -462,6 +486,11 @@ class FederationSender(object): logger.info("Not sending EDU to ourselves") return + if not self._federation_shard_config.should_send_to( + self._instance_name, destination + ): + return + edu = Edu( origin=self.server_name, destination=destination, @@ -478,6 +507,11 @@ class FederationSender(object): edu: edu to send key: clobbering key for this edu """ + if not self._federation_shard_config.should_send_to( + self._instance_name, edu.destination + ): + return + queue = self._get_per_destination_queue(edu.destination) if key: queue.send_keyed_edu(edu, key) @@ -489,6 +523,11 @@ class FederationSender(object): logger.warning("Not sending device update to ourselves") return + if not self._federation_shard_config.should_send_to( + self._instance_name, destination + ): + return + self._get_per_destination_queue(destination).attempt_new_transaction() def wake_destination(self, destination: str): @@ -502,6 +541,11 @@ class FederationSender(object): logger.warning("Not waking up ourselves") return + if not self._federation_shard_config.should_send_to( + self._instance_name, destination + ): + return + self._get_per_destination_queue(destination).attempt_new_transaction() @staticmethod diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 12966e239b..6402136e8a 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -74,6 +74,20 @@ class PerDestinationQueue(object): self._clock = hs.get_clock() self._store = hs.get_datastore() self._transaction_manager = transaction_manager + self._instance_name = hs.get_instance_name() + self._federation_shard_config = hs.config.federation.federation_shard_config + + self._should_send_on_this_instance = True + if not self._federation_shard_config.should_send_to( + self._instance_name, destination + ): + # We don't raise an exception here to avoid taking out any other + # processing. We have a guard in `attempt_new_transaction` that + # ensure we don't start sending stuff. + logger.error( + "Create a per destination queue for %s on wrong worker", destination, + ) + self._should_send_on_this_instance = False self._destination = destination self.transmission_loop_running = False @@ -180,6 +194,14 @@ class PerDestinationQueue(object): logger.debug("TX [%s] Transaction already in progress", self._destination) return + if not self._should_send_on_this_instance: + # We don't raise an exception here to avoid taking out any other + # processing. + logger.error( + "Trying to start a transaction to %s on wrong worker", self._destination + ) + return + logger.debug("TX [%s] Starting transaction loop", self._destination) run_as_background_process( diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index ccc7f1f0d1..f33801f883 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -293,20 +293,22 @@ class FederationAckCommand(Command): Format:: - FEDERATION_ACK + FEDERATION_ACK """ NAME = "FEDERATION_ACK" - def __init__(self, token): + def __init__(self, instance_name, token): + self.instance_name = instance_name self.token = token @classmethod def from_line(cls, line): - return cls(int(line)) + instance_name, token = line.split(" ") + return cls(instance_name, int(token)) def to_line(self): - return str(self.token) + return "%s %s" % (self.instance_name, self.token) class RemovePusherCommand(Command): diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 55b3b79008..80f5df60f9 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -238,7 +238,7 @@ class ReplicationCommandHandler: federation_ack_counter.inc() if self._federation_sender: - self._federation_sender.federation_ack(cmd.token) + self._federation_sender.federation_ack(cmd.instance_name, cmd.token) async def on_REMOVE_PUSHER( self, conn: AbstractConnection, cmd: RemovePusherCommand @@ -527,7 +527,7 @@ class ReplicationCommandHandler: """Ack data for the federation stream. This allows the master to drop data stored purely in memory. """ - self.send_command(FederationAckCommand(token)) + self.send_command(FederationAckCommand(self._instance_name, token)) def send_user_sync( self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int diff --git a/synapse/storage/data_stores/main/schema/delta/58/10federation_pos_instance_name.sql b/synapse/storage/data_stores/main/schema/delta/58/10federation_pos_instance_name.sql new file mode 100644 index 0000000000..1cc2633aad --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/58/10federation_pos_instance_name.sql @@ -0,0 +1,22 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- We need to store the stream positions by instance in a sharded config world. +-- +-- We default to master as we want the column to be NOT NULL and we correctly +-- reset the instance name to match the config each time we start up. +ALTER TABLE federation_stream_position ADD COLUMN instance_name TEXT NOT NULL DEFAULT 'master'; + +CREATE UNIQUE INDEX federation_stream_position_instance ON federation_stream_position(type, instance_name); diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 379d758b5d..5e32c7aa1e 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -45,7 +45,7 @@ from twisted.internet import defer from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.storage._base import SQLBaseStore from synapse.storage.data_stores.main.events_worker import EventsWorkerStore -from synapse.storage.database import Database +from synapse.storage.database import Database, make_in_list_sql_clause from synapse.storage.engines import PostgresEngine from synapse.types import RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -253,6 +253,16 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): def __init__(self, database: Database, db_conn, hs): super(StreamWorkerStore, self).__init__(database, db_conn, hs) + self._instance_name = hs.get_instance_name() + self._send_federation = hs.should_send_federation() + self._federation_shard_config = hs.config.federation.federation_shard_config + + # If we're a process that sends federation we may need to reset the + # `federation_stream_position` table to match the current sharding + # config. We don't do this now as otherwise two processes could conflict + # during startup which would cause one to die. + self._need_to_reset_federation_stream_positions = self._send_federation + events_max = self.get_room_max_stream_ordering() event_cache_prefill, min_event_val = self.db.get_cache_dict( db_conn, @@ -793,22 +803,95 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return upper_bound, events - def get_federation_out_pos(self, typ): - return self.db.simple_select_one_onecol( + async def get_federation_out_pos(self, typ: str) -> int: + if self._need_to_reset_federation_stream_positions: + await self.db.runInteraction( + "_reset_federation_positions_txn", self._reset_federation_positions_txn + ) + self._need_to_reset_federation_stream_positions = False + + return await self.db.simple_select_one_onecol( table="federation_stream_position", retcol="stream_id", - keyvalues={"type": typ}, + keyvalues={"type": typ, "instance_name": self._instance_name}, desc="get_federation_out_pos", ) - def update_federation_out_pos(self, typ, stream_id): - return self.db.simple_update_one( + async def update_federation_out_pos(self, typ, stream_id): + if self._need_to_reset_federation_stream_positions: + await self.db.runInteraction( + "_reset_federation_positions_txn", self._reset_federation_positions_txn + ) + self._need_to_reset_federation_stream_positions = False + + return await self.db.simple_update_one( table="federation_stream_position", - keyvalues={"type": typ}, + keyvalues={"type": typ, "instance_name": self._instance_name}, updatevalues={"stream_id": stream_id}, desc="update_federation_out_pos", ) + def _reset_federation_positions_txn(self, txn): + """Fiddles with the `federation_stream_position` table to make it match + the configured federation sender instances during start up. + """ + + # The federation sender instances may have changed, so we need to + # massage the `federation_stream_position` table to have a row per type + # per instance sending federation. If there is a mismatch we update the + # table with the correct rows using the *minimum* stream ID seen. This + # may result in resending of events/EDUs to remote servers, but that is + # preferable to dropping them. + + if not self._send_federation: + return + + # Pull out the configured instances. If we don't have a shard config then + # we assume that we're the only instance sending. + configured_instances = self._federation_shard_config.instances + if not configured_instances: + configured_instances = [self._instance_name] + elif self._instance_name not in configured_instances: + return + + instances_in_table = self.db.simple_select_onecol_txn( + txn, + table="federation_stream_position", + keyvalues={}, + retcol="instance_name", + ) + + if set(instances_in_table) == set(configured_instances): + # Nothing to do + return + + sql = """ + SELECT type, MIN(stream_id) FROM federation_stream_position + GROUP BY type + """ + txn.execute(sql) + min_positions = dict(txn) # Map from type -> min position + + # Ensure we do actually have some values here + assert set(min_positions) == {"federation", "events"} + + sql = """ + DELETE FROM federation_stream_position + WHERE NOT (%s) + """ + clause, args = make_in_list_sql_clause( + txn.database_engine, "instance_name", configured_instances + ) + txn.execute(sql % (clause,), args) + + for typ, stream_id in min_positions.items(): + self.db.simple_upsert_txn( + txn, + table="federation_stream_position", + keyvalues={"type": typ, "instance_name": self._instance_name}, + values={"stream_id": stream_id}, + ) + def has_room_changed_since(self, room_id, stream_id): return self._events_stream_cache.has_entity_changed(room_id, stream_id) diff --git a/tests/replication/test_federation_ack.py b/tests/replication/test_federation_ack.py index 5448d9f0dc..23be1167a3 100644 --- a/tests/replication/test_federation_ack.py +++ b/tests/replication/test_federation_ack.py @@ -32,6 +32,7 @@ class FederationAckTestCase(HomeserverTestCase): def make_homeserver(self, reactor, clock): hs = self.setup_test_homeserver(homeserverToUse=GenericWorkerServer) + return hs def test_federation_ack_sent(self): diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py new file mode 100644 index 0000000000..519a2dc510 --- /dev/null +++ b/tests/replication/test_federation_sender_shard.py @@ -0,0 +1,286 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from mock import Mock + +from twisted.internet import defer + +from synapse.api.constants import EventTypes, Membership +from synapse.app.generic_worker import GenericWorkerServer +from synapse.events.builder import EventBuilderFactory +from synapse.replication.http import streams +from synapse.replication.tcp.handler import ReplicationCommandHandler +from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol +from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory +from synapse.rest.admin import register_servlets_for_client_rest_resource +from synapse.rest.client.v1 import login, room +from synapse.types import UserID + +from tests import unittest +from tests.server import FakeTransport + +logger = logging.getLogger(__name__) + + +class BaseStreamTestCase(unittest.HomeserverTestCase): + """Base class for tests of the replication streams""" + + servlets = [ + streams.register_servlets, + ] + + def prepare(self, reactor, clock, hs): + # build a replication server + self.server_factory = ReplicationStreamProtocolFactory(hs) + self.streamer = hs.get_replication_streamer() + + store = hs.get_datastore() + self.database = store.db + + self.reactor.lookups["testserv"] = "1.2.3.4" + + def default_config(self): + conf = super().default_config() + conf["send_federation"] = False + return conf + + def make_worker_hs(self, extra_config={}): + config = self._get_worker_hs_config() + config.update(extra_config) + + mock_federation_client = Mock(spec=["put_json"]) + mock_federation_client.put_json.side_effect = lambda *_, **__: defer.succeed({}) + + worker_hs = self.setup_test_homeserver( + http_client=mock_federation_client, + homeserverToUse=GenericWorkerServer, + config=config, + reactor=self.reactor, + ) + + store = worker_hs.get_datastore() + store.db._db_pool = self.database._db_pool + + repl_handler = ReplicationCommandHandler(worker_hs) + client = ClientReplicationStreamProtocol( + worker_hs, "client", "test", self.clock, repl_handler, + ) + server = self.server_factory.buildProtocol(None) + + client_transport = FakeTransport(server, self.reactor) + client.makeConnection(client_transport) + + server_transport = FakeTransport(client, self.reactor) + server.makeConnection(server_transport) + + return worker_hs + + def _get_worker_hs_config(self) -> dict: + config = self.default_config() + config["worker_app"] = "synapse.app.federation_sender" + config["worker_replication_host"] = "testserv" + config["worker_replication_http_port"] = "8765" + return config + + def replicate(self): + """Tell the master side of replication that something has happened, and then + wait for the replication to occur. + """ + self.streamer.on_notifier_poke() + self.pump() + + def create_room_with_remote_server(self, user, token, remote_server="other_server"): + room = self.helper.create_room_as(user, tok=token) + store = self.hs.get_datastore() + federation = self.hs.get_handlers().federation_handler + + prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room)) + room_version = self.get_success(store.get_room_version(room)) + + factory = EventBuilderFactory(self.hs) + factory.hostname = remote_server + + user_id = UserID("user", remote_server).to_string() + + event_dict = { + "type": EventTypes.Member, + "state_key": user_id, + "content": {"membership": Membership.JOIN}, + "sender": user_id, + "room_id": room, + } + + builder = factory.for_room_version(room_version, event_dict) + join_event = self.get_success(builder.build(prev_event_ids)) + + self.get_success(federation.on_send_join_request(remote_server, join_event)) + self.replicate() + + return room + + +class FederationSenderTestCase(BaseStreamTestCase): + servlets = [ + login.register_servlets, + register_servlets_for_client_rest_resource, + room.register_servlets, + ] + + def test_send_event_single_sender(self): + """Test that using a single federation sender worker correctly sends a + new event. + """ + worker_hs = self.make_worker_hs({"send_federation": True}) + mock_client = worker_hs.get_http_client() + + user = self.register_user("user", "pass") + token = self.login("user", "pass") + + room = self.create_room_with_remote_server(user, token) + + mock_client.put_json.reset_mock() + + self.create_and_send_event(room, UserID.from_string(user)) + self.replicate() + + # Assert that the event was sent out over federation. + mock_client.put_json.assert_called() + self.assertEqual(mock_client.put_json.call_args[0][0], "other_server") + self.assertTrue(mock_client.put_json.call_args[1]["data"].get("pdus")) + + def test_send_event_sharded(self): + """Test that using two federation sender workers correctly sends + new events. + """ + worker1 = self.make_worker_hs( + { + "send_federation": True, + "worker_name": "sender1", + "federation_sender_instances": ["sender1", "sender2"], + } + ) + mock_client1 = worker1.get_http_client() + + worker2 = self.make_worker_hs( + { + "send_federation": True, + "worker_name": "sender2", + "federation_sender_instances": ["sender1", "sender2"], + } + ) + mock_client2 = worker2.get_http_client() + + user = self.register_user("user2", "pass") + token = self.login("user2", "pass") + + sent_on_1 = False + sent_on_2 = False + for i in range(20): + server_name = "other_server_%d" % (i,) + room = self.create_room_with_remote_server(user, token, server_name) + mock_client1.reset_mock() + mock_client2.reset_mock() + + self.create_and_send_event(room, UserID.from_string(user)) + self.replicate() + + if mock_client1.put_json.called: + sent_on_1 = True + mock_client2.put_json.assert_not_called() + self.assertEqual(mock_client1.put_json.call_args[0][0], server_name) + self.assertTrue(mock_client1.put_json.call_args[1]["data"].get("pdus")) + elif mock_client2.put_json.called: + sent_on_2 = True + mock_client1.put_json.assert_not_called() + self.assertEqual(mock_client2.put_json.call_args[0][0], server_name) + self.assertTrue(mock_client2.put_json.call_args[1]["data"].get("pdus")) + else: + raise AssertionError( + "Expected send transaction from one or the other sender" + ) + + if sent_on_1 and sent_on_2: + break + + self.assertTrue(sent_on_1) + self.assertTrue(sent_on_2) + + def test_send_typing_sharded(self): + """Test that using two federation sender workers correctly sends + new typing EDUs. + """ + worker1 = self.make_worker_hs( + { + "send_federation": True, + "worker_name": "sender1", + "federation_sender_instances": ["sender1", "sender2"], + } + ) + mock_client1 = worker1.get_http_client() + + worker2 = self.make_worker_hs( + { + "send_federation": True, + "worker_name": "sender2", + "federation_sender_instances": ["sender1", "sender2"], + } + ) + mock_client2 = worker2.get_http_client() + + user = self.register_user("user3", "pass") + token = self.login("user3", "pass") + + typing_handler = self.hs.get_typing_handler() + + sent_on_1 = False + sent_on_2 = False + for i in range(20): + server_name = "other_server_%d" % (i,) + room = self.create_room_with_remote_server(user, token, server_name) + mock_client1.reset_mock() + mock_client2.reset_mock() + + self.get_success( + typing_handler.started_typing( + target_user=UserID.from_string(user), + auth_user=UserID.from_string(user), + room_id=room, + timeout=20000, + ) + ) + + self.replicate() + + if mock_client1.put_json.called: + sent_on_1 = True + mock_client2.put_json.assert_not_called() + self.assertEqual(mock_client1.put_json.call_args[0][0], server_name) + self.assertTrue(mock_client1.put_json.call_args[1]["data"].get("edus")) + elif mock_client2.put_json.called: + sent_on_2 = True + mock_client1.put_json.assert_not_called() + self.assertEqual(mock_client2.put_json.call_args[0][0], server_name) + self.assertTrue(mock_client2.put_json.call_args[1]["data"].get("edus")) + else: + raise AssertionError( + "Expected send transaction from one or the other sender" + ) + + if sent_on_1 and sent_on_2: + break + + self.assertTrue(sent_on_1) + self.assertTrue(sent_on_2) -- cgit 1.5.1 From 2c1b9d676322fad8cb57c92f97f81393bcfcbe56 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 29 Jul 2020 23:22:13 +0100 Subject: Update worker docs with recent enhancements (#7969) --- changelog.d/7969.doc | 1 + docs/sample_config.yaml | 54 +++ docs/synctl_workers.md | 32 ++ docs/workers.md | 459 +++++++++++---------- synapse/app/generic_worker.py | 6 +- synapse/config/federation.py | 12 +- synapse/config/homeserver.py | 2 +- synapse/config/logger.py | 2 +- synapse/config/redis.py | 23 +- synapse/config/workers.py | 49 ++- synapse/federation/send_queue.py | 2 +- synapse/federation/sender/__init__.py | 2 +- synapse/federation/sender/per_destination_queue.py | 2 +- synapse/storage/data_stores/main/stream.py | 2 +- 14 files changed, 413 insertions(+), 235 deletions(-) create mode 100644 changelog.d/7969.doc create mode 100644 docs/synctl_workers.md (limited to 'synapse/config/homeserver.py') diff --git a/changelog.d/7969.doc b/changelog.d/7969.doc new file mode 100644 index 0000000000..68d2ed5fad --- /dev/null +++ b/changelog.d/7969.doc @@ -0,0 +1 @@ +Update worker docs with latest enhancements. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 3227294e0b..b21e36bb6d 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -2398,3 +2398,57 @@ opentracing: # # logging: # false + + +## Workers ## + +# Disables sending of outbound federation transactions on the main process. +# Uncomment if using a federation sender worker. +# +#send_federation: false + +# It is possible to run multiple federation sender workers, in which case the +# work is balanced across them. +# +# This configuration must be shared between all federation sender workers, and if +# changed all federation sender workers must be stopped at the same time and then +# started, to ensure that all instances are running with the same config (otherwise +# events may be dropped). +# +#federation_sender_instances: +# - federation_sender1 + +# When using workers this should be a map from `worker_name` to the +# HTTP replication listener of the worker, if configured. +# +#instance_map: +# worker1: +# host: localhost +# port: 8034 + +# Experimental: When using workers you can define which workers should +# handle event persistence and typing notifications. Any worker +# specified here must also be in the `instance_map`. +# +#stream_writers: +# events: worker1 +# typing: worker1 + + +# Configuration for Redis when using workers. This *must* be enabled when +# using workers (unless using old style direct TCP configuration). +# +redis: + # Uncomment the below to enable Redis support. + # + #enabled: true + + # Optional host and port to use to connect to redis. Defaults to + # localhost and 6379 + # + #host: localhost + #port: 6379 + + # Optional password if configured on the Redis instance + # + #password: diff --git a/docs/synctl_workers.md b/docs/synctl_workers.md new file mode 100644 index 0000000000..8da4a31852 --- /dev/null +++ b/docs/synctl_workers.md @@ -0,0 +1,32 @@ +### Using synctl with workers + +If you want to use `synctl` to manage your synapse processes, you will need to +create an an additional configuration file for the main synapse process. That +configuration should look like this: + +```yaml +worker_app: synapse.app.homeserver +``` + +Additionally, each worker app must be configured with the name of a "pid file", +to which it will write its process ID when it starts. For example, for a +synchrotron, you might write: + +```yaml +worker_pid_file: /home/matrix/synapse/worker1.pid +``` + +Finally, to actually run your worker-based synapse, you must pass synctl the `-a` +commandline option to tell it to operate on all the worker configurations found +in the given directory, e.g.: + + synctl -a $CONFIG/workers start + +Currently one should always restart all workers when restarting or upgrading +synapse, unless you explicitly know it's safe not to. For instance, restarting +synapse without restarting all the synchrotrons may result in broken typing +notifications. + +To manipulate a specific worker, you pass the -w option to synctl: + + synctl -w $CONFIG/workers/worker1.yaml restart diff --git a/docs/workers.md b/docs/workers.md index f4cbbc0400..38bd758e57 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -16,69 +16,106 @@ workers only work with PostgreSQL-based Synapse deployments. SQLite should only be used for demo purposes and any admin considering workers should already be running PostgreSQL. -## Master/worker communication +## Main process/worker communication -The workers communicate with the master process via a Synapse-specific protocol -called 'replication' (analogous to MySQL- or Postgres-style database -replication) which feeds a stream of relevant data from the master to the -workers so they can be kept in sync with the master process and database state. +The processes communicate with each other via a Synapse-specific protocol called +'replication' (analogous to MySQL- or Postgres-style database replication) which +feeds streams of newly written data between processes so they can be kept in +sync with the database state. -Additionally, workers may make HTTP requests to the master, to send information -in the other direction. Typically this is used for operations which need to -wait for a reply - such as sending an event. +Additionally, processes may make HTTP requests to each other. Typically this is +used for operations which need to wait for a reply - such as sending an event. -## Configuration +As of Synapse v1.13.0, it is possible to configure Synapse to send replication +via a [Redis pub/sub channel](https://redis.io/topics/pubsub), and is now the +recommended way of configuring replication. This is an alternative to the old +direct TCP connections to the main process: rather than all the workers +connecting to the main process, all the workers and the main process connect to +Redis, which relays replication commands between processes. This can give a +significant cpu saving on the main process and will be a prerequisite for +upcoming performance improvements. + +(See the [Architectural diagram](#architectural-diagram) section at the end for +a visualisation of what this looks like) + + +## Setting up workers + +A Redis server is required to manage the communication between the processes. +(The older direct TCP connections are now deprecated.) The Redis server +should be installed following the normal procedure for your distribution (e.g. +`apt install redis-server` on Debian). It is safe to use an existing Redis +deployment if you have one. + +Once installed, check that Redis is running and accessible from the host running +Synapse, for example by executing `echo PING | nc -q1 localhost 6379` and seeing +a response of `+PONG`. + +The appropriate dependencies must also be installed for Synapse. If using a +virtualenv, these can be installed with: + +```sh +pip install matrix-synapse[redis] +``` + +Note that these dependencies are included when synapse is installed with `pip +install matrix-synapse[all]`. They are also included in the debian packages from +`matrix.org` and in the docker images at +https://hub.docker.com/r/matrixdotorg/synapse/. To make effective use of the workers, you will need to configure an HTTP reverse-proxy such as nginx or haproxy, which will direct incoming requests to -the correct worker, or to the main synapse instance. Note that this includes -requests made to the federation port. See [reverse_proxy.md](reverse_proxy.md) +the correct worker, or to the main synapse instance. See [reverse_proxy.md](reverse_proxy.md) for information on setting up a reverse proxy. -To enable workers, you need to add *two* replication listeners to the -main Synapse configuration file (`homeserver.yaml`). For example: +To enable workers you should create a configuration file for each worker +process. Each worker configuration file inherits the configuration of the shared +homeserver configuration file. You can then override configuration specific to +that worker, e.g. the HTTP listener that it provides (if any); logging +configuration; etc. You should minimise the number of overrides though to +maintain a usable config. + +Next you need to add both a HTTP replication listener and redis config to the +shared Synapse configuration file (`homeserver.yaml`). For example: ```yaml +# extend the existing `listeners` section. This defines the ports that the +# main process will listen on. listeners: - # The TCP replication port - - port: 9092 - bind_address: '127.0.0.1' - type: replication - # The HTTP replication port - port: 9093 bind_address: '127.0.0.1' type: http resources: - names: [replication] + +redis: + enabled: true ``` -Under **no circumstances** should these replication API listeners be exposed to -the public internet; they have no authentication and are unencrypted. +See the sample config for the full documentation of each option. -You should then create a set of configs for the various worker processes. Each -worker configuration file inherits the configuration of the main homeserver -configuration file. You can then override configuration specific to that -worker, e.g. the HTTP listener that it provides (if any); logging -configuration; etc. You should minimise the number of overrides though to -maintain a usable config. +Under **no circumstances** should the replication listener be exposed to the +public internet; it has no authentication and is unencrypted. In the config file for each worker, you must specify the type of worker -application (`worker_app`). The currently available worker applications are -listed below. You must also specify the replication endpoints that it should -talk to on the main synapse process. `worker_replication_host` should specify -the host of the main synapse, `worker_replication_port` should point to the TCP -replication listener port and `worker_replication_http_port` should point to -the HTTP replication port. +application (`worker_app`), and you should specify a unqiue name for the worker +(`worker_name`). The currently available worker applications are listed below. +You must also specify the HTTP replication endpoint that it should talk to on +the main synapse process. `worker_replication_host` should specify the host of +the main synapse and `worker_replication_http_port` should point to the HTTP +replication port. If the worker will handle HTTP requests then the +`worker_listeners` option should be set with a `http` listener, in the same way +as the `listeners` option in the shared config. For example: ```yaml -worker_app: synapse.app.synchrotron +worker_app: synapse.app.generic_worker +worker_name: worker1 -# The replication listener on the synapse to talk to. +# The replication listener on the main synapse process. worker_replication_host: 127.0.0.1 -worker_replication_port: 9092 worker_replication_http_port: 9093 worker_listeners: @@ -87,13 +124,14 @@ worker_listeners: resources: - names: - client + - federation -worker_log_config: /home/matrix/synapse/config/synchrotron_log_config.yaml +worker_log_config: /home/matrix/synapse/config/worker1_log_config.yaml ``` -...is a full configuration for a synchrotron worker instance, which will expose a -plain HTTP `/sync` endpoint on port 8083 separately from the `/sync` endpoint provided -by the main synapse. +...is a full configuration for a generic worker instance, which will expose a +plain HTTP endpoint on port 8083 separately serving various endpoints, e.g. +`/sync`, which are listed below. Obviously you should configure your reverse-proxy to route the relevant endpoints to the worker (`localhost:8083` in the above example). @@ -102,127 +140,24 @@ Finally, you need to start your worker processes. This can be done with either `synctl` or your distribution's preferred service manager such as `systemd`. We recommend the use of `systemd` where available: for information on setting up `systemd` to start synapse workers, see -[systemd-with-workers](systemd-with-workers). To use `synctl`, see below. +[systemd-with-workers](systemd-with-workers). To use `synctl`, see +[synctl_workers.md](synctl_workers.md). -### **Experimental** support for replication over redis - -As of Synapse v1.13.0, it is possible to configure Synapse to send replication -via a [Redis pub/sub channel](https://redis.io/topics/pubsub). This is an -alternative to direct TCP connections to the master: rather than all the -workers connecting to the master, all the workers and the master connect to -Redis, which relays replication commands between processes. This can give a -significant cpu saving on the master and will be a prerequisite for upcoming -performance improvements. - -Note that this support is currently experimental; you may experience lost -messages and similar problems! It is strongly recommended that admins setting -up workers for the first time use direct TCP replication as above. - -To configure Synapse to use Redis: - -1. Install Redis following the normal procedure for your distribution - for - example, on Debian, `apt install redis-server`. (It is safe to use an - existing Redis deployment if you have one: we use a pub/sub stream named - according to the `server_name` of your synapse server.) -2. Check Redis is running and accessible: you should be able to `echo PING | nc -q1 - localhost 6379` and get a response of `+PONG`. -3. Install the python prerequisites. If you installed synapse into a - virtualenv, this can be done with: - ```sh - pip install matrix-synapse[redis] - ``` - The debian packages from matrix.org already include the required - dependencies. -4. Add config to the shared configuration (`homeserver.yaml`): - ```yaml - redis: - enabled: true - ``` - Optional parameters which can go alongside `enabled` are `host`, `port`, - `password`. Normally none of these are required. -5. Restart master and all workers. - -Once redis replication is in use, `worker_replication_port` is redundant and -can be removed from the worker configuration files. Similarly, the -configuration for the `listener` for the TCP replication port can be removed -from the main configuration file. Note that the HTTP replication port is -still required. - -### Using synctl - -If you want to use `synctl` to manage your synapse processes, you will need to -create an an additional configuration file for the master synapse process. That -configuration should look like this: - -```yaml -worker_app: synapse.app.homeserver -``` - -Additionally, each worker app must be configured with the name of a "pid file", -to which it will write its process ID when it starts. For example, for a -synchrotron, you might write: - -```yaml -worker_pid_file: /home/matrix/synapse/synchrotron.pid -``` - -Finally, to actually run your worker-based synapse, you must pass synctl the `-a` -commandline option to tell it to operate on all the worker configurations found -in the given directory, e.g.: - - synctl -a $CONFIG/workers start - -Currently one should always restart all workers when restarting or upgrading -synapse, unless you explicitly know it's safe not to. For instance, restarting -synapse without restarting all the synchrotrons may result in broken typing -notifications. - -To manipulate a specific worker, you pass the -w option to synctl: - - synctl -w $CONFIG/workers/synchrotron.yaml restart ## Available worker applications -### `synapse.app.pusher` - -Handles sending push notifications to sygnal and email. Doesn't handle any -REST endpoints itself, but you should set `start_pushers: False` in the -shared configuration file to stop the main synapse sending these notifications. - -Note this worker cannot be load-balanced: only one instance should be active. - -### `synapse.app.synchrotron` +### `synapse.app.generic_worker` -The synchrotron handles `sync` requests from clients. In particular, it can -handle REST endpoints matching the following regular expressions: +This worker can handle API requests matching the following regular +expressions: + # Sync requests ^/_matrix/client/(v2_alpha|r0)/sync$ ^/_matrix/client/(api/v1|v2_alpha|r0)/events$ ^/_matrix/client/(api/v1|r0)/initialSync$ ^/_matrix/client/(api/v1|r0)/rooms/[^/]+/initialSync$ -The above endpoints should all be routed to the synchrotron worker by the -reverse-proxy configuration. - -It is possible to run multiple instances of the synchrotron to scale -horizontally. In this case the reverse-proxy should be configured to -load-balance across the instances, though it will be more efficient if all -requests from a particular user are routed to a single instance. Extracting -a userid from the access token is currently left as an exercise for the reader. - -### `synapse.app.appservice` - -Handles sending output traffic to Application Services. Doesn't handle any -REST endpoints itself, but you should set `notify_appservices: False` in the -shared configuration file to stop the main synapse sending these notifications. - -Note this worker cannot be load-balanced: only one instance should be active. - -### `synapse.app.federation_reader` - -Handles a subset of federation endpoints. In particular, it can handle REST -endpoints matching the following regular expressions: - + # Federation requests ^/_matrix/federation/v1/event/ ^/_matrix/federation/v1/state/ ^/_matrix/federation/v1/state_ids/ @@ -242,40 +177,145 @@ endpoints matching the following regular expressions: ^/_matrix/federation/v1/event_auth/ ^/_matrix/federation/v1/exchange_third_party_invite/ ^/_matrix/federation/v1/user/devices/ - ^/_matrix/federation/v1/send/ ^/_matrix/federation/v1/get_groups_publicised$ ^/_matrix/key/v2/query + # Inbound federation transaction request + ^/_matrix/federation/v1/send/ + + # Client API requests + ^/_matrix/client/(api/v1|r0|unstable)/publicRooms$ + ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/joined_members$ + ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/context/.*$ + ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/members$ + ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state$ + ^/_matrix/client/(api/v1|r0|unstable)/account/3pid$ + ^/_matrix/client/(api/v1|r0|unstable)/keys/query$ + ^/_matrix/client/(api/v1|r0|unstable)/keys/changes$ + ^/_matrix/client/versions$ + ^/_matrix/client/(api/v1|r0|unstable)/voip/turnServer$ + ^/_matrix/client/(api/v1|r0|unstable)/joined_groups$ + ^/_matrix/client/(api/v1|r0|unstable)/publicised_groups$ + ^/_matrix/client/(api/v1|r0|unstable)/publicised_groups/ + + # Registration/login requests + ^/_matrix/client/(api/v1|r0|unstable)/login$ + ^/_matrix/client/(r0|unstable)/register$ + ^/_matrix/client/(r0|unstable)/auth/.*/fallback/web$ + + # Event sending requests + ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send + ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state/ + ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$ + ^/_matrix/client/(api/v1|r0|unstable)/join/ + ^/_matrix/client/(api/v1|r0|unstable)/profile/ + + Additionally, the following REST endpoints can be handled for GET requests: ^/_matrix/federation/v1/groups/ -The above endpoints should all be routed to the federation_reader worker by the -reverse-proxy configuration. +Pagination requests can also be handled, but all requests for a given +room must be routed to the same instance. Additionally, care must be taken to +ensure that the purge history admin API is not used while pagination requests +for the room are in flight: + + ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/messages$ + +Note that a HTTP listener with `client` and `federation` resources must be +configured in the `worker_listeners` option in the worker config. + + +#### Load balancing + +It is possible to run multiple instances of this worker app, with incoming requests +being load-balanced between them by the reverse-proxy. However, different endpoints +have different characteristics and so admins +may wish to run multiple groups of workers handling different endpoints so that +load balancing can be done in different ways. + +For `/sync` and `/initialSync` requests it will be more efficient if all +requests from a particular user are routed to a single instance. Extracting a +user ID from the access token or `Authorization` header is currently left as an +exercise for the reader. Admins may additionally wish to separate out `/sync` +requests that have a `since` query parameter from those that don't (and +`/initialSync`), as requests that don't are known as "initial sync" that happens +when a user logs in on a new device and can be *very* resource intensive, so +isolating these requests will stop them from interfering with other users ongoing +syncs. + +Federation and client requests can be balanced via simple round robin. -The `^/_matrix/federation/v1/send/` endpoint must only be handled by a single -instance. +The inbound federation transaction request `^/_matrix/federation/v1/send/` +should be balanced by source IP so that transactions from the same remote server +go to the same process. -Note that `federation` must be added to the listener resources in the worker config: +Registration/login requests can be handled separately purely to help ensure that +unexpected load doesn't affect new logins and sign ups. + +Finally, event sending requests can be balanced by the room ID in the URI (or +the full URI, or even just round robin), the room ID is the path component after +`/rooms/`. If there is a large bridge connected that is sending or may send lots +of events, then a dedicated set of workers can be provisioned to limit the +effects of bursts of events from that bridge on events sent by normal users. + +#### Stream writers + +Additionally, there is *experimental* support for moving writing of specific +streams (such as events) off of the main process to a particular worker. (This +is only supported with Redis-based replication.) + +Currently support streams are `events` and `typing`. + +To enable this, the worker must have a HTTP replication listener configured, +have a `worker_name` and be listed in the `instance_map` config. For example to +move event persistence off to a dedicated worker, the shared configuration would +include: ```yaml -worker_app: synapse.app.federation_reader -... -worker_listeners: - - type: http - port: - resources: - - names: - - federation +instance_map: + event_persister1: + host: localhost + port: 8034 + +streams_writers: + events: event_persister1 ``` + +### `synapse.app.pusher` + +Handles sending push notifications to sygnal and email. Doesn't handle any +REST endpoints itself, but you should set `start_pushers: False` in the +shared configuration file to stop the main synapse sending push notifications. + +Note this worker cannot be load-balanced: only one instance should be active. + +### `synapse.app.appservice` + +Handles sending output traffic to Application Services. Doesn't handle any +REST endpoints itself, but you should set `notify_appservices: False` in the +shared configuration file to stop the main synapse sending appservice notifications. + +Note this worker cannot be load-balanced: only one instance should be active. + + ### `synapse.app.federation_sender` Handles sending federation traffic to other servers. Doesn't handle any REST endpoints itself, but you should set `send_federation: False` in the shared configuration file to stop the main synapse sending this traffic. -Note this worker cannot be load-balanced: only one instance should be active. +If running multiple federation senders then you must list each +instance in the `federation_sender_instances` option by their `worker_name`. +All instances must be stopped and started when adding or removing instances. +For example: + +```yaml +federation_sender_instances: + - federation_sender1 + - federation_sender2 +``` ### `synapse.app.media_repository` @@ -314,46 +354,6 @@ and you must configure a single instance to run the background tasks, e.g.: media_instance_running_background_jobs: "media-repository-1" ``` -### `synapse.app.client_reader` - -Handles client API endpoints. It can handle REST endpoints matching the -following regular expressions: - - ^/_matrix/client/(api/v1|r0|unstable)/publicRooms$ - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/joined_members$ - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/context/.*$ - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/members$ - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state$ - ^/_matrix/client/(api/v1|r0|unstable)/login$ - ^/_matrix/client/(api/v1|r0|unstable)/account/3pid$ - ^/_matrix/client/(api/v1|r0|unstable)/keys/query$ - ^/_matrix/client/(api/v1|r0|unstable)/keys/changes$ - ^/_matrix/client/versions$ - ^/_matrix/client/(api/v1|r0|unstable)/voip/turnServer$ - ^/_matrix/client/(api/v1|r0|unstable)/joined_groups$ - ^/_matrix/client/(api/v1|r0|unstable)/publicised_groups$ - ^/_matrix/client/(api/v1|r0|unstable)/publicised_groups/ - -Additionally, the following REST endpoints can be handled for GET requests: - - ^/_matrix/client/(api/v1|r0|unstable)/pushrules/.*$ - ^/_matrix/client/(api/v1|r0|unstable)/groups/.*$ - ^/_matrix/client/(api/v1|r0|unstable)/user/[^/]*/account_data/ - ^/_matrix/client/(api/v1|r0|unstable)/user/[^/]*/rooms/[^/]*/account_data/ - -Additionally, the following REST endpoints can be handled, but all requests must -be routed to the same instance: - - ^/_matrix/client/(r0|unstable)/register$ - ^/_matrix/client/(r0|unstable)/auth/.*/fallback/web$ - -Pagination requests can also be handled, but all requests with the same path -room must be routed to the same instance. Additionally, care must be taken to -ensure that the purge history admin API is not used while pagination requests -for the room are in flight: - - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/messages$ - ### `synapse.app.user_dir` Handles searches in the user directory. It can handle REST endpoints matching @@ -388,15 +388,48 @@ file. For example: worker_main_http_uri: http://127.0.0.1:8008 -### `synapse.app.event_creator` +### Historical apps -Handles some event creation. It can handle REST endpoints matching: +*Note:* Historically there used to be more apps, however they have been +amalgamated into a single `synapse.app.generic_worker` app. The remaining apps +are ones that do specific processing unrelated to requests, e.g. the `pusher` +that handles sending out push notifications for new events. The intention is for +all these to be folded into the `generic_worker` app and to use config to define +which processes handle the various proccessing such as push notifications. - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state/ - ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$ - ^/_matrix/client/(api/v1|r0|unstable)/join/ - ^/_matrix/client/(api/v1|r0|unstable)/profile/ -It will create events locally and then send them on to the main synapse -instance to be persisted and handled. +## Architectural diagram + +The following shows an example setup using Redis and a reverse proxy: + +``` + Clients & Federation + | + v + +-----------+ + | | + | Reverse | + | Proxy | + | | + +-----------+ + | | | + | | | HTTP requests + +-------------------+ | +-----------+ + | +---+ | + | | | + v v v ++--------------+ +--------------+ +--------------+ +--------------+ +| Main | | Generic | | Generic | | Event | +| Process | | Worker 1 | | Worker 2 | | Persister | ++--------------+ +--------------+ +--------------+ +--------------+ + ^ ^ | ^ | | ^ | ^ ^ + | | | | | | | | | | + | | | | | HTTP | | | | | + | +----------+<--|---|---------+ | | | | + | | +-------------|-->+----------+ | + | | | | + | | | | + v v v v +==================================================================== + Redis pub/sub channel +``` diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index ec0dbddb8c..5841454c9a 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -940,7 +940,7 @@ def start(config_options): config.server.update_user_directory = False if config.worker_app == "synapse.app.federation_sender": - if config.federation.send_federation: + if config.worker.send_federation: sys.stderr.write( "\nThe send_federation must be disabled in the main synapse process" "\nbefore they can be run in a separate worker." @@ -950,10 +950,10 @@ def start(config_options): sys.exit(1) # Force the pushers to start since they will be disabled in the main config - config.federation.send_federation = True + config.worker.send_federation = True else: # For other worker types we force this to off. - config.federation.send_federation = False + config.worker.send_federation = False synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts diff --git a/synapse/config/federation.py b/synapse/config/federation.py index 82ff9664de..2c77d8f85b 100644 --- a/synapse/config/federation.py +++ b/synapse/config/federation.py @@ -17,23 +17,13 @@ from typing import Optional from netaddr import IPSet -from ._base import Config, ConfigError, ShardedWorkerHandlingConfig +from ._base import Config, ConfigError class FederationConfig(Config): section = "federation" def read_config(self, config, **kwargs): - # Whether to send federation traffic out in this process. This only - # applies to some federation traffic, and so shouldn't be used to - # "disable" federation - self.send_federation = config.get("send_federation", True) - - federation_sender_instances = config.get("federation_sender_instances") or [] - self.federation_shard_config = ShardedWorkerHandlingConfig( - federation_sender_instances - ) - # FIXME: federation_domain_whitelist needs sytests self.federation_domain_whitelist = None # type: Optional[dict] federation_domain_whitelist = config.get("federation_domain_whitelist", None) diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 8e93d31394..556e291495 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -78,7 +78,6 @@ class HomeServerConfig(RootConfig): JWTConfig, PasswordConfig, EmailConfig, - WorkerConfig, PasswordAuthProviderConfig, PushConfig, SpamCheckerConfig, @@ -91,6 +90,7 @@ class HomeServerConfig(RootConfig): RoomDirectoryConfig, ThirdPartyRulesConfig, TracerConfig, + WorkerConfig, RedisConfig, FederationConfig, ] diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 49f6c32beb..dd775a97e8 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -214,7 +214,7 @@ def setup_logging( Set up the logging subsystem. Args: - config (LoggingConfig | synapse.config.workers.WorkerConfig): + config (LoggingConfig | synapse.config.worker.WorkerConfig): configuration data use_worker_options (bool): True to use the 'worker_log_config' option diff --git a/synapse/config/redis.py b/synapse/config/redis.py index d5d3ca1c9e..1373302335 100644 --- a/synapse/config/redis.py +++ b/synapse/config/redis.py @@ -21,7 +21,7 @@ class RedisConfig(Config): section = "redis" def read_config(self, config, **kwargs): - redis_config = config.get("redis", {}) + redis_config = config.get("redis") or {} self.redis_enabled = redis_config.get("enabled", False) if not self.redis_enabled: @@ -32,3 +32,24 @@ class RedisConfig(Config): self.redis_host = redis_config.get("host", "localhost") self.redis_port = redis_config.get("port", 6379) self.redis_password = redis_config.get("password") + + def generate_config_section(self, config_dir_path, server_name, **kwargs): + return """\ + # Configuration for Redis when using workers. This *must* be enabled when + # using workers (unless using old style direct TCP configuration). + # + redis: + # Uncomment the below to enable Redis support. + # + #enabled: true + + # Optional host and port to use to connect to redis. Defaults to + # localhost and 6379 + # + #host: localhost + #port: 6379 + + # Optional password if configured on the Redis instance + # + #password: + """ diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 2574cd3aa1..c784a71508 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -15,7 +15,7 @@ import attr -from ._base import Config, ConfigError +from ._base import Config, ConfigError, ShardedWorkerHandlingConfig from .server import ListenerConfig, parse_listener_def @@ -85,6 +85,16 @@ class WorkerConfig(Config): ) ) + # Whether to send federation traffic out in this process. This only + # applies to some federation traffic, and so shouldn't be used to + # "disable" federation + self.send_federation = config.get("send_federation", True) + + federation_sender_instances = config.get("federation_sender_instances") or [] + self.federation_shard_config = ShardedWorkerHandlingConfig( + federation_sender_instances + ) + # A map from instance name to host/port of their HTTP replication endpoint. instance_map = config.get("instance_map") or {} self.instance_map = { @@ -105,6 +115,43 @@ class WorkerConfig(Config): % (instance, stream) ) + def generate_config_section(self, config_dir_path, server_name, **kwargs): + return """\ + ## Workers ## + + # Disables sending of outbound federation transactions on the main process. + # Uncomment if using a federation sender worker. + # + #send_federation: false + + # It is possible to run multiple federation sender workers, in which case the + # work is balanced across them. + # + # This configuration must be shared between all federation sender workers, and if + # changed all federation sender workers must be stopped at the same time and then + # started, to ensure that all instances are running with the same config (otherwise + # events may be dropped). + # + #federation_sender_instances: + # - federation_sender1 + + # When using workers this should be a map from `worker_name` to the + # HTTP replication listener of the worker, if configured. + # + #instance_map: + # worker1: + # host: localhost + # port: 8034 + + # Experimental: When using workers you can define which workers should + # handle event persistence and typing notifications. Any worker + # specified here must also be in the `instance_map`. + # + #stream_writers: + # events: worker1 + # typing: worker1 + """ + def read_arguments(self, args): # We support a bunch of command line arguments that override options in # the config. A lot of these options have a worker_* prefix when running diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 4fc9ff92e5..2b0ab2dcbf 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -57,7 +57,7 @@ class FederationRemoteSendQueue(object): # We may have multiple federation sender instances, so we need to track # their positions separately. - self._sender_instances = hs.config.federation.federation_shard_config.instances + self._sender_instances = hs.config.worker.federation_shard_config.instances self._sender_positions = {} # Pending presence map user_id -> UserPresenceState diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index ba4ddd2370..6ae6522f87 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -70,7 +70,7 @@ class FederationSender(object): self._transaction_manager = TransactionManager(hs) self._instance_name = hs.get_instance_name() - self._federation_shard_config = hs.config.federation.federation_shard_config + self._federation_shard_config = hs.config.worker.federation_shard_config # map from destination to PerDestinationQueue self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue] diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 3436741783..dd150f89a6 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -75,7 +75,7 @@ class PerDestinationQueue(object): self._store = hs.get_datastore() self._transaction_manager = transaction_manager self._instance_name = hs.get_instance_name() - self._federation_shard_config = hs.config.federation.federation_shard_config + self._federation_shard_config = hs.config.worker.federation_shard_config self._should_send_on_this_instance = True if not self._federation_shard_config.should_handle( diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 5e32c7aa1e..10d39b3699 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -255,7 +255,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): self._instance_name = hs.get_instance_name() self._send_federation = hs.should_send_federation() - self._federation_shard_config = hs.config.federation.federation_shard_config + self._federation_shard_config = hs.config.worker.federation_shard_config # If we're a process that sends federation we may need to reset the # `federation_stream_position` table to match the current sharding -- cgit 1.5.1