diff options
author | Nick Mills-Barrett <nick@beeper.com> | 2022-07-15 10:36:56 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-15 09:36:56 +0100 |
commit | 21eeacc99551febcddcef21db96a2bd82166fc7e (patch) | |
tree | 10cc25c6bd1c20ac1c206ee46e343aa4487cf5ce /synapse/storage/databases/main/appservice.py | |
parent | Rip out auth-event reconciliation code (#12943) (diff) | |
download | synapse-21eeacc99551febcddcef21db96a2bd82166fc7e.tar.xz |
Federation Sender & Appservice Pusher Stream Optimisations (#13251)
* Replace `get_new_events_for_appservice` with `get_all_new_events_stream` The functions were near identical and this brings the AS worker closer to the way federation senders work which can allow for multiple workers to handle AS traffic. * Pull received TS alongside events when processing the stream This avoids an extra query -per event- when both federation sender and appservice pusher process events.
Diffstat (limited to 'synapse/storage/databases/main/appservice.py')
-rw-r--r-- | synapse/storage/databases/main/appservice.py | 58 |
1 files changed, 18 insertions, 40 deletions
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index e284454b66..64b70a7b28 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -371,52 +371,30 @@ class ApplicationServiceTransactionWorkerStore( device_list_summary=DeviceListUpdates(), ) - async def set_appservice_last_pos(self, pos: int) -> None: - def set_appservice_last_pos_txn(txn: LoggingTransaction) -> None: - txn.execute( - "UPDATE appservice_stream_position SET stream_ordering = ?", (pos,) - ) + async def get_appservice_last_pos(self) -> int: + """ + Get the last stream ordering position for the appservice process. + """ - await self.db_pool.runInteraction( - "set_appservice_last_pos", set_appservice_last_pos_txn + return await self.db_pool.simple_select_one_onecol( + table="appservice_stream_position", + retcol="stream_ordering", + keyvalues={}, + desc="get_appservice_last_pos", ) - async def get_new_events_for_appservice( - self, current_id: int, limit: int - ) -> Tuple[int, List[EventBase]]: - """Get all new events for an appservice""" - - def get_new_events_for_appservice_txn( - txn: LoggingTransaction, - ) -> Tuple[int, List[str]]: - sql = ( - "SELECT e.stream_ordering, e.event_id" - " FROM events AS e" - " WHERE" - " (SELECT stream_ordering FROM appservice_stream_position)" - " < e.stream_ordering" - " AND e.stream_ordering <= ?" - " ORDER BY e.stream_ordering ASC" - " LIMIT ?" - ) - - txn.execute(sql, (current_id, limit)) - rows = txn.fetchall() - - upper_bound = current_id - if len(rows) == limit: - upper_bound = rows[-1][0] - - return upper_bound, [row[1] for row in rows] + async def set_appservice_last_pos(self, pos: int) -> None: + """ + Set the last stream ordering position for the appservice process. + """ - upper_bound, event_ids = await self.db_pool.runInteraction( - "get_new_events_for_appservice", get_new_events_for_appservice_txn + await self.db_pool.simple_update_one( + table="appservice_stream_position", + keyvalues={}, + updatevalues={"stream_ordering": pos}, + desc="set_appservice_last_pos", ) - events = await self.get_events_as_list(event_ids, get_prev_content=True) - - return upper_bound, events - async def get_type_stream_id_for_appservice( self, service: ApplicationService, type: str ) -> int: |