diff --git a/changelog.d/16133.bugfix b/changelog.d/16133.bugfix
new file mode 100644
index 0000000000..ed8830692f
--- /dev/null
+++ b/changelog.d/16133.bugfix
@@ -0,0 +1 @@
+Fix a rare race that could block new events from being sent for up to two minutes. Introduced in v1.90.0.
diff --git a/changelog.d/16150.misc b/changelog.d/16150.misc
index 97861282fd..41059378c5 100644
--- a/changelog.d/16150.misc
+++ b/changelog.d/16150.misc
@@ -1 +1 @@
-Clean-up calling `setup_background_tasks` in unit tests.
+Improve presence tests.
diff --git a/changelog.d/16151.misc b/changelog.d/16151.misc
new file mode 100644
index 0000000000..41059378c5
--- /dev/null
+++ b/changelog.d/16151.misc
@@ -0,0 +1 @@
+Improve presence tests.
diff --git a/changelog.d/16160.misc b/changelog.d/16160.misc
new file mode 100644
index 0000000000..78803b7bcd
--- /dev/null
+++ b/changelog.d/16160.misc
@@ -0,0 +1 @@
+Reduce DB contention on worker locks.
diff --git a/changelog.d/16164.bugfix b/changelog.d/16164.bugfix
new file mode 100644
index 0000000000..17284297cf
--- /dev/null
+++ b/changelog.d/16164.bugfix
@@ -0,0 +1 @@
+Fix a bug introduced in 1.87 where synapse would send an excessive amount of federation requests to servers which have been offline for a long time. Contributed by Nico.
diff --git a/changelog.d/16165.misc b/changelog.d/16165.misc
new file mode 100644
index 0000000000..b4d514d249
--- /dev/null
+++ b/changelog.d/16165.misc
@@ -0,0 +1 @@
+Task scheduler: mark task as active if we are scheduling as soon as possible.
diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py
index 1fb3180c3c..9ab120eea9 100644
--- a/synapse/storage/databases/main/task_scheduler.py
+++ b/synapse/storage/databases/main/task_scheduler.py
@@ -92,7 +92,7 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
if clauses:
sql = sql + " WHERE " + " AND ".join(clauses)
- sql = sql + "ORDER BY timestamp"
+ sql = sql + " ORDER BY timestamp"
txn.execute(sql, args)
return self.db_pool.cursor_to_dict(txn)
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 48e4b0ba3c..860bbf7c0f 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -242,8 +242,6 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
) -> None:
# Upsert retry time interval if retry_interval is zero (i.e. we're
# resetting it) or greater than the existing retry interval.
- # We also upsert when the new retry interval is the same as the existing one,
- # since it will be the case when `destination_max_retry_interval` is reached.
#
# WARNING: This is executed in autocommit, so we shouldn't add any more
# SQL calls in here (without being very careful).
@@ -258,8 +256,10 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
retry_interval = EXCLUDED.retry_interval
WHERE
EXCLUDED.retry_interval = 0
+ OR EXCLUDED.retry_last_ts = 0
OR destinations.retry_interval IS NULL
- OR destinations.retry_interval <= EXCLUDED.retry_interval
+ OR destinations.retry_interval < EXCLUDED.retry_interval
+ OR destinations.retry_last_ts < EXCLUDED.retry_last_ts
"""
txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))
diff --git a/synapse/storage/schema/main/delta/80/02_read_write_locks_deadlock.sql.postgres b/synapse/storage/schema/main/delta/80/02_read_write_locks_deadlock.sql.postgres
new file mode 100644
index 0000000000..401c42e18a
--- /dev/null
+++ b/synapse/storage/schema/main/delta/80/02_read_write_locks_deadlock.sql.postgres
@@ -0,0 +1,37 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- To avoid the possibility of a deadlock, lock the
+-- `worker_read_write_locks_mode` table so that we serialize inserts/deletes
+-- for a specific lock name/key.
+
+CREATE OR REPLACE FUNCTION delete_read_write_lock_parent_before() RETURNS trigger AS $$
+BEGIN
+ -- `PERFORM` is a `SELECT` which discards the rows.
+ PERFORM * FROM worker_read_write_locks_mode
+ WHERE
+ lock_name = OLD.lock_name
+ AND lock_key = OLD.lock_key
+ FOR UPDATE;
+
+ RETURN OLD;
+END
+$$
+LANGUAGE plpgsql;
+
+DROP TRIGGER IF EXISTS delete_read_write_lock_parent_before_trigger ON worker_read_write_locks;
+CREATE TRIGGER delete_read_write_lock_parent_before_trigger BEFORE DELETE ON worker_read_write_locks
+ FOR EACH ROW
+ EXECUTE PROCEDURE delete_read_write_lock_parent_before();
diff --git a/synapse/storage/schema/main/delta/80/03_read_write_locks_triggers.sql.postgres b/synapse/storage/schema/main/delta/80/03_read_write_locks_triggers.sql.postgres
new file mode 100644
index 0000000000..31de5bfa18
--- /dev/null
+++ b/synapse/storage/schema/main/delta/80/03_read_write_locks_triggers.sql.postgres
@@ -0,0 +1,37 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Fix up the triggers that were in `78/04_read_write_locks_triggers.sql`
+
+-- Reduce the number of writes we do on this table.
+--
+-- Note: that we still want to lock the row here (i.e. still do a `DO UPDATE
+-- SET`) so that we serialize updates.
+CREATE OR REPLACE FUNCTION upsert_read_write_lock_parent() RETURNS trigger AS $$
+BEGIN
+ INSERT INTO worker_read_write_locks_mode (lock_name, lock_key, write_lock, token)
+ VALUES (NEW.lock_name, NEW.lock_key, NEW.write_lock, NEW.token)
+ ON CONFLICT (lock_name, lock_key)
+ DO UPDATE SET write_lock = NEW.write_lock
+ WHERE OLD.write_lock != NEW.write_lock;
+ RETURN NEW;
+END
+$$
+LANGUAGE plpgsql;
+
+DROP TRIGGER IF EXISTS upsert_read_write_lock_parent_trigger ON worker_read_write_locks;
+CREATE TRIGGER upsert_read_write_lock_parent_trigger BEFORE INSERT ON worker_read_write_locks
+ FOR EACH ROW
+ EXECUTE PROCEDURE upsert_read_write_lock_parent();
diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py
index 773a8327f6..4aea64b338 100644
--- a/synapse/util/task_scheduler.py
+++ b/synapse/util/task_scheduler.py
@@ -154,13 +154,15 @@ class TaskScheduler:
f"No function associated with action {action} of the scheduled task"
)
+ status = TaskStatus.SCHEDULED
if timestamp is None or timestamp < self._clock.time_msec():
timestamp = self._clock.time_msec()
+ status = TaskStatus.ACTIVE
task = ScheduledTask(
random_string(16),
action,
- TaskStatus.SCHEDULED,
+ status,
timestamp,
resource_id,
params,
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index 1f483eb75a..1aebcc16ad 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -38,6 +38,7 @@ from synapse.handlers.presence import (
from synapse.rest import admin
from synapse.rest.client import room
from synapse.server import HomeServer
+from synapse.storage.database import LoggingDatabaseConnection
from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util import Clock
@@ -513,6 +514,121 @@ class PresenceTimeoutTestCase(unittest.TestCase):
self.assertEqual(state, new_state)
+class PresenceHandlerInitTestCase(unittest.HomeserverTestCase):
+ def default_config(self) -> JsonDict:
+ config = super().default_config()
+ # Disable background tasks on this worker so that the PresenceHandler isn't
+ # loaded until we request it.
+ config["run_background_tasks_on"] = "other"
+ return config
+
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+ self.user_id = f"@test:{self.hs.config.server.server_name}"
+
+ # Move the reactor to the initial time.
+ self.reactor.advance(1000)
+ now = self.clock.time_msec()
+
+ main_store = hs.get_datastores().main
+ self.get_success(
+ main_store.update_presence(
+ [
+ UserPresenceState(
+ user_id=self.user_id,
+ state=PresenceState.ONLINE,
+ last_active_ts=now,
+ last_federation_update_ts=now,
+ last_user_sync_ts=now,
+ status_msg=None,
+ currently_active=True,
+ )
+ ]
+ )
+ )
+
+ # Regenerate the preloaded presence information on PresenceStore.
+ def refill_presence(db_conn: LoggingDatabaseConnection) -> None:
+ main_store._presence_on_startup = main_store._get_active_presence(db_conn)
+
+ self.get_success(main_store.db_pool.runWithConnection(refill_presence))
+
+ def test_restored_presence_idles(self) -> None:
+ """The presence state restored from the database should not persist forever."""
+
+ # Get the handler (which kicks off a bunch of timers).
+ presence_handler = self.hs.get_presence_handler()
+
+ # Assert the user is online.
+ state = self.get_success(
+ presence_handler.get_state(UserID.from_string(self.user_id))
+ )
+ self.assertEqual(state.state, PresenceState.ONLINE)
+
+ # Advance such that the user should timeout.
+ self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000)
+ self.reactor.pump([5])
+
+ # Check that the user is now offline.
+ state = self.get_success(
+ presence_handler.get_state(UserID.from_string(self.user_id))
+ )
+ self.assertEqual(state.state, PresenceState.OFFLINE)
+
+ @parameterized.expand(
+ [
+ (PresenceState.BUSY, PresenceState.BUSY),
+ (PresenceState.ONLINE, PresenceState.ONLINE),
+ (PresenceState.UNAVAILABLE, PresenceState.UNAVAILABLE),
+ # Offline syncs don't update the state.
+ (PresenceState.OFFLINE, PresenceState.ONLINE),
+ ]
+ )
+ @unittest.override_config({"experimental_features": {"msc3026_enabled": True}})
+ def test_restored_presence_online_after_sync(
+ self, sync_state: str, expected_state: str
+ ) -> None:
+ """
+ The presence state restored from the database should be overridden with sync after a timeout.
+
+ Args:
+ sync_state: The presence state of the new sync.
+ expected_state: The expected presence right after the sync.
+ """
+
+ # Get the handler (which kicks off a bunch of timers).
+ presence_handler = self.hs.get_presence_handler()
+
+ # Assert the user is online, as restored.
+ state = self.get_success(
+ presence_handler.get_state(UserID.from_string(self.user_id))
+ )
+ self.assertEqual(state.state, PresenceState.ONLINE)
+
+ # Advance slightly and sync.
+ self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000 / 2)
+ self.get_success(
+ presence_handler.user_syncing(
+ self.user_id, sync_state != PresenceState.OFFLINE, sync_state
+ )
+ )
+
+ # Assert the user is in the expected state.
+ state = self.get_success(
+ presence_handler.get_state(UserID.from_string(self.user_id))
+ )
+ self.assertEqual(state.state, expected_state)
+
+ # Advance such that the user's preloaded data times out, but not the new sync.
+ self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000 / 2)
+ self.reactor.pump([5])
+
+ # Check that the user is in the sync state (as the client is currently syncing still).
+ state = self.get_success(
+ presence_handler.get_state(UserID.from_string(self.user_id))
+ )
+ self.assertEqual(state.state, sync_state)
+
+
class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
user_id = "@test:server"
user_id_obj = UserID.from_string(user_id)
|