From bf69b574226192b946aeaf7a72a39457a3792e41 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 2 Nov 2023 10:00:18 -0400 Subject: Fix "'int' object is not iterable" error in set_device_id_for_pushers background update (#16594) A regression from removing the cursor_to_dict call, adds back the wrapping into a tuple. --- synapse/storage/databases/main/pusher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index a6a1671bd6..8f36cfce12 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -601,7 +601,7 @@ class PusherBackgroundUpdatesStore(SQLBaseStore): (last_pusher_id, batch_size), ) - rows = txn.fetchall() + rows = cast(List[Tuple[int, Optional[str], Optional[str]]], txn.fetchall()) if len(rows) == 0: return 0 @@ -617,7 +617,7 @@ class PusherBackgroundUpdatesStore(SQLBaseStore): txn=txn, table="pushers", key_names=("id",), - key_values=[row[0] for row in rows], + key_values=[(row[0],) for row in rows], value_names=("device_id", "access_token"), # If there was already a device_id on the pusher, we only want to clear # the access_token column, so we keep the existing device_id. Otherwise, -- cgit 1.5.1 From 700c8a0de5df7815949b8c792e8e471a91bb8721 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 17 Nov 2023 13:14:26 +0000 Subject: Reduce task concurrency (#16656) --- changelog.d/16656.misc | 1 + synapse/handlers/device.py | 2 +- synapse/util/task_scheduler.py | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) create mode 100644 changelog.d/16656.misc (limited to 'synapse') diff --git a/changelog.d/16656.misc b/changelog.d/16656.misc new file mode 100644 index 0000000000..6763685b9d --- /dev/null +++ b/changelog.d/16656.misc @@ -0,0 +1 @@ +Reduce max concurrency of background tasks, reducing potential max DB load. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 1af6d77545..98e6e42563 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -383,7 +383,7 @@ class DeviceWorkerHandler: ) DEVICE_MSGS_DELETE_BATCH_LIMIT = 1000 - DEVICE_MSGS_DELETE_SLEEP_MS = 1000 + DEVICE_MSGS_DELETE_SLEEP_MS = 100 async def _delete_device_messages( self, diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 29c561e555..8c2df233d3 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -71,7 +71,7 @@ class TaskScheduler: # Time before a complete or failed task is deleted from the DB KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week # Maximum number of tasks that can run at the same time - MAX_CONCURRENT_RUNNING_TASKS = 10 + MAX_CONCURRENT_RUNNING_TASKS = 5 # Time from the last task update after which we will log a warning LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs -- cgit 1.5.1 From 6fec2d035f5584335af7c1b115ca48a13a8da5fa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 17 Nov 2023 14:14:29 +0000 Subject: Also discard 'caches' and 'backfill' stream POSITIONS (#16655) Follow on from #16640 --- changelog.d/16655.misc | 1 + synapse/replication/tcp/streams/_base.py | 16 ++++++++++++++++ 2 files changed, 17 insertions(+) create mode 100644 changelog.d/16655.misc (limited to 'synapse') diff --git a/changelog.d/16655.misc b/changelog.d/16655.misc new file mode 100644 index 0000000000..3b1cc2185d --- /dev/null +++ b/changelog.d/16655.misc @@ -0,0 +1 @@ +More efficiently handle no-op `POSITION` over replication. diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index cc34dfb322..1f6402c2da 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -305,6 +305,14 @@ class BackfillStream(Stream): # which means we need to negate it. return -self.store._backfill_id_gen.get_minimal_local_current_token() + def can_discard_position( + self, instance_name: str, prev_token: int, new_token: int + ) -> bool: + # Backfill stream can't go backwards, so we know we can ignore any + # positions where the tokens are from before the current token. + + return new_token <= self.current_token(instance_name) + class PresenceStream(_StreamFromIdGen): @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -519,6 +527,14 @@ class CachesStream(Stream): return self.store._cache_id_gen.get_minimal_local_current_token() return self.current_token(self.local_instance_name) + def can_discard_position( + self, instance_name: str, prev_token: int, new_token: int + ) -> bool: + # Caches streams can't go backwards, so we know we can ignore any + # positions where the tokens are from before the current token. + + return new_token <= self.current_token(instance_name) + class DeviceListsStream(_StreamFromIdGen): """Either a user has updated their devices or a remote server needs to be -- cgit 1.5.1 From 9c02ef21e05a54b09a49ab8200160b578cdb8920 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 17 Nov 2023 14:15:44 +0000 Subject: Speed up purge room by adding index (#16657) What it says on the tin --- changelog.d/16657.misc | 1 + synapse/storage/databases/main/event_push_actions.py | 8 ++++++++ .../schema/main/delta/83/06_event_push_summary_room.sql | 17 +++++++++++++++++ 3 files changed, 26 insertions(+) create mode 100644 changelog.d/16657.misc create mode 100644 synapse/storage/schema/main/delta/83/06_event_push_summary_room.sql (limited to 'synapse') diff --git a/changelog.d/16657.misc b/changelog.d/16657.misc new file mode 100644 index 0000000000..c444aa15e4 --- /dev/null +++ b/changelog.d/16657.misc @@ -0,0 +1 @@ +Speed up purge room by adding an index to `event_push_summary`. diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 39556481ff..dd8957680a 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -311,6 +311,14 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas self._background_drop_null_thread_id_indexes, ) + # Add a room ID index to speed up room deletion + self.db_pool.updates.register_background_index_update( + "event_push_summary_index_room_id", + index_name="event_push_summary_index_room_id", + table="event_push_summary", + columns=["room_id"], + ) + async def _background_drop_null_thread_id_indexes( self, progress: JsonDict, batch_size: int ) -> int: diff --git a/synapse/storage/schema/main/delta/83/06_event_push_summary_room.sql b/synapse/storage/schema/main/delta/83/06_event_push_summary_room.sql new file mode 100644 index 0000000000..1aae1b7557 --- /dev/null +++ b/synapse/storage/schema/main/delta/83/06_event_push_summary_room.sql @@ -0,0 +1,17 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (8306, 'event_push_summary_index_room_id', '{}'); -- cgit 1.5.1 From 6088303efb0f1498401718797941542089c2d432 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 17 Nov 2023 16:36:02 +0000 Subject: Speed up how quickly we launch new tasks (#16660) Now that we're reducing concurrency (#16656), this is more important. --- changelog.d/16660.misc | 1 + synapse/util/task_scheduler.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelog.d/16660.misc (limited to 'synapse') diff --git a/changelog.d/16660.misc b/changelog.d/16660.misc new file mode 100644 index 0000000000..6763685b9d --- /dev/null +++ b/changelog.d/16660.misc @@ -0,0 +1 @@ +Reduce max concurrency of background tasks, reducing potential max DB load. diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 8c2df233d3..b254d3f84c 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -377,7 +377,7 @@ class TaskScheduler: self._running_tasks.remove(task.id) # Try launch a new task since we've finished with this one. - self._clock.call_later(1, self._launch_scheduled_tasks) + self._clock.call_later(0.1, self._launch_scheduled_tasks) if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS: return -- cgit 1.5.1