diff options
Diffstat (limited to 'synapse/storage/event_federation.py')
-rw-r--r-- | synapse/storage/event_federation.py | 149 |
1 files changed, 60 insertions, 89 deletions
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index a8d90456e3..956f876572 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -32,8 +32,7 @@ from synapse.util.caches.descriptors import cached logger = logging.getLogger(__name__) -class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, - SQLBaseStore): +class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBaseStore): def get_auth_chain(self, event_ids, include_given=False): """Get auth events for given event_ids. The events *must* be state events. @@ -45,7 +44,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, list of events """ return self.get_auth_chain_ids( - event_ids, include_given=include_given, + event_ids, include_given=include_given ).addCallback(self._get_events) def get_auth_chain_ids(self, event_ids, include_given=False): @@ -59,9 +58,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, list of event_ids """ return self.runInteraction( - "get_auth_chain_ids", - self._get_auth_chain_ids_txn, - event_ids, include_given + "get_auth_chain_ids", self._get_auth_chain_ids_txn, event_ids, include_given ) def _get_auth_chain_ids_txn(self, txn, event_ids, include_given): @@ -70,23 +67,15 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, else: results = set() - base_sql = ( - "SELECT auth_id FROM event_auth WHERE event_id IN (%s)" - ) + base_sql = "SELECT auth_id FROM event_auth WHERE event_id IN (%s)" front = set(event_ids) while front: new_front = set() front_list = list(front) - chunks = [ - front_list[x:x + 100] - for x in range(0, len(front), 100) - ] + chunks = [front_list[x : x + 100] for x in range(0, len(front), 100)] for chunk in chunks: - txn.execute( - base_sql % (",".join(["?"] * len(chunk)),), - chunk - ) + txn.execute(base_sql % (",".join(["?"] * len(chunk)),), chunk) new_front.update([r[0] for r in txn]) new_front -= results @@ -98,9 +87,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, def get_oldest_events_in_room(self, room_id): return self.runInteraction( - "get_oldest_events_in_room", - self._get_oldest_events_in_room_txn, - room_id, + "get_oldest_events_in_room", self._get_oldest_events_in_room_txn, room_id ) def get_oldest_events_with_depth_in_room(self, room_id): @@ -121,7 +108,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, " GROUP BY b.event_id" ) - txn.execute(sql, (room_id, False,)) + txn.execute(sql, (room_id, False)) return dict(txn) @@ -152,9 +139,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, return self._simple_select_onecol_txn( txn, table="event_backward_extremities", - keyvalues={ - "room_id": room_id, - }, + keyvalues={"room_id": room_id}, retcol="event_id", ) @@ -209,9 +194,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, def get_latest_event_ids_in_room(self, room_id): return self._simple_select_onecol( table="event_forward_extremities", - keyvalues={ - "room_id": room_id, - }, + keyvalues={"room_id": room_id}, retcol="event_id", desc="get_latest_event_ids_in_room", ) @@ -225,14 +208,13 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, "WHERE f.room_id = ?" ) - txn.execute(sql, (room_id, )) + txn.execute(sql, (room_id,)) results = [] for event_id, depth in txn.fetchall(): hashes = self._get_event_reference_hashes_txn(txn, event_id) prev_hashes = { - k: encode_base64(v) for k, v in hashes.items() - if k == "sha256" + k: encode_base64(v) for k, v in hashes.items() if k == "sha256" } results.append((event_id, prev_hashes, depth)) @@ -242,9 +224,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, """ For hte given room, get the minimum depth we have seen for it. """ return self.runInteraction( - "get_min_depth", - self._get_min_depth_interaction, - room_id, + "get_min_depth", self._get_min_depth_interaction, room_id ) def _get_min_depth_interaction(self, txn, room_id): @@ -300,7 +280,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, if stream_ordering <= self.stream_ordering_month_ago: raise StoreError(400, "stream_ordering too old") - sql = (""" + sql = """ SELECT event_id FROM stream_ordering_to_exterm INNER JOIN ( SELECT room_id, MAX(stream_ordering) AS stream_ordering @@ -308,15 +288,14 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, WHERE stream_ordering <= ? GROUP BY room_id ) AS rms USING (room_id, stream_ordering) WHERE room_id = ? - """) + """ def get_forward_extremeties_for_room_txn(txn): txn.execute(sql, (stream_ordering, room_id)) return [event_id for event_id, in txn] return self.runInteraction( - "get_forward_extremeties_for_room", - get_forward_extremeties_for_room_txn + "get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn ) def get_backfill_events(self, room_id, event_list, limit): @@ -329,19 +308,21 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, event_list (list) limit (int) """ - return self.runInteraction( - "get_backfill_events", - self._get_backfill_events, room_id, event_list, limit - ).addCallback( - self._get_events - ).addCallback( - lambda l: sorted(l, key=lambda e: -e.depth) + return ( + self.runInteraction( + "get_backfill_events", + self._get_backfill_events, + room_id, + event_list, + limit, + ) + .addCallback(self._get_events) + .addCallback(lambda l: sorted(l, key=lambda e: -e.depth)) ) def _get_backfill_events(self, txn, room_id, event_list, limit): logger.debug( - "_get_backfill_events: %s, %s, %s", - room_id, repr(event_list), limit + "_get_backfill_events: %s, %s, %s", room_id, repr(event_list), limit ) event_results = set() @@ -364,10 +345,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, depth = self._simple_select_one_onecol_txn( txn, table="events", - keyvalues={ - "event_id": event_id, - "room_id": room_id, - }, + keyvalues={"event_id": event_id, "room_id": room_id}, retcol="depth", allow_none=True, ) @@ -386,10 +364,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, event_results.add(event_id) - txn.execute( - query, - (event_id, False, limit - len(event_results)) - ) + txn.execute(query, (event_id, False, limit - len(event_results))) for row in txn: if row[1] not in event_results: @@ -398,18 +373,19 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, return event_results @defer.inlineCallbacks - def get_missing_events(self, room_id, earliest_events, latest_events, - limit): + def get_missing_events(self, room_id, earliest_events, latest_events, limit): ids = yield self.runInteraction( "get_missing_events", self._get_missing_events, - room_id, earliest_events, latest_events, limit, + room_id, + earliest_events, + latest_events, + limit, ) events = yield self._get_events(ids) defer.returnValue(events) - def _get_missing_events(self, txn, room_id, earliest_events, latest_events, - limit): + def _get_missing_events(self, txn, room_id, earliest_events, latest_events, limit): seen_events = set(earliest_events) front = set(latest_events) - seen_events @@ -425,8 +401,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, new_front = set() for event_id in front: txn.execute( - query, - (room_id, event_id, False, limit - len(event_results)) + query, (room_id, event_id, False, limit - len(event_results)) ) new_results = set(t[0] for t in txn) - seen_events @@ -457,12 +432,10 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, column="prev_event_id", iterable=event_ids, retcols=("event_id",), - desc="get_successor_events" + desc="get_successor_events", ) - defer.returnValue([ - row["event_id"] for row in rows - ]) + defer.returnValue([row["event_id"] for row in rows]) class EventFederationStore(EventFederationWorkerStore): @@ -481,12 +454,11 @@ class EventFederationStore(EventFederationWorkerStore): super(EventFederationStore, self).__init__(db_conn, hs) self.register_background_update_handler( - self.EVENT_AUTH_STATE_ONLY, - self._background_delete_non_state_event_auth, + self.EVENT_AUTH_STATE_ONLY, self._background_delete_non_state_event_auth ) hs.get_clock().looping_call( - self._delete_old_forward_extrem_cache, 60 * 60 * 1000, + self._delete_old_forward_extrem_cache, 60 * 60 * 1000 ) def _update_min_depth_for_room_txn(self, txn, room_id, depth): @@ -498,12 +470,8 @@ class EventFederationStore(EventFederationWorkerStore): self._simple_upsert_txn( txn, table="room_depth", - keyvalues={ - "room_id": room_id, - }, - values={ - "min_depth": depth, - }, + keyvalues={"room_id": room_id}, + values={"min_depth": depth}, ) def _handle_mult_prev_events(self, txn, events): @@ -553,11 +521,15 @@ class EventFederationStore(EventFederationWorkerStore): " )" ) - txn.executemany(query, [ - (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False) - for ev in events for e_id in ev.prev_event_ids() - if not ev.internal_metadata.is_outlier() - ]) + txn.executemany( + query, + [ + (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False) + for ev in events + for e_id in ev.prev_event_ids() + if not ev.internal_metadata.is_outlier() + ], + ) query = ( "DELETE FROM event_backward_extremities" @@ -566,16 +538,17 @@ class EventFederationStore(EventFederationWorkerStore): txn.executemany( query, [ - (ev.event_id, ev.room_id) for ev in events + (ev.event_id, ev.room_id) + for ev in events if not ev.internal_metadata.is_outlier() - ] + ], ) def _delete_old_forward_extrem_cache(self): def _delete_old_forward_extrem_cache_txn(txn): # Delete entries older than a month, while making sure we don't delete # the only entries for a room. - sql = (""" + sql = """ DELETE FROM stream_ordering_to_exterm WHERE room_id IN ( @@ -583,11 +556,11 @@ class EventFederationStore(EventFederationWorkerStore): FROM stream_ordering_to_exterm WHERE stream_ordering > ? ) AND stream_ordering < ? - """) + """ txn.execute( - sql, - (self.stream_ordering_month_ago, self.stream_ordering_month_ago,) + sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago) ) + return run_as_background_process( "delete_old_forward_extrem_cache", self.runInteraction, @@ -597,9 +570,7 @@ class EventFederationStore(EventFederationWorkerStore): def clean_room_for_join(self, room_id): return self.runInteraction( - "clean_room_for_join", - self._clean_room_for_join_txn, - room_id, + "clean_room_for_join", self._clean_room_for_join_txn, room_id ) def _clean_room_for_join_txn(self, txn, room_id): @@ -635,7 +606,7 @@ class EventFederationStore(EventFederationWorkerStore): ) """ - txn.execute(sql, (min_stream_id, max_stream_id,)) + txn.execute(sql, (min_stream_id, max_stream_id)) new_progress = { "target_min_stream_id_inclusive": target_min_stream_id, |