From 329ef5c715d81b538e8b071de046c698a82eae10 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 Jun 2021 12:07:16 +0100 Subject: Fix the inbound PDU metric (#10279) This broke in #10272 --- synapse/federation/federation_server.py | 37 ++++++------ synapse/storage/databases/main/event_federation.py | 66 ++++++++++++++++++---- synapse/storage/engines/_base.py | 6 ++ synapse/storage/engines/postgres.py | 5 ++ synapse/storage/engines/sqlite.py | 5 ++ 5 files changed, 92 insertions(+), 27 deletions(-) (limited to 'synapse') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 742d29291e..e93b7577fe 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -369,22 +369,21 @@ class FederationServer(FederationBase): async def process_pdu(pdu: EventBase) -> JsonDict: event_id = pdu.event_id - with pdu_process_time.time(): - with nested_logging_context(event_id): - try: - await self._handle_received_pdu(origin, pdu) - return {} - except FederationError as e: - logger.warning("Error handling PDU %s: %s", event_id, e) - return {"error": str(e)} - except Exception as e: - f = failure.Failure() - logger.error( - "Failed to handle PDU %s", - event_id, - exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore - ) - return {"error": str(e)} + with nested_logging_context(event_id): + try: + await self._handle_received_pdu(origin, pdu) + return {} + except FederationError as e: + logger.warning("Error handling PDU %s: %s", event_id, e) + return {"error": str(e)} + except Exception as e: + f = failure.Failure() + logger.error( + "Failed to handle PDU %s", + event_id, + exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore + ) + return {"error": str(e)} await concurrently_execute( process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT @@ -932,9 +931,13 @@ class FederationServer(FederationBase): exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore ) - await self.store.remove_received_event_from_staging( + received_ts = await self.store.remove_received_event_from_staging( origin, event.event_id ) + if received_ts is not None: + pdu_process_time.observe( + (self._clock.time_msec() - received_ts) / 1000 + ) # We need to do this check outside the lock to avoid a race between # a new event being inserted by another instance and it attempting diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index f23f8c6ecf..f2d27ee893 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1075,16 +1075,62 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas self, origin: str, event_id: str, - ) -> None: - """Remove the given event from the staging area""" - await self.db_pool.simple_delete( - table="federation_inbound_events_staging", - keyvalues={ - "origin": origin, - "event_id": event_id, - }, - desc="remove_received_event_from_staging", - ) + ) -> Optional[int]: + """Remove the given event from the staging area. + + Returns: + The received_ts of the row that was deleted, if any. + """ + if self.db_pool.engine.supports_returning: + + def _remove_received_event_from_staging_txn(txn): + sql = """ + DELETE FROM federation_inbound_events_staging + WHERE origin = ? AND event_id = ? + RETURNING received_ts + """ + + txn.execute(sql, (origin, event_id)) + return txn.fetchone() + + row = await self.db_pool.runInteraction( + "remove_received_event_from_staging", + _remove_received_event_from_staging_txn, + db_autocommit=True, + ) + if row is None: + return None + + return row[0] + + else: + + def _remove_received_event_from_staging_txn(txn): + received_ts = self.db_pool.simple_select_one_onecol_txn( + txn, + table="federation_inbound_events_staging", + keyvalues={ + "origin": origin, + "event_id": event_id, + }, + retcol="received_ts", + allow_none=True, + ) + self.db_pool.simple_delete_txn( + txn, + table="federation_inbound_events_staging", + keyvalues={ + "origin": origin, + "event_id": event_id, + }, + ) + + return received_ts + + return await self.db_pool.runInteraction( + "remove_received_event_from_staging", + _remove_received_event_from_staging_txn, + ) async def get_next_staged_event_id_for_room( self, diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index 1882bfd9cf..20cd63c330 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -49,6 +49,12 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta): """ ... + @property + @abc.abstractmethod + def supports_returning(self) -> bool: + """Do we support the `RETURNING` clause in insert/update/delete?""" + ... + @abc.abstractmethod def check_database( self, db_conn: ConnectionType, allow_outdated_version: bool = False diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 21411c5fea..30f948a0f7 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -133,6 +133,11 @@ class PostgresEngine(BaseDatabaseEngine): """Do we support using `a = ANY(?)` and passing a list""" return True + @property + def supports_returning(self) -> bool: + """Do we support the `RETURNING` clause in insert/update/delete?""" + return True + def is_deadlock(self, error): if isinstance(error, self.module.DatabaseError): # https://www.postgresql.org/docs/current/static/errcodes-appendix.html diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 5fe1b205e1..70d17d4f2c 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -60,6 +60,11 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]): """Do we support using `a = ANY(?)` and passing a list""" return False + @property + def supports_returning(self) -> bool: + """Do we support the `RETURNING` clause in insert/update/delete?""" + return self.module.sqlite_version_info >= (3, 35, 0) + def check_database(self, db_conn, allow_outdated_version: bool = False): if not allow_outdated_version: version = self.module.sqlite_version_info -- cgit 1.4.1