diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/app/federation_sender.py | 12 | ||||
-rw-r--r-- | synapse/config/emailconfig.py | 222 | ||||
-rw-r--r-- | synapse/config/push.py | 2 | ||||
-rw-r--r-- | synapse/config/registration.py | 83 | ||||
-rw-r--r-- | synapse/config/saml2_config.py | 1 | ||||
-rw-r--r-- | synapse/federation/sender/__init__.py | 18 | ||||
-rw-r--r-- | synapse/federation/transport/server.py | 19 | ||||
-rw-r--r-- | synapse/handlers/saml_handler.py | 27 | ||||
-rw-r--r-- | synapse/notifier.py | 31 | ||||
-rw-r--r-- | synapse/replication/tcp/client.py | 3 | ||||
-rw-r--r-- | synapse/replication/tcp/commands.py | 17 | ||||
-rw-r--r-- | synapse/replication/tcp/protocol.py | 15 | ||||
-rw-r--r-- | synapse/replication/tcp/resource.py | 9 | ||||
-rw-r--r-- | synapse/server.pyi | 12 |
14 files changed, 318 insertions, 153 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index a57cf991ac..38d11fdd0f 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -158,6 +158,13 @@ class FederationSenderReplicationHandler(ReplicationClientHandler): args.update(self.send_handler.stream_positions()) return args + def on_remote_server_up(self, server: str): + """Called when get a new REMOTE_SERVER_UP command.""" + + # Let's wake up the transaction queue for the server in case we have + # pending stuff to send to it. + self.send_handler.wake_destination(server) + def start(config_options): try: @@ -205,7 +212,7 @@ class FederationSenderHandler(object): to the federation sender. """ - def __init__(self, hs, replication_client): + def __init__(self, hs: FederationSenderServer, replication_client): self.store = hs.get_datastore() self._is_mine_id = hs.is_mine_id self.federation_sender = hs.get_federation_sender() @@ -226,6 +233,9 @@ class FederationSenderHandler(object): self.store.get_room_max_stream_ordering() ) + def wake_destination(self, server: str): + self.federation_sender.wake_destination(server) + def stream_positions(self): return {"federation": self.federation_position} diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py index 35756bed87..74853f9faa 100644 --- a/synapse/config/emailconfig.py +++ b/synapse/config/emailconfig.py @@ -37,10 +37,12 @@ class EmailConfig(Config): self.email_enable_notifs = False - email_config = config.get("email", {}) + email_config = config.get("email") + if email_config is None: + email_config = {} - self.email_smtp_host = email_config.get("smtp_host", None) - self.email_smtp_port = email_config.get("smtp_port", None) + self.email_smtp_host = email_config.get("smtp_host", "localhost") + self.email_smtp_port = email_config.get("smtp_port", 25) self.email_smtp_user = email_config.get("smtp_user", None) self.email_smtp_pass = email_config.get("smtp_pass", None) self.require_transport_security = email_config.get( @@ -74,9 +76,9 @@ class EmailConfig(Config): self.email_template_dir = os.path.abspath(template_dir) self.email_enable_notifs = email_config.get("enable_notifs", False) - account_validity_renewal_enabled = config.get("account_validity", {}).get( - "renew_at" - ) + + account_validity_config = config.get("account_validity") or {} + account_validity_renewal_enabled = account_validity_config.get("renew_at") self.threepid_behaviour_email = ( # Have Synapse handle the email sending if account_threepid_delegates.email @@ -278,7 +280,9 @@ class EmailConfig(Config): self.email_notif_for_new_users = email_config.get( "notif_for_new_users", True ) - self.email_riot_base_url = email_config.get("riot_base_url", None) + self.email_riot_base_url = email_config.get( + "client_base_url", email_config.get("riot_base_url", None) + ) if account_validity_renewal_enabled: self.email_expiry_template_html = email_config.get( @@ -294,107 +298,111 @@ class EmailConfig(Config): raise ConfigError("Unable to find email template file %s" % (p,)) def generate_config_section(self, config_dir_path, server_name, **kwargs): - return """ - # Enable sending emails for password resets, notification events or - # account expiry notices - # - # If your SMTP server requires authentication, the optional smtp_user & - # smtp_pass variables should be used - # - #email: - # enable_notifs: false - # smtp_host: "localhost" - # smtp_port: 25 # SSL: 465, STARTTLS: 587 - # smtp_user: "exampleusername" - # smtp_pass: "examplepassword" - # require_transport_security: false - # - # # notif_from defines the "From" address to use when sending emails. - # # It must be set if email sending is enabled. - # # - # # The placeholder '%(app)s' will be replaced by the application name, - # # which is normally 'app_name' (below), but may be overridden by the - # # Matrix client application. - # # - # # Note that the placeholder must be written '%(app)s', including the - # # trailing 's'. - # # - # notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>" - # - # # app_name defines the default value for '%(app)s' in notif_from. It - # # defaults to 'Matrix'. - # # - # #app_name: my_branded_matrix_server - # - # # Enable email notifications by default - # # - # notif_for_new_users: true - # - # # Defining a custom URL for Riot is only needed if email notifications - # # should contain links to a self-hosted installation of Riot; when set - # # the "app_name" setting is ignored - # # - # riot_base_url: "http://localhost/riot" - # - # # Configure the time that a validation email or text message code - # # will expire after sending - # # - # # This is currently used for password resets - # # - # #validation_token_lifetime: 1h - # - # # Template directory. All template files should be stored within this - # # directory. If not set, default templates from within the Synapse - # # package will be used - # # - # # For the list of default templates, please see - # # https://github.com/matrix-org/synapse/tree/master/synapse/res/templates - # # - # #template_dir: res/templates - # - # # Templates for email notifications - # # - # notif_template_html: notif_mail.html - # notif_template_text: notif_mail.txt - # - # # Templates for account expiry notices - # # - # expiry_template_html: notice_expiry.html - # expiry_template_text: notice_expiry.txt - # - # # Templates for password reset emails sent by the homeserver - # # - # #password_reset_template_html: password_reset.html - # #password_reset_template_text: password_reset.txt - # - # # Templates for registration emails sent by the homeserver - # # - # #registration_template_html: registration.html - # #registration_template_text: registration.txt - # - # # Templates for validation emails sent by the homeserver when adding an email to - # # your user account - # # - # #add_threepid_template_html: add_threepid.html - # #add_threepid_template_text: add_threepid.txt - # - # # Templates for password reset success and failure pages that a user - # # will see after attempting to reset their password - # # - # #password_reset_template_success_html: password_reset_success.html - # #password_reset_template_failure_html: password_reset_failure.html - # - # # Templates for registration success and failure pages that a user - # # will see after attempting to register using an email or phone - # # - # #registration_template_success_html: registration_success.html - # #registration_template_failure_html: registration_failure.html + return """\ + # Configuration for sending emails from Synapse. # - # # Templates for success and failure pages that a user will see after attempting - # # to add an email or phone to their account - # # - # #add_threepid_success_html: add_threepid_success.html - # #add_threepid_failure_html: add_threepid_failure.html + email: + # The hostname of the outgoing SMTP server to use. Defaults to 'localhost'. + # + #smtp_host: mail.server + + # The port on the mail server for outgoing SMTP. Defaults to 25. + # + #smtp_port: 587 + + # Username/password for authentication to the SMTP server. By default, no + # authentication is attempted. + # + # smtp_user: "exampleusername" + # smtp_pass: "examplepassword" + + # Uncomment the following to require TLS transport security for SMTP. + # By default, Synapse will connect over plain text, and will then switch to + # TLS via STARTTLS *if the SMTP server supports it*. If this option is set, + # Synapse will refuse to connect unless the server supports STARTTLS. + # + #require_transport_security: true + + # Enable sending emails for messages that the user has missed + # + #enable_notifs: false + + # notif_from defines the "From" address to use when sending emails. + # It must be set if email sending is enabled. + # + # The placeholder '%(app)s' will be replaced by the application name, + # which is normally 'app_name' (below), but may be overridden by the + # Matrix client application. + # + # Note that the placeholder must be written '%(app)s', including the + # trailing 's'. + # + #notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>" + + # app_name defines the default value for '%(app)s' in notif_from. It + # defaults to 'Matrix'. + # + #app_name: my_branded_matrix_server + + # Uncomment the following to disable automatic subscription to email + # notifications for new users. Enabled by default. + # + #notif_for_new_users: false + + # Custom URL for client links within the email notifications. By default + # links will be based on "https://matrix.to". + # + # (This setting used to be called riot_base_url; the old name is still + # supported for backwards-compatibility but is now deprecated.) + # + #client_base_url: "http://localhost/riot" + + # Configure the time that a validation email will expire after sending. + # Defaults to 1h. + # + #validation_token_lifetime: 15m + + # Directory in which Synapse will try to find the template files below. + # If not set, default templates from within the Synapse package will be used. + # + # DO NOT UNCOMMENT THIS SETTING unless you want to customise the templates. + # If you *do* uncomment it, you will need to make sure that all the templates + # below are in the directory. + # + # Synapse will look for the following templates in this directory: + # + # * The contents of email notifications of missed events: 'notif_mail.html' and + # 'notif_mail.txt'. + # + # * The contents of account expiry notice emails: 'notice_expiry.html' and + # 'notice_expiry.txt'. + # + # * The contents of password reset emails sent by the homeserver: + # 'password_reset.html' and 'password_reset.txt' + # + # * HTML pages for success and failure that a user will see when they follow + # the link in the password reset email: 'password_reset_success.html' and + # 'password_reset_failure.html' + # + # * The contents of address verification emails sent during registration: + # 'registration.html' and 'registration.txt' + # + # * HTML pages for success and failure that a user will see when they follow + # the link in an address verification email sent during registration: + # 'registration_success.html' and 'registration_failure.html' + # + # * The contents of address verification emails sent when an address is added + # to a Matrix account: 'add_threepid.html' and 'add_threepid.txt' + # + # * HTML pages for success and failure that a user will see when they follow + # the link in an address verification email sent when an address is added + # to a Matrix account: 'add_threepid_success.html' and + # 'add_threepid_failure.html' + # + # You can see the default templates at: + # https://github.com/matrix-org/synapse/tree/master/synapse/res/templates + # + #template_dir: "res/templates" """ diff --git a/synapse/config/push.py b/synapse/config/push.py index 0910958649..6f2b3a7faa 100644 --- a/synapse/config/push.py +++ b/synapse/config/push.py @@ -35,7 +35,7 @@ class PushConfig(Config): # Now check for the one in the 'email' section and honour it, # with a warning. - push_config = config.get("email", {}) + push_config = config.get("email") or {} redact_content = push_config.get("redact_content") if redact_content is not None: print( diff --git a/synapse/config/registration.py b/synapse/config/registration.py index ee9614c5f7..b873995a49 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -27,6 +27,8 @@ class AccountValidityConfig(Config): section = "accountvalidity" def __init__(self, config, synapse_config): + if config is None: + return self.enabled = config.get("enabled", False) self.renew_by_email_enabled = "renew_at" in config @@ -159,23 +161,6 @@ class RegistrationConfig(Config): # Optional account validity configuration. This allows for accounts to be denied # any request after a given period. # - # ``enabled`` defines whether the account validity feature is enabled. Defaults - # to False. - # - # ``period`` allows setting the period after which an account is valid - # after its registration. When renewing the account, its validity period - # will be extended by this amount of time. This parameter is required when using - # the account validity feature. - # - # ``renew_at`` is the amount of time before an account's expiry date at which - # Synapse will send an email to the account's email address with a renewal link. - # This needs the ``email`` and ``public_baseurl`` configuration sections to be - # filled. - # - # ``renew_email_subject`` is the subject of the email sent out with the renewal - # link. ``%%(app)s`` can be used as a placeholder for the ``app_name`` parameter - # from the ``email`` section. - # # Once this feature is enabled, Synapse will look for registered users without an # expiration date at startup and will add one to every account it found using the # current settings at that time. @@ -186,21 +171,55 @@ class RegistrationConfig(Config): # date will be randomly selected within a range [now + period - d ; now + period], # where d is equal to 10%% of the validity period. # - #account_validity: - # enabled: true - # period: 6w - # renew_at: 1w - # renew_email_subject: "Renew your %%(app)s account" - # # Directory in which Synapse will try to find the HTML files to serve to the - # # user when trying to renew an account. Optional, defaults to - # # synapse/res/templates. - # template_dir: "res/templates" - # # HTML to be displayed to the user after they successfully renewed their - # # account. Optional. - # account_renewed_html_path: "account_renewed.html" - # # HTML to be displayed when the user tries to renew an account with an invalid - # # renewal token. Optional. - # invalid_token_html_path: "invalid_token.html" + account_validity: + # The account validity feature is disabled by default. Uncomment the + # following line to enable it. + # + #enabled: true + + # The period after which an account is valid after its registration. When + # renewing the account, its validity period will be extended by this amount + # of time. This parameter is required when using the account validity + # feature. + # + #period: 6w + + # The amount of time before an account's expiry date at which Synapse will + # send an email to the account's email address with a renewal link. By + # default, no such emails are sent. + # + # If you enable this setting, you will also need to fill out the 'email' and + # 'public_baseurl' configuration sections. + # + #renew_at: 1w + + # The subject of the email sent out with the renewal link. '%%(app)s' can be + # used as a placeholder for the 'app_name' parameter from the 'email' + # section. + # + # Note that the placeholder must be written '%%(app)s', including the + # trailing 's'. + # + # If this is not set, a default value is used. + # + #renew_email_subject: "Renew your %%(app)s account" + + # Directory in which Synapse will try to find templates for the HTML files to + # serve to the user when trying to renew an account. If not set, default + # templates from within the Synapse package will be used. + # + #template_dir: "res/templates" + + # File within 'template_dir' giving the HTML to be displayed to the user after + # they successfully renewed their account. If not set, default text is used. + # + #account_renewed_html_path: "account_renewed.html" + + # File within 'template_dir' giving the HTML to be displayed when the user + # tries to renew an account with an invalid renewal token. If not set, + # default text is used. + # + #invalid_token_html_path: "invalid_token.html" # Time that a user's session remains valid for, after they log in. # diff --git a/synapse/config/saml2_config.py b/synapse/config/saml2_config.py index b91414aa35..423c158b11 100644 --- a/synapse/config/saml2_config.py +++ b/synapse/config/saml2_config.py @@ -121,6 +121,7 @@ class SAML2Config(Config): required_methods = [ "get_saml_attributes", "saml_response_to_user_attributes", + "get_remote_user_id", ] missing_methods = [ method diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 4ebb0e8bc0..36c83c3027 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -21,6 +21,7 @@ from prometheus_client import Counter from twisted.internet import defer +import synapse import synapse.metrics from synapse.federation.sender.per_destination_queue import PerDestinationQueue from synapse.federation.sender.transaction_manager import TransactionManager @@ -54,7 +55,7 @@ sent_pdus_destination_dist_total = Counter( class FederationSender(object): - def __init__(self, hs): + def __init__(self, hs: "synapse.server.HomeServer"): self.hs = hs self.server_name = hs.hostname @@ -482,7 +483,20 @@ class FederationSender(object): def send_device_messages(self, destination): if destination == self.server_name: - logger.info("Not sending device update to ourselves") + logger.warning("Not sending device update to ourselves") + return + + self._get_per_destination_queue(destination).attempt_new_transaction() + + def wake_destination(self, destination: str): + """Called when we want to retry sending transactions to a remote. + + This is mainly useful if the remote server has been down and we think it + might have come back. + """ + + if destination == self.server_name: + logger.warning("Not waking up ourselves") return self._get_per_destination_queue(destination).attempt_new_transaction() diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index b4cbf23394..d8cf9ed299 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -44,6 +44,7 @@ from synapse.logging.opentracing import ( tags, whitelisted_homeserver, ) +from synapse.server import HomeServer from synapse.types import ThirdPartyInstanceID, get_domain_from_id from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.versionstring import get_version_string @@ -101,12 +102,17 @@ class NoAuthenticationError(AuthenticationError): class Authenticator(object): - def __init__(self, hs): + def __init__(self, hs: HomeServer): self._clock = hs.get_clock() self.keyring = hs.get_keyring() self.server_name = hs.hostname self.store = hs.get_datastore() self.federation_domain_whitelist = hs.config.federation_domain_whitelist + self.notifer = hs.get_notifier() + + self.replication_client = None + if hs.config.worker.worker_app: + self.replication_client = hs.get_tcp_replication() # A method just so we can pass 'self' as the authenticator to the Servlets async def authenticate_request(self, request, content): @@ -166,6 +172,17 @@ class Authenticator(object): try: logger.info("Marking origin %r as up", origin) await self.store.set_destination_retry_timings(origin, None, 0, 0) + + # Inform the relevant places that the remote server is back up. + self.notifer.notify_remote_server_up(origin) + if self.replication_client: + # If we're on a worker we try and inform master about this. The + # replication client doesn't hook into the notifier to avoid + # infinite loops where we send a `REMOTE_SERVER_UP` command to + # master, which then echoes it back to us which in turn pokes + # the notifier. + self.replication_client.send_remote_server_up(origin) + except Exception: logger.exception("Error resetting retry timings on %s", origin) diff --git a/synapse/handlers/saml_handler.py b/synapse/handlers/saml_handler.py index 32638671c9..7f411b53b9 100644 --- a/synapse/handlers/saml_handler.py +++ b/synapse/handlers/saml_handler.py @@ -146,14 +146,15 @@ class SamlHandler: logger.info("SAML2 mapped attributes: %s", saml2_auth.ava) - try: - remote_user_id = saml2_auth.ava["uid"][0] - except KeyError: - logger.warning("SAML2 response lacks a 'uid' attestation") - raise SynapseError(400, "'uid' not in SAML2 response") - self._outstanding_requests_dict.pop(saml2_auth.in_response_to, None) + remote_user_id = self._user_mapping_provider.get_remote_user_id( + saml2_auth, client_redirect_url + ) + + if not remote_user_id: + raise Exception("Failed to extract remote user id from SAML response") + with (await self._mapping_lock.queue(self._auth_provider_id)): # first of all, check if we already have a mapping for this user logger.info( @@ -290,6 +291,20 @@ class DefaultSamlMappingProvider(object): self._mxid_source_attribute = parsed_config.mxid_source_attribute self._mxid_mapper = parsed_config.mxid_mapper + self._grandfathered_mxid_source_attribute = ( + module_api._hs.config.saml2_grandfathered_mxid_source_attribute + ) + + def get_remote_user_id( + self, saml_response: saml2.response.AuthnResponse, client_redirect_url: str + ): + """Extracts the remote user id from the SAML response""" + try: + return saml_response.ava["uid"][0] + except KeyError: + logger.warning("SAML2 response lacks a 'uid' attestation") + raise SynapseError(400, "'uid' not in SAML2 response") + def saml_response_to_user_attributes( self, saml_response: saml2.response.AuthnResponse, diff --git a/synapse/notifier.py b/synapse/notifier.py index 5f5f765bea..6132727cbd 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -15,11 +15,13 @@ import logging from collections import namedtuple +from typing import Callable, List from prometheus_client import Counter from twisted.internet import defer +import synapse.server from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError from synapse.handlers.presence import format_user_presence_state @@ -154,7 +156,7 @@ class Notifier(object): UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000 - def __init__(self, hs): + def __init__(self, hs: "synapse.server.HomeServer"): self.user_to_user_stream = {} self.room_to_user_streams = {} @@ -164,7 +166,12 @@ class Notifier(object): self.store = hs.get_datastore() self.pending_new_room_events = [] - self.replication_callbacks = [] + # Called when there are new things to stream over replication + self.replication_callbacks = [] # type: List[Callable[[], None]] + + # Called when remote servers have come back online after having been + # down. + self.remote_server_up_callbacks = [] # type: List[Callable[[str], None]] self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() @@ -205,7 +212,7 @@ class Notifier(object): "synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream) ) - def add_replication_callback(self, cb): + def add_replication_callback(self, cb: Callable[[], None]): """Add a callback that will be called when some new data is available. Callback is not given any arguments. It should *not* return a Deferred - if it needs to do any asynchronous work, a background thread should be started and @@ -213,6 +220,12 @@ class Notifier(object): """ self.replication_callbacks.append(cb) + def add_remote_server_up_callback(self, cb: Callable[[str], None]): + """Add a callback that will be called when synapse detects a server + has been + """ + self.remote_server_up_callbacks.append(cb) + def on_new_room_event( self, event, room_stream_id, max_room_stream_id, extra_users=[] ): @@ -522,3 +535,15 @@ class Notifier(object): """Notify the any replication listeners that there's a new event""" for cb in self.replication_callbacks: cb() + + def notify_remote_server_up(self, server: str): + """Notify any replication that a remote server has come back up + """ + # We call federation_sender directly rather than registering as a + # callback as a) we already have a reference to it and b) it introduces + # circular dependencies. + if self.federation_sender: + self.federation_sender.wake_destination(server) + + for cb in self.remote_server_up_callbacks: + cb(server) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 52a0aefe68..fc06a7b053 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -143,6 +143,9 @@ class ReplicationClientHandler(AbstractReplicationClientHandler): if d: d.callback(data) + def on_remote_server_up(self, server: str): + """Called when get a new REMOTE_SERVER_UP command.""" + def get_streams_to_replicate(self) -> Dict[str, int]: """Called when a new connection has been established and we need to subscribe to streams. diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index cbb36b9acf..451671412d 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -387,6 +387,20 @@ class UserIpCommand(Command): ) +class RemoteServerUpCommand(Command): + """Sent when a worker has detected that a remote server is no longer + "down" and retry timings should be reset. + + If sent from a client the server will relay to all other workers. + + Format:: + + REMOTE_SERVER_UP <server> + """ + + NAME = "REMOTE_SERVER_UP" + + _COMMANDS = ( ServerCommand, RdataCommand, @@ -401,6 +415,7 @@ _COMMANDS = ( RemovePusherCommand, InvalidateCacheCommand, UserIpCommand, + RemoteServerUpCommand, ) # type: Tuple[Type[Command], ...] # Map of command name to command type. @@ -414,6 +429,7 @@ VALID_SERVER_COMMANDS = ( ErrorCommand.NAME, PingCommand.NAME, SyncCommand.NAME, + RemoteServerUpCommand.NAME, ) # The commands the client is allowed to send @@ -427,4 +443,5 @@ VALID_CLIENT_COMMANDS = ( InvalidateCacheCommand.NAME, UserIpCommand.NAME, ErrorCommand.NAME, + RemoteServerUpCommand.NAME, ) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 5f4bdf84d2..131e5acb09 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -76,6 +76,7 @@ from synapse.replication.tcp.commands import ( PingCommand, PositionCommand, RdataCommand, + RemoteServerUpCommand, ReplicateCommand, ServerCommand, SyncCommand, @@ -460,6 +461,9 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): async def on_INVALIDATE_CACHE(self, cmd): self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys) + async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand): + self.streamer.on_remote_server_up(cmd.data) + async def on_USER_IP(self, cmd): self.streamer.on_user_ip( cmd.user_id, @@ -555,6 +559,9 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): def send_sync(self, data): self.send_command(SyncCommand(data)) + def send_remote_server_up(self, server: str): + self.send_command(RemoteServerUpCommand(server)) + def on_connection_closed(self): BaseReplicationStreamProtocol.on_connection_closed(self) self.streamer.lost_connection(self) @@ -589,6 +596,11 @@ class AbstractReplicationClientHandler(metaclass=abc.ABCMeta): raise NotImplementedError() @abc.abstractmethod + async def on_remote_server_up(self, server: str): + """Called when get a new REMOTE_SERVER_UP command.""" + raise NotImplementedError() + + @abc.abstractmethod def get_streams_to_replicate(self): """Called when a new connection has been established and we need to subscribe to streams. @@ -707,6 +719,9 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): async def on_SYNC(self, cmd): self.handler.on_sync(cmd.data) + async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand): + self.handler.on_remote_server_up(cmd.data) + def replicate(self, stream_name, token): """Send the subscription request to the server """ diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index b1752e88cd..6ebf944f66 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -120,6 +120,7 @@ class ReplicationStreamer(object): self.federation_sender = hs.get_federation_sender() self.notifier.add_replication_callback(self.on_notifier_poke) + self.notifier.add_remote_server_up_callback(self.send_remote_server_up) # Keeps track of whether we are currently checking for updates self.is_looping = False @@ -288,6 +289,14 @@ class ReplicationStreamer(object): ) await self._server_notices_sender.on_user_ip(user_id) + @measure_func("repl.on_remote_server_up") + def on_remote_server_up(self, server: str): + self.notifier.notify_remote_server_up(server) + + def send_remote_server_up(self, server: str): + for conn in self.connections: + conn.send_remote_server_up(server) + def send_sync_to_all_connections(self, data): """Sends a SYNC command to all clients. diff --git a/synapse/server.pyi b/synapse/server.pyi index b5e0b57095..0731403047 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -1,3 +1,5 @@ +import twisted.internet + import synapse.api.auth import synapse.config.homeserver import synapse.federation.sender @@ -9,10 +11,12 @@ import synapse.handlers.deactivate_account import synapse.handlers.device import synapse.handlers.e2e_keys import synapse.handlers.message +import synapse.handlers.presence import synapse.handlers.room import synapse.handlers.room_member import synapse.handlers.set_password import synapse.http.client +import synapse.notifier import synapse.rest.media.v1.media_repository import synapse.server_notices.server_notices_manager import synapse.server_notices.server_notices_sender @@ -85,3 +89,11 @@ class HomeServer(object): self, ) -> synapse.server_notices.server_notices_sender.ServerNoticesSender: pass + def get_notifier(self) -> synapse.notifier.Notifier: + pass + def get_presence_handler(self) -> synapse.handlers.presence.PresenceHandler: + pass + def get_clock(self) -> synapse.util.Clock: + pass + def get_reactor(self) -> twisted.internet.base.ReactorBase: + pass |