summary refs log tree commit diff
path: root/synapse/handlers/device.py
diff options
context:
space:
mode:
authorSean Quah <8349537+squahtx@users.noreply.github.com>2022-11-22 16:46:52 +0000
committerGitHub <noreply@github.com>2022-11-22 16:46:52 +0000
commit9cae44f49e6bf4f6b8a20ab11a65da417bb1565f (patch)
treed24ffa5f61fb9b6189d60a8fc2e4d62dd65631ed /synapse/handlers/device.py
parentParallelize calls to fetch bundled aggregations. (#14510) (diff)
downloadsynapse-9cae44f49e6bf4f6b8a20ab11a65da417bb1565f.tar.xz
Track unconverted device list outbound pokes using a position instead (#14516)
When a local device list change is added to
`device_lists_changes_in_room`, the `converted_to_destinations` flag is
set to `FALSE` and the `_handle_new_device_update_async` background
process is started. This background process looks for unconverted rows
in `device_lists_changes_in_room`, copies them to
`device_lists_outbound_pokes` and updates the flag.

To update the `converted_to_destinations` flag, the database performs a
`DELETE` and `INSERT` internally, which fragments the table. To avoid
this, track unconverted rows using a `(stream ID, room ID)` position
instead of the flag.

From now on, the `converted_to_destinations` column indicates rows that
need converting to outbound pokes, but does not indicate whether the
conversion has already taken place.

Closes #14037.

Signed-off-by: Sean Quah <seanq@matrix.org>
Diffstat (limited to 'synapse/handlers/device.py')
-rw-r--r--synapse/handlers/device.py30
1 files changed, 27 insertions, 3 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index c597639a7f..da3ddafeae 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -682,13 +682,33 @@ class DeviceHandler(DeviceWorkerHandler):
         hosts_already_sent_to: Set[str] = set()
 
         try:
+            stream_id, room_id = await self.store.get_device_change_last_converted_pos()
+
             while True:
                 self._handle_new_device_update_new_data = False
-                rows = await self.store.get_uncoverted_outbound_room_pokes()
+                max_stream_id = self.store.get_device_stream_token()
+                rows = await self.store.get_uncoverted_outbound_room_pokes(
+                    stream_id, room_id
+                )
                 if not rows:
                     # If the DB returned nothing then there is nothing left to
                     # do, *unless* a new device list update happened during the
                     # DB query.
+
+                    # Advance `(stream_id, room_id)`.
+                    # `max_stream_id` comes from *before* the query for unconverted
+                    # rows, which means that any unconverted rows must have a larger
+                    # stream ID.
+                    if max_stream_id > stream_id:
+                        stream_id, room_id = max_stream_id, ""
+                        await self.store.set_device_change_last_converted_pos(
+                            stream_id, room_id
+                        )
+                    else:
+                        assert max_stream_id == stream_id
+                        # Avoid moving `room_id` backwards.
+                        pass
+
                     if self._handle_new_device_update_new_data:
                         continue
                     else:
@@ -718,7 +738,6 @@ class DeviceHandler(DeviceWorkerHandler):
                         user_id=user_id,
                         device_id=device_id,
                         room_id=room_id,
-                        stream_id=stream_id,
                         hosts=hosts,
                         context=opentracing_context,
                     )
@@ -752,6 +771,12 @@ class DeviceHandler(DeviceWorkerHandler):
                     hosts_already_sent_to.update(hosts)
                     current_stream_id = stream_id
 
+                # Advance `(stream_id, room_id)`.
+                _, _, room_id, stream_id, _ = rows[-1]
+                await self.store.set_device_change_last_converted_pos(
+                    stream_id, room_id
+                )
+
         finally:
             self._handle_new_device_update_is_processing = False
 
@@ -834,7 +859,6 @@ class DeviceHandler(DeviceWorkerHandler):
                 user_id=user_id,
                 device_id=device_id,
                 room_id=room_id,
-                stream_id=None,
                 hosts=potentially_changed_hosts,
                 context=None,
             )