summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
authorJonathan de Jong <jonathan@automatia.nl>2021-04-14 18:19:02 +0200
committerGitHub <noreply@github.com>2021-04-14 17:19:02 +0100
commit05e8c70c059f8ebb066e029bc3aa3e0cefef1019 (patch)
treebe614725637438d22494c9b48adf277fecc89445 /synapse/storage/databases/main
parentMove some replication processing out of generic_worker (#9796) (diff)
downloadsynapse-05e8c70c059f8ebb066e029bc3aa3e0cefef1019.tar.xz
Experimental Federation Speedup (#9702)
This basically speeds up federation by "squeezing" each individual dual database call (to destinations and destination_rooms), which previously happened per every event, into one call for an entire batch (100 max).

Signed-off-by: Jonathan de Jong <jonathan@automatia.nl>
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/transactions.py28
1 files changed, 12 insertions, 16 deletions
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 82335e7a9d..b28ca61f80 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -14,7 +14,7 @@
 
 import logging
 from collections import namedtuple
-from typing import Iterable, List, Optional, Tuple
+from typing import Dict, List, Optional, Tuple
 
 from canonicaljson import encode_canonical_json
 
@@ -295,37 +295,33 @@ class TransactionStore(TransactionWorkerStore):
                 },
             )
 
-    async def store_destination_rooms_entries(
-        self,
-        destinations: Iterable[str],
-        room_id: str,
-        stream_ordering: int,
-    ) -> None:
+    async def bulk_store_destination_rooms_entries(
+        self, room_and_destination_to_ordering: Dict[Tuple[str, str], int]
+    ):
         """
-        Updates or creates `destination_rooms` entries in batch for a single event.
+        Updates or creates `destination_rooms` entries for a number of events.
 
         Args:
-            destinations: list of destinations
-            room_id: the room_id of the event
-            stream_ordering: the stream_ordering of the event
+            room_and_destination_to_ordering: A mapping of (room, destination) -> stream_id
         """
 
         await self.db_pool.simple_upsert_many(
             table="destinations",
             key_names=("destination",),
-            key_values=[(d,) for d in destinations],
+            key_values={(d,) for _, d in room_and_destination_to_ordering.keys()},
             value_names=[],
             value_values=[],
             desc="store_destination_rooms_entries_dests",
         )
 
-        rows = [(destination, room_id) for destination in destinations]
         await self.db_pool.simple_upsert_many(
             table="destination_rooms",
-            key_names=("destination", "room_id"),
-            key_values=rows,
+            key_names=("room_id", "destination"),
+            key_values=list(room_and_destination_to_ordering.keys()),
             value_names=["stream_ordering"],
-            value_values=[(stream_ordering,)] * len(rows),
+            value_values=[
+                (stream_id,) for stream_id in room_and_destination_to_ordering.values()
+            ],
             desc="store_destination_rooms_entries_rooms",
         )