diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index deee0f4904..861441708b 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -474,8 +474,13 @@ class FederationClient(FederationBase):
content (object): Any additional data to put into the content field
of the event.
Return:
- A tuple of (origin (str), event (object)) where origin is the remote
- homeserver which generated the event.
+ Deferred: resolves to a tuple of (origin (str), event (object))
+ where origin is the remote homeserver which generated the event.
+
+ Fails with a ``CodeMessageException`` if the chosen remote server
+ returns a 300/400 code.
+
+ Fails with a ``RuntimeError`` if no servers were reachable.
"""
valid_memberships = {Membership.JOIN, Membership.LEAVE}
if membership not in valid_memberships:
@@ -528,6 +533,27 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
def send_join(self, destinations, pdu):
+ """Sends a join event to one of a list of homeservers.
+
+ Doing so will cause the remote server to add the event to the graph,
+ and send the event out to the rest of the federation.
+
+ Args:
+ destinations (str): Candidate homeservers which are probably
+ participating in the room.
+ pdu (BaseEvent): event to be sent
+
+ Return:
+ Deferred: resolves to a dict with members ``origin`` (a string
+ giving the serer the event was sent to, ``state`` (?) and
+ ``auth_chain``.
+
+ Fails with a ``CodeMessageException`` if the chosen remote server
+ returns a 300/400 code.
+
+ Fails with a ``RuntimeError`` if no servers were reachable.
+ """
+
for destination in destinations:
if destination == self.server_name:
continue
@@ -635,6 +661,26 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
def send_leave(self, destinations, pdu):
+ """Sends a leave event to one of a list of homeservers.
+
+ Doing so will cause the remote server to add the event to the graph,
+ and send the event out to the rest of the federation.
+
+ This is mostly useful to reject received invites.
+
+ Args:
+ destinations (str): Candidate homeservers which are probably
+ participating in the room.
+ pdu (BaseEvent): event to be sent
+
+ Return:
+ Deferred: resolves to None.
+
+ Fails with a ``CodeMessageException`` if the chosen remote server
+ returns a non-200 code.
+
+ Fails with a ``RuntimeError`` if no servers were reachable.
+ """
for destination in destinations:
if destination == self.server_name:
continue
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index bc20b9c201..51e3fdea06 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -440,6 +440,16 @@ class FederationServer(FederationBase):
key_id: json.loads(json_bytes)
}
+ logger.info(
+ "Claimed one-time-keys: %s",
+ ",".join((
+ "%s for %s:%s" % (key_id, user_id, device_id)
+ for user_id, user_keys in json_result.iteritems()
+ for device_id, device_keys in user_keys.iteritems()
+ for key_id, _ in device_keys.iteritems()
+ )),
+ )
+
defer.returnValue({"one_time_keys": json_result})
@defer.inlineCallbacks
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index bbb0195228..93e5acebc1 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -31,21 +31,19 @@ Events are replicated via a separate events stream.
from .units import Edu
+from synapse.storage.presence import UserPresenceState
from synapse.util.metrics import Measure
import synapse.metrics
from blist import sorteddict
-import ujson
+from collections import namedtuple
+import logging
-metrics = synapse.metrics.get_metrics_for(__name__)
+logger = logging.getLogger(__name__)
-PRESENCE_TYPE = "p"
-KEYED_EDU_TYPE = "k"
-EDU_TYPE = "e"
-FAILURE_TYPE = "f"
-DEVICE_MESSAGE_TYPE = "d"
+metrics = synapse.metrics.get_metrics_for(__name__)
class FederationRemoteSendQueue(object):
@@ -55,18 +53,19 @@ class FederationRemoteSendQueue(object):
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
+ self.is_mine_id = hs.is_mine_id
- self.presence_map = {}
- self.presence_changed = sorteddict()
+ self.presence_map = {} # Pending presence map user_id -> UserPresenceState
+ self.presence_changed = sorteddict() # Stream position -> user_id
- self.keyed_edu = {}
- self.keyed_edu_changed = sorteddict()
+ self.keyed_edu = {} # (destination, key) -> EDU
+ self.keyed_edu_changed = sorteddict() # stream position -> (destination, key)
- self.edus = sorteddict()
+ self.edus = sorteddict() # stream position -> Edu
- self.failures = sorteddict()
+ self.failures = sorteddict() # stream position -> (destination, Failure)
- self.device_messages = sorteddict()
+ self.device_messages = sorteddict() # stream position -> destination
self.pos = 1
self.pos_time = sorteddict()
@@ -122,7 +121,9 @@ class FederationRemoteSendQueue(object):
del self.presence_changed[key]
user_ids = set(
- user_id for uids in self.presence_changed.values() for _, user_id in uids
+ user_id
+ for uids in self.presence_changed.itervalues()
+ for user_id in uids
)
to_del = [
@@ -189,18 +190,20 @@ class FederationRemoteSendQueue(object):
self.notifier.on_new_replication_data()
- def send_presence(self, destination, states):
- """As per TransactionQueue"""
+ def send_presence(self, states):
+ """As per TransactionQueue
+
+ Args:
+ states (list(UserPresenceState))
+ """
pos = self._next_pos()
- self.presence_map.update({
- state.user_id: state
- for state in states
- })
+ # We only want to send presence for our own users, so lets always just
+ # filter here just in case.
+ local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
- self.presence_changed[pos] = [
- (destination, state.user_id) for state in states
- ]
+ self.presence_map.update({state.user_id: state for state in local_states})
+ self.presence_changed[pos] = [state.user_id for state in local_states]
self.notifier.on_new_replication_data()
@@ -220,10 +223,15 @@ class FederationRemoteSendQueue(object):
def get_current_token(self):
return self.pos - 1
- def get_replication_rows(self, token, limit, federation_ack=None):
- """
+ def federation_ack(self, token):
+ self._clear_queue_before_pos(token)
+
+ def get_replication_rows(self, from_token, to_token, limit, federation_ack=None):
+ """Get rows to be sent over federation between the two tokens
+
Args:
- token (int)
+ from_token (int)
+ to_token(int)
limit (int)
federation_ack (int): Optional. The position where the worker is
explicitly acknowledged it has handled. Allows us to drop
@@ -232,9 +240,11 @@ class FederationRemoteSendQueue(object):
# TODO: Handle limit.
# To handle restarts where we wrap around
- if token > self.pos:
- token = -1
+ if from_token > self.pos:
+ from_token = -1
+ # list of tuple(int, BaseFederationRow), where the first is the position
+ # of the federation stream.
rows = []
# There should be only one reader, so lets delete everything its
@@ -244,62 +254,295 @@ class FederationRemoteSendQueue(object):
# Fetch changed presence
keys = self.presence_changed.keys()
- i = keys.bisect_right(token)
- dest_user_ids = set(
- (pos, dest_user_id)
- for pos in keys[i:]
- for dest_user_id in self.presence_changed[pos]
- )
+ i = keys.bisect_right(from_token)
+ j = keys.bisect_right(to_token) + 1
+ dest_user_ids = [
+ (pos, user_id)
+ for pos in keys[i:j]
+ for user_id in self.presence_changed[pos]
+ ]
- for (key, (dest, user_id)) in dest_user_ids:
- rows.append((key, PRESENCE_TYPE, ujson.dumps({
- "destination": dest,
- "state": self.presence_map[user_id].as_dict(),
- })))
+ for (key, user_id) in dest_user_ids:
+ rows.append((key, PresenceRow(
+ state=self.presence_map[user_id],
+ )))
# Fetch changes keyed edus
keys = self.keyed_edu_changed.keys()
- i = keys.bisect_right(token)
- keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:])
-
- for (pos, (destination, edu_key)) in keyed_edus:
- rows.append(
- (pos, KEYED_EDU_TYPE, ujson.dumps({
- "key": edu_key,
- "edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(),
- }))
- )
+ i = keys.bisect_right(from_token)
+ j = keys.bisect_right(to_token) + 1
+ # We purposefully clobber based on the key here, python dict comprehensions
+ # always use the last value, so this will correctly point to the last
+ # stream position.
+ keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
+
+ for ((destination, edu_key), pos) in keyed_edus.iteritems():
+ rows.append((pos, KeyedEduRow(
+ key=edu_key,
+ edu=self.keyed_edu[(destination, edu_key)],
+ )))
# Fetch changed edus
keys = self.edus.keys()
- i = keys.bisect_right(token)
- edus = set((k, self.edus[k]) for k in keys[i:])
+ i = keys.bisect_right(from_token)
+ j = keys.bisect_right(to_token) + 1
+ edus = ((k, self.edus[k]) for k in keys[i:j])
for (pos, edu) in edus:
- rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict())))
+ rows.append((pos, EduRow(edu)))
# Fetch changed failures
keys = self.failures.keys()
- i = keys.bisect_right(token)
- failures = set((k, self.failures[k]) for k in keys[i:])
+ i = keys.bisect_right(from_token)
+ j = keys.bisect_right(to_token) + 1
+ failures = ((k, self.failures[k]) for k in keys[i:j])
for (pos, (destination, failure)) in failures:
- rows.append((pos, FAILURE_TYPE, ujson.dumps({
- "destination": destination,
- "failure": failure,
- })))
+ rows.append((pos, FailureRow(
+ destination=destination,
+ failure=failure,
+ )))
# Fetch changed device messages
keys = self.device_messages.keys()
- i = keys.bisect_right(token)
- device_messages = set((k, self.device_messages[k]) for k in keys[i:])
+ i = keys.bisect_right(from_token)
+ j = keys.bisect_right(to_token) + 1
+ device_messages = {self.device_messages[k]: k for k in keys[i:j]}
- for (pos, destination) in device_messages:
- rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({
- "destination": destination,
- })))
+ for (destination, pos) in device_messages.iteritems():
+ rows.append((pos, DeviceRow(
+ destination=destination,
+ )))
# Sort rows based on pos
rows.sort()
- return rows
+ return [(pos, row.TypeId, row.to_data()) for pos, row in rows]
+
+
+class BaseFederationRow(object):
+ """Base class for rows to be sent in the federation stream.
+
+ Specifies how to identify, serialize and deserialize the different types.
+ """
+
+ TypeId = None # Unique string that ids the type. Must be overriden in sub classes.
+
+ @staticmethod
+ def from_data(data):
+ """Parse the data from the federation stream into a row.
+
+ Args:
+ data: The value of ``data`` from FederationStreamRow.data, type
+ depends on the type of stream
+ """
+ raise NotImplementedError()
+
+ def to_data(self):
+ """Serialize this row to be sent over the federation stream.
+
+ Returns:
+ The value to be sent in FederationStreamRow.data. The type depends
+ on the type of stream.
+ """
+ raise NotImplementedError()
+
+ def add_to_buffer(self, buff):
+ """Add this row to the appropriate field in the buffer ready for this
+ to be sent over federation.
+
+ We use a buffer so that we can batch up events that have come in at
+ the same time and send them all at once.
+
+ Args:
+ buff (BufferedToSend)
+ """
+ raise NotImplementedError()
+
+
+class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
+ "state", # UserPresenceState
+))):
+ TypeId = "p"
+
+ @staticmethod
+ def from_data(data):
+ return PresenceRow(
+ state=UserPresenceState.from_dict(data)
+ )
+
+ def to_data(self):
+ return self.state.as_dict()
+
+ def add_to_buffer(self, buff):
+ buff.presence.append(self.state)
+
+
+class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
+ "key", # tuple(str) - the edu key passed to send_edu
+ "edu", # Edu
+))):
+ """Streams EDUs that have an associated key that is ued to clobber. For example,
+ typing EDUs clobber based on room_id.
+ """
+
+ TypeId = "k"
+
+ @staticmethod
+ def from_data(data):
+ return KeyedEduRow(
+ key=tuple(data["key"]),
+ edu=Edu(**data["edu"]),
+ )
+
+ def to_data(self):
+ return {
+ "key": self.key,
+ "edu": self.edu.get_internal_dict(),
+ }
+
+ def add_to_buffer(self, buff):
+ buff.keyed_edus.setdefault(
+ self.edu.destination, {}
+ )[self.key] = self.edu
+
+
+class EduRow(BaseFederationRow, namedtuple("EduRow", (
+ "edu", # Edu
+))):
+ """Streams EDUs that don't have keys. See KeyedEduRow
+ """
+ TypeId = "e"
+
+ @staticmethod
+ def from_data(data):
+ return EduRow(Edu(**data))
+
+ def to_data(self):
+ return self.edu.get_internal_dict()
+
+ def add_to_buffer(self, buff):
+ buff.edus.setdefault(self.edu.destination, []).append(self.edu)
+
+
+class FailureRow(BaseFederationRow, namedtuple("FailureRow", (
+ "destination", # str
+ "failure",
+))):
+ """Streams failures to a remote server. Failures are issued when there was
+ something wrong with a transaction the remote sent us, e.g. it included
+ an event that was invalid.
+ """
+
+ TypeId = "f"
+
+ @staticmethod
+ def from_data(data):
+ return FailureRow(
+ destination=data["destination"],
+ failure=data["failure"],
+ )
+
+ def to_data(self):
+ return {
+ "destination": self.destination,
+ "failure": self.failure,
+ }
+
+ def add_to_buffer(self, buff):
+ buff.failures.setdefault(self.destination, []).append(self.failure)
+
+
+class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", (
+ "destination", # str
+))):
+ """Streams the fact that either a) there is pending to device messages for
+ users on the remote, or b) a local users device has changed and needs to
+ be sent to the remote.
+ """
+ TypeId = "d"
+
+ @staticmethod
+ def from_data(data):
+ return DeviceRow(destination=data["destination"])
+
+ def to_data(self):
+ return {"destination": self.destination}
+
+ def add_to_buffer(self, buff):
+ buff.device_destinations.add(self.destination)
+
+
+TypeToRow = {
+ Row.TypeId: Row
+ for Row in (
+ PresenceRow,
+ KeyedEduRow,
+ EduRow,
+ FailureRow,
+ DeviceRow,
+ )
+}
+
+
+ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
+ "presence", # list(UserPresenceState)
+ "keyed_edus", # dict of destination -> { key -> Edu }
+ "edus", # dict of destination -> [Edu]
+ "failures", # dict of destination -> [failures]
+ "device_destinations", # set of destinations
+))
+
+
+def process_rows_for_federation(transaction_queue, rows):
+ """Parse a list of rows from the federation stream and put them in the
+ transaction queue ready for sending to the relevant homeservers.
+
+ Args:
+ transaction_queue (TransactionQueue)
+ rows (list(synapse.replication.tcp.streams.FederationStreamRow))
+ """
+
+ # The federation stream contains a bunch of different types of
+ # rows that need to be handled differently. We parse the rows, put
+ # them into the appropriate collection and then send them off.
+
+ buff = ParsedFederationStreamData(
+ presence=[],
+ keyed_edus={},
+ edus={},
+ failures={},
+ device_destinations=set(),
+ )
+
+ # Parse the rows in the stream and add to the buffer
+ for row in rows:
+ if row.type not in TypeToRow:
+ logger.error("Unrecognized federation row type %r", row.type)
+ continue
+
+ RowType = TypeToRow[row.type]
+ parsed_row = RowType.from_data(row.data)
+ parsed_row.add_to_buffer(buff)
+
+ if buff.presence:
+ transaction_queue.send_presence(buff.presence)
+
+ for destination, edu_map in buff.keyed_edus.iteritems():
+ for key, edu in edu_map.items():
+ transaction_queue.send_edu(
+ edu.destination, edu.edu_type, edu.content, key=key,
+ )
+
+ for destination, edu_list in buff.edus.iteritems():
+ for edu in edu_list:
+ transaction_queue.send_edu(
+ edu.destination, edu.edu_type, edu.content, key=None,
+ )
+
+ for destination, failure_list in buff.failures.iteritems():
+ for failure in failure_list:
+ transaction_queue.send_failure(destination, failure)
+
+ for destination in buff.device_destinations:
+ transaction_queue.send_device_messages(destination)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index c27ce7c5f3..a15198e05d 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -21,11 +21,10 @@ from .units import Transaction, Edu
from synapse.api.errors import HttpResponseException
from synapse.util.async import run_on_reactor
-from synapse.util.logcontext import preserve_context_over_fn
+from synapse.util.logcontext import preserve_context_over_fn, preserve_fn
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.util.metrics import measure_func
-from synapse.types import get_domain_from_id
-from synapse.handlers.presence import format_user_presence_state
+from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
import synapse.metrics
import logging
@@ -41,6 +40,8 @@ sent_pdus_destination_dist = client_metrics.register_distribution(
)
sent_edus_counter = client_metrics.register_counter("sent_edus")
+sent_transactions_counter = client_metrics.register_counter("sent_transactions")
+
class TransactionQueue(object):
"""This class makes sure we only have one transaction in flight at
@@ -77,8 +78,18 @@ class TransactionQueue(object):
# destination -> list of tuple(edu, deferred)
self.pending_edus_by_dest = edus = {}
- # Presence needs to be separate as we send single aggragate EDUs
+ # Map of user_id -> UserPresenceState for all the pending presence
+ # to be sent out by user_id. Entries here get processed and put in
+ # pending_presence_by_dest
+ self.pending_presence = {}
+
+ # Map of destination -> user_id -> UserPresenceState of pending presence
+ # to be sent to each destinations
self.pending_presence_by_dest = presence = {}
+
+ # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
+ # based on their key (e.g. typing events by room_id)
+ # Map of destination -> (edu_type, key) -> Edu
self.pending_edus_keyed_by_dest = edus_keyed = {}
metrics.register_callback(
@@ -113,6 +124,8 @@ class TransactionQueue(object):
self._is_processing = False
self._last_poked_id = -1
+ self._processing_pending_presence = False
+
def can_send_to(self, destination):
"""Can we send messages to the given server?
@@ -169,15 +182,12 @@ class TransactionQueue(object):
# Otherwise if the last member on a server in a room is
# banned then it won't receive the event because it won't
# be in the room after the ban.
- users_in_room = yield self.state.get_current_user_in_room(
+ destinations = yield self.state.get_current_hosts_in_room(
event.room_id, latest_event_ids=[
prev_id for prev_id, _ in event.prev_events
],
)
- destinations = set(
- get_domain_from_id(user_id) for user_id in users_in_room
- )
if send_on_behalf_of is not None:
# If we are sending the event on behalf of another server
# then it already has the event and there is no reason to
@@ -224,17 +234,71 @@ class TransactionQueue(object):
self._attempt_new_transaction, destination
)
- def send_presence(self, destination, states):
- if not self.can_send_to(destination):
- return
+ @preserve_fn # the caller should not yield on this
+ @defer.inlineCallbacks
+ def send_presence(self, states):
+ """Send the new presence states to the appropriate destinations.
- self.pending_presence_by_dest.setdefault(destination, {}).update({
+ This actually queues up the presence states ready for sending and
+ triggers a background task to process them and send out the transactions.
+
+ Args:
+ states (list(UserPresenceState))
+ """
+
+ # First we queue up the new presence by user ID, so multiple presence
+ # updates in quick successtion are correctly handled
+ # We only want to send presence for our own users, so lets always just
+ # filter here just in case.
+ self.pending_presence.update({
state.user_id: state for state in states
+ if self.is_mine_id(state.user_id)
})
- preserve_context_over_fn(
- self._attempt_new_transaction, destination
- )
+ # We then handle the new pending presence in batches, first figuring
+ # out the destinations we need to send each state to and then poking it
+ # to attempt a new transaction. We linearize this so that we don't
+ # accidentally mess up the ordering and send multiple presence updates
+ # in the wrong order
+ if self._processing_pending_presence:
+ return
+
+ self._processing_pending_presence = True
+ try:
+ while True:
+ states_map = self.pending_presence
+ self.pending_presence = {}
+
+ if not states_map:
+ break
+
+ yield self._process_presence_inner(states_map.values())
+ finally:
+ self._processing_pending_presence = False
+
+ @measure_func("txnqueue._process_presence")
+ @defer.inlineCallbacks
+ def _process_presence_inner(self, states):
+ """Given a list of states populate self.pending_presence_by_dest and
+ poke to send a new transaction to each destination
+
+ Args:
+ states (list(UserPresenceState))
+ """
+ hosts_and_states = yield get_interested_remotes(self.store, states, self.state)
+
+ for destinations, states in hosts_and_states:
+ for destination in destinations:
+ if not self.can_send_to(destination):
+ continue
+
+ self.pending_presence_by_dest.setdefault(
+ destination, {}
+ ).update({
+ state.user_id: state for state in states
+ })
+
+ preserve_fn(self._attempt_new_transaction)(destination)
def send_edu(self, destination, edu_type, content, key=None):
edu = Edu(
@@ -374,6 +438,7 @@ class TransactionQueue(object):
destination, pending_pdus, pending_edus, pending_failures,
)
if success:
+ sent_transactions_counter.inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if device_message_edus:
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 15a03378f5..52b2a717d2 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -193,6 +193,26 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def make_membership_event(self, destination, room_id, user_id, membership):
+ """Asks a remote server to build and sign us a membership event
+
+ Note that this does not append any events to any graphs.
+
+ Args:
+ destination (str): address of remote homeserver
+ room_id (str): room to join/leave
+ user_id (str): user to be joined/left
+ membership (str): one of join/leave
+
+ Returns:
+ Deferred: Succeeds when we get a 2xx HTTP response. The result
+ will be the decoded JSON body (ie, the new event).
+
+ Fails with ``HTTPRequestException`` if we get an HTTP response
+ code >= 300.
+
+ Fails with ``NotRetryingDestination`` if we are not yet ready
+ to retry this server.
+ """
valid_memberships = {Membership.JOIN, Membership.LEAVE}
if membership not in valid_memberships:
raise RuntimeError(
@@ -201,11 +221,23 @@ class TransportLayerClient(object):
)
path = PREFIX + "/make_%s/%s/%s" % (membership, room_id, user_id)
+ ignore_backoff = False
+ retry_on_dns_fail = False
+
+ if membership == Membership.LEAVE:
+ # we particularly want to do our best to send leave events. The
+ # problem is that if it fails, we won't retry it later, so if the
+ # remote server was just having a momentary blip, the room will be
+ # out of sync.
+ ignore_backoff = True
+ retry_on_dns_fail = True
+
content = yield self.client.get_json(
destination=destination,
path=path,
- retry_on_dns_fail=False,
+ retry_on_dns_fail=retry_on_dns_fail,
timeout=20000,
+ ignore_backoff=ignore_backoff,
)
defer.returnValue(content)
@@ -232,6 +264,12 @@ class TransportLayerClient(object):
destination=destination,
path=path,
data=content,
+
+ # we want to do our best to send this through. The problem is
+ # that if it fails, we won't retry it later, so if the remote
+ # server was just having a momentary blip, the room will be out of
+ # sync.
+ ignore_backoff=True,
)
defer.returnValue(response)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index c840da834c..3d676e7d8b 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -24,6 +24,7 @@ from synapse.http.servlet import (
)
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string
+from synapse.util.logcontext import preserve_fn
from synapse.types import ThirdPartyInstanceID
import functools
@@ -79,6 +80,7 @@ class Authenticator(object):
def __init__(self, hs):
self.keyring = hs.get_keyring()
self.server_name = hs.hostname
+ self.store = hs.get_datastore()
# A method just so we can pass 'self' as the authenticator to the Servlets
@defer.inlineCallbacks
@@ -138,6 +140,13 @@ class Authenticator(object):
logger.info("Request from %s", origin)
request.authenticated_entity = origin
+ # If we get a valid signed request from the other side, its probably
+ # alive
+ retry_timings = yield self.store.get_destination_retry_timings(origin)
+ if retry_timings and retry_timings["retry_last_ts"]:
+ logger.info("Marking origin %r as up", origin)
+ preserve_fn(self.store.set_destination_retry_timings)(origin, 0, 0)
+
defer.returnValue(origin)
|