diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 0090c9f225..a580e4bdda 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -27,7 +27,6 @@ from typing import (
)
from synapse.api.constants import EduTypes
-from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import ReceiptsStream
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
@@ -61,6 +60,9 @@ class ReceiptsWorkerStore(SQLBaseStore):
hs: "HomeServer",
):
self._instance_name = hs.get_instance_name()
+
+ # In the worker store this is an ID tracker which we overwrite in the non-worker
+ # class below that is used on the main process.
self._receipts_id_gen: AbstractStreamIdTracker
if isinstance(database.engine, PostgresEngine):
@@ -87,14 +89,12 @@ class ReceiptsWorkerStore(SQLBaseStore):
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
# updated over replication. (Multiple writers are not supported for
# SQLite).
- if hs.get_instance_name() in hs.config.worker.writers.receipts:
- self._receipts_id_gen = StreamIdGenerator(
- db_conn, "receipts_linearized", "stream_id"
- )
- else:
- self._receipts_id_gen = SlavedIdTracker(
- db_conn, "receipts_linearized", "stream_id"
- )
+ self._receipts_id_gen = StreamIdGenerator(
+ db_conn,
+ "receipts_linearized",
+ "stream_id",
+ is_writer=hs.get_instance_name() in hs.config.worker.writers.receipts,
+ )
super().__init__(database, db_conn, hs)
@@ -117,34 +117,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
"""Get the current max stream ID for receipts stream"""
return self._receipts_id_gen.get_current_token()
- async def get_last_receipt_event_id_for_user(
- self, user_id: str, room_id: str, receipt_types: Collection[str]
- ) -> Optional[str]:
- """
- Fetch the event ID for the latest receipt in a room with one of the given receipt types.
-
- Args:
- user_id: The user to fetch receipts for.
- room_id: The room ID to fetch the receipt for.
- receipt_type: The receipt types to fetch.
-
- Returns:
- The latest receipt, if one exists.
- """
- result = await self.db_pool.runInteraction(
- "get_last_receipt_event_id_for_user",
- self.get_last_receipt_for_user_txn,
- user_id,
- room_id,
- receipt_types,
- )
- if not result:
- return None
-
- event_id, _ = result
- return event_id
-
- def get_last_receipt_for_user_txn(
+ def get_last_unthreaded_receipt_for_user_txn(
self,
txn: LoggingTransaction,
user_id: str,
@@ -152,16 +125,16 @@ class ReceiptsWorkerStore(SQLBaseStore):
receipt_types: Collection[str],
) -> Optional[Tuple[str, int]]:
"""
- Fetch the event ID and stream_ordering for the latest receipt in a room
- with one of the given receipt types.
+ Fetch the event ID and stream_ordering for the latest unthreaded receipt
+ in a room with one of the given receipt types.
Args:
user_id: The user to fetch receipts for.
room_id: The room ID to fetch the receipt for.
- receipt_type: The receipt types to fetch.
+ receipt_types: The receipt types to fetch.
Returns:
- The latest receipt, if one exists.
+ The event ID and stream ordering of the latest receipt, if one exists.
"""
clause, args = make_in_list_sql_clause(
@@ -175,6 +148,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
WHERE {clause}
AND user_id = ?
AND room_id = ?
+ AND thread_id IS NULL
ORDER BY stream_ordering DESC
LIMIT 1
"""
@@ -426,6 +400,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
receipt_type = event_entry.setdefault(row["receipt_type"], {})
receipt_type[row["user_id"]] = db_to_json(row["data"])
+ if row["thread_id"]:
+ receipt_type[row["user_id"]]["thread_id"] = row["thread_id"]
results = {
room_id: [results[room_id]] if room_id in results else []
@@ -522,7 +498,9 @@ class ReceiptsWorkerStore(SQLBaseStore):
async def get_all_updated_receipts(
self, instance_name: str, last_id: int, current_id: int, limit: int
- ) -> Tuple[List[Tuple[int, list]], int, bool]:
+ ) -> Tuple[
+ List[Tuple[int, Tuple[str, str, str, str, Optional[str], JsonDict]]], int, bool
+ ]:
"""Get updates for receipts replication stream.
Args:
@@ -549,9 +527,13 @@ class ReceiptsWorkerStore(SQLBaseStore):
def get_all_updated_receipts_txn(
txn: LoggingTransaction,
- ) -> Tuple[List[Tuple[int, list]], int, bool]:
+ ) -> Tuple[
+ List[Tuple[int, Tuple[str, str, str, str, Optional[str], JsonDict]]],
+ int,
+ bool,
+ ]:
sql = """
- SELECT stream_id, room_id, receipt_type, user_id, event_id, data
+ SELECT stream_id, room_id, receipt_type, user_id, event_id, thread_id, data
FROM receipts_linearized
WHERE ? < stream_id AND stream_id <= ?
ORDER BY stream_id ASC
@@ -560,8 +542,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
txn.execute(sql, (last_id, current_id, limit))
updates = cast(
- List[Tuple[int, list]],
- [(r[0], r[1:5] + (db_to_json(r[5]),)) for r in txn],
+ List[Tuple[int, Tuple[str, str, str, str, Optional[str], JsonDict]]],
+ [(r[0], r[1:6] + (db_to_json(r[6]),)) for r in txn],
)
limited = False
@@ -613,6 +595,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
receipt_type: str,
user_id: str,
event_id: str,
+ thread_id: Optional[str],
data: JsonDict,
stream_id: int,
) -> Optional[int]:
@@ -639,12 +622,27 @@ class ReceiptsWorkerStore(SQLBaseStore):
# We don't want to clobber receipts for more recent events, so we
# have to compare orderings of existing receipts
if stream_ordering is not None:
- sql = (
- "SELECT stream_ordering, event_id FROM events"
- " INNER JOIN receipts_linearized AS r USING (event_id, room_id)"
- " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
+ if thread_id is None:
+ thread_clause = "r.thread_id IS NULL"
+ thread_args: Tuple[str, ...] = ()
+ else:
+ thread_clause = "r.thread_id = ?"
+ thread_args = (thread_id,)
+
+ sql = f"""
+ SELECT stream_ordering, event_id FROM events
+ INNER JOIN receipts_linearized AS r USING (event_id, room_id)
+ WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ? AND {thread_clause}
+ """
+ txn.execute(
+ sql,
+ (
+ room_id,
+ receipt_type,
+ user_id,
+ )
+ + thread_args,
)
- txn.execute(sql, (room_id, receipt_type, user_id))
for so, eid in txn:
if int(so) >= stream_ordering:
@@ -664,22 +662,28 @@ class ReceiptsWorkerStore(SQLBaseStore):
self._receipts_stream_cache.entity_has_changed, room_id, stream_id
)
+ keyvalues = {
+ "room_id": room_id,
+ "receipt_type": receipt_type,
+ "user_id": user_id,
+ }
+ where_clause = ""
+ if thread_id is None:
+ where_clause = "thread_id IS NULL"
+ else:
+ keyvalues["thread_id"] = thread_id
+
self.db_pool.simple_upsert_txn(
txn,
table="receipts_linearized",
- keyvalues={
- "room_id": room_id,
- "receipt_type": receipt_type,
- "user_id": user_id,
- },
+ keyvalues=keyvalues,
values={
"stream_id": stream_id,
"event_id": event_id,
+ "event_stream_ordering": stream_ordering,
"data": json_encoder.encode(data),
},
- # receipts_linearized has a unique constraint on
- # (user_id, room_id, receipt_type), so no need to lock
- lock=False,
+ where_clause=where_clause,
)
return rx_ts
@@ -728,6 +732,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
receipt_type: str,
user_id: str,
event_ids: List[str],
+ thread_id: Optional[str],
data: dict,
) -> Optional[Tuple[int, int]]:
"""Insert a receipt, either from local client or remote server.
@@ -760,6 +765,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
receipt_type,
user_id,
linearized_event_id,
+ thread_id,
data,
stream_id=stream_id,
# Read committed is actually beneficial here because we check for a receipt with
@@ -774,7 +780,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
now = self._clock.time_msec()
logger.debug(
- "RR for event %s in %s (%i ms old)",
+ "Receipt %s for event %s in %s (%i ms old)",
+ receipt_type,
linearized_event_id,
room_id,
now - event_ts,
@@ -787,6 +794,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
receipt_type,
user_id,
event_ids,
+ thread_id,
data,
)
@@ -801,6 +809,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
receipt_type: str,
user_id: str,
event_ids: List[str],
+ thread_id: Optional[str],
data: JsonDict,
) -> None:
assert self._can_write_to_receipts
@@ -812,27 +821,246 @@ class ReceiptsWorkerStore(SQLBaseStore):
# FIXME: This shouldn't invalidate the whole cache
txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,))
- self.db_pool.simple_delete_txn(
- txn,
- table="receipts_graph",
- keyvalues={
- "room_id": room_id,
- "receipt_type": receipt_type,
- "user_id": user_id,
- },
- )
- self.db_pool.simple_insert_txn(
+ keyvalues = {
+ "room_id": room_id,
+ "receipt_type": receipt_type,
+ "user_id": user_id,
+ }
+ where_clause = ""
+ if thread_id is None:
+ where_clause = "thread_id IS NULL"
+ else:
+ keyvalues["thread_id"] = thread_id
+
+ self.db_pool.simple_upsert_txn(
txn,
table="receipts_graph",
+ keyvalues=keyvalues,
values={
- "room_id": room_id,
- "receipt_type": receipt_type,
- "user_id": user_id,
"event_ids": json_encoder.encode(event_ids),
"data": json_encoder.encode(data),
},
+ where_clause=where_clause,
+ )
+
+
+class ReceiptsBackgroundUpdateStore(SQLBaseStore):
+ POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"
+ RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME = "receipts_linearized_unique_index"
+ RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME = "receipts_graph_unique_index"
+
+ def __init__(
+ self,
+ database: DatabasePool,
+ db_conn: LoggingDatabaseConnection,
+ hs: "HomeServer",
+ ):
+ super().__init__(database, db_conn, hs)
+
+ self.db_pool.updates.register_background_update_handler(
+ self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
+ self._populate_receipt_event_stream_ordering,
+ )
+ self.db_pool.updates.register_background_update_handler(
+ self.RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME,
+ self._background_receipts_linearized_unique_index,
+ )
+ self.db_pool.updates.register_background_update_handler(
+ self.RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME,
+ self._background_receipts_graph_unique_index,
+ )
+
+ async def _populate_receipt_event_stream_ordering(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
+ def _populate_receipt_event_stream_ordering_txn(
+ txn: LoggingTransaction,
+ ) -> bool:
+
+ if "max_stream_id" in progress:
+ max_stream_id = progress["max_stream_id"]
+ else:
+ txn.execute("SELECT max(stream_id) FROM receipts_linearized")
+ res = txn.fetchone()
+ if res is None or res[0] is None:
+ return True
+ else:
+ max_stream_id = res[0]
+
+ start = progress.get("stream_id", 0)
+ stop = start + batch_size
+
+ sql = """
+ UPDATE receipts_linearized
+ SET event_stream_ordering = (
+ SELECT stream_ordering
+ FROM events
+ WHERE event_id = receipts_linearized.event_id
+ )
+ WHERE stream_id >= ? AND stream_id < ?
+ """
+ txn.execute(sql, (start, stop))
+
+ self.db_pool.updates._background_update_progress_txn(
+ txn,
+ self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
+ {
+ "stream_id": stop,
+ "max_stream_id": max_stream_id,
+ },
+ )
+
+ return stop > max_stream_id
+
+ finished = await self.db_pool.runInteraction(
+ "_remove_devices_from_device_inbox_txn",
+ _populate_receipt_event_stream_ordering_txn,
+ )
+
+ if finished:
+ await self.db_pool.updates._end_background_update(
+ self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING
+ )
+
+ return batch_size
+
+ async def _create_receipts_index(self, index_name: str, table: str) -> None:
+ """Adds a unique index on `(room_id, receipt_type, user_id)` to the given
+ receipts table, for non-thread receipts."""
+
+ def _create_index(conn: LoggingDatabaseConnection) -> None:
+ conn.rollback()
+
+ # we have to set autocommit, because postgres refuses to
+ # CREATE INDEX CONCURRENTLY without it.
+ if isinstance(self.database_engine, PostgresEngine):
+ conn.set_session(autocommit=True)
+
+ try:
+ c = conn.cursor()
+
+ # Now that the duplicates are gone, we can create the index.
+ concurrently = (
+ "CONCURRENTLY"
+ if isinstance(self.database_engine, PostgresEngine)
+ else ""
+ )
+ sql = f"""
+ CREATE UNIQUE INDEX {concurrently} {index_name}
+ ON {table}(room_id, receipt_type, user_id)
+ WHERE thread_id IS NULL
+ """
+ c.execute(sql)
+ finally:
+ if isinstance(self.database_engine, PostgresEngine):
+ conn.set_session(autocommit=False)
+
+ await self.db_pool.runWithConnection(_create_index)
+
+ async def _background_receipts_linearized_unique_index(
+ self, progress: dict, batch_size: int
+ ) -> int:
+ """Removes duplicate receipts and adds a unique index on
+ `(room_id, receipt_type, user_id)` to `receipts_linearized`, for non-thread
+ receipts."""
+
+ def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
+ # Identify any duplicate receipts arising from
+ # https://github.com/matrix-org/synapse/issues/14406.
+ # We expect the following query to use the per-thread receipt index and take
+ # less than a minute.
+ sql = """
+ SELECT MAX(stream_id), room_id, receipt_type, user_id
+ FROM receipts_linearized
+ WHERE thread_id IS NULL
+ GROUP BY room_id, receipt_type, user_id
+ HAVING COUNT(*) > 1
+ """
+ txn.execute(sql)
+ duplicate_keys = cast(List[Tuple[int, str, str, str]], list(txn))
+
+ # Then remove duplicate receipts, keeping the one with the highest
+ # `stream_id`. There should only be a single receipt with any given
+ # `stream_id`.
+ for max_stream_id, room_id, receipt_type, user_id in duplicate_keys:
+ sql = """
+ DELETE FROM receipts_linearized
+ WHERE
+ room_id = ? AND
+ receipt_type = ? AND
+ user_id = ? AND
+ thread_id IS NULL AND
+ stream_id < ?
+ """
+ txn.execute(sql, (room_id, receipt_type, user_id, max_stream_id))
+
+ await self.db_pool.runInteraction(
+ self.RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME,
+ _remote_duplicate_receipts_txn,
+ )
+
+ await self._create_receipts_index(
+ "receipts_linearized_unique_index",
+ "receipts_linearized",
+ )
+
+ await self.db_pool.updates._end_background_update(
+ self.RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME
+ )
+
+ return 1
+
+ async def _background_receipts_graph_unique_index(
+ self, progress: dict, batch_size: int
+ ) -> int:
+ """Removes duplicate receipts and adds a unique index on
+ `(room_id, receipt_type, user_id)` to `receipts_graph`, for non-thread
+ receipts."""
+
+ def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
+ # Identify any duplicate receipts arising from
+ # https://github.com/matrix-org/synapse/issues/14406.
+ # We expect the following query to use the per-thread receipt index and take
+ # less than a minute.
+ sql = """
+ SELECT room_id, receipt_type, user_id FROM receipts_graph
+ WHERE thread_id IS NULL
+ GROUP BY room_id, receipt_type, user_id
+ HAVING COUNT(*) > 1
+ """
+ txn.execute(sql)
+ duplicate_keys = cast(List[Tuple[str, str, str]], list(txn))
+
+ # Then remove all duplicate receipts.
+ # We could be clever and try to keep the latest receipt out of every set of
+ # duplicates, but it's far simpler to remove them all.
+ for room_id, receipt_type, user_id in duplicate_keys:
+ sql = """
+ DELETE FROM receipts_graph
+ WHERE
+ room_id = ? AND
+ receipt_type = ? AND
+ user_id = ? AND
+ thread_id IS NULL
+ """
+ txn.execute(sql, (room_id, receipt_type, user_id))
+
+ await self.db_pool.runInteraction(
+ self.RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME,
+ _remote_duplicate_receipts_txn,
+ )
+
+ await self._create_receipts_index(
+ "receipts_graph_unique_index",
+ "receipts_graph",
)
+ await self.db_pool.updates._end_background_update(
+ self.RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME
+ )
+
+ return 1
+
-class ReceiptsStore(ReceiptsWorkerStore):
+class ReceiptsStore(ReceiptsWorkerStore, ReceiptsBackgroundUpdateStore):
pass
|