summary refs log tree commit diff
path: root/synapse/federation/federation_server.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-02-23 13:58:02 +0000
committerErik Johnston <erik@matrix.org>2015-02-23 13:58:02 +0000
commitdb215b7e0007a207b8775d78c6693153e16f2731 (patch)
treef62c3f26b8de39624b01620f958273d1e05db605 /synapse/federation/federation_server.py
parentInitial stab at implementing a batched get_missing_pdus request (diff)
downloadsynapse-db215b7e0007a207b8775d78c6693153e16f2731.tar.xz
Implement and use new batched get missing pdu
Diffstat (limited to 'synapse/federation/federation_server.py')
-rw-r--r--synapse/federation/federation_server.py150
1 files changed, 51 insertions, 99 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 34bc397e8a..f74e16abd5 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -142,7 +142,15 @@ class FederationServer(FederationBase):
             if r[0]:
                 ret.append({})
             else:
-                logger.exception(r[1])
+                failure = r[1]
+                logger.error(
+                    "Failed to handle PDU",
+                    exc_info=(
+                        failure.type,
+                        failure.value,
+                        failure.getTracebackObject()
+                    )
+                )
                 ret.append({"error": str(r[1].value)})
 
         logger.debug("Returning: %s", str(ret))
@@ -306,75 +314,17 @@ class FederationServer(FederationBase):
         )
 
     @defer.inlineCallbacks
-    def get_missing_events(self, origin, room_id, earliest_events,
-                           latest_events, limit, min_depth):
-        limit = max(limit, 50)
-        min_depth = max(min_depth, 0)
-
-        missing_events = yield self.store.get_missing_events(
-            room_id=room_id,
-            earliest_events=earliest_events,
-            latest_events=latest_events,
-            limit=limit,
-            min_depth=min_depth,
+    @log_function
+    def on_get_missing_events(self, origin, room_id, earliest_events,
+                              latest_events, limit, min_depth):
+        missing_events = yield self.handler.on_get_missing_events(
+            origin, room_id, earliest_events, latest_events, limit, min_depth
         )
 
-        known_ids = {e.event_id for e in missing_events} | {earliest_events}
-
-        back_edges = {
-            e for e in missing_events
-            if {i for i, h in e.prev_events.items()} <= known_ids
-        }
-
-        decoded_auth_events = set()
-        state = {}
-        auth_events = set()
-        auth_and_state = {}
-        for event in back_edges:
-            state_pdus = yield self.handler.get_state_for_pdu(
-                origin, room_id, event.event_id,
-                do_auth=False,
-            )
-
-            state[event.event_id] = [s.event_id for s in state_pdus]
-
-            auth_and_state.update({
-                s.event_id: s for s in state_pdus
-            })
-
-            state_ids = {pdu.event_id for pdu in state_pdus}
-            prev_ids = {i for i, h in event.prev_events.items()}
-            partial_auth_chain = yield self.store.get_auth_chain(
-                state_ids | prev_ids, have_ids=decoded_auth_events.keys()
-            )
-
-            for p in partial_auth_chain:
-                p.signatures.update(
-                    compute_event_signature(
-                        p,
-                        self.hs.hostname,
-                        self.hs.config.signing_key[0]
-                    )
-                )
-
-            auth_events.update(
-                a.event_id for a in partial_auth_chain
-            )
-
-            auth_and_state.update({
-                a.event_id: a for a in partial_auth_chain
-            })
-
         time_now = self._clock.time_msec()
 
         defer.returnValue({
             "events": [ev.get_pdu_json(time_now) for ev in missing_events],
-            "state_for_events": state,
-            "auth_events": auth_events,
-            "event_map": {
-                k: ev.get_pdu_json(time_now)
-                for k, ev in auth_and_state.items()
-            },
         })
 
     @log_function
@@ -403,7 +353,7 @@ class FederationServer(FederationBase):
 
     @defer.inlineCallbacks
     @log_function
-    def _handle_new_pdu(self, origin, pdu, max_recursion=10):
+    def _handle_new_pdu(self, origin, pdu, get_missing=True):
         # We reprocess pdus when we have seen them only as outliers
         existing = yield self._get_persisted_pdu(
             origin, pdu.event_id, do_auth=False
@@ -455,48 +405,50 @@ class FederationServer(FederationBase):
                 pdu.room_id, min_depth
             )
 
+            prevs = {e_id for e_id, _ in pdu.prev_events}
+            seen = set(have_seen.keys())
+
             if min_depth and pdu.depth < min_depth:
                 # This is so that we don't notify the user about this
                 # message, to work around the fact that some events will
                 # reference really really old events we really don't want to
                 # send to the clients.
                 pdu.internal_metadata.outlier = True
-            elif min_depth and pdu.depth > min_depth and max_recursion > 0:
-                for event_id, hashes in pdu.prev_events:
-                    if event_id not in have_seen:
-                        logger.debug(
-                            "_handle_new_pdu requesting pdu %s",
-                            event_id
+            elif min_depth and pdu.depth > min_depth:
+                if get_missing and prevs - seen:
+                    latest_tuples = yield self.store.get_latest_events_in_room(
+                        pdu.room_id
+                    )
+
+                    # We add the prev events that we have seen to the latest
+                    # list to ensure the remote server doesn't give them to us
+                    latest = set(e_id for e_id, _, _ in latest_tuples)
+                    latest |= seen
+
+                    missing_events = yield self.get_missing_events(
+                        origin,
+                        pdu.room_id,
+                        earliest_events=list(latest),
+                        latest_events=[pdu.event_id],
+                        limit=10,
+                        min_depth=min_depth,
+                    )
+
+                    for e in missing_events:
+                        yield self._handle_new_pdu(
+                            origin,
+                            e,
+                            get_missing=False
                         )
 
-                        try:
-                            new_pdu = yield self.federation_client.get_pdu(
-                                [origin, pdu.origin],
-                                event_id=event_id,
-                            )
-
-                            if new_pdu:
-                                yield self._handle_new_pdu(
-                                    origin,
-                                    new_pdu,
-                                    max_recursion=max_recursion-1
-                                )
-
-                                logger.debug("Processed pdu %s", event_id)
-                            else:
-                                logger.warn("Failed to get PDU %s", event_id)
-                                fetch_state = True
-                        except:
-                            # TODO(erikj): Do some more intelligent retries.
-                            logger.exception("Failed to get PDU")
-                            fetch_state = True
-            else:
-                prevs = {e_id for e_id, _ in pdu.prev_events}
-                seen = set(have_seen.keys())
-                if prevs - seen:
-                    fetch_state = True
-        else:
-            fetch_state = True
+                    have_seen = yield self.store.have_events(
+                        [ev for ev, _ in pdu.prev_events]
+                    )
+
+            prevs = {e_id for e_id, _ in pdu.prev_events}
+            seen = set(have_seen.keys())
+            if prevs - seen:
+                fetch_state = True
 
         if fetch_state:
             # We need to get the state at this event, since we haven't