summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2023-11-20 10:09:33 +0000
committerErik Johnston <erik@matrix.org>2023-11-20 10:09:33 +0000
commit9c3b906b3aa13d83f3dc5eb080dc7e77d6d00511 (patch)
tree69de0f066bc3f30c6d70aedb08f8f8d612a44d38 /synapse
parentMerge remote-tracking branch 'origin/develop' into matrix-org-hotfixes (diff)
parentSpeed up how quickly we launch new tasks (#16660) (diff)
downloadsynapse-9c3b906b3aa13d83f3dc5eb080dc7e77d6d00511.tar.xz
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/device.py2
-rw-r--r--synapse/replication/tcp/streams/_base.py16
-rw-r--r--synapse/storage/databases/main/event_push_actions.py8
-rw-r--r--synapse/storage/databases/main/pusher.py4
-rw-r--r--synapse/storage/schema/main/delta/83/06_event_push_summary_room.sql17
-rw-r--r--synapse/util/task_scheduler.py4
6 files changed, 46 insertions, 5 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py

index 1af6d77545..98e6e42563 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py
@@ -383,7 +383,7 @@ class DeviceWorkerHandler: ) DEVICE_MSGS_DELETE_BATCH_LIMIT = 1000 - DEVICE_MSGS_DELETE_SLEEP_MS = 1000 + DEVICE_MSGS_DELETE_SLEEP_MS = 100 async def _delete_device_messages( self, diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index cc34dfb322..1f6402c2da 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py
@@ -305,6 +305,14 @@ class BackfillStream(Stream): # which means we need to negate it. return -self.store._backfill_id_gen.get_minimal_local_current_token() + def can_discard_position( + self, instance_name: str, prev_token: int, new_token: int + ) -> bool: + # Backfill stream can't go backwards, so we know we can ignore any + # positions where the tokens are from before the current token. + + return new_token <= self.current_token(instance_name) + class PresenceStream(_StreamFromIdGen): @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -519,6 +527,14 @@ class CachesStream(Stream): return self.store._cache_id_gen.get_minimal_local_current_token() return self.current_token(self.local_instance_name) + def can_discard_position( + self, instance_name: str, prev_token: int, new_token: int + ) -> bool: + # Caches streams can't go backwards, so we know we can ignore any + # positions where the tokens are from before the current token. + + return new_token <= self.current_token(instance_name) + class DeviceListsStream(_StreamFromIdGen): """Either a user has updated their devices or a remote server needs to be diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 39556481ff..dd8957680a 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py
@@ -311,6 +311,14 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas self._background_drop_null_thread_id_indexes, ) + # Add a room ID index to speed up room deletion + self.db_pool.updates.register_background_index_update( + "event_push_summary_index_room_id", + index_name="event_push_summary_index_room_id", + table="event_push_summary", + columns=["room_id"], + ) + async def _background_drop_null_thread_id_indexes( self, progress: JsonDict, batch_size: int ) -> int: diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index a6a1671bd6..8f36cfce12 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py
@@ -601,7 +601,7 @@ class PusherBackgroundUpdatesStore(SQLBaseStore): (last_pusher_id, batch_size), ) - rows = txn.fetchall() + rows = cast(List[Tuple[int, Optional[str], Optional[str]]], txn.fetchall()) if len(rows) == 0: return 0 @@ -617,7 +617,7 @@ class PusherBackgroundUpdatesStore(SQLBaseStore): txn=txn, table="pushers", key_names=("id",), - key_values=[row[0] for row in rows], + key_values=[(row[0],) for row in rows], value_names=("device_id", "access_token"), # If there was already a device_id on the pusher, we only want to clear # the access_token column, so we keep the existing device_id. Otherwise, diff --git a/synapse/storage/schema/main/delta/83/06_event_push_summary_room.sql b/synapse/storage/schema/main/delta/83/06_event_push_summary_room.sql new file mode 100644
index 0000000000..1aae1b7557 --- /dev/null +++ b/synapse/storage/schema/main/delta/83/06_event_push_summary_room.sql
@@ -0,0 +1,17 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (8306, 'event_push_summary_index_room_id', '{}'); diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py
index 29c561e555..b254d3f84c 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py
@@ -71,7 +71,7 @@ class TaskScheduler: # Time before a complete or failed task is deleted from the DB KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week # Maximum number of tasks that can run at the same time - MAX_CONCURRENT_RUNNING_TASKS = 10 + MAX_CONCURRENT_RUNNING_TASKS = 5 # Time from the last task update after which we will log a warning LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs @@ -377,7 +377,7 @@ class TaskScheduler: self._running_tasks.remove(task.id) # Try launch a new task since we've finished with this one. - self._clock.call_later(1, self._launch_scheduled_tasks) + self._clock.call_later(0.1, self._launch_scheduled_tasks) if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS: return