summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2023-08-23 13:46:10 +0100
committerErik Johnston <erik@matrix.org>2023-08-23 13:46:10 +0100
commitb7d0c7d3fd5caaf24efa63dc20efb74ad640c0a6 (patch)
tree214fdb142593f642eb20ea0ba145c687b3638bed
parentMerge remote-tracking branch 'origin/develop' into matrix-org-hotfixes (diff)
parentFix rare deadlock when using read/write locks (#16133) (diff)
downloadsynapse-b7d0c7d3fd5caaf24efa63dc20efb74ad640c0a6.tar.xz
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
-rw-r--r--changelog.d/16133.bugfix1
-rw-r--r--changelog.d/16150.misc2
-rw-r--r--changelog.d/16151.misc1
-rw-r--r--changelog.d/16160.misc1
-rw-r--r--changelog.d/16164.bugfix1
-rw-r--r--changelog.d/16165.misc1
-rw-r--r--synapse/storage/databases/main/task_scheduler.py2
-rw-r--r--synapse/storage/databases/main/transactions.py6
-rw-r--r--synapse/storage/schema/main/delta/80/02_read_write_locks_deadlock.sql.postgres37
-rw-r--r--synapse/storage/schema/main/delta/80/03_read_write_locks_triggers.sql.postgres37
-rw-r--r--synapse/util/task_scheduler.py4
-rw-r--r--tests/handlers/test_presence.py116
12 files changed, 203 insertions, 6 deletions
diff --git a/changelog.d/16133.bugfix b/changelog.d/16133.bugfix
new file mode 100644

index 0000000000..ed8830692f --- /dev/null +++ b/changelog.d/16133.bugfix
@@ -0,0 +1 @@ +Fix a rare race that could block new events from being sent for up to two minutes. Introduced in v1.90.0. diff --git a/changelog.d/16150.misc b/changelog.d/16150.misc
index 97861282fd..41059378c5 100644 --- a/changelog.d/16150.misc +++ b/changelog.d/16150.misc
@@ -1 +1 @@ -Clean-up calling `setup_background_tasks` in unit tests. +Improve presence tests. diff --git a/changelog.d/16151.misc b/changelog.d/16151.misc new file mode 100644
index 0000000000..41059378c5 --- /dev/null +++ b/changelog.d/16151.misc
@@ -0,0 +1 @@ +Improve presence tests. diff --git a/changelog.d/16160.misc b/changelog.d/16160.misc new file mode 100644
index 0000000000..78803b7bcd --- /dev/null +++ b/changelog.d/16160.misc
@@ -0,0 +1 @@ +Reduce DB contention on worker locks. diff --git a/changelog.d/16164.bugfix b/changelog.d/16164.bugfix new file mode 100644
index 0000000000..17284297cf --- /dev/null +++ b/changelog.d/16164.bugfix
@@ -0,0 +1 @@ +Fix a bug introduced in 1.87 where synapse would send an excessive amount of federation requests to servers which have been offline for a long time. Contributed by Nico. diff --git a/changelog.d/16165.misc b/changelog.d/16165.misc new file mode 100644
index 0000000000..b4d514d249 --- /dev/null +++ b/changelog.d/16165.misc
@@ -0,0 +1 @@ +Task scheduler: mark task as active if we are scheduling as soon as possible. diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py
index 1fb3180c3c..9ab120eea9 100644 --- a/synapse/storage/databases/main/task_scheduler.py +++ b/synapse/storage/databases/main/task_scheduler.py
@@ -92,7 +92,7 @@ class TaskSchedulerWorkerStore(SQLBaseStore): if clauses: sql = sql + " WHERE " + " AND ".join(clauses) - sql = sql + "ORDER BY timestamp" + sql = sql + " ORDER BY timestamp" txn.execute(sql, args) return self.db_pool.cursor_to_dict(txn) diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 48e4b0ba3c..860bbf7c0f 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py
@@ -242,8 +242,6 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): ) -> None: # Upsert retry time interval if retry_interval is zero (i.e. we're # resetting it) or greater than the existing retry interval. - # We also upsert when the new retry interval is the same as the existing one, - # since it will be the case when `destination_max_retry_interval` is reached. # # WARNING: This is executed in autocommit, so we shouldn't add any more # SQL calls in here (without being very careful). @@ -258,8 +256,10 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): retry_interval = EXCLUDED.retry_interval WHERE EXCLUDED.retry_interval = 0 + OR EXCLUDED.retry_last_ts = 0 OR destinations.retry_interval IS NULL - OR destinations.retry_interval <= EXCLUDED.retry_interval + OR destinations.retry_interval < EXCLUDED.retry_interval + OR destinations.retry_last_ts < EXCLUDED.retry_last_ts """ txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval)) diff --git a/synapse/storage/schema/main/delta/80/02_read_write_locks_deadlock.sql.postgres b/synapse/storage/schema/main/delta/80/02_read_write_locks_deadlock.sql.postgres new file mode 100644
index 0000000000..401c42e18a --- /dev/null +++ b/synapse/storage/schema/main/delta/80/02_read_write_locks_deadlock.sql.postgres
@@ -0,0 +1,37 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- To avoid the possibility of a deadlock, lock the +-- `worker_read_write_locks_mode` table so that we serialize inserts/deletes +-- for a specific lock name/key. + +CREATE OR REPLACE FUNCTION delete_read_write_lock_parent_before() RETURNS trigger AS $$ +BEGIN + -- `PERFORM` is a `SELECT` which discards the rows. + PERFORM * FROM worker_read_write_locks_mode + WHERE + lock_name = OLD.lock_name + AND lock_key = OLD.lock_key + FOR UPDATE; + + RETURN OLD; +END +$$ +LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS delete_read_write_lock_parent_before_trigger ON worker_read_write_locks; +CREATE TRIGGER delete_read_write_lock_parent_before_trigger BEFORE DELETE ON worker_read_write_locks + FOR EACH ROW + EXECUTE PROCEDURE delete_read_write_lock_parent_before(); diff --git a/synapse/storage/schema/main/delta/80/03_read_write_locks_triggers.sql.postgres b/synapse/storage/schema/main/delta/80/03_read_write_locks_triggers.sql.postgres new file mode 100644
index 0000000000..31de5bfa18 --- /dev/null +++ b/synapse/storage/schema/main/delta/80/03_read_write_locks_triggers.sql.postgres
@@ -0,0 +1,37 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Fix up the triggers that were in `78/04_read_write_locks_triggers.sql` + +-- Reduce the number of writes we do on this table. +-- +-- Note: that we still want to lock the row here (i.e. still do a `DO UPDATE +-- SET`) so that we serialize updates. +CREATE OR REPLACE FUNCTION upsert_read_write_lock_parent() RETURNS trigger AS $$ +BEGIN + INSERT INTO worker_read_write_locks_mode (lock_name, lock_key, write_lock, token) + VALUES (NEW.lock_name, NEW.lock_key, NEW.write_lock, NEW.token) + ON CONFLICT (lock_name, lock_key) + DO UPDATE SET write_lock = NEW.write_lock + WHERE OLD.write_lock != NEW.write_lock; + RETURN NEW; +END +$$ +LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS upsert_read_write_lock_parent_trigger ON worker_read_write_locks; +CREATE TRIGGER upsert_read_write_lock_parent_trigger BEFORE INSERT ON worker_read_write_locks + FOR EACH ROW + EXECUTE PROCEDURE upsert_read_write_lock_parent(); diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py
index 773a8327f6..4aea64b338 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py
@@ -154,13 +154,15 @@ class TaskScheduler: f"No function associated with action {action} of the scheduled task" ) + status = TaskStatus.SCHEDULED if timestamp is None or timestamp < self._clock.time_msec(): timestamp = self._clock.time_msec() + status = TaskStatus.ACTIVE task = ScheduledTask( random_string(16), action, - TaskStatus.SCHEDULED, + status, timestamp, resource_id, params, diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index 1f483eb75a..1aebcc16ad 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py
@@ -38,6 +38,7 @@ from synapse.handlers.presence import ( from synapse.rest import admin from synapse.rest.client import room from synapse.server import HomeServer +from synapse.storage.database import LoggingDatabaseConnection from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.util import Clock @@ -513,6 +514,121 @@ class PresenceTimeoutTestCase(unittest.TestCase): self.assertEqual(state, new_state) +class PresenceHandlerInitTestCase(unittest.HomeserverTestCase): + def default_config(self) -> JsonDict: + config = super().default_config() + # Disable background tasks on this worker so that the PresenceHandler isn't + # loaded until we request it. + config["run_background_tasks_on"] = "other" + return config + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.user_id = f"@test:{self.hs.config.server.server_name}" + + # Move the reactor to the initial time. + self.reactor.advance(1000) + now = self.clock.time_msec() + + main_store = hs.get_datastores().main + self.get_success( + main_store.update_presence( + [ + UserPresenceState( + user_id=self.user_id, + state=PresenceState.ONLINE, + last_active_ts=now, + last_federation_update_ts=now, + last_user_sync_ts=now, + status_msg=None, + currently_active=True, + ) + ] + ) + ) + + # Regenerate the preloaded presence information on PresenceStore. + def refill_presence(db_conn: LoggingDatabaseConnection) -> None: + main_store._presence_on_startup = main_store._get_active_presence(db_conn) + + self.get_success(main_store.db_pool.runWithConnection(refill_presence)) + + def test_restored_presence_idles(self) -> None: + """The presence state restored from the database should not persist forever.""" + + # Get the handler (which kicks off a bunch of timers). + presence_handler = self.hs.get_presence_handler() + + # Assert the user is online. + state = self.get_success( + presence_handler.get_state(UserID.from_string(self.user_id)) + ) + self.assertEqual(state.state, PresenceState.ONLINE) + + # Advance such that the user should timeout. + self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000) + self.reactor.pump([5]) + + # Check that the user is now offline. + state = self.get_success( + presence_handler.get_state(UserID.from_string(self.user_id)) + ) + self.assertEqual(state.state, PresenceState.OFFLINE) + + @parameterized.expand( + [ + (PresenceState.BUSY, PresenceState.BUSY), + (PresenceState.ONLINE, PresenceState.ONLINE), + (PresenceState.UNAVAILABLE, PresenceState.UNAVAILABLE), + # Offline syncs don't update the state. + (PresenceState.OFFLINE, PresenceState.ONLINE), + ] + ) + @unittest.override_config({"experimental_features": {"msc3026_enabled": True}}) + def test_restored_presence_online_after_sync( + self, sync_state: str, expected_state: str + ) -> None: + """ + The presence state restored from the database should be overridden with sync after a timeout. + + Args: + sync_state: The presence state of the new sync. + expected_state: The expected presence right after the sync. + """ + + # Get the handler (which kicks off a bunch of timers). + presence_handler = self.hs.get_presence_handler() + + # Assert the user is online, as restored. + state = self.get_success( + presence_handler.get_state(UserID.from_string(self.user_id)) + ) + self.assertEqual(state.state, PresenceState.ONLINE) + + # Advance slightly and sync. + self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000 / 2) + self.get_success( + presence_handler.user_syncing( + self.user_id, sync_state != PresenceState.OFFLINE, sync_state + ) + ) + + # Assert the user is in the expected state. + state = self.get_success( + presence_handler.get_state(UserID.from_string(self.user_id)) + ) + self.assertEqual(state.state, expected_state) + + # Advance such that the user's preloaded data times out, but not the new sync. + self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000 / 2) + self.reactor.pump([5]) + + # Check that the user is in the sync state (as the client is currently syncing still). + state = self.get_success( + presence_handler.get_state(UserID.from_string(self.user_id)) + ) + self.assertEqual(state.state, sync_state) + + class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): user_id = "@test:server" user_id_obj = UserID.from_string(user_id)