diff options
author | Erik Johnston <erik@matrix.org> | 2021-06-30 12:07:16 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-06-30 12:07:16 +0100 |
commit | 329ef5c715d81b538e8b071de046c698a82eae10 (patch) | |
tree | 69b4853a47342f7a42c3ef493cc99be7ccbd044d /synapse/storage/databases | |
parent | Merge branch 'release-v1.37' into develop (diff) | |
download | synapse-329ef5c715d81b538e8b071de046c698a82eae10.tar.xz |
Fix the inbound PDU metric (#10279)
This broke in #10272
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r-- | synapse/storage/databases/main/event_federation.py | 66 |
1 files changed, 56 insertions, 10 deletions
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index f23f8c6ecf..f2d27ee893 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1075,16 +1075,62 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas self, origin: str, event_id: str, - ) -> None: - """Remove the given event from the staging area""" - await self.db_pool.simple_delete( - table="federation_inbound_events_staging", - keyvalues={ - "origin": origin, - "event_id": event_id, - }, - desc="remove_received_event_from_staging", - ) + ) -> Optional[int]: + """Remove the given event from the staging area. + + Returns: + The received_ts of the row that was deleted, if any. + """ + if self.db_pool.engine.supports_returning: + + def _remove_received_event_from_staging_txn(txn): + sql = """ + DELETE FROM federation_inbound_events_staging + WHERE origin = ? AND event_id = ? + RETURNING received_ts + """ + + txn.execute(sql, (origin, event_id)) + return txn.fetchone() + + row = await self.db_pool.runInteraction( + "remove_received_event_from_staging", + _remove_received_event_from_staging_txn, + db_autocommit=True, + ) + if row is None: + return None + + return row[0] + + else: + + def _remove_received_event_from_staging_txn(txn): + received_ts = self.db_pool.simple_select_one_onecol_txn( + txn, + table="federation_inbound_events_staging", + keyvalues={ + "origin": origin, + "event_id": event_id, + }, + retcol="received_ts", + allow_none=True, + ) + self.db_pool.simple_delete_txn( + txn, + table="federation_inbound_events_staging", + keyvalues={ + "origin": origin, + "event_id": event_id, + }, + ) + + return received_ts + + return await self.db_pool.runInteraction( + "remove_received_event_from_staging", + _remove_received_event_from_staging_txn, + ) async def get_next_staged_event_id_for_room( self, |