diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 8f70eff809..fecddb4144 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -211,18 +211,28 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
async def get_destination_retry_timings_batch(
self, destinations: StrCollection
) -> Mapping[str, Optional[DestinationRetryTimings]]:
- rows = await self.db_pool.simple_select_many_batch(
- table="destinations",
- iterable=destinations,
- column="destination",
- retcols=("destination", "failure_ts", "retry_last_ts", "retry_interval"),
- desc="get_destination_retry_timings_batch",
+ rows = cast(
+ List[Tuple[str, Optional[int], Optional[int], Optional[int]]],
+ await self.db_pool.simple_select_many_batch(
+ table="destinations",
+ iterable=destinations,
+ column="destination",
+ retcols=(
+ "destination",
+ "failure_ts",
+ "retry_last_ts",
+ "retry_interval",
+ ),
+ desc="get_destination_retry_timings_batch",
+ ),
)
return {
- row.pop("destination"): DestinationRetryTimings(**row)
- for row in rows
- if row["retry_last_ts"] and row["failure_ts"] and row["retry_interval"]
+ destination: DestinationRetryTimings(
+ failure_ts, retry_last_ts, retry_interval
+ )
+ for destination, failure_ts, retry_last_ts, retry_interval in rows
+ if retry_last_ts and failure_ts and retry_interval
}
async def set_destination_retry_timings(
@@ -468,7 +478,10 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
destination: Optional[str] = None,
order_by: str = DestinationSortOrder.DESTINATION.value,
direction: Direction = Direction.FORWARDS,
- ) -> Tuple[List[JsonDict], int]:
+ ) -> Tuple[
+ List[Tuple[str, Optional[int], Optional[int], Optional[int], Optional[int]]],
+ int,
+ ]:
"""Function to retrieve a paginated list of destinations.
This will return a json list of destinations and the
total number of destinations matching the filter criteria.
@@ -480,13 +493,23 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
order_by: the sort order of the returned list
direction: sort ascending or descending
Returns:
- A tuple of a list of mappings from destination to information
+ A tuple of a list of tuples of destination information:
+ * destination
+ * retry_last_ts
+ * retry_interval
+ * failure_ts
+ * last_successful_stream_ordering
and a count of total destinations.
"""
def get_destinations_paginate_txn(
txn: LoggingTransaction,
- ) -> Tuple[List[JsonDict], int]:
+ ) -> Tuple[
+ List[
+ Tuple[str, Optional[int], Optional[int], Optional[int], Optional[int]]
+ ],
+ int,
+ ]:
order_by_column = DestinationSortOrder(order_by).value
if direction == Direction.BACKWARDS:
@@ -513,7 +536,14 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
LIMIT ? OFFSET ?
"""
txn.execute(sql, args + [limit, start])
- destinations = self.db_pool.cursor_to_dict(txn)
+ destinations = cast(
+ List[
+ Tuple[
+ str, Optional[int], Optional[int], Optional[int], Optional[int]
+ ]
+ ],
+ txn.fetchall(),
+ )
return destinations, count
return await self.db_pool.runInteraction(
@@ -526,7 +556,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
start: int,
limit: int,
direction: Direction = Direction.FORWARDS,
- ) -> Tuple[List[JsonDict], int]:
+ ) -> Tuple[List[Tuple[str, int]], int]:
"""Function to retrieve a paginated list of destination's rooms.
This will return a json list of rooms and the
total number of rooms.
@@ -537,12 +567,14 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
limit: number of rows to retrieve
direction: sort ascending or descending by room_id
Returns:
- A tuple of a dict of rooms and a count of total rooms.
+ A tuple of a list of room tuples and a count of total rooms.
+
+ Each room tuple is room_id, stream_ordering.
"""
def get_destination_rooms_paginate_txn(
txn: LoggingTransaction,
- ) -> Tuple[List[JsonDict], int]:
+ ) -> Tuple[List[Tuple[str, int]], int]:
if direction == Direction.BACKWARDS:
order = "DESC"
else:
@@ -556,14 +588,17 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
txn.execute(sql, [destination])
count = cast(Tuple[int], txn.fetchone())[0]
- rooms = self.db_pool.simple_select_list_paginate_txn(
- txn=txn,
- table="destination_rooms",
- orderby="room_id",
- start=start,
- limit=limit,
- retcols=("room_id", "stream_ordering"),
- order_direction=order,
+ rooms = cast(
+ List[Tuple[str, int]],
+ self.db_pool.simple_select_list_paginate_txn(
+ txn=txn,
+ table="destination_rooms",
+ orderby="room_id",
+ start=start,
+ limit=limit,
+ retcols=("room_id", "stream_ordering"),
+ order_direction=order,
+ ),
)
return rooms, count
|