From 9084cdd70f0b9c03de4e72897cabd30f545b6cb5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 May 2015 16:34:31 +0100 Subject: Ensure event_results is a set --- synapse/storage/event_federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 2880850506..2f913adf20 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -378,7 +378,7 @@ class EventFederationStore(SQLBaseStore): room_id, repr(event_list), limit ) - event_results = event_list + event_results = set(event_list) front = event_list -- cgit 1.4.1 From 20814fabdd001ee6a04efc5277d71e80fdbf5a14 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 May 2015 11:59:02 +0100 Subject: Actually fetch state for new backwards extremeties when backfilling. --- synapse/federation/federation_client.py | 6 +- synapse/handlers/federation.py | 164 ++++++++++++++++++++------------ 2 files changed, 108 insertions(+), 62 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 4b3bf97835..6febc8618c 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -168,7 +168,11 @@ class FederationClient(FederationBase): for i, pdu in enumerate(pdus): pdus[i] = yield self._check_sigs_and_hash(pdu) - # FIXME: We should handle signature failures more gracefully. + # FIXME: We should handle signature failures more gracefully. + pdus[:] = yield defer.gatherResults( + [self._check_sigs_and_hash(pdu) for pdu in pdus], + consumeErrors=True, + ).addErrback(unwrapFirstError) defer.returnValue(pdus) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d85b1cf5de..46ce3699d7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -230,27 +230,65 @@ class FederationHandler(BaseHandler): if not extremities: extremities = yield self.store.get_oldest_events_in_room(room_id) - pdus = yield self.replication_layer.backfill( + events = yield self.replication_layer.backfill( dest, room_id, - limit, + limit=limit, extremities=extremities, ) - events = [] + event_map = {e.event_id: e for e in events} - for pdu in pdus: - event = pdu + event_ids = set(e.event_id for e in events) - # FIXME (erikj): Not sure this actually works :/ - context = yield self.state_handler.compute_event_context(event) + edges = [ + ev.event_id + for ev in events + if set(e_id for e_id, _ in ev.prev_events) - event_ids + ] - events.append((event, context)) + # For each edge get the current state. - yield self.store.persist_event( - event, - context=context, - backfilled=True + auth_events = {} + events_to_state = {} + for e_id in edges: + state, auth = yield self.replication_layer.get_state_for_room( + destination=dest, + room_id=room_id, + event_id=e_id + ) + auth_events.update({a.event_id: a for a in auth}) + events_to_state[e_id] = state + + yield defer.gatherResults( + [ + self._handle_new_event(dest, a) + for a in auth_events.values() + ], + consumeErrors=True, + ).addErrback(unwrapFirstError) + + yield defer.gatherResults( + [ + self._handle_new_event( + dest, event_map[e_id], + state=events_to_state[e_id], + backfilled=True, + ) + for e_id in events_to_state + ], + consumeErrors=True + ).addErrback(unwrapFirstError) + + events.sort(key=lambda e: e.depth) + + for event in events: + if event in events_to_state: + continue + + yield self._handle_new_event( + dest, event, + backfilled=True, ) defer.returnValue(events) @@ -347,7 +385,7 @@ class FederationHandler(BaseHandler): logger.info(e.message) continue except Exception as e: - logger.warn( + logger.exception( "Failed to backfill from %s because %s", dom, e, ) @@ -517,54 +555,9 @@ class FederationHandler(BaseHandler): # FIXME pass - auth_ids_to_deferred = {} - - def process_auth_ev(ev): - auth_ids = [e_id for e_id, _ in ev.auth_events] - - prev_ds = [ - auth_ids_to_deferred[i] - for i in auth_ids - if i in auth_ids_to_deferred - ] - - d = defer.Deferred() - - auth_ids_to_deferred[ev.event_id] = d - - @defer.inlineCallbacks - def f(*_): - ev.internal_metadata.outlier = True - - try: - auth = { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - - yield self._handle_new_event( - origin, ev, auth_events=auth - ) - except: - logger.exception( - "Failed to handle auth event %s", - ev.event_id, - ) - - d.callback(None) - - if prev_ds: - dx = defer.DeferredList(prev_ds) - dx.addBoth(f) - else: - f() - - for e in auth_chain: - if e.event_id == event.event_id: - return - process_auth_ev(e) - - yield defer.DeferredList(auth_ids_to_deferred.values()) + yield self._handle_auth_events( + origin, [e for e in auth_chain if e.event_id != event.event_id] + ) @defer.inlineCallbacks def handle_state(e): @@ -1348,3 +1341,52 @@ class FederationHandler(BaseHandler): }, "missing": [e.event_id for e in missing_locals], }) + + @defer.inlineCallbacks + def _handle_auth_events(self, origin, auth_events): + auth_ids_to_deferred = {} + + def process_auth_ev(ev): + auth_ids = [e_id for e_id, _ in ev.auth_events] + + prev_ds = [ + auth_ids_to_deferred[i] + for i in auth_ids + if i in auth_ids_to_deferred + ] + + d = defer.Deferred() + + auth_ids_to_deferred[ev.event_id] = d + + @defer.inlineCallbacks + def f(*_): + ev.internal_metadata.outlier = True + + try: + auth = { + (e.type, e.state_key): e for e in auth_events + if e.event_id in auth_ids + } + + yield self._handle_new_event( + origin, ev, auth_events=auth + ) + except: + logger.exception( + "Failed to handle auth event %s", + ev.event_id, + ) + + d.callback(None) + + if prev_ds: + dx = defer.DeferredList(prev_ds) + dx.addBoth(f) + else: + f() + + for e in auth_events: + process_auth_ev(e) + + yield defer.DeferredList(auth_ids_to_deferred.values()) -- cgit 1.4.1 From 2bc60c55af8a77d4cf229c4c1d3a05f488c4af6d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 May 2015 12:57:00 +0100 Subject: Fix _get_backfill_events to return events in the correct order --- synapse/storage/event_federation.py | 55 +++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 2f913adf20..34948c30cf 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -17,6 +17,7 @@ from ._base import SQLBaseStore, cached from syutil.base64util import encode_base64 import logging +from Queue import PriorityQueue logger = logging.getLogger(__name__) @@ -380,41 +381,41 @@ class EventFederationStore(SQLBaseStore): event_results = set(event_list) - front = event_list + # We want to make sure that we do a breadth-first, "depth" ordered + # search. query = ( - "SELECT prev_event_id FROM event_edges " - "WHERE room_id = ? AND event_id = ? " - "LIMIT ?" + "SELECT depth, prev_event_id FROM event_edges" + " INNER JOIN events" + " ON prev_event_id = events.event_id" + " AND event_edges.room_id = events.room_id" + " WHERE event_edges.room_id = ? AND event_edges.event_id = ?" + " LIMIT ?" ) - # We iterate through all event_ids in `front` to select their previous - # events. These are dumped in `new_front`. - # We continue until we reach the limit *or* new_front is empty (i.e., - # we've run out of things to select - while front and len(event_results) < limit: + queue = PriorityQueue() - new_front = [] - for event_id in front: - logger.debug( - "_backfill_interaction: id=%s", - event_id - ) + for event_id in event_list: + txn.execute( + query, + (room_id, event_id, limit - len(event_results)) + ) - txn.execute( - query, - (room_id, event_id, limit - len(event_results)) - ) + for row in txn.fetchall(): + queue.put(row) - for row in txn.fetchall(): - logger.debug( - "_backfill_interaction: got id=%s", - *row - ) - new_front.append(row[0]) + while not queue.empty() and len(event_results) < limit: + _, event_id = queue.get_nowait() - front = new_front - event_results += new_front + event_results.add(event_id) + + txn.execute( + query, + (room_id, event_id, limit - len(event_results)) + ) + + for row in txn.fetchall(): + queue.put(row) return self._get_events_txn(txn, event_results) -- cgit 1.4.1 From 115ef3ddac9dbb1c49c31257190e77062b5a10a8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 May 2015 15:37:43 +0100 Subject: Correctly capture Queue.Empty exception --- synapse/storage/event_federation.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 80eff8e6f2..e171cbcdb4 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -19,7 +19,7 @@ from ._base import SQLBaseStore, cached from syutil.base64util import encode_base64 import logging -from Queue import PriorityQueue +from Queue import PriorityQueue, Empty logger = logging.getLogger(__name__) @@ -398,7 +398,10 @@ class EventFederationStore(SQLBaseStore): queue.put(row) while not queue.empty() and len(event_results) < limit: - _, event_id = queue.get_nowait() + try: + _, event_id = queue.get_nowait() + except Empty: + break event_results.add(event_id) -- cgit 1.4.1 From 6189d8e54d7f7d55cd6cd2e9d7f866e895c6fe44 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 May 2015 15:38:08 +0100 Subject: PriorityQueue gives lowest first --- synapse/storage/event_federation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index e171cbcdb4..03942fec7a 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -395,7 +395,7 @@ class EventFederationStore(SQLBaseStore): ) for row in txn.fetchall(): - queue.put(row) + queue.put((-row[0], row[1])) while not queue.empty() and len(event_results) < limit: try: @@ -411,7 +411,7 @@ class EventFederationStore(SQLBaseStore): ) for row in txn.fetchall(): - queue.put(row) + queue.put((-row[0], row[1])) return event_results -- cgit 1.4.1 From 73d23c6ae85b180dbeca070fc9149692ee2fbcfe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 May 2015 15:40:22 +0100 Subject: Don't readd things that are already in event_results --- synapse/storage/event_federation.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 03942fec7a..a3da117799 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -411,7 +411,8 @@ class EventFederationStore(SQLBaseStore): ) for row in txn.fetchall(): - queue.put((-row[0], row[1])) + if row[1] not in event_results: + queue.put((-row[0], row[1])) return event_results -- cgit 1.4.1 From dc085ddf8cfa887672f5c505b3bf9c2ce7fc0d58 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 May 2015 15:44:05 +0100 Subject: Don't prepopulate event_results --- synapse/storage/event_federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index a3da117799..26d570cf2d 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -372,7 +372,7 @@ class EventFederationStore(SQLBaseStore): room_id, repr(event_list), limit ) - event_results = set(event_list) + event_results = set() # We want to make sure that we do a breadth-first, "depth" ordered # search. -- cgit 1.4.1 From ae3bff349151d8f309bdf29fd258b215cb792e90 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 May 2015 15:46:07 +0100 Subject: Correctly prepopulate queue --- synapse/storage/event_federation.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 26d570cf2d..91d19857b1 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -394,8 +394,16 @@ class EventFederationStore(SQLBaseStore): (room_id, event_id, limit - len(event_results)) ) - for row in txn.fetchall(): - queue.put((-row[0], row[1])) + depth = self._simple_select_one_onecol_txn( + txn, + table="events", + keyvalues={ + "event_id": event_id, + }, + retcol="depth" + ) + + queue.put((-depth, event_id)) while not queue.empty() and len(event_results) < limit: try: -- cgit 1.4.1 From 39a3340f738f37868b7034cb019fb410f6b1d48b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 May 2015 15:48:56 +0100 Subject: Skip events we've already seen --- synapse/storage/event_federation.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 91d19857b1..823a4998c3 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -411,6 +411,9 @@ class EventFederationStore(SQLBaseStore): except Empty: break + if event_id in event_results: + continue + event_results.add(event_id) txn.execute( -- cgit 1.4.1 From 1f3d1d85a9a12cab1423d39f82c8dbe339e69269 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 May 2015 15:52:29 +0100 Subject: Only get non-state --- synapse/storage/event_federation.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 823a4998c3..8188b7cbc4 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -383,6 +383,7 @@ class EventFederationStore(SQLBaseStore): " ON prev_event_id = events.event_id" " AND event_edges.room_id = events.room_id" " WHERE event_edges.room_id = ? AND event_edges.event_id = ?" + " AND event_edges.is_state = ?" " LIMIT ?" ) @@ -418,7 +419,7 @@ class EventFederationStore(SQLBaseStore): txn.execute( query, - (room_id, event_id, limit - len(event_results)) + (room_id, event_id, False, limit - len(event_results)) ) for row in txn.fetchall(): -- cgit 1.4.1 From 0180bfe4aa97464bd86efd702348fcd412f3006f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 May 2015 15:53:41 +0100 Subject: Remove dead code --- synapse/storage/event_federation.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 8188b7cbc4..8a56476f56 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -390,11 +390,6 @@ class EventFederationStore(SQLBaseStore): queue = PriorityQueue() for event_id in event_list: - txn.execute( - query, - (room_id, event_id, limit - len(event_results)) - ) - depth = self._simple_select_one_onecol_txn( txn, table="events", -- cgit 1.4.1 From e309b1045db036174b66364740b645466e459454 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 May 2015 15:57:35 +0100 Subject: Sort backfill events --- synapse/storage/event_federation.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 8a56476f56..5fd126cdb9 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -364,7 +364,11 @@ class EventFederationStore(SQLBaseStore): return self.runInteraction( "get_backfill_events", self._get_backfill_events, room_id, event_list, limit - ).addCallback(self._get_events) + ).addCallback( + self._get_events + ).addCallback( + lambda l: l.sort(key=lambda e: -e.depth) + ) def _get_backfill_events(self, txn, room_id, event_list, limit): logger.debug( -- cgit 1.4.1 From a910984b58b1653394d84402bb61fb75a88e9bc7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 May 2015 15:58:41 +0100 Subject: Actually return something from lambda --- synapse/storage/event_federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 5fd126cdb9..4655c8e548 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -367,7 +367,7 @@ class EventFederationStore(SQLBaseStore): ).addCallback( self._get_events ).addCallback( - lambda l: l.sort(key=lambda e: -e.depth) + lambda l: sorted(l, key=lambda e: -e.depth) ) def _get_backfill_events(self, txn, room_id, event_list, limit): -- cgit 1.4.1 From 6eadbfbea0f8eb742f94d73e262631b0877e3dee Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2015 16:12:20 +0100 Subject: Remove redundant for loop --- synapse/federation/federation_client.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index cbb9d354b6..d3b46b24c1 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -165,9 +165,6 @@ class FederationClient(FederationBase): for p in transaction_data["pdus"] ] - for i, pdu in enumerate(pdus): - pdus[i] = yield self._check_sigs_and_hash(pdu) - # FIXME: We should handle signature failures more gracefully. pdus[:] = yield defer.gatherResults( [self._check_sigs_and_hash(pdu) for pdu in pdus], -- cgit 1.4.1