diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 6c5fcdcec3..697bc5651c 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -1193,6 +1193,7 @@ class DatabasePool:
keyvalues: Dict[str, Any],
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
+ where_clause: Optional[str] = None,
desc: str = "simple_upsert",
) -> bool:
"""Insert a row with values + insertion_values; on conflict, update with values.
@@ -1243,6 +1244,7 @@ class DatabasePool:
keyvalues: The unique key columns and their new values
values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting
+ where_clause: An index predicate to apply to the upsert.
desc: description of the transaction, for logging and metrics
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
@@ -1263,6 +1265,7 @@ class DatabasePool:
keyvalues,
values,
insertion_values,
+ where_clause,
db_autocommit=autocommit,
)
except self.engine.module.IntegrityError as e:
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index e4d10ff250..a074c43989 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -795,9 +795,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
now - event_ts,
)
- await self.db_pool.runInteraction(
- "insert_graph_receipt",
- self._insert_graph_receipt_txn,
+ await self._insert_graph_receipt(
room_id,
receipt_type,
user_id,
@@ -810,9 +808,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
return stream_id, max_persisted_id
- def _insert_graph_receipt_txn(
+ async def _insert_graph_receipt(
self,
- txn: LoggingTransaction,
room_id: str,
receipt_type: str,
user_id: str,
@@ -822,13 +819,6 @@ class ReceiptsWorkerStore(SQLBaseStore):
) -> None:
assert self._can_write_to_receipts
- txn.call_after(
- self._get_receipts_for_user_with_orderings.invalidate,
- (user_id, receipt_type),
- )
- # FIXME: This shouldn't invalidate the whole cache
- txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,))
-
keyvalues = {
"room_id": room_id,
"receipt_type": receipt_type,
@@ -840,8 +830,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
else:
keyvalues["thread_id"] = thread_id
- self.db_pool.simple_upsert_txn(
- txn,
+ await self.db_pool.simple_upsert(
+ desc="insert_graph_receipt",
table="receipts_graph",
keyvalues=keyvalues,
values={
@@ -851,6 +841,11 @@ class ReceiptsWorkerStore(SQLBaseStore):
where_clause=where_clause,
)
+ self._get_receipts_for_user_with_orderings.invalidate((user_id, receipt_type))
+
+ # FIXME: This shouldn't invalidate the whole cache
+ self._get_linearized_receipts_for_room.invalidate((room_id,))
+
class ReceiptsBackgroundUpdateStore(SQLBaseStore):
POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"
|