summary refs log tree commit diff
path: root/synapse/storage/databases/main/events_bg_updates.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/events_bg_updates.py')
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py69
1 files changed, 37 insertions, 32 deletions
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py

index daef3685b0..0061805150 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -369,18 +369,20 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)] for chunk in chunks: - ev_rows = self.db_pool.simple_select_many_txn( - txn, - table="event_json", - column="event_id", - iterable=chunk, - retcols=["event_id", "json"], - keyvalues={}, + ev_rows = cast( + List[Tuple[str, str]], + self.db_pool.simple_select_many_txn( + txn, + table="event_json", + column="event_id", + iterable=chunk, + retcols=["event_id", "json"], + keyvalues={}, + ), ) - for row in ev_rows: - event_id = row["event_id"] - event_json = db_to_json(row["json"]) + for event_id, json in ev_rows: + event_json = db_to_json(json) try: origin_server_ts = event_json["origin_server_ts"] except (KeyError, AttributeError): @@ -563,15 +565,18 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): if deleted: # We now need to invalidate the caches of these rooms - rows = self.db_pool.simple_select_many_txn( - txn, - table="events", - column="event_id", - iterable=to_delete, - keyvalues={}, - retcols=("room_id",), + rows = cast( + List[Tuple[str]], + self.db_pool.simple_select_many_txn( + txn, + table="events", + column="event_id", + iterable=to_delete, + keyvalues={}, + retcols=("room_id",), + ), ) - room_ids = {row["room_id"] for row in rows} + room_ids = {row[0] for row in rows} for room_id in room_ids: txn.call_after( self.get_latest_event_ids_in_room.invalidate, (room_id,) # type: ignore[attr-defined] @@ -1038,18 +1043,21 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): count = len(rows) # We also need to fetch the auth events for them. - auth_events = self.db_pool.simple_select_many_txn( - txn, - table="event_auth", - column="event_id", - iterable=event_to_room_id, - keyvalues={}, - retcols=("event_id", "auth_id"), + auth_events = cast( + List[Tuple[str, str]], + self.db_pool.simple_select_many_txn( + txn, + table="event_auth", + column="event_id", + iterable=event_to_room_id, + keyvalues={}, + retcols=("event_id", "auth_id"), + ), ) event_to_auth_chain: Dict[str, List[str]] = {} - for row in auth_events: - event_to_auth_chain.setdefault(row["event_id"], []).append(row["auth_id"]) + for event_id, auth_id in auth_events: + event_to_auth_chain.setdefault(event_id, []).append(auth_id) # Calculate and persist the chain cover index for this set of events. # @@ -1302,12 +1310,9 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): # ANALYZE the new column to build stats on it, to encourage PostgreSQL to use the # indexes on it. - # We need to pass execute a dummy function to handle the txn's result otherwise - # it tries to call fetchall() on it and fails because there's no result to fetch. - await self.db_pool.execute( + await self.db_pool.runInteraction( "background_analyze_new_stream_ordering_column", - lambda txn: None, - "ANALYZE events(stream_ordering2)", + lambda txn: txn.execute("ANALYZE events(stream_ordering2)"), ) await self.db_pool.runInteraction(