diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index 98ae69e996..d644c82784 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -38,7 +38,6 @@ from synapse.logging.utils import log_function
from synapse.metrics import BucketCollector
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import make_in_list_sql_clause
-from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.data_stores.main.event_federation import EventFederationStore
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
from synapse.storage.data_stores.main.state import StateGroupWorkerStore
@@ -94,10 +93,7 @@ def _retry_on_integrity_error(func):
# inherits from EventFederationStore so that we can call _update_backward_extremities
# and _handle_mult_prev_events (though arguably those could both be moved in here)
class EventsStore(
- StateGroupWorkerStore,
- EventFederationStore,
- EventsWorkerStore,
- BackgroundUpdateStore,
+ StateGroupWorkerStore, EventFederationStore, EventsWorkerStore,
):
def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs)
@@ -143,7 +139,7 @@ class EventsStore(
)
return txn.fetchall()
- res = yield self.runInteraction("read_forward_extremities", fetch)
+ res = yield self.db.runInteraction("read_forward_extremities", fetch)
self._current_forward_extremities_amount = c_counter(list(x[0] for x in res))
@_retry_on_integrity_error
@@ -208,7 +204,7 @@ class EventsStore(
for (event, context), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream
- yield self.runInteraction(
+ yield self.db.runInteraction(
"persist_events",
self._persist_events_txn,
events_and_contexts=events_and_contexts,
@@ -281,7 +277,7 @@ class EventsStore(
results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed"))
for chunk in batch_iter(event_ids, 100):
- yield self.runInteraction(
+ yield self.db.runInteraction(
"_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
)
@@ -345,7 +341,7 @@ class EventsStore(
existing_prevs.add(prev_event_id)
for chunk in batch_iter(event_ids, 100):
- yield self.runInteraction(
+ yield self.db.runInteraction(
"_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
)
@@ -432,7 +428,7 @@ class EventsStore(
# event's auth chain, but its easier for now just to store them (and
# it doesn't take much storage compared to storing the entire event
# anyway).
- self.simple_insert_many_txn(
+ self.db.simple_insert_many_txn(
txn,
table="event_auth",
values=[
@@ -580,12 +576,12 @@ class EventsStore(
self, txn, new_forward_extremities, max_stream_order
):
for room_id, new_extrem in iteritems(new_forward_extremities):
- self.simple_delete_txn(
+ self.db.simple_delete_txn(
txn, table="event_forward_extremities", keyvalues={"room_id": room_id}
)
txn.call_after(self.get_latest_event_ids_in_room.invalidate, (room_id,))
- self.simple_insert_many_txn(
+ self.db.simple_insert_many_txn(
txn,
table="event_forward_extremities",
values=[
@@ -598,7 +594,7 @@ class EventsStore(
# new stream_ordering to new forward extremeties in the room.
# This allows us to later efficiently look up the forward extremeties
# for a room before a given stream_ordering
- self.simple_insert_many_txn(
+ self.db.simple_insert_many_txn(
txn,
table="stream_ordering_to_exterm",
values=[
@@ -722,7 +718,7 @@ class EventsStore(
# change in outlier status to our workers.
stream_order = event.internal_metadata.stream_ordering
state_group_id = context.state_group
- self.simple_insert_txn(
+ self.db.simple_insert_txn(
txn,
table="ex_outlier_stream",
values={
@@ -794,7 +790,7 @@ class EventsStore(
d.pop("redacted_because", None)
return d
- self.simple_insert_many_txn(
+ self.db.simple_insert_many_txn(
txn,
table="event_json",
values=[
@@ -811,7 +807,7 @@ class EventsStore(
],
)
- self.simple_insert_many_txn(
+ self.db.simple_insert_many_txn(
txn,
table="events",
values=[
@@ -841,7 +837,7 @@ class EventsStore(
# If we're persisting an unredacted event we go and ensure
# that we mark any redactions that reference this event as
# requiring censoring.
- self.simple_update_txn(
+ self.db.simple_update_txn(
txn,
table="redactions",
keyvalues={"redacts": event.event_id},
@@ -983,7 +979,7 @@ class EventsStore(
state_values.append(vals)
- self.simple_insert_many_txn(txn, table="state_events", values=state_values)
+ self.db.simple_insert_many_txn(txn, table="state_events", values=state_values)
# Prefill the event cache
self._add_to_cache(txn, events_and_contexts)
@@ -1014,7 +1010,7 @@ class EventsStore(
)
txn.execute(sql + clause, args)
- rows = self.cursor_to_dict(txn)
+ rows = self.db.cursor_to_dict(txn)
for row in rows:
event = ev_map[row["event_id"]]
if not row["rejects"] and not row["redacts"]:
@@ -1032,7 +1028,7 @@ class EventsStore(
# invalidate the cache for the redacted event
txn.call_after(self._invalidate_get_event_cache, event.redacts)
- self.simple_insert_txn(
+ self.db.simple_insert_txn(
txn,
table="redactions",
values={
@@ -1077,7 +1073,9 @@ class EventsStore(
LIMIT ?
"""
- rows = yield self.execute("_censor_redactions_fetch", None, sql, before_ts, 100)
+ rows = yield self.db.execute(
+ "_censor_redactions_fetch", None, sql, before_ts, 100
+ )
updates = []
@@ -1109,14 +1107,14 @@ class EventsStore(
if pruned_json:
self._censor_event_txn(txn, event_id, pruned_json)
- self.simple_update_one_txn(
+ self.db.simple_update_one_txn(
txn,
table="redactions",
keyvalues={"event_id": redaction_id},
updatevalues={"have_censored": True},
)
- yield self.runInteraction("_update_censor_txn", _update_censor_txn)
+ yield self.db.runInteraction("_update_censor_txn", _update_censor_txn)
def _censor_event_txn(self, txn, event_id, pruned_json):
"""Censor an event by replacing its JSON in the event_json table with the
@@ -1127,7 +1125,7 @@ class EventsStore(
event_id (str): The ID of the event to censor.
pruned_json (str): The pruned JSON
"""
- self.simple_update_one_txn(
+ self.db.simple_update_one_txn(
txn,
table="event_json",
keyvalues={"event_id": event_id},
@@ -1153,7 +1151,7 @@ class EventsStore(
(count,) = txn.fetchone()
return count
- ret = yield self.runInteraction("count_messages", _count_messages)
+ ret = yield self.db.runInteraction("count_messages", _count_messages)
return ret
@defer.inlineCallbacks
@@ -1174,7 +1172,7 @@ class EventsStore(
(count,) = txn.fetchone()
return count
- ret = yield self.runInteraction("count_daily_sent_messages", _count_messages)
+ ret = yield self.db.runInteraction("count_daily_sent_messages", _count_messages)
return ret
@defer.inlineCallbacks
@@ -1189,7 +1187,7 @@ class EventsStore(
(count,) = txn.fetchone()
return count
- ret = yield self.runInteraction("count_daily_active_rooms", _count)
+ ret = yield self.db.runInteraction("count_daily_active_rooms", _count)
return ret
def get_current_backfill_token(self):
@@ -1241,7 +1239,7 @@ class EventsStore(
return new_event_updates
- return self.runInteraction(
+ return self.db.runInteraction(
"get_all_new_forward_event_rows", get_all_new_forward_event_rows
)
@@ -1286,7 +1284,7 @@ class EventsStore(
return new_event_updates
- return self.runInteraction(
+ return self.db.runInteraction(
"get_all_new_backfill_event_rows", get_all_new_backfill_event_rows
)
@@ -1379,7 +1377,7 @@ class EventsStore(
backward_ex_outliers,
)
- return self.runInteraction("get_all_new_events", get_all_new_events_txn)
+ return self.db.runInteraction("get_all_new_events", get_all_new_events_txn)
def purge_history(self, room_id, token, delete_local_events):
"""Deletes room history before a certain point
@@ -1399,7 +1397,7 @@ class EventsStore(
deleted events.
"""
- return self.runInteraction(
+ return self.db.runInteraction(
"purge_history",
self._purge_history_txn,
room_id,
@@ -1647,7 +1645,7 @@ class EventsStore(
Deferred[List[int]]: The list of state groups to delete.
"""
- return self.runInteraction("purge_room", self._purge_room_txn, room_id)
+ return self.db.runInteraction("purge_room", self._purge_room_txn, room_id)
def _purge_room_txn(self, txn, room_id):
# First we fetch all the state groups that should be deleted, before
@@ -1766,7 +1764,7 @@ class EventsStore(
to delete.
"""
- return self.runInteraction(
+ return self.db.runInteraction(
"purge_unreferenced_state_groups",
self._purge_unreferenced_state_groups,
room_id,
@@ -1778,7 +1776,7 @@ class EventsStore(
"[purge] found %i state groups to delete", len(state_groups_to_delete)
)
- rows = self.simple_select_many_txn(
+ rows = self.db.simple_select_many_txn(
txn,
table="state_group_edges",
column="prev_state_group",
@@ -1805,15 +1803,15 @@ class EventsStore(
curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
curr_state = curr_state[sg]
- self.simple_delete_txn(
+ self.db.simple_delete_txn(
txn, table="state_groups_state", keyvalues={"state_group": sg}
)
- self.simple_delete_txn(
+ self.db.simple_delete_txn(
txn, table="state_group_edges", keyvalues={"state_group": sg}
)
- self.simple_insert_many_txn(
+ self.db.simple_insert_many_txn(
txn,
table="state_groups_state",
values=[
@@ -1850,7 +1848,7 @@ class EventsStore(
state group.
"""
- rows = yield self.simple_select_many_batch(
+ rows = yield self.db.simple_select_many_batch(
table="state_group_edges",
column="prev_state_group",
iterable=state_groups,
@@ -1869,7 +1867,7 @@ class EventsStore(
state_groups_to_delete (list[int]): State groups to delete
"""
- return self.runInteraction(
+ return self.db.runInteraction(
"purge_room_state",
self._purge_room_state_txn,
room_id,
@@ -1880,7 +1878,7 @@ class EventsStore(
# first we have to delete the state groups states
logger.info("[purge] removing %s from state_groups_state", room_id)
- self.simple_delete_many_txn(
+ self.db.simple_delete_many_txn(
txn,
table="state_groups_state",
column="state_group",
@@ -1891,7 +1889,7 @@ class EventsStore(
# ... and the state group edges
logger.info("[purge] removing %s from state_group_edges", room_id)
- self.simple_delete_many_txn(
+ self.db.simple_delete_many_txn(
txn,
table="state_group_edges",
column="state_group",
@@ -1902,7 +1900,7 @@ class EventsStore(
# ... and the state groups
logger.info("[purge] removing %s from state_groups", room_id)
- self.simple_delete_many_txn(
+ self.db.simple_delete_many_txn(
txn,
table="state_groups",
column="id",
@@ -1919,7 +1917,7 @@ class EventsStore(
@cachedInlineCallbacks(max_entries=5000)
def _get_event_ordering(self, event_id):
- res = yield self.simple_select_one(
+ res = yield self.db.simple_select_one(
table="events",
retcols=["topological_ordering", "stream_ordering"],
keyvalues={"event_id": event_id},
@@ -1942,7 +1940,7 @@ class EventsStore(
txn.execute(sql, (from_token, to_token, limit))
return txn.fetchall()
- return self.runInteraction(
+ return self.db.runInteraction(
"get_all_updated_current_state_deltas",
get_all_updated_current_state_deltas_txn,
)
@@ -1960,7 +1958,7 @@ class EventsStore(
room_id (str): The ID of the room the event was sent to.
topological_ordering (int): The position of the event in the room's topology.
"""
- return self.simple_insert_many_txn(
+ return self.db.simple_insert_many_txn(
txn=txn,
table="event_labels",
values=[
@@ -1982,7 +1980,7 @@ class EventsStore(
event_id (str): The event ID the expiry timestamp is associated with.
expiry_ts (int): The timestamp at which to expire (delete) the event.
"""
- return self.simple_insert_txn(
+ return self.db.simple_insert_txn(
txn=txn,
table="event_expiry",
values={"event_id": event_id, "expiry_ts": expiry_ts},
@@ -2031,7 +2029,7 @@ class EventsStore(
txn, "_get_event_cache", (event.event_id,)
)
- yield self.runInteraction("delete_expired_event", delete_expired_event_txn)
+ yield self.db.runInteraction("delete_expired_event", delete_expired_event_txn)
def _delete_event_expiry_txn(self, txn, event_id):
"""Delete the expiry timestamp associated with an event ID without deleting the
@@ -2041,7 +2039,7 @@ class EventsStore(
txn (LoggingTransaction): The transaction to use to perform the deletion.
event_id (str): The event ID to delete the associated expiry timestamp of.
"""
- return self.simple_delete_txn(
+ return self.db.simple_delete_txn(
txn=txn, table="event_expiry", keyvalues={"event_id": event_id}
)
@@ -2065,7 +2063,7 @@ class EventsStore(
return txn.fetchone()
- return self.runInteraction(
+ return self.db.runInteraction(
desc="get_next_event_to_expire", func=get_next_event_to_expire_txn
)
|