diff options
author | Jonathan de Jong <jonathan@automatia.nl> | 2021-04-14 18:19:02 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-04-14 17:19:02 +0100 |
commit | 05e8c70c059f8ebb066e029bc3aa3e0cefef1019 (patch) | |
tree | be614725637438d22494c9b48adf277fecc89445 /synapse/storage/databases/main/transactions.py | |
parent | Move some replication processing out of generic_worker (#9796) (diff) | |
download | synapse-05e8c70c059f8ebb066e029bc3aa3e0cefef1019.tar.xz |
Experimental Federation Speedup (#9702)
This basically speeds up federation by "squeezing" each individual dual database call (to destinations and destination_rooms), which previously happened per every event, into one call for an entire batch (100 max). Signed-off-by: Jonathan de Jong <jonathan@automatia.nl>
Diffstat (limited to 'synapse/storage/databases/main/transactions.py')
-rw-r--r-- | synapse/storage/databases/main/transactions.py | 28 |
1 files changed, 12 insertions, 16 deletions
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 82335e7a9d..b28ca61f80 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -14,7 +14,7 @@ import logging from collections import namedtuple -from typing import Iterable, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple from canonicaljson import encode_canonical_json @@ -295,37 +295,33 @@ class TransactionStore(TransactionWorkerStore): }, ) - async def store_destination_rooms_entries( - self, - destinations: Iterable[str], - room_id: str, - stream_ordering: int, - ) -> None: + async def bulk_store_destination_rooms_entries( + self, room_and_destination_to_ordering: Dict[Tuple[str, str], int] + ): """ - Updates or creates `destination_rooms` entries in batch for a single event. + Updates or creates `destination_rooms` entries for a number of events. Args: - destinations: list of destinations - room_id: the room_id of the event - stream_ordering: the stream_ordering of the event + room_and_destination_to_ordering: A mapping of (room, destination) -> stream_id """ await self.db_pool.simple_upsert_many( table="destinations", key_names=("destination",), - key_values=[(d,) for d in destinations], + key_values={(d,) for _, d in room_and_destination_to_ordering.keys()}, value_names=[], value_values=[], desc="store_destination_rooms_entries_dests", ) - rows = [(destination, room_id) for destination in destinations] await self.db_pool.simple_upsert_many( table="destination_rooms", - key_names=("destination", "room_id"), - key_values=rows, + key_names=("room_id", "destination"), + key_values=list(room_and_destination_to_ordering.keys()), value_names=["stream_ordering"], - value_values=[(stream_ordering,)] * len(rows), + value_values=[ + (stream_id,) for stream_id in room_and_destination_to_ordering.values() + ], desc="store_destination_rooms_entries_rooms", ) |