diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index e922b7ff4a..bc20b9c201 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -52,7 +52,6 @@ class FederationServer(FederationBase):
self.auth = hs.get_auth()
- self._room_pdu_linearizer = Linearizer("fed_room_pdu")
self._server_linearizer = Linearizer("fed_server")
# We cache responses to state queries, as they take a while and often
@@ -147,11 +146,15 @@ class FederationServer(FederationBase):
# check that it's actually being sent from a valid destination to
# workaround bug #1753 in 0.18.5 and 0.18.6
if transaction.origin != get_domain_from_id(pdu.event_id):
+ # We continue to accept join events from any server; this is
+ # necessary for the federation join dance to work correctly.
+ # (When we join over federation, the "helper" server is
+ # responsible for sending out the join event, rather than the
+ # origin. See bug #1893).
if not (
pdu.type == 'm.room.member' and
pdu.content and
- pdu.content.get("membership", None) == 'join' and
- self.hs.is_mine_id(pdu.state_key)
+ pdu.content.get("membership", None) == 'join'
):
logger.info(
"Discarding PDU %s from invalid origin %s",
@@ -165,7 +168,7 @@ class FederationServer(FederationBase):
)
try:
- yield self._handle_new_pdu(transaction.origin, pdu)
+ yield self._handle_received_pdu(transaction.origin, pdu)
results.append({})
except FederationError as e:
self.send_failure(e, transaction.origin)
@@ -497,27 +500,16 @@ class FederationServer(FederationBase):
)
@defer.inlineCallbacks
- @log_function
- def _handle_new_pdu(self, origin, pdu, get_missing=True):
-
- # We reprocess pdus when we have seen them only as outliers
- existing = yield self._get_persisted_pdu(
- origin, pdu.event_id, do_auth=False
- )
-
- # FIXME: Currently we fetch an event again when we already have it
- # if it has been marked as an outlier.
+ def _handle_received_pdu(self, origin, pdu):
+ """ Process a PDU received in a federation /send/ transaction.
- already_seen = (
- existing and (
- not existing.internal_metadata.is_outlier()
- or pdu.internal_metadata.is_outlier()
- )
- )
- if already_seen:
- logger.debug("Already seen pdu %s", pdu.event_id)
- return
+ Args:
+ origin (str): server which sent the pdu
+ pdu (FrozenEvent): received pdu
+ Returns (Deferred): completes with None
+ Raises: FederationError if the signatures / hash do not match
+ """
# Check signature.
try:
pdu = yield self._check_sigs_and_hash(pdu)
@@ -529,143 +521,7 @@ class FederationServer(FederationBase):
affected=pdu.event_id,
)
- state = None
-
- auth_chain = []
-
- have_seen = yield self.store.have_events(
- [ev for ev, _ in pdu.prev_events]
- )
-
- fetch_state = False
-
- # Get missing pdus if necessary.
- if not pdu.internal_metadata.is_outlier():
- # We only backfill backwards to the min depth.
- min_depth = yield self.handler.get_min_depth_for_context(
- pdu.room_id
- )
-
- logger.debug(
- "_handle_new_pdu min_depth for %s: %d",
- pdu.room_id, min_depth
- )
-
- prevs = {e_id for e_id, _ in pdu.prev_events}
- seen = set(have_seen.keys())
-
- if min_depth and pdu.depth < min_depth:
- # This is so that we don't notify the user about this
- # message, to work around the fact that some events will
- # reference really really old events we really don't want to
- # send to the clients.
- pdu.internal_metadata.outlier = True
- elif min_depth and pdu.depth > min_depth:
- if get_missing and prevs - seen:
- # If we're missing stuff, ensure we only fetch stuff one
- # at a time.
- logger.info(
- "Acquiring lock for room %r to fetch %d missing events: %r...",
- pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
- )
- with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
- logger.info(
- "Acquired lock for room %r to fetch %d missing events",
- pdu.room_id, len(prevs - seen),
- )
-
- # We recalculate seen, since it may have changed.
- have_seen = yield self.store.have_events(prevs)
- seen = set(have_seen.keys())
-
- if prevs - seen:
- latest = yield self.store.get_latest_event_ids_in_room(
- pdu.room_id
- )
-
- # We add the prev events that we have seen to the latest
- # list to ensure the remote server doesn't give them to us
- latest = set(latest)
- latest |= seen
-
- logger.info(
- "Missing %d events for room %r: %r...",
- len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
- )
-
- # XXX: we set timeout to 10s to help workaround
- # https://github.com/matrix-org/synapse/issues/1733.
- # The reason is to avoid holding the linearizer lock
- # whilst processing inbound /send transactions, causing
- # FDs to stack up and block other inbound transactions
- # which empirically can currently take up to 30 minutes.
- #
- # N.B. this explicitly disables retry attempts.
- #
- # N.B. this also increases our chances of falling back to
- # fetching fresh state for the room if the missing event
- # can't be found, which slightly reduces our security.
- # it may also increase our DAG extremity count for the room,
- # causing additional state resolution? See #1760.
- # However, fetching state doesn't hold the linearizer lock
- # apparently.
- #
- # see https://github.com/matrix-org/synapse/pull/1744
-
- missing_events = yield self.get_missing_events(
- origin,
- pdu.room_id,
- earliest_events_ids=list(latest),
- latest_events=[pdu],
- limit=10,
- min_depth=min_depth,
- timeout=10000,
- )
-
- # We want to sort these by depth so we process them and
- # tell clients about them in order.
- missing_events.sort(key=lambda x: x.depth)
-
- for e in missing_events:
- yield self._handle_new_pdu(
- origin,
- e,
- get_missing=False
- )
-
- have_seen = yield self.store.have_events(
- [ev for ev, _ in pdu.prev_events]
- )
-
- prevs = {e_id for e_id, _ in pdu.prev_events}
- seen = set(have_seen.keys())
- if prevs - seen:
- logger.info(
- "Still missing %d events for room %r: %r...",
- len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
- )
- fetch_state = True
-
- if fetch_state:
- # We need to get the state at this event, since we haven't
- # processed all the prev events.
- logger.debug(
- "_handle_new_pdu getting state for %s",
- pdu.room_id
- )
- try:
- state, auth_chain = yield self.get_state_for_room(
- origin, pdu.room_id, pdu.event_id,
- )
- except:
- logger.exception("Failed to get state for event: %s", pdu.event_id)
-
- yield self.handler.on_receive_pdu(
- origin,
- pdu,
- state=state,
- auth_chain=auth_chain,
- )
+ yield self.handler.on_receive_pdu(origin, pdu, get_missing=True)
def __str__(self):
return "<ReplicationLayer(%s)>" % self.server_name
|