summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-03-02 14:54:27 +0000
committerErik Johnston <erik@matrix.org>2015-03-02 14:54:27 +0000
commitb2d211847651414d67d4935683e32c76eb944029 (patch)
tree25ca81b14a72192a89c9399b3d80180ac124ca14
parentMerge pull request #83 from matrix-org/nofile_limit_config (diff)
parentMerge branch 'develop' of github.com:matrix-org/synapse into batched_get_pdu (diff)
downloadsynapse-b2d211847651414d67d4935683e32c76eb944029.tar.xz
Merge pull request #88 from matrix-org/batched_get_pdu
Batched get pdu
-rw-r--r--synapse/federation/federation_client.py19
-rw-r--r--synapse/federation/federation_server.py87
-rw-r--r--synapse/federation/transaction_queue.py6
-rw-r--r--synapse/federation/transport/client.py19
-rw-r--r--synapse/federation/transport/server.py31
-rw-r--r--synapse/handlers/federation.py32
-rw-r--r--synapse/storage/event_federation.py51
-rw-r--r--tests/handlers/test_presence.py43
8 files changed, 231 insertions, 57 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index cd3c962d50..ca89a0787c 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -439,6 +439,25 @@ class FederationClient(FederationBase):
 
         defer.returnValue(ret)
 
+    @defer.inlineCallbacks
+    def get_missing_events(self, destination, room_id, earliest_events,
+                           latest_events, limit, min_depth):
+        content = yield self.transport_layer.get_missing_events(
+            destination, room_id, earliest_events, latest_events, limit,
+            min_depth,
+        )
+
+        events = [
+            self.event_from_pdu_json(e)
+            for e in content.get("events", [])
+        ]
+
+        signed_events = yield self._check_sigs_and_hash_and_fetch(
+            destination, events, outlier=True
+        )
+
+        defer.returnValue(signed_events)
+
     def event_from_pdu_json(self, pdu_json, outlier=False):
         event = FrozenEvent(
             pdu_json
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index bc9bac809a..4264d857be 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -125,6 +125,7 @@ class FederationServer(FederationBase):
                     results.append({"error": str(e)})
                 except Exception as e:
                     results.append({"error": str(e)})
+                    logger.exception("Failed to handle PDU")
 
             if hasattr(transaction, "edus"):
                 for edu in [Edu(**x) for x in transaction.edus]:
@@ -297,6 +298,20 @@ class FederationServer(FederationBase):
             (200, send_content)
         )
 
+    @defer.inlineCallbacks
+    @log_function
+    def on_get_missing_events(self, origin, room_id, earliest_events,
+                              latest_events, limit, min_depth):
+        missing_events = yield self.handler.on_get_missing_events(
+            origin, room_id, earliest_events, latest_events, limit, min_depth
+        )
+
+        time_now = self._clock.time_msec()
+
+        defer.returnValue({
+            "events": [ev.get_pdu_json(time_now) for ev in missing_events],
+        })
+
     @log_function
     def _get_persisted_pdu(self, origin, event_id, do_auth=True):
         """ Get a PDU from the database with given origin and id.
@@ -323,7 +338,7 @@ class FederationServer(FederationBase):
 
     @defer.inlineCallbacks
     @log_function
-    def _handle_new_pdu(self, origin, pdu, max_recursion=10):
+    def _handle_new_pdu(self, origin, pdu, get_missing=True):
         # We reprocess pdus when we have seen them only as outliers
         existing = yield self._get_persisted_pdu(
             origin, pdu.event_id, do_auth=False
@@ -375,48 +390,50 @@ class FederationServer(FederationBase):
                 pdu.room_id, min_depth
             )
 
+            prevs = {e_id for e_id, _ in pdu.prev_events}
+            seen = set(have_seen.keys())
+
             if min_depth and pdu.depth < min_depth:
                 # This is so that we don't notify the user about this
                 # message, to work around the fact that some events will
                 # reference really really old events we really don't want to
                 # send to the clients.
                 pdu.internal_metadata.outlier = True
-            elif min_depth and pdu.depth > min_depth and max_recursion > 0:
-                for event_id, hashes in pdu.prev_events:
-                    if event_id not in have_seen:
-                        logger.debug(
-                            "_handle_new_pdu requesting pdu %s",
-                            event_id
+            elif min_depth and pdu.depth > min_depth:
+                if get_missing and prevs - seen:
+                    latest_tuples = yield self.store.get_latest_events_in_room(
+                        pdu.room_id
+                    )
+
+                    # We add the prev events that we have seen to the latest
+                    # list to ensure the remote server doesn't give them to us
+                    latest = set(e_id for e_id, _, _ in latest_tuples)
+                    latest |= seen
+
+                    missing_events = yield self.get_missing_events(
+                        origin,
+                        pdu.room_id,
+                        earliest_events=list(latest),
+                        latest_events=[pdu.event_id],
+                        limit=10,
+                        min_depth=min_depth,
+                    )
+
+                    for e in missing_events:
+                        yield self._handle_new_pdu(
+                            origin,
+                            e,
+                            get_missing=False
                         )
 
-                        try:
-                            new_pdu = yield self.federation_client.get_pdu(
-                                [origin, pdu.origin],
-                                event_id=event_id,
-                            )
-
-                            if new_pdu:
-                                yield self._handle_new_pdu(
-                                    origin,
-                                    new_pdu,
-                                    max_recursion=max_recursion-1
-                                )
-
-                                logger.debug("Processed pdu %s", event_id)
-                            else:
-                                logger.warn("Failed to get PDU %s", event_id)
-                                fetch_state = True
-                        except:
-                            # TODO(erikj): Do some more intelligent retries.
-                            logger.exception("Failed to get PDU")
-                            fetch_state = True
-            else:
-                prevs = {e_id for e_id, _ in pdu.prev_events}
-                seen = set(have_seen.keys())
-                if prevs - seen:
-                    fetch_state = True
-        else:
-            fetch_state = True
+                    have_seen = yield self.store.have_events(
+                        [ev for ev, _ in pdu.prev_events]
+                    )
+
+            prevs = {e_id for e_id, _ in pdu.prev_events}
+            seen = set(have_seen.keys())
+            if prevs - seen:
+                fetch_state = True
 
         if fetch_state:
             # We need to get the state at this event, since we haven't
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 7d30c924d1..741a4e7a1a 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -224,6 +224,8 @@ class TransactionQueue(object):
         ]
 
         try:
+            self.pending_transactions[destination] = 1
+
             limiter = yield get_retry_limiter(
                 destination,
                 self._clock,
@@ -239,8 +241,6 @@ class TransactionQueue(object):
                 len(pending_failures)
             )
 
-            self.pending_transactions[destination] = 1
-
             logger.debug("TX [%s] Persisting transaction...", destination)
 
             transaction = Transaction.create_new(
@@ -287,7 +287,7 @@ class TransactionQueue(object):
                     code = 200
 
                     if response:
-                        for e_id, r in getattr(response, "pdus", {}).items():
+                        for e_id, r in response.get("pdus", {}).items():
                             if "error" in r:
                                 logger.warn(
                                     "Transaction returned error for %s: %s",
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 8b137e7128..80d03012b7 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -219,3 +219,22 @@ class TransportLayerClient(object):
         )
 
         defer.returnValue(content)
+
+    @defer.inlineCallbacks
+    @log_function
+    def get_missing_events(self, destination, room_id, earliest_events,
+                           latest_events, limit, min_depth):
+        path = PREFIX + "/get_missing_events/%s" % (room_id,)
+
+        content = yield self.client.post_json(
+            destination=destination,
+            path=path,
+            data={
+                "limit": int(limit),
+                "min_depth": int(min_depth),
+                "earliest_events": earliest_events,
+                "latest_events": latest_events,
+            }
+        )
+
+        defer.returnValue(content)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index fce9c0195e..ece6dbcf62 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -242,6 +242,7 @@ class TransportLayerServer(object):
                 )
             )
         )
+
         self.server.register_path(
             "POST",
             re.compile("^" + PREFIX + "/query_auth/([^/]*)/([^/]*)$"),
@@ -253,6 +254,17 @@ class TransportLayerServer(object):
             )
         )
 
+        self.server.register_path(
+            "POST",
+            re.compile("^" + PREFIX + "/get_missing_events/([^/]*)/?$"),
+            self._with_authentication(
+                lambda origin, content, query, room_id:
+                self._get_missing_events(
+                    origin, content, room_id,
+                )
+            )
+        )
+
     @defer.inlineCallbacks
     @log_function
     def _on_send_request(self, origin, content, query, transaction_id):
@@ -352,3 +364,22 @@ class TransportLayerServer(object):
         )
 
         defer.returnValue((200, new_content))
+
+    @defer.inlineCallbacks
+    @log_function
+    def _get_missing_events(self, origin, content, room_id):
+        limit = int(content.get("limit", 10))
+        min_depth = int(content.get("min_depth", 0))
+        earliest_events = content.get("earliest_events", [])
+        latest_events = content.get("latest_events", [])
+
+        content = yield self.request_handler.on_get_missing_events(
+            origin,
+            room_id=room_id,
+            earliest_events=earliest_events,
+            latest_events=latest_events,
+            min_depth=min_depth,
+            limit=limit,
+        )
+
+        defer.returnValue((200, content))
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 7deed16f9c..ae4e9b316d 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -581,12 +581,13 @@ class FederationHandler(BaseHandler):
         defer.returnValue(event)
 
     @defer.inlineCallbacks
-    def get_state_for_pdu(self, origin, room_id, event_id):
+    def get_state_for_pdu(self, origin, room_id, event_id, do_auth=True):
         yield run_on_reactor()
 
-        in_room = yield self.auth.check_host_in_room(room_id, origin)
-        if not in_room:
-            raise AuthError(403, "Host not in room.")
+        if do_auth:
+            in_room = yield self.auth.check_host_in_room(room_id, origin)
+            if not in_room:
+                raise AuthError(403, "Host not in room.")
 
         state_groups = yield self.store.get_state_groups(
             [event_id]
@@ -789,6 +790,29 @@ class FederationHandler(BaseHandler):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
+    def on_get_missing_events(self, origin, room_id, earliest_events,
+                              latest_events, limit, min_depth):
+        in_room = yield self.auth.check_host_in_room(
+            room_id,
+            origin
+        )
+        if not in_room:
+            raise AuthError(403, "Host not in room.")
+
+        limit = min(limit, 20)
+        min_depth = max(min_depth, 0)
+
+        missing_events = yield self.store.get_missing_events(
+            room_id=room_id,
+            earliest_events=earliest_events,
+            latest_events=latest_events,
+            limit=limit,
+            min_depth=min_depth,
+        )
+
+        defer.returnValue(missing_events)
+
+    @defer.inlineCallbacks
     @log_function
     def do_auth(self, origin, event, context, auth_events):
         # Check if we have all the auth events.
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 3fbc090224..2deda8ac50 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -64,6 +64,9 @@ class EventFederationStore(SQLBaseStore):
             for f in front:
                 txn.execute(base_sql, (f,))
                 new_front.update([r[0] for r in txn.fetchall()])
+
+            new_front -= results
+
             front = new_front
             results.update(front)
 
@@ -378,3 +381,51 @@ class EventFederationStore(SQLBaseStore):
             event_results += new_front
 
         return self._get_events_txn(txn, event_results)
+
+    def get_missing_events(self, room_id, earliest_events, latest_events,
+                           limit, min_depth):
+        return self.runInteraction(
+            "get_missing_events",
+            self._get_missing_events,
+            room_id, earliest_events, latest_events, limit, min_depth
+        )
+
+    def _get_missing_events(self, txn, room_id, earliest_events, latest_events,
+                            limit, min_depth):
+
+        earliest_events = set(earliest_events)
+        front = set(latest_events) - earliest_events
+
+        event_results = set()
+
+        query = (
+            "SELECT prev_event_id FROM event_edges "
+            "WHERE room_id = ? AND event_id = ? AND is_state = 0 "
+            "LIMIT ?"
+        )
+
+        while front and len(event_results) < limit:
+            new_front = set()
+            for event_id in front:
+                txn.execute(
+                    query,
+                    (room_id, event_id, limit - len(event_results))
+                )
+
+                for e_id, in txn.fetchall():
+                    new_front.add(e_id)
+
+            new_front -= earliest_events
+            new_front -= event_results
+
+            front = new_front
+            event_results |= new_front
+
+        events = self._get_events_txn(txn, event_results)
+
+        events = sorted(
+            [ev for ev in events if ev.depth >= min_depth],
+            key=lambda e: e.depth,
+        )
+
+        return events[:limit]
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index d88a977be4..6ffc3c99cc 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -389,14 +389,18 @@ class PresenceInvitesTestCase(PresenceTestCase):
 
     @defer.inlineCallbacks
     def test_invite_remote(self):
+        # Use a different destination, otherwise retry logic might fail the
+        # request
+        u_rocket = UserID.from_string("@rocket:there")
+
         put_json = self.mock_http_client.put_json
         put_json.expect_call_and_return(
-            call("elsewhere",
+            call("there",
                 path="/_matrix/federation/v1/send/1000000/",
-                data=_expect_edu("elsewhere", "m.presence_invite",
+                data=_expect_edu("there", "m.presence_invite",
                     content={
                         "observer_user": "@apple:test",
-                        "observed_user": "@cabbage:elsewhere",
+                        "observed_user": "@rocket:there",
                     }
                 ),
                 json_data_callback=ANY,
@@ -405,10 +409,10 @@ class PresenceInvitesTestCase(PresenceTestCase):
         )
 
         yield self.handler.send_invite(
-                observer_user=self.u_apple, observed_user=self.u_cabbage)
+                observer_user=self.u_apple, observed_user=u_rocket)
 
         self.assertEquals(
-            [{"observed_user_id": "@cabbage:elsewhere", "accepted": 0}],
+            [{"observed_user_id": "@rocket:there", "accepted": 0}],
             (yield self.datastore.get_presence_list(self.u_apple.localpart))
         )
 
@@ -418,13 +422,18 @@ class PresenceInvitesTestCase(PresenceTestCase):
     def test_accept_remote(self):
         # TODO(paul): This test will likely break if/when real auth permissions
         # are added; for now the HS will always accept any invite
+
+        # Use a different destination, otherwise retry logic might fail the
+        # request
+        u_rocket = UserID.from_string("@rocket:moon")
+
         put_json = self.mock_http_client.put_json
         put_json.expect_call_and_return(
-            call("elsewhere",
+            call("moon",
                 path="/_matrix/federation/v1/send/1000000/",
-                data=_expect_edu("elsewhere", "m.presence_accept",
+                data=_expect_edu("moon", "m.presence_accept",
                     content={
-                        "observer_user": "@cabbage:elsewhere",
+                        "observer_user": "@rocket:moon",
                         "observed_user": "@apple:test",
                     }
                 ),
@@ -437,7 +446,7 @@ class PresenceInvitesTestCase(PresenceTestCase):
             "/_matrix/federation/v1/send/1000000/",
             _make_edu_json("elsewhere", "m.presence_invite",
                 content={
-                    "observer_user": "@cabbage:elsewhere",
+                    "observer_user": "@rocket:moon",
                     "observed_user": "@apple:test",
                 }
             )
@@ -446,7 +455,7 @@ class PresenceInvitesTestCase(PresenceTestCase):
         self.assertTrue(
             (yield self.datastore.is_presence_visible(
                 observed_localpart=self.u_apple.localpart,
-                observer_userid=self.u_cabbage.to_string(),
+                observer_userid=u_rocket.to_string(),
             ))
         )
 
@@ -454,13 +463,17 @@ class PresenceInvitesTestCase(PresenceTestCase):
 
     @defer.inlineCallbacks
     def test_invited_remote_nonexistant(self):
+        # Use a different destination, otherwise retry logic might fail the
+        # request
+        u_rocket = UserID.from_string("@rocket:sun")
+
         put_json = self.mock_http_client.put_json
         put_json.expect_call_and_return(
-            call("elsewhere",
+            call("sun",
                 path="/_matrix/federation/v1/send/1000000/",
-                data=_expect_edu("elsewhere", "m.presence_deny",
+                data=_expect_edu("sun", "m.presence_deny",
                     content={
-                        "observer_user": "@cabbage:elsewhere",
+                        "observer_user": "@rocket:sun",
                         "observed_user": "@durian:test",
                     }
                 ),
@@ -471,9 +484,9 @@ class PresenceInvitesTestCase(PresenceTestCase):
 
         yield self.mock_federation_resource.trigger("PUT",
             "/_matrix/federation/v1/send/1000000/",
-            _make_edu_json("elsewhere", "m.presence_invite",
+            _make_edu_json("sun", "m.presence_invite",
                 content={
-                    "observer_user": "@cabbage:elsewhere",
+                    "observer_user": "@rocket:sun",
                     "observed_user": "@durian:test",
                 }
             )