diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index abfc56b061..b15c37679b 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -36,6 +36,10 @@ class SQLBaseStore(metaclass=ABCMeta):
per data store (and not one per physical database).
"""
+ # if set to False, we will query the `state_events` and `rejections` tables when
+ # fetching event data. When True, we rely on it all being in the `events` table.
+ STATE_KEY_IN_EVENTS = False
+
def __init__(
self,
database: DatabasePool,
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 2ff3d21305..6456a951bf 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -257,17 +257,30 @@ class PersistEventsStore:
def _get_events_which_are_prevs_txn(
txn: LoggingTransaction, batch: Collection[str]
) -> None:
- sql = """
- SELECT prev_event_id, internal_metadata
- FROM event_edges
- INNER JOIN events USING (event_id)
- LEFT JOIN rejections USING (event_id)
- LEFT JOIN event_json USING (event_id)
- WHERE
- NOT events.outlier
- AND rejections.event_id IS NULL
- AND
- """
+ if self.store.STATE_KEY_IN_EVENTS:
+ sql = """
+ SELECT prev_event_id, internal_metadata
+ FROM event_edges
+ INNER JOIN events USING (event_id)
+ LEFT JOIN event_json USING (event_id)
+ WHERE
+ NOT events.outlier
+ AND events.rejection_reason IS NULL
+ AND
+ """
+
+ else:
+ sql = """
+ SELECT prev_event_id, internal_metadata
+ FROM event_edges
+ INNER JOIN events USING (event_id)
+ LEFT JOIN rejections USING (event_id)
+ LEFT JOIN event_json USING (event_id)
+ WHERE
+ NOT events.outlier
+ AND rejections.event_id IS NULL
+ AND
+ """
clause, args = make_in_list_sql_clause(
self.database_engine, "prev_event_id", batch
@@ -311,7 +324,19 @@ class PersistEventsStore:
) -> None:
to_recursively_check = batch
- while to_recursively_check:
+ if self.store.STATE_KEY_IN_EVENTS:
+ sql = """
+ SELECT
+ event_id, prev_event_id, internal_metadata,
+ events.rejection_reason IS NOT NULL
+ FROM event_edges
+ INNER JOIN events USING (event_id)
+ LEFT JOIN event_json USING (event_id)
+ WHERE
+ NOT events.outlier
+ AND
+ """
+ else:
sql = """
SELECT
event_id, prev_event_id, internal_metadata,
@@ -325,6 +350,7 @@ class PersistEventsStore:
AND
"""
+ while to_recursively_check:
clause, args = make_in_list_sql_clause(
self.database_engine, "event_id", to_recursively_check
)
@@ -530,6 +556,7 @@ class PersistEventsStore:
event_to_room_id = {e.event_id: e.room_id for e in state_events.values()}
self._add_chain_cover_index(
+ self.store.STATE_KEY_IN_EVENTS,
txn,
self.db_pool,
self.store.event_chain_id_gen,
@@ -541,6 +568,7 @@ class PersistEventsStore:
@classmethod
def _add_chain_cover_index(
cls,
+ state_key_in_events: bool,
txn: LoggingTransaction,
db_pool: DatabasePool,
event_chain_id_gen: SequenceGenerator,
@@ -551,6 +579,8 @@ class PersistEventsStore:
"""Calculate the chain cover index for the given events.
Args:
+ state_key_in_events: whether to use the `state_key` column in the `events`
+ table in preference to the `state_events` table
event_to_room_id: Event ID to the room ID of the event
event_to_types: Event ID to type and state_key of the event
event_to_auth_chain: Event ID to list of auth event IDs of the
@@ -610,7 +640,15 @@ class PersistEventsStore:
# We loop here in case we find an out of band membership and need to
# fetch their auth event info.
- while missing_auth_chains:
+ if state_key_in_events:
+ sql = """
+ SELECT event_id, events.type, events.state_key, chain_id, sequence_number
+ FROM events
+ LEFT JOIN event_auth_chains USING (event_id)
+ WHERE
+ events.state_key IS NOT NULL AND
+ """
+ else:
sql = """
SELECT event_id, events.type, se.state_key, chain_id, sequence_number
FROM events
@@ -618,6 +656,8 @@ class PersistEventsStore:
LEFT JOIN event_auth_chains USING (event_id)
WHERE
"""
+
+ while missing_auth_chains:
clause, args = make_in_list_sql_clause(
txn.database_engine,
"event_id",
@@ -1641,22 +1681,31 @@ class PersistEventsStore:
) -> None:
to_prefill = []
- rows = []
-
ev_map = {e.event_id: e for e, _ in events_and_contexts}
if not ev_map:
return
- sql = (
- "SELECT "
- " e.event_id as event_id, "
- " r.redacts as redacts,"
- " rej.event_id as rejects "
- " FROM events as e"
- " LEFT JOIN rejections as rej USING (event_id)"
- " LEFT JOIN redactions as r ON e.event_id = r.redacts"
- " WHERE "
- )
+ if self.store.STATE_KEY_IN_EVENTS:
+ sql = (
+ "SELECT "
+ " e.event_id as event_id, "
+ " r.redacts as redacts,"
+ " e.rejection_reason as rejects "
+ " FROM events as e"
+ " LEFT JOIN redactions as r ON e.event_id = r.redacts"
+ " WHERE "
+ )
+ else:
+ sql = (
+ "SELECT "
+ " e.event_id as event_id, "
+ " r.redacts as redacts,"
+ " rej.event_id as rejects "
+ " FROM events as e"
+ " LEFT JOIN rejections as rej USING (event_id)"
+ " LEFT JOIN redactions as r ON e.event_id = r.redacts"
+ " WHERE "
+ )
clause, args = make_in_list_sql_clause(
self.database_engine, "e.event_id", list(ev_map)
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 7d951d85d6..8746bc493e 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -448,6 +448,10 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
# First, we get `batch_size` events from the table, pulling out
# their successor events, if any, and the successor events'
# rejection status.
+
+ # this should happen before the bg update which drops 'rejections'
+ assert not self.STATE_KEY_IN_EVENTS
+
txn.execute(
"""SELECT prev_event_id, event_id, internal_metadata,
rejections.event_id IS NOT NULL, events.outlier
@@ -973,6 +977,9 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
extra_clause = "AND events.room_id = ?"
tuple_args.append(last_room_id)
+ # this should happen before the bg update which drops 'state_events'
+ assert not self.STATE_KEY_IN_EVENTS
+
sql = """
SELECT
event_id, state_events.type, state_events.state_key,
@@ -1041,9 +1048,10 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
# Calculate and persist the chain cover index for this set of events.
#
- # Annoyingly we need to gut wrench into the persit event store so that
+ # Annoyingly we need to gut wrench into the persist event store so that
# we can reuse the function to calculate the chain cover for rooms.
PersistEventsStore._add_chain_cover_index(
+ False,
txn,
self.db_pool,
self.event_chain_id_gen, # type: ignore[attr-defined]
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index b99b107784..7cd94f3964 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1482,20 +1482,35 @@ class EventsWorkerStore(SQLBaseStore):
def get_all_new_forward_event_rows(
txn: LoggingTransaction,
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
- sql = (
- "SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
- " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
- " FROM events AS e"
- " LEFT JOIN redactions USING (event_id)"
- " LEFT JOIN state_events AS se USING (event_id)"
- " LEFT JOIN event_relations USING (event_id)"
- " LEFT JOIN room_memberships USING (event_id)"
- " LEFT JOIN rejections USING (event_id)"
- " WHERE ? < stream_ordering AND stream_ordering <= ?"
- " AND instance_name = ?"
- " ORDER BY stream_ordering ASC"
- " LIMIT ?"
- )
+ if self.STATE_KEY_IN_EVENTS:
+ sql = (
+ "SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
+ " e.state_key, redacts, relates_to_id, membership, e.rejection_reason IS NOT NULL"
+ " FROM events AS e"
+ " LEFT JOIN redactions USING (event_id)"
+ " LEFT JOIN event_relations USING (event_id)"
+ " LEFT JOIN room_memberships USING (event_id)"
+ " WHERE ? < stream_ordering AND stream_ordering <= ?"
+ " AND instance_name = ?"
+ " ORDER BY stream_ordering ASC"
+ " LIMIT ?"
+ )
+ else:
+ sql = (
+ "SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
+ " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
+ " FROM events AS e"
+ " LEFT JOIN redactions USING (event_id)"
+ " LEFT JOIN state_events AS se USING (event_id)"
+ " LEFT JOIN event_relations USING (event_id)"
+ " LEFT JOIN room_memberships USING (event_id)"
+ " LEFT JOIN rejections USING (event_id)"
+ " WHERE ? < stream_ordering AND stream_ordering <= ?"
+ " AND instance_name = ?"
+ " ORDER BY stream_ordering ASC"
+ " LIMIT ?"
+ )
+
txn.execute(sql, (last_id, current_id, instance_name, limit))
return cast(
List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall()
@@ -1523,21 +1538,36 @@ class EventsWorkerStore(SQLBaseStore):
def get_ex_outlier_stream_rows_txn(
txn: LoggingTransaction,
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
- sql = (
- "SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
- " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
- " FROM events AS e"
- " INNER JOIN ex_outlier_stream AS out USING (event_id)"
- " LEFT JOIN redactions USING (event_id)"
- " LEFT JOIN state_events AS se USING (event_id)"
- " LEFT JOIN event_relations USING (event_id)"
- " LEFT JOIN room_memberships USING (event_id)"
- " LEFT JOIN rejections USING (event_id)"
- " WHERE ? < event_stream_ordering"
- " AND event_stream_ordering <= ?"
- " AND out.instance_name = ?"
- " ORDER BY event_stream_ordering ASC"
- )
+ if self.STATE_KEY_IN_EVENTS:
+ sql = (
+ "SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
+ " e.state_key, redacts, relates_to_id, membership, e.rejection_reason IS NOT NULL"
+ " FROM events AS e"
+ " INNER JOIN ex_outlier_stream AS out USING (event_id)"
+ " LEFT JOIN redactions USING (event_id)"
+ " LEFT JOIN event_relations USING (event_id)"
+ " LEFT JOIN room_memberships USING (event_id)"
+ " WHERE ? < event_stream_ordering"
+ " AND event_stream_ordering <= ?"
+ " AND out.instance_name = ?"
+ " ORDER BY event_stream_ordering ASC"
+ )
+ else:
+ sql = (
+ "SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
+ " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
+ " FROM events AS e"
+ " INNER JOIN ex_outlier_stream AS out USING (event_id)"
+ " LEFT JOIN redactions USING (event_id)"
+ " LEFT JOIN state_events AS se USING (event_id)"
+ " LEFT JOIN event_relations USING (event_id)"
+ " LEFT JOIN room_memberships USING (event_id)"
+ " LEFT JOIN rejections USING (event_id)"
+ " WHERE ? < event_stream_ordering"
+ " AND event_stream_ordering <= ?"
+ " AND out.instance_name = ?"
+ " ORDER BY event_stream_ordering ASC"
+ )
txn.execute(sql, (last_id, current_id, instance_name))
return cast(
@@ -1581,18 +1611,32 @@ class EventsWorkerStore(SQLBaseStore):
def get_all_new_backfill_event_rows(
txn: LoggingTransaction,
) -> Tuple[List[Tuple[int, Tuple[str, str, str, str, str, str]]], int, bool]:
- sql = (
- "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
- " se.state_key, redacts, relates_to_id"
- " FROM events AS e"
- " LEFT JOIN redactions USING (event_id)"
- " LEFT JOIN state_events AS se USING (event_id)"
- " LEFT JOIN event_relations USING (event_id)"
- " WHERE ? > stream_ordering AND stream_ordering >= ?"
- " AND instance_name = ?"
- " ORDER BY stream_ordering ASC"
- " LIMIT ?"
- )
+ if self.STATE_KEY_IN_EVENTS:
+ sql = (
+ "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
+ " e.state_key, redacts, relates_to_id"
+ " FROM events AS e"
+ " LEFT JOIN redactions USING (event_id)"
+ " LEFT JOIN event_relations USING (event_id)"
+ " WHERE ? > stream_ordering AND stream_ordering >= ?"
+ " AND instance_name = ?"
+ " ORDER BY stream_ordering ASC"
+ " LIMIT ?"
+ )
+ else:
+ sql = (
+ "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
+ " se.state_key, redacts, relates_to_id"
+ " FROM events AS e"
+ " LEFT JOIN redactions USING (event_id)"
+ " LEFT JOIN state_events AS se USING (event_id)"
+ " LEFT JOIN event_relations USING (event_id)"
+ " WHERE ? > stream_ordering AND stream_ordering >= ?"
+ " AND instance_name = ?"
+ " ORDER BY stream_ordering ASC"
+ " LIMIT ?"
+ )
+
txn.execute(sql, (-last_id, -current_id, instance_name, limit))
new_event_updates: List[
Tuple[int, Tuple[str, str, str, str, str, str]]
@@ -1611,19 +1655,34 @@ class EventsWorkerStore(SQLBaseStore):
else:
upper_bound = current_id
- sql = (
- "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
- " se.state_key, redacts, relates_to_id"
- " FROM events AS e"
- " INNER JOIN ex_outlier_stream AS out USING (event_id)"
- " LEFT JOIN redactions USING (event_id)"
- " LEFT JOIN state_events AS se USING (event_id)"
- " LEFT JOIN event_relations USING (event_id)"
- " WHERE ? > event_stream_ordering"
- " AND event_stream_ordering >= ?"
- " AND out.instance_name = ?"
- " ORDER BY event_stream_ordering DESC"
- )
+ if self.STATE_KEY_IN_EVENTS:
+ sql = (
+ "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
+ " e.state_key, redacts, relates_to_id"
+ " FROM events AS e"
+ " INNER JOIN ex_outlier_stream AS out USING (event_id)"
+ " LEFT JOIN redactions USING (event_id)"
+ " LEFT JOIN event_relations USING (event_id)"
+ " WHERE ? > event_stream_ordering"
+ " AND event_stream_ordering >= ?"
+ " AND out.instance_name = ?"
+ " ORDER BY event_stream_ordering DESC"
+ )
+ else:
+ sql = (
+ "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
+ " se.state_key, redacts, relates_to_id"
+ " FROM events AS e"
+ " INNER JOIN ex_outlier_stream AS out USING (event_id)"
+ " LEFT JOIN redactions USING (event_id)"
+ " LEFT JOIN state_events AS se USING (event_id)"
+ " LEFT JOIN event_relations USING (event_id)"
+ " WHERE ? > event_stream_ordering"
+ " AND event_stream_ordering >= ?"
+ " AND out.instance_name = ?"
+ " ORDER BY event_stream_ordering DESC"
+ )
+
txn.execute(sql, (-last_id, -upper_bound, instance_name))
# Type safety: iterating over `txn` yields `Tuple`, i.e.
# `Tuple[Any, ...]` of arbitrary length. Mypy detects assigning a
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 87b0d09039..37b53de971 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -122,7 +122,11 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
logger.info("[purge] looking for events to delete")
- should_delete_expr = "state_events.state_key IS NULL"
+ should_delete_expr = (
+ "e.state_key IS NULL"
+ if self.STATE_KEY_IN_EVENTS
+ else "state_events.state_key IS NULL"
+ )
should_delete_params: Tuple[Any, ...] = ()
if not delete_local_events:
should_delete_expr += " AND event_id NOT LIKE ?"
@@ -134,12 +138,23 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# Note that we insert events that are outliers and aren't going to be
# deleted, as nothing will happen to them.
+ if self.STATE_KEY_IN_EVENTS:
+ sqlf = """
+ INSERT INTO events_to_purge
+ SELECT event_id, %s
+ FROM events AS e
+ WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?
+ """
+ else:
+ sqlf = """
+ INSERT INTO events_to_purge
+ SELECT event_id, %s
+ FROM events AS e LEFT JOIN state_events USING (event_id)
+ WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?
+ """
+
txn.execute(
- "INSERT INTO events_to_purge"
- " SELECT event_id, %s"
- " FROM events AS e LEFT JOIN state_events USING (event_id)"
- " WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?"
- % (should_delete_expr, should_delete_expr),
+ sqlf % (should_delete_expr, should_delete_expr),
should_delete_params,
)
diff --git a/synapse/storage/databases/main/rejections.py b/synapse/storage/databases/main/rejections.py
index 167318b314..f57262c51a 100644
--- a/synapse/storage/databases/main/rejections.py
+++ b/synapse/storage/databases/main/rejections.py
@@ -22,10 +22,19 @@ logger = logging.getLogger(__name__)
class RejectionsStore(SQLBaseStore):
async def get_rejection_reason(self, event_id: str) -> Optional[str]:
- return await self.db_pool.simple_select_one_onecol(
- table="rejections",
- retcol="reason",
- keyvalues={"event_id": event_id},
- allow_none=True,
- desc="get_rejection_reason",
- )
+ if self.STATE_KEY_IN_EVENTS:
+ return await self.db_pool.simple_select_one_onecol(
+ table="events",
+ retcol="rejection_reason",
+ keyvalues={"event_id": event_id},
+ allow_none=True,
+ desc="get_rejection_reason",
+ )
+ else:
+ return await self.db_pool.simple_select_one_onecol(
+ table="rejections",
+ retcol="reason",
+ keyvalues={"event_id": event_id},
+ allow_none=True,
+ desc="get_rejection_reason",
+ )
|