diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index e284454b66..64b70a7b28 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -371,52 +371,30 @@ class ApplicationServiceTransactionWorkerStore(
device_list_summary=DeviceListUpdates(),
)
- async def set_appservice_last_pos(self, pos: int) -> None:
- def set_appservice_last_pos_txn(txn: LoggingTransaction) -> None:
- txn.execute(
- "UPDATE appservice_stream_position SET stream_ordering = ?", (pos,)
- )
+ async def get_appservice_last_pos(self) -> int:
+ """
+ Get the last stream ordering position for the appservice process.
+ """
- await self.db_pool.runInteraction(
- "set_appservice_last_pos", set_appservice_last_pos_txn
+ return await self.db_pool.simple_select_one_onecol(
+ table="appservice_stream_position",
+ retcol="stream_ordering",
+ keyvalues={},
+ desc="get_appservice_last_pos",
)
- async def get_new_events_for_appservice(
- self, current_id: int, limit: int
- ) -> Tuple[int, List[EventBase]]:
- """Get all new events for an appservice"""
-
- def get_new_events_for_appservice_txn(
- txn: LoggingTransaction,
- ) -> Tuple[int, List[str]]:
- sql = (
- "SELECT e.stream_ordering, e.event_id"
- " FROM events AS e"
- " WHERE"
- " (SELECT stream_ordering FROM appservice_stream_position)"
- " < e.stream_ordering"
- " AND e.stream_ordering <= ?"
- " ORDER BY e.stream_ordering ASC"
- " LIMIT ?"
- )
-
- txn.execute(sql, (current_id, limit))
- rows = txn.fetchall()
-
- upper_bound = current_id
- if len(rows) == limit:
- upper_bound = rows[-1][0]
-
- return upper_bound, [row[1] for row in rows]
+ async def set_appservice_last_pos(self, pos: int) -> None:
+ """
+ Set the last stream ordering position for the appservice process.
+ """
- upper_bound, event_ids = await self.db_pool.runInteraction(
- "get_new_events_for_appservice", get_new_events_for_appservice_txn
+ await self.db_pool.simple_update_one(
+ table="appservice_stream_position",
+ keyvalues={},
+ updatevalues={"stream_ordering": pos},
+ desc="set_appservice_last_pos",
)
- events = await self.get_events_as_list(event_ids, get_prev_content=True)
-
- return upper_bound, events
-
async def get_type_stream_id_for_appservice(
self, service: ApplicationService, type: str
) -> int:
|