diff options
-rw-r--r-- | changelog.d/16299.misc | 1 | ||||
-rw-r--r-- | synapse/storage/database.py | 3 | ||||
-rw-r--r-- | synapse/storage/databases/main/receipts.py | 23 |
3 files changed, 13 insertions, 14 deletions
diff --git a/changelog.d/16299.misc b/changelog.d/16299.misc new file mode 100644 index 0000000000..d454669151 --- /dev/null +++ b/changelog.d/16299.misc @@ -0,0 +1 @@ +Refactor `receipts_graph` Postgres transactions to stop error messages. diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 6c5fcdcec3..697bc5651c 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1193,6 +1193,7 @@ class DatabasePool: keyvalues: Dict[str, Any], values: Dict[str, Any], insertion_values: Optional[Dict[str, Any]] = None, + where_clause: Optional[str] = None, desc: str = "simple_upsert", ) -> bool: """Insert a row with values + insertion_values; on conflict, update with values. @@ -1243,6 +1244,7 @@ class DatabasePool: keyvalues: The unique key columns and their new values values: The nonunique columns and their new values insertion_values: additional key/values to use only when inserting + where_clause: An index predicate to apply to the upsert. desc: description of the transaction, for logging and metrics Returns: Returns True if a row was inserted or updated (i.e. if `values` is @@ -1263,6 +1265,7 @@ class DatabasePool: keyvalues, values, insertion_values, + where_clause, db_autocommit=autocommit, ) except self.engine.module.IntegrityError as e: diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index e4d10ff250..a074c43989 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -795,9 +795,7 @@ class ReceiptsWorkerStore(SQLBaseStore): now - event_ts, ) - await self.db_pool.runInteraction( - "insert_graph_receipt", - self._insert_graph_receipt_txn, + await self._insert_graph_receipt( room_id, receipt_type, user_id, @@ -810,9 +808,8 @@ class ReceiptsWorkerStore(SQLBaseStore): return stream_id, max_persisted_id - def _insert_graph_receipt_txn( + async def _insert_graph_receipt( self, - txn: LoggingTransaction, room_id: str, receipt_type: str, user_id: str, @@ -822,13 +819,6 @@ class ReceiptsWorkerStore(SQLBaseStore): ) -> None: assert self._can_write_to_receipts - txn.call_after( - self._get_receipts_for_user_with_orderings.invalidate, - (user_id, receipt_type), - ) - # FIXME: This shouldn't invalidate the whole cache - txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,)) - keyvalues = { "room_id": room_id, "receipt_type": receipt_type, @@ -840,8 +830,8 @@ class ReceiptsWorkerStore(SQLBaseStore): else: keyvalues["thread_id"] = thread_id - self.db_pool.simple_upsert_txn( - txn, + await self.db_pool.simple_upsert( + desc="insert_graph_receipt", table="receipts_graph", keyvalues=keyvalues, values={ @@ -851,6 +841,11 @@ class ReceiptsWorkerStore(SQLBaseStore): where_clause=where_clause, ) + self._get_receipts_for_user_with_orderings.invalidate((user_id, receipt_type)) + + # FIXME: This shouldn't invalidate the whole cache + self._get_linearized_receipts_for_room.invalidate((room_id,)) + class ReceiptsBackgroundUpdateStore(SQLBaseStore): POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering" |