summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/federation_server.py37
-rw-r--r--synapse/storage/databases/main/event_federation.py66
-rw-r--r--synapse/storage/engines/_base.py6
-rw-r--r--synapse/storage/engines/postgres.py5
-rw-r--r--synapse/storage/engines/sqlite.py5
5 files changed, 92 insertions, 27 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 742d29291e..e93b7577fe 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -369,22 +369,21 @@ class FederationServer(FederationBase):
 
         async def process_pdu(pdu: EventBase) -> JsonDict:
             event_id = pdu.event_id
-            with pdu_process_time.time():
-                with nested_logging_context(event_id):
-                    try:
-                        await self._handle_received_pdu(origin, pdu)
-                        return {}
-                    except FederationError as e:
-                        logger.warning("Error handling PDU %s: %s", event_id, e)
-                        return {"error": str(e)}
-                    except Exception as e:
-                        f = failure.Failure()
-                        logger.error(
-                            "Failed to handle PDU %s",
-                            event_id,
-                            exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore
-                        )
-                        return {"error": str(e)}
+            with nested_logging_context(event_id):
+                try:
+                    await self._handle_received_pdu(origin, pdu)
+                    return {}
+                except FederationError as e:
+                    logger.warning("Error handling PDU %s: %s", event_id, e)
+                    return {"error": str(e)}
+                except Exception as e:
+                    f = failure.Failure()
+                    logger.error(
+                        "Failed to handle PDU %s",
+                        event_id,
+                        exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore
+                    )
+                    return {"error": str(e)}
 
         await concurrently_execute(
             process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
@@ -932,9 +931,13 @@ class FederationServer(FederationBase):
                         exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore
                     )
 
-                await self.store.remove_received_event_from_staging(
+                received_ts = await self.store.remove_received_event_from_staging(
                     origin, event.event_id
                 )
+                if received_ts is not None:
+                    pdu_process_time.observe(
+                        (self._clock.time_msec() - received_ts) / 1000
+                    )
 
             # We need to do this check outside the lock to avoid a race between
             # a new event being inserted by another instance and it attempting
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index f23f8c6ecf..f2d27ee893 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1075,16 +1075,62 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         self,
         origin: str,
         event_id: str,
-    ) -> None:
-        """Remove the given event from the staging area"""
-        await self.db_pool.simple_delete(
-            table="federation_inbound_events_staging",
-            keyvalues={
-                "origin": origin,
-                "event_id": event_id,
-            },
-            desc="remove_received_event_from_staging",
-        )
+    ) -> Optional[int]:
+        """Remove the given event from the staging area.
+
+        Returns:
+            The received_ts of the row that was deleted, if any.
+        """
+        if self.db_pool.engine.supports_returning:
+
+            def _remove_received_event_from_staging_txn(txn):
+                sql = """
+                    DELETE FROM federation_inbound_events_staging
+                    WHERE origin = ? AND event_id = ?
+                    RETURNING received_ts
+                """
+
+                txn.execute(sql, (origin, event_id))
+                return txn.fetchone()
+
+            row = await self.db_pool.runInteraction(
+                "remove_received_event_from_staging",
+                _remove_received_event_from_staging_txn,
+                db_autocommit=True,
+            )
+            if row is None:
+                return None
+
+            return row[0]
+
+        else:
+
+            def _remove_received_event_from_staging_txn(txn):
+                received_ts = self.db_pool.simple_select_one_onecol_txn(
+                    txn,
+                    table="federation_inbound_events_staging",
+                    keyvalues={
+                        "origin": origin,
+                        "event_id": event_id,
+                    },
+                    retcol="received_ts",
+                    allow_none=True,
+                )
+                self.db_pool.simple_delete_txn(
+                    txn,
+                    table="federation_inbound_events_staging",
+                    keyvalues={
+                        "origin": origin,
+                        "event_id": event_id,
+                    },
+                )
+
+                return received_ts
+
+            return await self.db_pool.runInteraction(
+                "remove_received_event_from_staging",
+                _remove_received_event_from_staging_txn,
+            )
 
     async def get_next_staged_event_id_for_room(
         self,
diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py
index 1882bfd9cf..20cd63c330 100644
--- a/synapse/storage/engines/_base.py
+++ b/synapse/storage/engines/_base.py
@@ -49,6 +49,12 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
         """
         ...
 
+    @property
+    @abc.abstractmethod
+    def supports_returning(self) -> bool:
+        """Do we support the `RETURNING` clause in insert/update/delete?"""
+        ...
+
     @abc.abstractmethod
     def check_database(
         self, db_conn: ConnectionType, allow_outdated_version: bool = False
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 21411c5fea..30f948a0f7 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -133,6 +133,11 @@ class PostgresEngine(BaseDatabaseEngine):
         """Do we support using `a = ANY(?)` and passing a list"""
         return True
 
+    @property
+    def supports_returning(self) -> bool:
+        """Do we support the `RETURNING` clause in insert/update/delete?"""
+        return True
+
     def is_deadlock(self, error):
         if isinstance(error, self.module.DatabaseError):
             # https://www.postgresql.org/docs/current/static/errcodes-appendix.html
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 5fe1b205e1..70d17d4f2c 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -60,6 +60,11 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]):
         """Do we support using `a = ANY(?)` and passing a list"""
         return False
 
+    @property
+    def supports_returning(self) -> bool:
+        """Do we support the `RETURNING` clause in insert/update/delete?"""
+        return self.module.sqlite_version_info >= (3, 35, 0)
+
     def check_database(self, db_conn, allow_outdated_version: bool = False):
         if not allow_outdated_version:
             version = self.module.sqlite_version_info