diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 4ebb0e8bc0..36c83c3027 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -21,6 +21,7 @@ from prometheus_client import Counter
from twisted.internet import defer
+import synapse
import synapse.metrics
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager
@@ -54,7 +55,7 @@ sent_pdus_destination_dist_total = Counter(
class FederationSender(object):
- def __init__(self, hs):
+ def __init__(self, hs: "synapse.server.HomeServer"):
self.hs = hs
self.server_name = hs.hostname
@@ -482,7 +483,20 @@ class FederationSender(object):
def send_device_messages(self, destination):
if destination == self.server_name:
- logger.info("Not sending device update to ourselves")
+ logger.warning("Not sending device update to ourselves")
+ return
+
+ self._get_per_destination_queue(destination).attempt_new_transaction()
+
+ def wake_destination(self, destination: str):
+ """Called when we want to retry sending transactions to a remote.
+
+ This is mainly useful if the remote server has been down and we think it
+ might have come back.
+ """
+
+ if destination == self.server_name:
+ logger.warning("Not waking up ourselves")
return
self._get_per_destination_queue(destination).attempt_new_transaction()
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index b4cbf23394..d8cf9ed299 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -44,6 +44,7 @@ from synapse.logging.opentracing import (
tags,
whitelisted_homeserver,
)
+from synapse.server import HomeServer
from synapse.types import ThirdPartyInstanceID, get_domain_from_id
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string
@@ -101,12 +102,17 @@ class NoAuthenticationError(AuthenticationError):
class Authenticator(object):
- def __init__(self, hs):
+ def __init__(self, hs: HomeServer):
self._clock = hs.get_clock()
self.keyring = hs.get_keyring()
self.server_name = hs.hostname
self.store = hs.get_datastore()
self.federation_domain_whitelist = hs.config.federation_domain_whitelist
+ self.notifer = hs.get_notifier()
+
+ self.replication_client = None
+ if hs.config.worker.worker_app:
+ self.replication_client = hs.get_tcp_replication()
# A method just so we can pass 'self' as the authenticator to the Servlets
async def authenticate_request(self, request, content):
@@ -166,6 +172,17 @@ class Authenticator(object):
try:
logger.info("Marking origin %r as up", origin)
await self.store.set_destination_retry_timings(origin, None, 0, 0)
+
+ # Inform the relevant places that the remote server is back up.
+ self.notifer.notify_remote_server_up(origin)
+ if self.replication_client:
+ # If we're on a worker we try and inform master about this. The
+ # replication client doesn't hook into the notifier to avoid
+ # infinite loops where we send a `REMOTE_SERVER_UP` command to
+ # master, which then echoes it back to us which in turn pokes
+ # the notifier.
+ self.replication_client.send_remote_server_up(origin)
+
except Exception:
logger.exception("Error resetting retry timings on %s", origin)
|