summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-06-25 11:17:39 +0100
committerGitHub <noreply@github.com>2024-06-25 11:17:39 +0100
commitc89fea3fd1f47b43c4d500dd7d024b2f9b24d2ad (patch)
tree095015a5ba986bee1982e3c4f345a899fd712ac2
parentReintroduce "Reduce device lists replication traffic."" (#17361) (diff)
downloadsynapse-c89fea3fd1f47b43c4d500dd7d024b2f9b24d2ad.tar.xz
Limit amount of replication we send (#17358)
Fixes up #17333, where we failed to actually send less data (the
`DISTINCT` didn't work due to `stream_id` being different).

We fix this by making it so that every device list outbound poke for a
given user ID has the same stream ID. We can't change the query to only
return e.g. max stream ID as the receivers look up the destinations to
send to by doing `SELECT WHERE stream_id = ?`
-rw-r--r--changelog.d/17358.misc1
-rw-r--r--synapse/storage/databases/main/devices.py15
2 files changed, 8 insertions, 8 deletions
diff --git a/changelog.d/17358.misc b/changelog.d/17358.misc
new file mode 100644
index 0000000000..d3ef0b3777
--- /dev/null
+++ b/changelog.d/17358.misc
@@ -0,0 +1 @@
+Handle device lists notifications for large accounts more efficiently in worker mode.
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 5eeca6165d..59a035dd62 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -2131,7 +2131,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
         user_id: str,
         device_id: str,
         hosts: Collection[str],
-        stream_ids: List[int],
+        stream_id: int,
         context: Optional[Dict[str, str]],
     ) -> None:
         if self._device_list_federation_stream_cache:
@@ -2139,11 +2139,10 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
                 txn.call_after(
                     self._device_list_federation_stream_cache.entity_has_changed,
                     host,
-                    stream_ids[-1],
+                    stream_id,
                 )
 
         now = self._clock.time_msec()
-        stream_id_iterator = iter(stream_ids)
 
         encoded_context = json_encoder.encode(context)
         mark_sent = not self.hs.is_mine_id(user_id)
@@ -2152,7 +2151,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             (
                 destination,
                 self._instance_name,
-                next(stream_id_iterator),
+                stream_id,
                 user_id,
                 device_id,
                 mark_sent,
@@ -2337,22 +2336,22 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             return
 
         def add_device_list_outbound_pokes_txn(
-            txn: LoggingTransaction, stream_ids: List[int]
+            txn: LoggingTransaction, stream_id: int
         ) -> None:
             self._add_device_outbound_poke_to_stream_txn(
                 txn,
                 user_id=user_id,
                 device_id=device_id,
                 hosts=hosts,
-                stream_ids=stream_ids,
+                stream_id=stream_id,
                 context=context,
             )
 
-        async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids:
+        async with self._device_list_id_gen.get_next() as stream_id:
             return await self.db_pool.runInteraction(
                 "add_device_list_outbound_pokes",
                 add_device_list_outbound_pokes_txn,
-                stream_ids,
+                stream_id,
             )
 
     async def add_remote_device_list_to_pending(