diff --git a/changelog.d/10279.bugfix b/changelog.d/10279.bugfix
new file mode 100644
index 0000000000..ac8b64ead9
--- /dev/null
+++ b/changelog.d/10279.bugfix
@@ -0,0 +1 @@
+Fix the prometheus `synapse_federation_server_pdu_process_time` metric. Broke in v1.37.1.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 742d29291e..e93b7577fe 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -369,22 +369,21 @@ class FederationServer(FederationBase):
async def process_pdu(pdu: EventBase) -> JsonDict:
event_id = pdu.event_id
- with pdu_process_time.time():
- with nested_logging_context(event_id):
- try:
- await self._handle_received_pdu(origin, pdu)
- return {}
- except FederationError as e:
- logger.warning("Error handling PDU %s: %s", event_id, e)
- return {"error": str(e)}
- except Exception as e:
- f = failure.Failure()
- logger.error(
- "Failed to handle PDU %s",
- event_id,
- exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
- )
- return {"error": str(e)}
+ with nested_logging_context(event_id):
+ try:
+ await self._handle_received_pdu(origin, pdu)
+ return {}
+ except FederationError as e:
+ logger.warning("Error handling PDU %s: %s", event_id, e)
+ return {"error": str(e)}
+ except Exception as e:
+ f = failure.Failure()
+ logger.error(
+ "Failed to handle PDU %s",
+ event_id,
+ exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
+ )
+ return {"error": str(e)}
await concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
@@ -932,9 +931,13 @@ class FederationServer(FederationBase):
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
)
- await self.store.remove_received_event_from_staging(
+ received_ts = await self.store.remove_received_event_from_staging(
origin, event.event_id
)
+ if received_ts is not None:
+ pdu_process_time.observe(
+ (self._clock.time_msec() - received_ts) / 1000
+ )
# We need to do this check outside the lock to avoid a race between
# a new event being inserted by another instance and it attempting
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index f23f8c6ecf..f2d27ee893 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1075,16 +1075,62 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
self,
origin: str,
event_id: str,
- ) -> None:
- """Remove the given event from the staging area"""
- await self.db_pool.simple_delete(
- table="federation_inbound_events_staging",
- keyvalues={
- "origin": origin,
- "event_id": event_id,
- },
- desc="remove_received_event_from_staging",
- )
+ ) -> Optional[int]:
+ """Remove the given event from the staging area.
+
+ Returns:
+ The received_ts of the row that was deleted, if any.
+ """
+ if self.db_pool.engine.supports_returning:
+
+ def _remove_received_event_from_staging_txn(txn):
+ sql = """
+ DELETE FROM federation_inbound_events_staging
+ WHERE origin = ? AND event_id = ?
+ RETURNING received_ts
+ """
+
+ txn.execute(sql, (origin, event_id))
+ return txn.fetchone()
+
+ row = await self.db_pool.runInteraction(
+ "remove_received_event_from_staging",
+ _remove_received_event_from_staging_txn,
+ db_autocommit=True,
+ )
+ if row is None:
+ return None
+
+ return row[0]
+
+ else:
+
+ def _remove_received_event_from_staging_txn(txn):
+ received_ts = self.db_pool.simple_select_one_onecol_txn(
+ txn,
+ table="federation_inbound_events_staging",
+ keyvalues={
+ "origin": origin,
+ "event_id": event_id,
+ },
+ retcol="received_ts",
+ allow_none=True,
+ )
+ self.db_pool.simple_delete_txn(
+ txn,
+ table="federation_inbound_events_staging",
+ keyvalues={
+ "origin": origin,
+ "event_id": event_id,
+ },
+ )
+
+ return received_ts
+
+ return await self.db_pool.runInteraction(
+ "remove_received_event_from_staging",
+ _remove_received_event_from_staging_txn,
+ )
async def get_next_staged_event_id_for_room(
self,
diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py
index 1882bfd9cf..20cd63c330 100644
--- a/synapse/storage/engines/_base.py
+++ b/synapse/storage/engines/_base.py
@@ -49,6 +49,12 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
"""
...
+ @property
+ @abc.abstractmethod
+ def supports_returning(self) -> bool:
+ """Do we support the `RETURNING` clause in insert/update/delete?"""
+ ...
+
@abc.abstractmethod
def check_database(
self, db_conn: ConnectionType, allow_outdated_version: bool = False
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 21411c5fea..30f948a0f7 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -133,6 +133,11 @@ class PostgresEngine(BaseDatabaseEngine):
"""Do we support using `a = ANY(?)` and passing a list"""
return True
+ @property
+ def supports_returning(self) -> bool:
+ """Do we support the `RETURNING` clause in insert/update/delete?"""
+ return True
+
def is_deadlock(self, error):
if isinstance(error, self.module.DatabaseError):
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 5fe1b205e1..70d17d4f2c 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -60,6 +60,11 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]):
"""Do we support using `a = ANY(?)` and passing a list"""
return False
+ @property
+ def supports_returning(self) -> bool:
+ """Do we support the `RETURNING` clause in insert/update/delete?"""
+ return self.module.sqlite_version_info >= (3, 35, 0)
+
def check_database(self, db_conn, allow_outdated_version: bool = False):
if not allow_outdated_version:
version = self.module.sqlite_version_info
|