diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index f23f8c6ecf..f2d27ee893 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1075,16 +1075,62 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
self,
origin: str,
event_id: str,
- ) -> None:
- """Remove the given event from the staging area"""
- await self.db_pool.simple_delete(
- table="federation_inbound_events_staging",
- keyvalues={
- "origin": origin,
- "event_id": event_id,
- },
- desc="remove_received_event_from_staging",
- )
+ ) -> Optional[int]:
+ """Remove the given event from the staging area.
+
+ Returns:
+ The received_ts of the row that was deleted, if any.
+ """
+ if self.db_pool.engine.supports_returning:
+
+ def _remove_received_event_from_staging_txn(txn):
+ sql = """
+ DELETE FROM federation_inbound_events_staging
+ WHERE origin = ? AND event_id = ?
+ RETURNING received_ts
+ """
+
+ txn.execute(sql, (origin, event_id))
+ return txn.fetchone()
+
+ row = await self.db_pool.runInteraction(
+ "remove_received_event_from_staging",
+ _remove_received_event_from_staging_txn,
+ db_autocommit=True,
+ )
+ if row is None:
+ return None
+
+ return row[0]
+
+ else:
+
+ def _remove_received_event_from_staging_txn(txn):
+ received_ts = self.db_pool.simple_select_one_onecol_txn(
+ txn,
+ table="federation_inbound_events_staging",
+ keyvalues={
+ "origin": origin,
+ "event_id": event_id,
+ },
+ retcol="received_ts",
+ allow_none=True,
+ )
+ self.db_pool.simple_delete_txn(
+ txn,
+ table="federation_inbound_events_staging",
+ keyvalues={
+ "origin": origin,
+ "event_id": event_id,
+ },
+ )
+
+ return received_ts
+
+ return await self.db_pool.runInteraction(
+ "remove_received_event_from_staging",
+ _remove_received_event_from_staging_txn,
+ )
async def get_next_staged_event_id_for_room(
self,
diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py
index 1882bfd9cf..20cd63c330 100644
--- a/synapse/storage/engines/_base.py
+++ b/synapse/storage/engines/_base.py
@@ -49,6 +49,12 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
"""
...
+ @property
+ @abc.abstractmethod
+ def supports_returning(self) -> bool:
+ """Do we support the `RETURNING` clause in insert/update/delete?"""
+ ...
+
@abc.abstractmethod
def check_database(
self, db_conn: ConnectionType, allow_outdated_version: bool = False
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 21411c5fea..30f948a0f7 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -133,6 +133,11 @@ class PostgresEngine(BaseDatabaseEngine):
"""Do we support using `a = ANY(?)` and passing a list"""
return True
+ @property
+ def supports_returning(self) -> bool:
+ """Do we support the `RETURNING` clause in insert/update/delete?"""
+ return True
+
def is_deadlock(self, error):
if isinstance(error, self.module.DatabaseError):
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 5fe1b205e1..70d17d4f2c 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -60,6 +60,11 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]):
"""Do we support using `a = ANY(?)` and passing a list"""
return False
+ @property
+ def supports_returning(self) -> bool:
+ """Do we support the `RETURNING` clause in insert/update/delete?"""
+ return self.module.sqlite_version_info >= (3, 35, 0)
+
def check_database(self, db_conn, allow_outdated_version: bool = False):
if not allow_outdated_version:
version = self.module.sqlite_version_info
|