diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index a8d90456e3..956f876572 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -32,8 +32,7 @@ from synapse.util.caches.descriptors import cached
logger = logging.getLogger(__name__)
-class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
- SQLBaseStore):
+class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBaseStore):
def get_auth_chain(self, event_ids, include_given=False):
"""Get auth events for given event_ids. The events *must* be state events.
@@ -45,7 +44,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
list of events
"""
return self.get_auth_chain_ids(
- event_ids, include_given=include_given,
+ event_ids, include_given=include_given
).addCallback(self._get_events)
def get_auth_chain_ids(self, event_ids, include_given=False):
@@ -59,9 +58,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
list of event_ids
"""
return self.runInteraction(
- "get_auth_chain_ids",
- self._get_auth_chain_ids_txn,
- event_ids, include_given
+ "get_auth_chain_ids", self._get_auth_chain_ids_txn, event_ids, include_given
)
def _get_auth_chain_ids_txn(self, txn, event_ids, include_given):
@@ -70,23 +67,15 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
else:
results = set()
- base_sql = (
- "SELECT auth_id FROM event_auth WHERE event_id IN (%s)"
- )
+ base_sql = "SELECT auth_id FROM event_auth WHERE event_id IN (%s)"
front = set(event_ids)
while front:
new_front = set()
front_list = list(front)
- chunks = [
- front_list[x:x + 100]
- for x in range(0, len(front), 100)
- ]
+ chunks = [front_list[x : x + 100] for x in range(0, len(front), 100)]
for chunk in chunks:
- txn.execute(
- base_sql % (",".join(["?"] * len(chunk)),),
- chunk
- )
+ txn.execute(base_sql % (",".join(["?"] * len(chunk)),), chunk)
new_front.update([r[0] for r in txn])
new_front -= results
@@ -98,9 +87,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
def get_oldest_events_in_room(self, room_id):
return self.runInteraction(
- "get_oldest_events_in_room",
- self._get_oldest_events_in_room_txn,
- room_id,
+ "get_oldest_events_in_room", self._get_oldest_events_in_room_txn, room_id
)
def get_oldest_events_with_depth_in_room(self, room_id):
@@ -121,7 +108,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
" GROUP BY b.event_id"
)
- txn.execute(sql, (room_id, False,))
+ txn.execute(sql, (room_id, False))
return dict(txn)
@@ -152,9 +139,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
return self._simple_select_onecol_txn(
txn,
table="event_backward_extremities",
- keyvalues={
- "room_id": room_id,
- },
+ keyvalues={"room_id": room_id},
retcol="event_id",
)
@@ -209,9 +194,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
def get_latest_event_ids_in_room(self, room_id):
return self._simple_select_onecol(
table="event_forward_extremities",
- keyvalues={
- "room_id": room_id,
- },
+ keyvalues={"room_id": room_id},
retcol="event_id",
desc="get_latest_event_ids_in_room",
)
@@ -225,14 +208,13 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
"WHERE f.room_id = ?"
)
- txn.execute(sql, (room_id, ))
+ txn.execute(sql, (room_id,))
results = []
for event_id, depth in txn.fetchall():
hashes = self._get_event_reference_hashes_txn(txn, event_id)
prev_hashes = {
- k: encode_base64(v) for k, v in hashes.items()
- if k == "sha256"
+ k: encode_base64(v) for k, v in hashes.items() if k == "sha256"
}
results.append((event_id, prev_hashes, depth))
@@ -242,9 +224,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
""" For hte given room, get the minimum depth we have seen for it.
"""
return self.runInteraction(
- "get_min_depth",
- self._get_min_depth_interaction,
- room_id,
+ "get_min_depth", self._get_min_depth_interaction, room_id
)
def _get_min_depth_interaction(self, txn, room_id):
@@ -300,7 +280,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
if stream_ordering <= self.stream_ordering_month_ago:
raise StoreError(400, "stream_ordering too old")
- sql = ("""
+ sql = """
SELECT event_id FROM stream_ordering_to_exterm
INNER JOIN (
SELECT room_id, MAX(stream_ordering) AS stream_ordering
@@ -308,15 +288,14 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
WHERE stream_ordering <= ? GROUP BY room_id
) AS rms USING (room_id, stream_ordering)
WHERE room_id = ?
- """)
+ """
def get_forward_extremeties_for_room_txn(txn):
txn.execute(sql, (stream_ordering, room_id))
return [event_id for event_id, in txn]
return self.runInteraction(
- "get_forward_extremeties_for_room",
- get_forward_extremeties_for_room_txn
+ "get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn
)
def get_backfill_events(self, room_id, event_list, limit):
@@ -329,19 +308,21 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
event_list (list)
limit (int)
"""
- return self.runInteraction(
- "get_backfill_events",
- self._get_backfill_events, room_id, event_list, limit
- ).addCallback(
- self._get_events
- ).addCallback(
- lambda l: sorted(l, key=lambda e: -e.depth)
+ return (
+ self.runInteraction(
+ "get_backfill_events",
+ self._get_backfill_events,
+ room_id,
+ event_list,
+ limit,
+ )
+ .addCallback(self._get_events)
+ .addCallback(lambda l: sorted(l, key=lambda e: -e.depth))
)
def _get_backfill_events(self, txn, room_id, event_list, limit):
logger.debug(
- "_get_backfill_events: %s, %s, %s",
- room_id, repr(event_list), limit
+ "_get_backfill_events: %s, %s, %s", room_id, repr(event_list), limit
)
event_results = set()
@@ -364,10 +345,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
depth = self._simple_select_one_onecol_txn(
txn,
table="events",
- keyvalues={
- "event_id": event_id,
- "room_id": room_id,
- },
+ keyvalues={"event_id": event_id, "room_id": room_id},
retcol="depth",
allow_none=True,
)
@@ -386,10 +364,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
event_results.add(event_id)
- txn.execute(
- query,
- (event_id, False, limit - len(event_results))
- )
+ txn.execute(query, (event_id, False, limit - len(event_results)))
for row in txn:
if row[1] not in event_results:
@@ -398,18 +373,19 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
return event_results
@defer.inlineCallbacks
- def get_missing_events(self, room_id, earliest_events, latest_events,
- limit):
+ def get_missing_events(self, room_id, earliest_events, latest_events, limit):
ids = yield self.runInteraction(
"get_missing_events",
self._get_missing_events,
- room_id, earliest_events, latest_events, limit,
+ room_id,
+ earliest_events,
+ latest_events,
+ limit,
)
events = yield self._get_events(ids)
defer.returnValue(events)
- def _get_missing_events(self, txn, room_id, earliest_events, latest_events,
- limit):
+ def _get_missing_events(self, txn, room_id, earliest_events, latest_events, limit):
seen_events = set(earliest_events)
front = set(latest_events) - seen_events
@@ -425,8 +401,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
new_front = set()
for event_id in front:
txn.execute(
- query,
- (room_id, event_id, False, limit - len(event_results))
+ query, (room_id, event_id, False, limit - len(event_results))
)
new_results = set(t[0] for t in txn) - seen_events
@@ -457,12 +432,10 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
column="prev_event_id",
iterable=event_ids,
retcols=("event_id",),
- desc="get_successor_events"
+ desc="get_successor_events",
)
- defer.returnValue([
- row["event_id"] for row in rows
- ])
+ defer.returnValue([row["event_id"] for row in rows])
class EventFederationStore(EventFederationWorkerStore):
@@ -481,12 +454,11 @@ class EventFederationStore(EventFederationWorkerStore):
super(EventFederationStore, self).__init__(db_conn, hs)
self.register_background_update_handler(
- self.EVENT_AUTH_STATE_ONLY,
- self._background_delete_non_state_event_auth,
+ self.EVENT_AUTH_STATE_ONLY, self._background_delete_non_state_event_auth
)
hs.get_clock().looping_call(
- self._delete_old_forward_extrem_cache, 60 * 60 * 1000,
+ self._delete_old_forward_extrem_cache, 60 * 60 * 1000
)
def _update_min_depth_for_room_txn(self, txn, room_id, depth):
@@ -498,12 +470,8 @@ class EventFederationStore(EventFederationWorkerStore):
self._simple_upsert_txn(
txn,
table="room_depth",
- keyvalues={
- "room_id": room_id,
- },
- values={
- "min_depth": depth,
- },
+ keyvalues={"room_id": room_id},
+ values={"min_depth": depth},
)
def _handle_mult_prev_events(self, txn, events):
@@ -553,11 +521,15 @@ class EventFederationStore(EventFederationWorkerStore):
" )"
)
- txn.executemany(query, [
- (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
- for ev in events for e_id in ev.prev_event_ids()
- if not ev.internal_metadata.is_outlier()
- ])
+ txn.executemany(
+ query,
+ [
+ (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
+ for ev in events
+ for e_id in ev.prev_event_ids()
+ if not ev.internal_metadata.is_outlier()
+ ],
+ )
query = (
"DELETE FROM event_backward_extremities"
@@ -566,16 +538,17 @@ class EventFederationStore(EventFederationWorkerStore):
txn.executemany(
query,
[
- (ev.event_id, ev.room_id) for ev in events
+ (ev.event_id, ev.room_id)
+ for ev in events
if not ev.internal_metadata.is_outlier()
- ]
+ ],
)
def _delete_old_forward_extrem_cache(self):
def _delete_old_forward_extrem_cache_txn(txn):
# Delete entries older than a month, while making sure we don't delete
# the only entries for a room.
- sql = ("""
+ sql = """
DELETE FROM stream_ordering_to_exterm
WHERE
room_id IN (
@@ -583,11 +556,11 @@ class EventFederationStore(EventFederationWorkerStore):
FROM stream_ordering_to_exterm
WHERE stream_ordering > ?
) AND stream_ordering < ?
- """)
+ """
txn.execute(
- sql,
- (self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
+ sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago)
)
+
return run_as_background_process(
"delete_old_forward_extrem_cache",
self.runInteraction,
@@ -597,9 +570,7 @@ class EventFederationStore(EventFederationWorkerStore):
def clean_room_for_join(self, room_id):
return self.runInteraction(
- "clean_room_for_join",
- self._clean_room_for_join_txn,
- room_id,
+ "clean_room_for_join", self._clean_room_for_join_txn, room_id
)
def _clean_room_for_join_txn(self, txn, room_id):
@@ -635,7 +606,7 @@ class EventFederationStore(EventFederationWorkerStore):
)
"""
- txn.execute(sql, (min_stream_id, max_stream_id,))
+ txn.execute(sql, (min_stream_id, max_stream_id))
new_progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
|