summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/app/federation_sender.py12
-rw-r--r--synapse/config/emailconfig.py222
-rw-r--r--synapse/config/push.py2
-rw-r--r--synapse/config/registration.py83
-rw-r--r--synapse/config/saml2_config.py1
-rw-r--r--synapse/federation/sender/__init__.py18
-rw-r--r--synapse/federation/transport/server.py19
-rw-r--r--synapse/handlers/saml_handler.py27
-rw-r--r--synapse/notifier.py31
-rw-r--r--synapse/replication/tcp/client.py3
-rw-r--r--synapse/replication/tcp/commands.py17
-rw-r--r--synapse/replication/tcp/protocol.py15
-rw-r--r--synapse/replication/tcp/resource.py9
-rw-r--r--synapse/server.pyi12
14 files changed, 318 insertions, 153 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index a57cf991ac..38d11fdd0f 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -158,6 +158,13 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
         args.update(self.send_handler.stream_positions())
         return args
 
+    def on_remote_server_up(self, server: str):
+        """Called when get a new REMOTE_SERVER_UP command."""
+
+        # Let's wake up the transaction queue for the server in case we have
+        # pending stuff to send to it.
+        self.send_handler.wake_destination(server)
+
 
 def start(config_options):
     try:
@@ -205,7 +212,7 @@ class FederationSenderHandler(object):
     to the federation sender.
     """
 
-    def __init__(self, hs, replication_client):
+    def __init__(self, hs: FederationSenderServer, replication_client):
         self.store = hs.get_datastore()
         self._is_mine_id = hs.is_mine_id
         self.federation_sender = hs.get_federation_sender()
@@ -226,6 +233,9 @@ class FederationSenderHandler(object):
             self.store.get_room_max_stream_ordering()
         )
 
+    def wake_destination(self, server: str):
+        self.federation_sender.wake_destination(server)
+
     def stream_positions(self):
         return {"federation": self.federation_position}
 
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
index 35756bed87..74853f9faa 100644
--- a/synapse/config/emailconfig.py
+++ b/synapse/config/emailconfig.py
@@ -37,10 +37,12 @@ class EmailConfig(Config):
 
         self.email_enable_notifs = False
 
-        email_config = config.get("email", {})
+        email_config = config.get("email")
+        if email_config is None:
+            email_config = {}
 
-        self.email_smtp_host = email_config.get("smtp_host", None)
-        self.email_smtp_port = email_config.get("smtp_port", None)
+        self.email_smtp_host = email_config.get("smtp_host", "localhost")
+        self.email_smtp_port = email_config.get("smtp_port", 25)
         self.email_smtp_user = email_config.get("smtp_user", None)
         self.email_smtp_pass = email_config.get("smtp_pass", None)
         self.require_transport_security = email_config.get(
@@ -74,9 +76,9 @@ class EmailConfig(Config):
         self.email_template_dir = os.path.abspath(template_dir)
 
         self.email_enable_notifs = email_config.get("enable_notifs", False)
-        account_validity_renewal_enabled = config.get("account_validity", {}).get(
-            "renew_at"
-        )
+
+        account_validity_config = config.get("account_validity") or {}
+        account_validity_renewal_enabled = account_validity_config.get("renew_at")
 
         self.threepid_behaviour_email = (
             # Have Synapse handle the email sending if account_threepid_delegates.email
@@ -278,7 +280,9 @@ class EmailConfig(Config):
             self.email_notif_for_new_users = email_config.get(
                 "notif_for_new_users", True
             )
-            self.email_riot_base_url = email_config.get("riot_base_url", None)
+            self.email_riot_base_url = email_config.get(
+                "client_base_url", email_config.get("riot_base_url", None)
+            )
 
         if account_validity_renewal_enabled:
             self.email_expiry_template_html = email_config.get(
@@ -294,107 +298,111 @@ class EmailConfig(Config):
                     raise ConfigError("Unable to find email template file %s" % (p,))
 
     def generate_config_section(self, config_dir_path, server_name, **kwargs):
-        return """
-        # Enable sending emails for password resets, notification events or
-        # account expiry notices
-        #
-        # If your SMTP server requires authentication, the optional smtp_user &
-        # smtp_pass variables should be used
-        #
-        #email:
-        #   enable_notifs: false
-        #   smtp_host: "localhost"
-        #   smtp_port: 25 # SSL: 465, STARTTLS: 587
-        #   smtp_user: "exampleusername"
-        #   smtp_pass: "examplepassword"
-        #   require_transport_security: false
-        #
-        #   # notif_from defines the "From" address to use when sending emails.
-        #   # It must be set if email sending is enabled.
-        #   #
-        #   # The placeholder '%(app)s' will be replaced by the application name,
-        #   # which is normally 'app_name' (below), but may be overridden by the
-        #   # Matrix client application.
-        #   #
-        #   # Note that the placeholder must be written '%(app)s', including the
-        #   # trailing 's'.
-        #   #
-        #   notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>"
-        #
-        #   # app_name defines the default value for '%(app)s' in notif_from. It
-        #   # defaults to 'Matrix'.
-        #   #
-        #   #app_name: my_branded_matrix_server
-        #
-        #   # Enable email notifications by default
-        #   #
-        #   notif_for_new_users: true
-        #
-        #   # Defining a custom URL for Riot is only needed if email notifications
-        #   # should contain links to a self-hosted installation of Riot; when set
-        #   # the "app_name" setting is ignored
-        #   #
-        #   riot_base_url: "http://localhost/riot"
-        #
-        #   # Configure the time that a validation email or text message code
-        #   # will expire after sending
-        #   #
-        #   # This is currently used for password resets
-        #   #
-        #   #validation_token_lifetime: 1h
-        #
-        #   # Template directory. All template files should be stored within this
-        #   # directory. If not set, default templates from within the Synapse
-        #   # package will be used
-        #   #
-        #   # For the list of default templates, please see
-        #   # https://github.com/matrix-org/synapse/tree/master/synapse/res/templates
-        #   #
-        #   #template_dir: res/templates
-        #
-        #   # Templates for email notifications
-        #   #
-        #   notif_template_html: notif_mail.html
-        #   notif_template_text: notif_mail.txt
-        #
-        #   # Templates for account expiry notices
-        #   #
-        #   expiry_template_html: notice_expiry.html
-        #   expiry_template_text: notice_expiry.txt
-        #
-        #   # Templates for password reset emails sent by the homeserver
-        #   #
-        #   #password_reset_template_html: password_reset.html
-        #   #password_reset_template_text: password_reset.txt
-        #
-        #   # Templates for registration emails sent by the homeserver
-        #   #
-        #   #registration_template_html: registration.html
-        #   #registration_template_text: registration.txt
-        #
-        #   # Templates for validation emails sent by the homeserver when adding an email to
-        #   # your user account
-        #   #
-        #   #add_threepid_template_html: add_threepid.html
-        #   #add_threepid_template_text: add_threepid.txt
-        #
-        #   # Templates for password reset success and failure pages that a user
-        #   # will see after attempting to reset their password
-        #   #
-        #   #password_reset_template_success_html: password_reset_success.html
-        #   #password_reset_template_failure_html: password_reset_failure.html
-        #
-        #   # Templates for registration success and failure pages that a user
-        #   # will see after attempting to register using an email or phone
-        #   #
-        #   #registration_template_success_html: registration_success.html
-        #   #registration_template_failure_html: registration_failure.html
+        return """\
+        # Configuration for sending emails from Synapse.
         #
-        #   # Templates for success and failure pages that a user will see after attempting
-        #   # to add an email or phone to their account
-        #   #
-        #   #add_threepid_success_html: add_threepid_success.html
-        #   #add_threepid_failure_html: add_threepid_failure.html
+        email:
+          # The hostname of the outgoing SMTP server to use. Defaults to 'localhost'.
+          #
+          #smtp_host: mail.server
+
+          # The port on the mail server for outgoing SMTP. Defaults to 25.
+          #
+          #smtp_port: 587
+
+          # Username/password for authentication to the SMTP server. By default, no
+          # authentication is attempted.
+          #
+          # smtp_user: "exampleusername"
+          # smtp_pass: "examplepassword"
+
+          # Uncomment the following to require TLS transport security for SMTP.
+          # By default, Synapse will connect over plain text, and will then switch to
+          # TLS via STARTTLS *if the SMTP server supports it*. If this option is set,
+          # Synapse will refuse to connect unless the server supports STARTTLS.
+          #
+          #require_transport_security: true
+
+          # Enable sending emails for messages that the user has missed
+          #
+          #enable_notifs: false
+
+          # notif_from defines the "From" address to use when sending emails.
+          # It must be set if email sending is enabled.
+          #
+          # The placeholder '%(app)s' will be replaced by the application name,
+          # which is normally 'app_name' (below), but may be overridden by the
+          # Matrix client application.
+          #
+          # Note that the placeholder must be written '%(app)s', including the
+          # trailing 's'.
+          #
+          #notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>"
+
+          # app_name defines the default value for '%(app)s' in notif_from. It
+          # defaults to 'Matrix'.
+          #
+          #app_name: my_branded_matrix_server
+
+          # Uncomment the following to disable automatic subscription to email
+          # notifications for new users. Enabled by default.
+          #
+          #notif_for_new_users: false
+
+          # Custom URL for client links within the email notifications. By default
+          # links will be based on "https://matrix.to".
+          #
+          # (This setting used to be called riot_base_url; the old name is still
+          # supported for backwards-compatibility but is now deprecated.)
+          #
+          #client_base_url: "http://localhost/riot"
+
+          # Configure the time that a validation email will expire after sending.
+          # Defaults to 1h.
+          #
+          #validation_token_lifetime: 15m
+
+          # Directory in which Synapse will try to find the template files below.
+          # If not set, default templates from within the Synapse package will be used.
+          #
+          # DO NOT UNCOMMENT THIS SETTING unless you want to customise the templates.
+          # If you *do* uncomment it, you will need to make sure that all the templates
+          # below are in the directory.
+          #
+          # Synapse will look for the following templates in this directory:
+          #
+          # * The contents of email notifications of missed events: 'notif_mail.html' and
+          #   'notif_mail.txt'.
+          #
+          # * The contents of account expiry notice emails: 'notice_expiry.html' and
+          #   'notice_expiry.txt'.
+          #
+          # * The contents of password reset emails sent by the homeserver:
+          #   'password_reset.html' and 'password_reset.txt'
+          #
+          # * HTML pages for success and failure that a user will see when they follow
+          #   the link in the password reset email: 'password_reset_success.html' and
+          #   'password_reset_failure.html'
+          #
+          # * The contents of address verification emails sent during registration:
+          #   'registration.html' and 'registration.txt'
+          #
+          # * HTML pages for success and failure that a user will see when they follow
+          #   the link in an address verification email sent during registration:
+          #   'registration_success.html' and 'registration_failure.html'
+          #
+          # * The contents of address verification emails sent when an address is added
+          #   to a Matrix account: 'add_threepid.html' and 'add_threepid.txt'
+          #
+          # * HTML pages for success and failure that a user will see when they follow
+          #   the link in an address verification email sent when an address is added
+          #   to a Matrix account: 'add_threepid_success.html' and
+          #   'add_threepid_failure.html'
+          #
+          # You can see the default templates at:
+          # https://github.com/matrix-org/synapse/tree/master/synapse/res/templates
+          #
+          #template_dir: "res/templates"
         """
 
 
diff --git a/synapse/config/push.py b/synapse/config/push.py
index 0910958649..6f2b3a7faa 100644
--- a/synapse/config/push.py
+++ b/synapse/config/push.py
@@ -35,7 +35,7 @@ class PushConfig(Config):
 
         # Now check for the one in the 'email' section and honour it,
         # with a warning.
-        push_config = config.get("email", {})
+        push_config = config.get("email") or {}
         redact_content = push_config.get("redact_content")
         if redact_content is not None:
             print(
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index ee9614c5f7..b873995a49 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -27,6 +27,8 @@ class AccountValidityConfig(Config):
     section = "accountvalidity"
 
     def __init__(self, config, synapse_config):
+        if config is None:
+            return
         self.enabled = config.get("enabled", False)
         self.renew_by_email_enabled = "renew_at" in config
 
@@ -159,23 +161,6 @@ class RegistrationConfig(Config):
         # Optional account validity configuration. This allows for accounts to be denied
         # any request after a given period.
         #
-        # ``enabled`` defines whether the account validity feature is enabled. Defaults
-        # to False.
-        #
-        # ``period`` allows setting the period after which an account is valid
-        # after its registration. When renewing the account, its validity period
-        # will be extended by this amount of time. This parameter is required when using
-        # the account validity feature.
-        #
-        # ``renew_at`` is the amount of time before an account's expiry date at which
-        # Synapse will send an email to the account's email address with a renewal link.
-        # This needs the ``email`` and ``public_baseurl`` configuration sections to be
-        # filled.
-        #
-        # ``renew_email_subject`` is the subject of the email sent out with the renewal
-        # link. ``%%(app)s`` can be used as a placeholder for the ``app_name`` parameter
-        # from the ``email`` section.
-        #
         # Once this feature is enabled, Synapse will look for registered users without an
         # expiration date at startup and will add one to every account it found using the
         # current settings at that time.
@@ -186,21 +171,55 @@ class RegistrationConfig(Config):
         # date will be randomly selected within a range [now + period - d ; now + period],
         # where d is equal to 10%% of the validity period.
         #
-        #account_validity:
-        #  enabled: true
-        #  period: 6w
-        #  renew_at: 1w
-        #  renew_email_subject: "Renew your %%(app)s account"
-        #  # Directory in which Synapse will try to find the HTML files to serve to the
-        #  # user when trying to renew an account. Optional, defaults to
-        #  # synapse/res/templates.
-        #  template_dir: "res/templates"
-        #  # HTML to be displayed to the user after they successfully renewed their
-        #  # account. Optional.
-        #  account_renewed_html_path: "account_renewed.html"
-        #  # HTML to be displayed when the user tries to renew an account with an invalid
-        #  # renewal token. Optional.
-        #  invalid_token_html_path: "invalid_token.html"
+        account_validity:
+          # The account validity feature is disabled by default. Uncomment the
+          # following line to enable it.
+          #
+          #enabled: true
+
+          # The period after which an account is valid after its registration. When
+          # renewing the account, its validity period will be extended by this amount
+          # of time. This parameter is required when using the account validity
+          # feature.
+          #
+          #period: 6w
+
+          # The amount of time before an account's expiry date at which Synapse will
+          # send an email to the account's email address with a renewal link. By
+          # default, no such emails are sent.
+          #
+          # If you enable this setting, you will also need to fill out the 'email' and
+          # 'public_baseurl' configuration sections.
+          #
+          #renew_at: 1w
+
+          # The subject of the email sent out with the renewal link. '%%(app)s' can be
+          # used as a placeholder for the 'app_name' parameter from the 'email'
+          # section.
+          #
+          # Note that the placeholder must be written '%%(app)s', including the
+          # trailing 's'.
+          #
+          # If this is not set, a default value is used.
+          #
+          #renew_email_subject: "Renew your %%(app)s account"
+
+          # Directory in which Synapse will try to find templates for the HTML files to
+          # serve to the user when trying to renew an account. If not set, default
+          # templates from within the Synapse package will be used.
+          #
+          #template_dir: "res/templates"
+
+          # File within 'template_dir' giving the HTML to be displayed to the user after
+          # they successfully renewed their account. If not set, default text is used.
+          #
+          #account_renewed_html_path: "account_renewed.html"
+
+          # File within 'template_dir' giving the HTML to be displayed when the user
+          # tries to renew an account with an invalid renewal token. If not set,
+          # default text is used.
+          #
+          #invalid_token_html_path: "invalid_token.html"
 
         # Time that a user's session remains valid for, after they log in.
         #
diff --git a/synapse/config/saml2_config.py b/synapse/config/saml2_config.py
index b91414aa35..423c158b11 100644
--- a/synapse/config/saml2_config.py
+++ b/synapse/config/saml2_config.py
@@ -121,6 +121,7 @@ class SAML2Config(Config):
         required_methods = [
             "get_saml_attributes",
             "saml_response_to_user_attributes",
+            "get_remote_user_id",
         ]
         missing_methods = [
             method
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 4ebb0e8bc0..36c83c3027 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -21,6 +21,7 @@ from prometheus_client import Counter
 
 from twisted.internet import defer
 
+import synapse
 import synapse.metrics
 from synapse.federation.sender.per_destination_queue import PerDestinationQueue
 from synapse.federation.sender.transaction_manager import TransactionManager
@@ -54,7 +55,7 @@ sent_pdus_destination_dist_total = Counter(
 
 
 class FederationSender(object):
-    def __init__(self, hs):
+    def __init__(self, hs: "synapse.server.HomeServer"):
         self.hs = hs
         self.server_name = hs.hostname
 
@@ -482,7 +483,20 @@ class FederationSender(object):
 
     def send_device_messages(self, destination):
         if destination == self.server_name:
-            logger.info("Not sending device update to ourselves")
+            logger.warning("Not sending device update to ourselves")
+            return
+
+        self._get_per_destination_queue(destination).attempt_new_transaction()
+
+    def wake_destination(self, destination: str):
+        """Called when we want to retry sending transactions to a remote.
+
+        This is mainly useful if the remote server has been down and we think it
+        might have come back.
+        """
+
+        if destination == self.server_name:
+            logger.warning("Not waking up ourselves")
             return
 
         self._get_per_destination_queue(destination).attempt_new_transaction()
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index b4cbf23394..d8cf9ed299 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -44,6 +44,7 @@ from synapse.logging.opentracing import (
     tags,
     whitelisted_homeserver,
 )
+from synapse.server import HomeServer
 from synapse.types import ThirdPartyInstanceID, get_domain_from_id
 from synapse.util.ratelimitutils import FederationRateLimiter
 from synapse.util.versionstring import get_version_string
@@ -101,12 +102,17 @@ class NoAuthenticationError(AuthenticationError):
 
 
 class Authenticator(object):
-    def __init__(self, hs):
+    def __init__(self, hs: HomeServer):
         self._clock = hs.get_clock()
         self.keyring = hs.get_keyring()
         self.server_name = hs.hostname
         self.store = hs.get_datastore()
         self.federation_domain_whitelist = hs.config.federation_domain_whitelist
+        self.notifer = hs.get_notifier()
+
+        self.replication_client = None
+        if hs.config.worker.worker_app:
+            self.replication_client = hs.get_tcp_replication()
 
     # A method just so we can pass 'self' as the authenticator to the Servlets
     async def authenticate_request(self, request, content):
@@ -166,6 +172,17 @@ class Authenticator(object):
         try:
             logger.info("Marking origin %r as up", origin)
             await self.store.set_destination_retry_timings(origin, None, 0, 0)
+
+            # Inform the relevant places that the remote server is back up.
+            self.notifer.notify_remote_server_up(origin)
+            if self.replication_client:
+                # If we're on a worker we try and inform master about this. The
+                # replication client doesn't hook into the notifier to avoid
+                # infinite loops where we send a `REMOTE_SERVER_UP` command to
+                # master, which then echoes it back to us which in turn pokes
+                # the notifier.
+                self.replication_client.send_remote_server_up(origin)
+
         except Exception:
             logger.exception("Error resetting retry timings on %s", origin)
 
diff --git a/synapse/handlers/saml_handler.py b/synapse/handlers/saml_handler.py
index 32638671c9..7f411b53b9 100644
--- a/synapse/handlers/saml_handler.py
+++ b/synapse/handlers/saml_handler.py
@@ -146,14 +146,15 @@ class SamlHandler:
 
         logger.info("SAML2 mapped attributes: %s", saml2_auth.ava)
 
-        try:
-            remote_user_id = saml2_auth.ava["uid"][0]
-        except KeyError:
-            logger.warning("SAML2 response lacks a 'uid' attestation")
-            raise SynapseError(400, "'uid' not in SAML2 response")
-
         self._outstanding_requests_dict.pop(saml2_auth.in_response_to, None)
 
+        remote_user_id = self._user_mapping_provider.get_remote_user_id(
+            saml2_auth, client_redirect_url
+        )
+
+        if not remote_user_id:
+            raise Exception("Failed to extract remote user id from SAML response")
+
         with (await self._mapping_lock.queue(self._auth_provider_id)):
             # first of all, check if we already have a mapping for this user
             logger.info(
@@ -290,6 +291,20 @@ class DefaultSamlMappingProvider(object):
         self._mxid_source_attribute = parsed_config.mxid_source_attribute
         self._mxid_mapper = parsed_config.mxid_mapper
 
+        self._grandfathered_mxid_source_attribute = (
+            module_api._hs.config.saml2_grandfathered_mxid_source_attribute
+        )
+
+    def get_remote_user_id(
+        self, saml_response: saml2.response.AuthnResponse, client_redirect_url: str
+    ):
+        """Extracts the remote user id from the SAML response"""
+        try:
+            return saml_response.ava["uid"][0]
+        except KeyError:
+            logger.warning("SAML2 response lacks a 'uid' attestation")
+            raise SynapseError(400, "'uid' not in SAML2 response")
+
     def saml_response_to_user_attributes(
         self,
         saml_response: saml2.response.AuthnResponse,
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 5f5f765bea..6132727cbd 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -15,11 +15,13 @@
 
 import logging
 from collections import namedtuple
+from typing import Callable, List
 
 from prometheus_client import Counter
 
 from twisted.internet import defer
 
+import synapse.server
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError
 from synapse.handlers.presence import format_user_presence_state
@@ -154,7 +156,7 @@ class Notifier(object):
 
     UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000
 
-    def __init__(self, hs):
+    def __init__(self, hs: "synapse.server.HomeServer"):
         self.user_to_user_stream = {}
         self.room_to_user_streams = {}
 
@@ -164,7 +166,12 @@ class Notifier(object):
         self.store = hs.get_datastore()
         self.pending_new_room_events = []
 
-        self.replication_callbacks = []
+        # Called when there are new things to stream over replication
+        self.replication_callbacks = []  # type: List[Callable[[], None]]
+
+        # Called when remote servers have come back online after having been
+        # down.
+        self.remote_server_up_callbacks = []  # type: List[Callable[[str], None]]
 
         self.clock = hs.get_clock()
         self.appservice_handler = hs.get_application_service_handler()
@@ -205,7 +212,7 @@ class Notifier(object):
             "synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream)
         )
 
-    def add_replication_callback(self, cb):
+    def add_replication_callback(self, cb: Callable[[], None]):
         """Add a callback that will be called when some new data is available.
         Callback is not given any arguments. It should *not* return a Deferred - if
         it needs to do any asynchronous work, a background thread should be started and
@@ -213,6 +220,12 @@ class Notifier(object):
         """
         self.replication_callbacks.append(cb)
 
+    def add_remote_server_up_callback(self, cb: Callable[[str], None]):
+        """Add a callback that will be called when synapse detects a server
+        has been
+        """
+        self.remote_server_up_callbacks.append(cb)
+
     def on_new_room_event(
         self, event, room_stream_id, max_room_stream_id, extra_users=[]
     ):
@@ -522,3 +535,15 @@ class Notifier(object):
         """Notify the any replication listeners that there's a new event"""
         for cb in self.replication_callbacks:
             cb()
+
+    def notify_remote_server_up(self, server: str):
+        """Notify any replication that a remote server has come back up
+        """
+        # We call federation_sender directly rather than registering as a
+        # callback as a) we already have a reference to it and b) it introduces
+        # circular dependencies.
+        if self.federation_sender:
+            self.federation_sender.wake_destination(server)
+
+        for cb in self.remote_server_up_callbacks:
+            cb(server)
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 52a0aefe68..fc06a7b053 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -143,6 +143,9 @@ class ReplicationClientHandler(AbstractReplicationClientHandler):
         if d:
             d.callback(data)
 
+    def on_remote_server_up(self, server: str):
+        """Called when get a new REMOTE_SERVER_UP command."""
+
     def get_streams_to_replicate(self) -> Dict[str, int]:
         """Called when a new connection has been established and we need to
         subscribe to streams.
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index cbb36b9acf..451671412d 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -387,6 +387,20 @@ class UserIpCommand(Command):
         )
 
 
+class RemoteServerUpCommand(Command):
+    """Sent when a worker has detected that a remote server is no longer
+    "down" and retry timings should be reset.
+
+    If sent from a client the server will relay to all other workers.
+
+    Format::
+
+        REMOTE_SERVER_UP <server>
+    """
+
+    NAME = "REMOTE_SERVER_UP"
+
+
 _COMMANDS = (
     ServerCommand,
     RdataCommand,
@@ -401,6 +415,7 @@ _COMMANDS = (
     RemovePusherCommand,
     InvalidateCacheCommand,
     UserIpCommand,
+    RemoteServerUpCommand,
 )  # type: Tuple[Type[Command], ...]
 
 # Map of command name to command type.
@@ -414,6 +429,7 @@ VALID_SERVER_COMMANDS = (
     ErrorCommand.NAME,
     PingCommand.NAME,
     SyncCommand.NAME,
+    RemoteServerUpCommand.NAME,
 )
 
 # The commands the client is allowed to send
@@ -427,4 +443,5 @@ VALID_CLIENT_COMMANDS = (
     InvalidateCacheCommand.NAME,
     UserIpCommand.NAME,
     ErrorCommand.NAME,
+    RemoteServerUpCommand.NAME,
 )
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 5f4bdf84d2..131e5acb09 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -76,6 +76,7 @@ from synapse.replication.tcp.commands import (
     PingCommand,
     PositionCommand,
     RdataCommand,
+    RemoteServerUpCommand,
     ReplicateCommand,
     ServerCommand,
     SyncCommand,
@@ -460,6 +461,9 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
     async def on_INVALIDATE_CACHE(self, cmd):
         self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
 
+    async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
+        self.streamer.on_remote_server_up(cmd.data)
+
     async def on_USER_IP(self, cmd):
         self.streamer.on_user_ip(
             cmd.user_id,
@@ -555,6 +559,9 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
     def send_sync(self, data):
         self.send_command(SyncCommand(data))
 
+    def send_remote_server_up(self, server: str):
+        self.send_command(RemoteServerUpCommand(server))
+
     def on_connection_closed(self):
         BaseReplicationStreamProtocol.on_connection_closed(self)
         self.streamer.lost_connection(self)
@@ -589,6 +596,11 @@ class AbstractReplicationClientHandler(metaclass=abc.ABCMeta):
         raise NotImplementedError()
 
     @abc.abstractmethod
+    async def on_remote_server_up(self, server: str):
+        """Called when get a new REMOTE_SERVER_UP command."""
+        raise NotImplementedError()
+
+    @abc.abstractmethod
     def get_streams_to_replicate(self):
         """Called when a new connection has been established and we need to
         subscribe to streams.
@@ -707,6 +719,9 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
     async def on_SYNC(self, cmd):
         self.handler.on_sync(cmd.data)
 
+    async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
+        self.handler.on_remote_server_up(cmd.data)
+
     def replicate(self, stream_name, token):
         """Send the subscription request to the server
         """
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index b1752e88cd..6ebf944f66 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -120,6 +120,7 @@ class ReplicationStreamer(object):
             self.federation_sender = hs.get_federation_sender()
 
         self.notifier.add_replication_callback(self.on_notifier_poke)
+        self.notifier.add_remote_server_up_callback(self.send_remote_server_up)
 
         # Keeps track of whether we are currently checking for updates
         self.is_looping = False
@@ -288,6 +289,14 @@ class ReplicationStreamer(object):
         )
         await self._server_notices_sender.on_user_ip(user_id)
 
+    @measure_func("repl.on_remote_server_up")
+    def on_remote_server_up(self, server: str):
+        self.notifier.notify_remote_server_up(server)
+
+    def send_remote_server_up(self, server: str):
+        for conn in self.connections:
+            conn.send_remote_server_up(server)
+
     def send_sync_to_all_connections(self, data):
         """Sends a SYNC command to all clients.
 
diff --git a/synapse/server.pyi b/synapse/server.pyi
index b5e0b57095..0731403047 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -1,3 +1,5 @@
+import twisted.internet
+
 import synapse.api.auth
 import synapse.config.homeserver
 import synapse.federation.sender
@@ -9,10 +11,12 @@ import synapse.handlers.deactivate_account
 import synapse.handlers.device
 import synapse.handlers.e2e_keys
 import synapse.handlers.message
+import synapse.handlers.presence
 import synapse.handlers.room
 import synapse.handlers.room_member
 import synapse.handlers.set_password
 import synapse.http.client
+import synapse.notifier
 import synapse.rest.media.v1.media_repository
 import synapse.server_notices.server_notices_manager
 import synapse.server_notices.server_notices_sender
@@ -85,3 +89,11 @@ class HomeServer(object):
         self,
     ) -> synapse.server_notices.server_notices_sender.ServerNoticesSender:
         pass
+    def get_notifier(self) -> synapse.notifier.Notifier:
+        pass
+    def get_presence_handler(self) -> synapse.handlers.presence.PresenceHandler:
+        pass
+    def get_clock(self) -> synapse.util.Clock:
+        pass
+    def get_reactor(self) -> twisted.internet.base.ReactorBase:
+        pass