diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 1af6d77545..98e6e42563 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -383,7 +383,7 @@ class DeviceWorkerHandler:
)
DEVICE_MSGS_DELETE_BATCH_LIMIT = 1000
- DEVICE_MSGS_DELETE_SLEEP_MS = 1000
+ DEVICE_MSGS_DELETE_SLEEP_MS = 100
async def _delete_device_messages(
self,
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index cc34dfb322..1f6402c2da 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -305,6 +305,14 @@ class BackfillStream(Stream):
# which means we need to negate it.
return -self.store._backfill_id_gen.get_minimal_local_current_token()
+ def can_discard_position(
+ self, instance_name: str, prev_token: int, new_token: int
+ ) -> bool:
+ # Backfill stream can't go backwards, so we know we can ignore any
+ # positions where the tokens are from before the current token.
+
+ return new_token <= self.current_token(instance_name)
+
class PresenceStream(_StreamFromIdGen):
@attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -519,6 +527,14 @@ class CachesStream(Stream):
return self.store._cache_id_gen.get_minimal_local_current_token()
return self.current_token(self.local_instance_name)
+ def can_discard_position(
+ self, instance_name: str, prev_token: int, new_token: int
+ ) -> bool:
+ # Caches streams can't go backwards, so we know we can ignore any
+ # positions where the tokens are from before the current token.
+
+ return new_token <= self.current_token(instance_name)
+
class DeviceListsStream(_StreamFromIdGen):
"""Either a user has updated their devices or a remote server needs to be
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 39556481ff..dd8957680a 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -311,6 +311,14 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
self._background_drop_null_thread_id_indexes,
)
+ # Add a room ID index to speed up room deletion
+ self.db_pool.updates.register_background_index_update(
+ "event_push_summary_index_room_id",
+ index_name="event_push_summary_index_room_id",
+ table="event_push_summary",
+ columns=["room_id"],
+ )
+
async def _background_drop_null_thread_id_indexes(
self, progress: JsonDict, batch_size: int
) -> int:
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index a6a1671bd6..8f36cfce12 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -601,7 +601,7 @@ class PusherBackgroundUpdatesStore(SQLBaseStore):
(last_pusher_id, batch_size),
)
- rows = txn.fetchall()
+ rows = cast(List[Tuple[int, Optional[str], Optional[str]]], txn.fetchall())
if len(rows) == 0:
return 0
@@ -617,7 +617,7 @@ class PusherBackgroundUpdatesStore(SQLBaseStore):
txn=txn,
table="pushers",
key_names=("id",),
- key_values=[row[0] for row in rows],
+ key_values=[(row[0],) for row in rows],
value_names=("device_id", "access_token"),
# If there was already a device_id on the pusher, we only want to clear
# the access_token column, so we keep the existing device_id. Otherwise,
diff --git a/synapse/storage/schema/main/delta/83/06_event_push_summary_room.sql b/synapse/storage/schema/main/delta/83/06_event_push_summary_room.sql
new file mode 100644
index 0000000000..1aae1b7557
--- /dev/null
+++ b/synapse/storage/schema/main/delta/83/06_event_push_summary_room.sql
@@ -0,0 +1,17 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+ (8306, 'event_push_summary_index_room_id', '{}');
diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py
index 29c561e555..b254d3f84c 100644
--- a/synapse/util/task_scheduler.py
+++ b/synapse/util/task_scheduler.py
@@ -71,7 +71,7 @@ class TaskScheduler:
# Time before a complete or failed task is deleted from the DB
KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week
# Maximum number of tasks that can run at the same time
- MAX_CONCURRENT_RUNNING_TASKS = 10
+ MAX_CONCURRENT_RUNNING_TASKS = 5
# Time from the last task update after which we will log a warning
LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs
@@ -377,7 +377,7 @@ class TaskScheduler:
self._running_tasks.remove(task.id)
# Try launch a new task since we've finished with this one.
- self._clock.call_later(1, self._launch_scheduled_tasks)
+ self._clock.call_later(0.1, self._launch_scheduled_tasks)
if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
return
|