summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/federation_server.py144
1 files changed, 82 insertions, 62 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index e922b7ff4a..3ef700f7fc 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -574,68 +574,9 @@ class FederationServer(FederationBase):
                             pdu.room_id, len(prevs - seen),
                         )
 
-                        # We recalculate seen, since it may have changed.
-                        have_seen = yield self.store.have_events(prevs)
-                        seen = set(have_seen.keys())
-
-                        if prevs - seen:
-                            latest = yield self.store.get_latest_event_ids_in_room(
-                                pdu.room_id
-                            )
-
-                            # We add the prev events that we have seen to the latest
-                            # list to ensure the remote server doesn't give them to us
-                            latest = set(latest)
-                            latest |= seen
-
-                            logger.info(
-                                "Missing %d events for room %r: %r...",
-                                len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
-                            )
-
-                            # XXX: we set timeout to 10s to help workaround
-                            # https://github.com/matrix-org/synapse/issues/1733.
-                            # The reason is to avoid holding the linearizer lock
-                            # whilst processing inbound /send transactions, causing
-                            # FDs to stack up and block other inbound transactions
-                            # which empirically can currently take up to 30 minutes.
-                            #
-                            # N.B. this explicitly disables retry attempts.
-                            #
-                            # N.B. this also increases our chances of falling back to
-                            # fetching fresh state for the room if the missing event
-                            # can't be found, which slightly reduces our security.
-                            # it may also increase our DAG extremity count for the room,
-                            # causing additional state resolution?  See #1760.
-                            # However, fetching state doesn't hold the linearizer lock
-                            # apparently.
-                            #
-                            # see https://github.com/matrix-org/synapse/pull/1744
-
-                            missing_events = yield self.get_missing_events(
-                                origin,
-                                pdu.room_id,
-                                earliest_events_ids=list(latest),
-                                latest_events=[pdu],
-                                limit=10,
-                                min_depth=min_depth,
-                                timeout=10000,
-                            )
-
-                            # We want to sort these by depth so we process them and
-                            # tell clients about them in order.
-                            missing_events.sort(key=lambda x: x.depth)
-
-                            for e in missing_events:
-                                yield self._handle_new_pdu(
-                                    origin,
-                                    e,
-                                    get_missing=False
-                                )
-
-                            have_seen = yield self.store.have_events(
-                                [ev for ev, _ in pdu.prev_events]
-                            )
+                        yield self._get_missing_events_for_pdu(
+                            origin, pdu, prevs, min_depth
+                        )
 
             prevs = {e_id for e_id, _ in pdu.prev_events}
             seen = set(have_seen.keys())
@@ -667,6 +608,85 @@ class FederationServer(FederationBase):
             auth_chain=auth_chain,
         )
 
+    @defer.inlineCallbacks
+    def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
+        """
+        Args:
+            origin (str): Origin of the pdu. Will be called to get the missing events
+            pdu: received pdu
+            prevs (str[]): List of event ids which we are missing
+            min_depth (int): Minimum depth of events to return.
+
+        Returns:
+            Deferred<dict(str, str?)>: updated have_seen dictionary
+        """
+        # We recalculate seen, since it may have changed.
+        have_seen = yield self.store.have_events(prevs)
+        seen = set(have_seen.keys())
+
+        if not prevs - seen:
+            # nothing left to do
+            defer.returnValue(have_seen)
+
+        latest = yield self.store.get_latest_event_ids_in_room(
+            pdu.room_id
+        )
+
+        # We add the prev events that we have seen to the latest
+        # list to ensure the remote server doesn't give them to us
+        latest = set(latest)
+        latest |= seen
+
+        logger.info(
+            "Missing %d events for room %r: %r...",
+            len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+        )
+
+        # XXX: we set timeout to 10s to help workaround
+        # https://github.com/matrix-org/synapse/issues/1733.
+        # The reason is to avoid holding the linearizer lock
+        # whilst processing inbound /send transactions, causing
+        # FDs to stack up and block other inbound transactions
+        # which empirically can currently take up to 30 minutes.
+        #
+        # N.B. this explicitly disables retry attempts.
+        #
+        # N.B. this also increases our chances of falling back to
+        # fetching fresh state for the room if the missing event
+        # can't be found, which slightly reduces our security.
+        # it may also increase our DAG extremity count for the room,
+        # causing additional state resolution?  See #1760.
+        # However, fetching state doesn't hold the linearizer lock
+        # apparently.
+        #
+        # see https://github.com/matrix-org/synapse/pull/1744
+
+        missing_events = yield self.get_missing_events(
+            origin,
+            pdu.room_id,
+            earliest_events_ids=list(latest),
+            latest_events=[pdu],
+            limit=10,
+            min_depth=min_depth,
+            timeout=10000,
+        )
+
+        # We want to sort these by depth so we process them and
+        # tell clients about them in order.
+        missing_events.sort(key=lambda x: x.depth)
+
+        for e in missing_events:
+            yield self._handle_new_pdu(
+                origin,
+                e,
+                get_missing=False
+            )
+
+        have_seen = yield self.store.have_events(
+            [ev for ev, _ in pdu.prev_events]
+        )
+        defer.returnValue(have_seen)
+
     def __str__(self):
         return "<ReplicationLayer(%s)>" % self.server_name