From 0ac2a79faa918280767c18e4db7ec29d7d3a3afb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Feb 2015 17:24:14 +0000 Subject: Initial stab at implementing a batched get_missing_pdus request --- synapse/federation/federation_server.py | 72 +++++++++++++++++++++++++++++++++ synapse/handlers/federation.py | 9 +++-- synapse/storage/event_federation.py | 63 ++++++++++++++++++++++++++--- 3 files changed, 135 insertions(+), 9 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 22b9663831..34bc397e8a 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -305,6 +305,78 @@ class FederationServer(FederationBase): (200, send_content) ) + @defer.inlineCallbacks + def get_missing_events(self, origin, room_id, earliest_events, + latest_events, limit, min_depth): + limit = max(limit, 50) + min_depth = max(min_depth, 0) + + missing_events = yield self.store.get_missing_events( + room_id=room_id, + earliest_events=earliest_events, + latest_events=latest_events, + limit=limit, + min_depth=min_depth, + ) + + known_ids = {e.event_id for e in missing_events} | {earliest_events} + + back_edges = { + e for e in missing_events + if {i for i, h in e.prev_events.items()} <= known_ids + } + + decoded_auth_events = set() + state = {} + auth_events = set() + auth_and_state = {} + for event in back_edges: + state_pdus = yield self.handler.get_state_for_pdu( + origin, room_id, event.event_id, + do_auth=False, + ) + + state[event.event_id] = [s.event_id for s in state_pdus] + + auth_and_state.update({ + s.event_id: s for s in state_pdus + }) + + state_ids = {pdu.event_id for pdu in state_pdus} + prev_ids = {i for i, h in event.prev_events.items()} + partial_auth_chain = yield self.store.get_auth_chain( + state_ids | prev_ids, have_ids=decoded_auth_events.keys() + ) + + for p in partial_auth_chain: + p.signatures.update( + compute_event_signature( + p, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ) + + auth_events.update( + a.event_id for a in partial_auth_chain + ) + + auth_and_state.update({ + a.event_id: a for a in partial_auth_chain + }) + + time_now = self._clock.time_msec() + + defer.returnValue({ + "events": [ev.get_pdu_json(time_now) for ev in missing_events], + "state_for_events": state, + "auth_events": auth_events, + "event_map": { + k: ev.get_pdu_json(time_now) + for k, ev in auth_and_state.items() + }, + }) + @log_function def _get_persisted_pdu(self, origin, event_id, do_auth=True): """ Get a PDU from the database with given origin and id. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0eb2ff95ca..26bdc6d1a7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -581,12 +581,13 @@ class FederationHandler(BaseHandler): defer.returnValue(event) @defer.inlineCallbacks - def get_state_for_pdu(self, origin, room_id, event_id): + def get_state_for_pdu(self, origin, room_id, event_id, do_auth=True): yield run_on_reactor() - in_room = yield self.auth.check_host_in_room(room_id, origin) - if not in_room: - raise AuthError(403, "Host not in room.") + if do_auth: + in_room = yield self.auth.check_host_in_room(room_id, origin) + if not in_room: + raise AuthError(403, "Host not in room.") state_groups = yield self.store.get_state_groups( [event_id] diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 3fbc090224..22bf7ad832 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -32,15 +32,15 @@ class EventFederationStore(SQLBaseStore): and backfilling from another server respectively. """ - def get_auth_chain(self, event_ids): + def get_auth_chain(self, event_ids, have_ids=set()): return self.runInteraction( "get_auth_chain", self._get_auth_chain_txn, - event_ids + event_ids, have_ids ) - def _get_auth_chain_txn(self, txn, event_ids): - results = self._get_auth_chain_ids_txn(txn, event_ids) + def _get_auth_chain_txn(self, txn, event_ids, have_ids): + results = self._get_auth_chain_ids_txn(txn, event_ids, have_ids) return self._get_events_txn(txn, results) @@ -51,8 +51,9 @@ class EventFederationStore(SQLBaseStore): event_ids ) - def _get_auth_chain_ids_txn(self, txn, event_ids): + def _get_auth_chain_ids_txn(self, txn, event_ids, have_ids): results = set() + have_ids = set(have_ids) base_sql = ( "SELECT auth_id FROM event_auth WHERE event_id = ?" @@ -64,6 +65,10 @@ class EventFederationStore(SQLBaseStore): for f in front: txn.execute(base_sql, (f,)) new_front.update([r[0] for r in txn.fetchall()]) + + new_front -= results + new_front -= have_ids + front = new_front results.update(front) @@ -378,3 +383,51 @@ class EventFederationStore(SQLBaseStore): event_results += new_front return self._get_events_txn(txn, event_results) + + def get_missing_events(self, room_id, earliest_events, latest_events, + limit, min_depth): + return self.runInteraction( + "get_missing_events", + self._get_missing_events, + room_id, earliest_events, latest_events, limit, min_depth + ) + + def _get_missing_events(self, txn, room_id, earliest_events, latest_events, + limit, min_depth): + + earliest_events = set(earliest_events) + front = set(latest_events) - earliest_events + + event_results = set() + + query = ( + "SELECT prev_event_id FROM event_edges " + "WHERE room_id = ? AND event_id = ? AND is_state = 0 " + "LIMIT ?" + ) + + while front and len(event_results) < limit: + new_front = set() + for event_id in front: + txn.execute( + query, + (room_id, event_id, limit - len(event_results)) + ) + + for e_id, in txn.fetchall(): + new_front.add(e_id) + + new_front -= earliest_events + new_front -= event_results + + front = new_front + event_results |= new_front + + events = self._get_events_txn(txn, event_results) + + events = sorted( + [ev for ev in events if ev.depth >= min_depth], + key=lambda e: e.depth, + ) + + return events[:limit] -- cgit 1.4.1 From db215b7e0007a207b8775d78c6693153e16f2731 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Feb 2015 13:58:02 +0000 Subject: Implement and use new batched get missing pdu --- synapse/federation/federation_client.py | 19 ++++ synapse/federation/federation_server.py | 150 +++++++++++--------------------- synapse/federation/transaction_queue.py | 2 +- synapse/federation/transport/client.py | 19 ++++ synapse/federation/transport/server.py | 31 +++++++ synapse/handlers/federation.py | 23 +++++ 6 files changed, 144 insertions(+), 100 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index cd3c962d50..ca89a0787c 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -439,6 +439,25 @@ class FederationClient(FederationBase): defer.returnValue(ret) + @defer.inlineCallbacks + def get_missing_events(self, destination, room_id, earliest_events, + latest_events, limit, min_depth): + content = yield self.transport_layer.get_missing_events( + destination, room_id, earliest_events, latest_events, limit, + min_depth, + ) + + events = [ + self.event_from_pdu_json(e) + for e in content.get("events", []) + ] + + signed_events = yield self._check_sigs_and_hash_and_fetch( + destination, events, outlier=True + ) + + defer.returnValue(signed_events) + def event_from_pdu_json(self, pdu_json, outlier=False): event = FrozenEvent( pdu_json diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 34bc397e8a..f74e16abd5 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -142,7 +142,15 @@ class FederationServer(FederationBase): if r[0]: ret.append({}) else: - logger.exception(r[1]) + failure = r[1] + logger.error( + "Failed to handle PDU", + exc_info=( + failure.type, + failure.value, + failure.getTracebackObject() + ) + ) ret.append({"error": str(r[1].value)}) logger.debug("Returning: %s", str(ret)) @@ -306,75 +314,17 @@ class FederationServer(FederationBase): ) @defer.inlineCallbacks - def get_missing_events(self, origin, room_id, earliest_events, - latest_events, limit, min_depth): - limit = max(limit, 50) - min_depth = max(min_depth, 0) - - missing_events = yield self.store.get_missing_events( - room_id=room_id, - earliest_events=earliest_events, - latest_events=latest_events, - limit=limit, - min_depth=min_depth, + @log_function + def on_get_missing_events(self, origin, room_id, earliest_events, + latest_events, limit, min_depth): + missing_events = yield self.handler.on_get_missing_events( + origin, room_id, earliest_events, latest_events, limit, min_depth ) - known_ids = {e.event_id for e in missing_events} | {earliest_events} - - back_edges = { - e for e in missing_events - if {i for i, h in e.prev_events.items()} <= known_ids - } - - decoded_auth_events = set() - state = {} - auth_events = set() - auth_and_state = {} - for event in back_edges: - state_pdus = yield self.handler.get_state_for_pdu( - origin, room_id, event.event_id, - do_auth=False, - ) - - state[event.event_id] = [s.event_id for s in state_pdus] - - auth_and_state.update({ - s.event_id: s for s in state_pdus - }) - - state_ids = {pdu.event_id for pdu in state_pdus} - prev_ids = {i for i, h in event.prev_events.items()} - partial_auth_chain = yield self.store.get_auth_chain( - state_ids | prev_ids, have_ids=decoded_auth_events.keys() - ) - - for p in partial_auth_chain: - p.signatures.update( - compute_event_signature( - p, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - - auth_events.update( - a.event_id for a in partial_auth_chain - ) - - auth_and_state.update({ - a.event_id: a for a in partial_auth_chain - }) - time_now = self._clock.time_msec() defer.returnValue({ "events": [ev.get_pdu_json(time_now) for ev in missing_events], - "state_for_events": state, - "auth_events": auth_events, - "event_map": { - k: ev.get_pdu_json(time_now) - for k, ev in auth_and_state.items() - }, }) @log_function @@ -403,7 +353,7 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function - def _handle_new_pdu(self, origin, pdu, max_recursion=10): + def _handle_new_pdu(self, origin, pdu, get_missing=True): # We reprocess pdus when we have seen them only as outliers existing = yield self._get_persisted_pdu( origin, pdu.event_id, do_auth=False @@ -455,48 +405,50 @@ class FederationServer(FederationBase): pdu.room_id, min_depth ) + prevs = {e_id for e_id, _ in pdu.prev_events} + seen = set(have_seen.keys()) + if min_depth and pdu.depth < min_depth: # This is so that we don't notify the user about this # message, to work around the fact that some events will # reference really really old events we really don't want to # send to the clients. pdu.internal_metadata.outlier = True - elif min_depth and pdu.depth > min_depth and max_recursion > 0: - for event_id, hashes in pdu.prev_events: - if event_id not in have_seen: - logger.debug( - "_handle_new_pdu requesting pdu %s", - event_id + elif min_depth and pdu.depth > min_depth: + if get_missing and prevs - seen: + latest_tuples = yield self.store.get_latest_events_in_room( + pdu.room_id + ) + + # We add the prev events that we have seen to the latest + # list to ensure the remote server doesn't give them to us + latest = set(e_id for e_id, _, _ in latest_tuples) + latest |= seen + + missing_events = yield self.get_missing_events( + origin, + pdu.room_id, + earliest_events=list(latest), + latest_events=[pdu.event_id], + limit=10, + min_depth=min_depth, + ) + + for e in missing_events: + yield self._handle_new_pdu( + origin, + e, + get_missing=False ) - try: - new_pdu = yield self.federation_client.get_pdu( - [origin, pdu.origin], - event_id=event_id, - ) - - if new_pdu: - yield self._handle_new_pdu( - origin, - new_pdu, - max_recursion=max_recursion-1 - ) - - logger.debug("Processed pdu %s", event_id) - else: - logger.warn("Failed to get PDU %s", event_id) - fetch_state = True - except: - # TODO(erikj): Do some more intelligent retries. - logger.exception("Failed to get PDU") - fetch_state = True - else: - prevs = {e_id for e_id, _ in pdu.prev_events} - seen = set(have_seen.keys()) - if prevs - seen: - fetch_state = True - else: - fetch_state = True + have_seen = yield self.store.have_events( + [ev for ev, _ in pdu.prev_events] + ) + + prevs = {e_id for e_id, _ in pdu.prev_events} + seen = set(have_seen.keys()) + if prevs - seen: + fetch_state = True if fetch_state: # We need to get the state at this event, since we haven't diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 7d30c924d1..8f1acbe590 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -287,7 +287,7 @@ class TransactionQueue(object): code = 200 if response: - for e_id, r in getattr(response, "pdus", {}).items(): + for e_id, r in response.get("pdus", {}).items(): if "error" in r: logger.warn( "Transaction returned error for %s: %s", diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 8b137e7128..80d03012b7 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -219,3 +219,22 @@ class TransportLayerClient(object): ) defer.returnValue(content) + + @defer.inlineCallbacks + @log_function + def get_missing_events(self, destination, room_id, earliest_events, + latest_events, limit, min_depth): + path = PREFIX + "/get_missing_events/%s" % (room_id,) + + content = yield self.client.post_json( + destination=destination, + path=path, + data={ + "limit": int(limit), + "min_depth": int(min_depth), + "earliest_events": earliest_events, + "latest_events": latest_events, + } + ) + + defer.returnValue(content) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 2ffb37aa18..ad75c8ddb7 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -234,6 +234,7 @@ class TransportLayerServer(object): ) ) ) + self.server.register_path( "POST", re.compile("^" + PREFIX + "/query_auth/([^/]*)/([^/]*)$"), @@ -245,6 +246,17 @@ class TransportLayerServer(object): ) ) + self.server.register_path( + "POST", + re.compile("^" + PREFIX + "/get_missing_events/([^/]*)/?$"), + self._with_authentication( + lambda origin, content, query, room_id: + self._get_missing_events( + origin, content, room_id, + ) + ) + ) + @defer.inlineCallbacks @log_function def _on_send_request(self, origin, content, query, transaction_id): @@ -344,3 +356,22 @@ class TransportLayerServer(object): ) defer.returnValue((200, new_content)) + + @defer.inlineCallbacks + @log_function + def _get_missing_events(self, origin, content, room_id): + limit = int(content.get("limit", 10)) + min_depth = int(content.get("min_depth", 0)) + earliest_events = content.get("earliest_events", []) + latest_events = content.get("latest_events", []) + + content = yield self.request_handler.on_get_missing_events( + origin, + room_id=room_id, + earliest_events=earliest_events, + latest_events=latest_events, + min_depth=min_depth, + limit=limit, + ) + + defer.returnValue((200, content)) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 26bdc6d1a7..628e62f8b1 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -789,6 +789,29 @@ class FederationHandler(BaseHandler): defer.returnValue(ret) + @defer.inlineCallbacks + def on_get_missing_events(self, origin, room_id, earliest_events, + latest_events, limit, min_depth): + in_room = yield self.auth.check_host_in_room( + room_id, + origin + ) + if not in_room: + raise AuthError(403, "Host not in room.") + + limit = min(limit, 20) + min_depth = max(min_depth, 0) + + missing_events = yield self.store.get_missing_events( + room_id=room_id, + earliest_events=earliest_events, + latest_events=latest_events, + limit=limit, + min_depth=min_depth, + ) + + defer.returnValue(missing_events) + @defer.inlineCallbacks @log_function def do_auth(self, origin, event, context, auth_events): -- cgit 1.4.1 From 42b972bccd0cf7d903befb498f9c1bbd5c4e6583 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Feb 2015 14:35:23 +0000 Subject: Revert get_auth_chain changes --- synapse/storage/event_federation.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 22bf7ad832..2deda8ac50 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -32,15 +32,15 @@ class EventFederationStore(SQLBaseStore): and backfilling from another server respectively. """ - def get_auth_chain(self, event_ids, have_ids=set()): + def get_auth_chain(self, event_ids): return self.runInteraction( "get_auth_chain", self._get_auth_chain_txn, - event_ids, have_ids + event_ids ) - def _get_auth_chain_txn(self, txn, event_ids, have_ids): - results = self._get_auth_chain_ids_txn(txn, event_ids, have_ids) + def _get_auth_chain_txn(self, txn, event_ids): + results = self._get_auth_chain_ids_txn(txn, event_ids) return self._get_events_txn(txn, results) @@ -51,9 +51,8 @@ class EventFederationStore(SQLBaseStore): event_ids ) - def _get_auth_chain_ids_txn(self, txn, event_ids, have_ids): + def _get_auth_chain_ids_txn(self, txn, event_ids): results = set() - have_ids = set(have_ids) base_sql = ( "SELECT auth_id FROM event_auth WHERE event_id = ?" @@ -67,7 +66,6 @@ class EventFederationStore(SQLBaseStore): new_front.update([r[0] for r in txn.fetchall()]) new_front -= results - new_front -= have_ids front = new_front results.update(front) -- cgit 1.4.1 From 852816befe678f0061dc153fab20e1d4c70a9094 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Feb 2015 15:14:09 +0000 Subject: Fix presence tests --- tests/handlers/test_presence.py | 43 +++++++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index d88a977be4..6ffc3c99cc 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -389,14 +389,18 @@ class PresenceInvitesTestCase(PresenceTestCase): @defer.inlineCallbacks def test_invite_remote(self): + # Use a different destination, otherwise retry logic might fail the + # request + u_rocket = UserID.from_string("@rocket:there") + put_json = self.mock_http_client.put_json put_json.expect_call_and_return( - call("elsewhere", + call("there", path="/_matrix/federation/v1/send/1000000/", - data=_expect_edu("elsewhere", "m.presence_invite", + data=_expect_edu("there", "m.presence_invite", content={ "observer_user": "@apple:test", - "observed_user": "@cabbage:elsewhere", + "observed_user": "@rocket:there", } ), json_data_callback=ANY, @@ -405,10 +409,10 @@ class PresenceInvitesTestCase(PresenceTestCase): ) yield self.handler.send_invite( - observer_user=self.u_apple, observed_user=self.u_cabbage) + observer_user=self.u_apple, observed_user=u_rocket) self.assertEquals( - [{"observed_user_id": "@cabbage:elsewhere", "accepted": 0}], + [{"observed_user_id": "@rocket:there", "accepted": 0}], (yield self.datastore.get_presence_list(self.u_apple.localpart)) ) @@ -418,13 +422,18 @@ class PresenceInvitesTestCase(PresenceTestCase): def test_accept_remote(self): # TODO(paul): This test will likely break if/when real auth permissions # are added; for now the HS will always accept any invite + + # Use a different destination, otherwise retry logic might fail the + # request + u_rocket = UserID.from_string("@rocket:moon") + put_json = self.mock_http_client.put_json put_json.expect_call_and_return( - call("elsewhere", + call("moon", path="/_matrix/federation/v1/send/1000000/", - data=_expect_edu("elsewhere", "m.presence_accept", + data=_expect_edu("moon", "m.presence_accept", content={ - "observer_user": "@cabbage:elsewhere", + "observer_user": "@rocket:moon", "observed_user": "@apple:test", } ), @@ -437,7 +446,7 @@ class PresenceInvitesTestCase(PresenceTestCase): "/_matrix/federation/v1/send/1000000/", _make_edu_json("elsewhere", "m.presence_invite", content={ - "observer_user": "@cabbage:elsewhere", + "observer_user": "@rocket:moon", "observed_user": "@apple:test", } ) @@ -446,7 +455,7 @@ class PresenceInvitesTestCase(PresenceTestCase): self.assertTrue( (yield self.datastore.is_presence_visible( observed_localpart=self.u_apple.localpart, - observer_userid=self.u_cabbage.to_string(), + observer_userid=u_rocket.to_string(), )) ) @@ -454,13 +463,17 @@ class PresenceInvitesTestCase(PresenceTestCase): @defer.inlineCallbacks def test_invited_remote_nonexistant(self): + # Use a different destination, otherwise retry logic might fail the + # request + u_rocket = UserID.from_string("@rocket:sun") + put_json = self.mock_http_client.put_json put_json.expect_call_and_return( - call("elsewhere", + call("sun", path="/_matrix/federation/v1/send/1000000/", - data=_expect_edu("elsewhere", "m.presence_deny", + data=_expect_edu("sun", "m.presence_deny", content={ - "observer_user": "@cabbage:elsewhere", + "observer_user": "@rocket:sun", "observed_user": "@durian:test", } ), @@ -471,9 +484,9 @@ class PresenceInvitesTestCase(PresenceTestCase): yield self.mock_federation_resource.trigger("PUT", "/_matrix/federation/v1/send/1000000/", - _make_edu_json("elsewhere", "m.presence_invite", + _make_edu_json("sun", "m.presence_invite", content={ - "observer_user": "@cabbage:elsewhere", + "observer_user": "@rocket:sun", "observed_user": "@durian:test", } ) -- cgit 1.4.1 From 59362454ddbcb3adf64d81da00dcd85cdb59a2e6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Feb 2015 15:47:35 +0000 Subject: Must update pending_transactions map before yield'ing --- synapse/federation/transaction_queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 8f1acbe590..741a4e7a1a 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -224,6 +224,8 @@ class TransactionQueue(object): ] try: + self.pending_transactions[destination] = 1 + limiter = yield get_retry_limiter( destination, self._clock, @@ -239,8 +241,6 @@ class TransactionQueue(object): len(pending_failures) ) - self.pending_transactions[destination] = 1 - logger.debug("TX [%s] Persisting transaction...", destination) transaction = Transaction.create_new( -- cgit 1.4.1