diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index ad17123915..fd25c8112d 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018-2019 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
@@ -171,7 +170,7 @@ class PersistEventsStore:
)
async with stream_ordering_manager as stream_orderings:
- for (event, context), stream in zip(events_and_contexts, stream_orderings):
+ for (event, _), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream
await self.db_pool.runInteraction(
@@ -298,7 +297,7 @@ class PersistEventsStore:
txn.execute(sql + clause, args)
to_recursively_check = []
- for event_id, prev_event_id, metadata, rejected in txn:
+ for _, prev_event_id, metadata, rejected in txn:
if prev_event_id in existing_prevs:
continue
@@ -1128,7 +1127,7 @@ class PersistEventsStore:
def _update_forward_extremities_txn(
self, txn, new_forward_extremities, max_stream_order
):
- for room_id, new_extrem in new_forward_extremities.items():
+ for room_id in new_forward_extremities.keys():
self.db_pool.simple_delete_txn(
txn, table="event_forward_extremities", keyvalues={"room_id": room_id}
)
@@ -1379,24 +1378,28 @@ class PersistEventsStore:
],
)
- for event, _ in events_and_contexts:
- if not event.internal_metadata.is_redacted():
- # If we're persisting an unredacted event we go and ensure
- # that we mark any redactions that reference this event as
- # requiring censoring.
- self.db_pool.simple_update_txn(
- txn,
- table="redactions",
- keyvalues={"redacts": event.event_id},
- updatevalues={"have_censored": False},
+ # If we're persisting an unredacted event we go and ensure
+ # that we mark any redactions that reference this event as
+ # requiring censoring.
+ sql = "UPDATE redactions SET have_censored = ? WHERE redacts = ?"
+ txn.execute_batch(
+ sql,
+ (
+ (
+ False,
+ event.event_id,
)
+ for event, _ in events_and_contexts
+ if not event.internal_metadata.is_redacted()
+ ),
+ )
state_events_and_contexts = [
ec for ec in events_and_contexts if ec[0].is_state()
]
state_values = []
- for event, context in state_events_and_contexts:
+ for event, _ in state_events_and_contexts:
vals = {
"event_id": event.event_id,
"room_id": event.room_id,
@@ -1465,7 +1468,7 @@ class PersistEventsStore:
# nothing to do here
return
- for event, context in events_and_contexts:
+ for event, _ in events_and_contexts:
if event.type == EventTypes.Redaction and event.redacts is not None:
# Remove the entries in the event_push_actions table for the
# redacted event.
@@ -1882,20 +1885,28 @@ class PersistEventsStore:
),
)
- for event, _ in events_and_contexts:
- user_ids = self.db_pool.simple_select_onecol_txn(
- txn,
- table="event_push_actions_staging",
- keyvalues={"event_id": event.event_id},
- retcol="user_id",
- )
+ room_to_event_ids = {} # type: Dict[str, List[str]]
+ for e, _ in events_and_contexts:
+ room_to_event_ids.setdefault(e.room_id, []).append(e.event_id)
- for uid in user_ids:
- txn.call_after(
- self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many,
- (event.room_id, uid),
+ for room_id, event_ids in room_to_event_ids.items():
+ rows = self.db_pool.simple_select_many_txn(
+ txn,
+ table="event_push_actions_staging",
+ column="event_id",
+ iterable=event_ids,
+ keyvalues={},
+ retcols=("user_id",),
)
+ user_ids = {row["user_id"] for row in rows}
+
+ for user_id in user_ids:
+ txn.call_after(
+ self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+ (room_id, user_id),
+ )
+
# Now we delete the staging area for *all* events that were being
# persisted.
txn.execute_batch(
|