summary refs log tree commit diff
path: root/synapse/storage/databases/main/transactions.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/transactions.py')
-rw-r--r--synapse/storage/databases/main/transactions.py83
1 files changed, 59 insertions, 24 deletions
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py

index 8f70eff809..fecddb4144 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py
@@ -211,18 +211,28 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): async def get_destination_retry_timings_batch( self, destinations: StrCollection ) -> Mapping[str, Optional[DestinationRetryTimings]]: - rows = await self.db_pool.simple_select_many_batch( - table="destinations", - iterable=destinations, - column="destination", - retcols=("destination", "failure_ts", "retry_last_ts", "retry_interval"), - desc="get_destination_retry_timings_batch", + rows = cast( + List[Tuple[str, Optional[int], Optional[int], Optional[int]]], + await self.db_pool.simple_select_many_batch( + table="destinations", + iterable=destinations, + column="destination", + retcols=( + "destination", + "failure_ts", + "retry_last_ts", + "retry_interval", + ), + desc="get_destination_retry_timings_batch", + ), ) return { - row.pop("destination"): DestinationRetryTimings(**row) - for row in rows - if row["retry_last_ts"] and row["failure_ts"] and row["retry_interval"] + destination: DestinationRetryTimings( + failure_ts, retry_last_ts, retry_interval + ) + for destination, failure_ts, retry_last_ts, retry_interval in rows + if retry_last_ts and failure_ts and retry_interval } async def set_destination_retry_timings( @@ -468,7 +478,10 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): destination: Optional[str] = None, order_by: str = DestinationSortOrder.DESTINATION.value, direction: Direction = Direction.FORWARDS, - ) -> Tuple[List[JsonDict], int]: + ) -> Tuple[ + List[Tuple[str, Optional[int], Optional[int], Optional[int], Optional[int]]], + int, + ]: """Function to retrieve a paginated list of destinations. This will return a json list of destinations and the total number of destinations matching the filter criteria. @@ -480,13 +493,23 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): order_by: the sort order of the returned list direction: sort ascending or descending Returns: - A tuple of a list of mappings from destination to information + A tuple of a list of tuples of destination information: + * destination + * retry_last_ts + * retry_interval + * failure_ts + * last_successful_stream_ordering and a count of total destinations. """ def get_destinations_paginate_txn( txn: LoggingTransaction, - ) -> Tuple[List[JsonDict], int]: + ) -> Tuple[ + List[ + Tuple[str, Optional[int], Optional[int], Optional[int], Optional[int]] + ], + int, + ]: order_by_column = DestinationSortOrder(order_by).value if direction == Direction.BACKWARDS: @@ -513,7 +536,14 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): LIMIT ? OFFSET ? """ txn.execute(sql, args + [limit, start]) - destinations = self.db_pool.cursor_to_dict(txn) + destinations = cast( + List[ + Tuple[ + str, Optional[int], Optional[int], Optional[int], Optional[int] + ] + ], + txn.fetchall(), + ) return destinations, count return await self.db_pool.runInteraction( @@ -526,7 +556,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): start: int, limit: int, direction: Direction = Direction.FORWARDS, - ) -> Tuple[List[JsonDict], int]: + ) -> Tuple[List[Tuple[str, int]], int]: """Function to retrieve a paginated list of destination's rooms. This will return a json list of rooms and the total number of rooms. @@ -537,12 +567,14 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): limit: number of rows to retrieve direction: sort ascending or descending by room_id Returns: - A tuple of a dict of rooms and a count of total rooms. + A tuple of a list of room tuples and a count of total rooms. + + Each room tuple is room_id, stream_ordering. """ def get_destination_rooms_paginate_txn( txn: LoggingTransaction, - ) -> Tuple[List[JsonDict], int]: + ) -> Tuple[List[Tuple[str, int]], int]: if direction == Direction.BACKWARDS: order = "DESC" else: @@ -556,14 +588,17 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): txn.execute(sql, [destination]) count = cast(Tuple[int], txn.fetchone())[0] - rooms = self.db_pool.simple_select_list_paginate_txn( - txn=txn, - table="destination_rooms", - orderby="room_id", - start=start, - limit=limit, - retcols=("room_id", "stream_ordering"), - order_direction=order, + rooms = cast( + List[Tuple[str, int]], + self.db_pool.simple_select_list_paginate_txn( + txn=txn, + table="destination_rooms", + orderby="room_id", + start=start, + limit=limit, + retcols=("room_id", "stream_ordering"), + order_direction=order, + ), ) return rooms, count