From f2f2c7c1f05de87f43cc2d18d5dc9bd636b3ed0a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 15 Nov 2023 08:02:11 -0500 Subject: Use full GitHub links instead of bare issue numbers. (#16637) --- synapse/util/check_dependencies.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/check_dependencies.py b/synapse/util/check_dependencies.py index f7cead9e12..6f008734a0 100644 --- a/synapse/util/check_dependencies.py +++ b/synapse/util/check_dependencies.py @@ -189,7 +189,8 @@ def check_requirements(extra: Optional[str] = None) -> None: errors.append(_not_installed(requirement, extra)) else: if dist.version is None: - # This shouldn't happen---it suggests a borked virtualenv. (See #12223) + # This shouldn't happen---it suggests a borked virtualenv. (See + # https://github.com/matrix-org/synapse/issues/12223) # Try to give a vaguely helpful error message anyway. # Type-ignore: the annotations don't reflect reality: see # https://github.com/python/typeshed/issues/7513 -- cgit 1.5.1 From 1b238e88371516bfedb62d010e156820ab164b94 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Nov 2023 14:25:35 +0000 Subject: Speed up persisting large number of outliers (#16649) Recalculating the roots tuple every iteration could be very expensive, so instead let's do a topological sort. --- changelog.d/16649.misc | 1 + synapse/handlers/federation_event.py | 18 ++++----- synapse/util/iterutils.py | 51 ++++++++++++++++++++++++ tests/util/test_itertools.py | 76 +++++++++++++++++++++++++++++++++++- 4 files changed, 134 insertions(+), 12 deletions(-) create mode 100644 changelog.d/16649.misc (limited to 'synapse/util') diff --git a/changelog.d/16649.misc b/changelog.d/16649.misc new file mode 100644 index 0000000000..cebd6aaee5 --- /dev/null +++ b/changelog.d/16649.misc @@ -0,0 +1 @@ +Speed up persisting large number of outliers. diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index ba6b94a8b7..f4c17894aa 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -88,7 +88,7 @@ from synapse.types import ( ) from synapse.types.state import StateFilter from synapse.util.async_helpers import Linearizer, concurrently_execute -from synapse.util.iterutils import batch_iter, partition +from synapse.util.iterutils import batch_iter, partition, sorted_topologically_batched from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import shortstr @@ -1669,14 +1669,13 @@ class FederationEventHandler: # XXX: it might be possible to kick this process off in parallel with fetching # the events. - while event_map: - # build a list of events whose auth events are not in the queue. - roots = tuple( - ev - for ev in event_map.values() - if not any(aid in event_map for aid in ev.auth_event_ids()) - ) + # We need to persist an event's auth events before the event. + auth_graph = { + ev: [event_map[e_id] for e_id in ev.auth_event_ids() if e_id in event_map] + for ev in event_map.values() + } + for roots in sorted_topologically_batched(event_map.values(), auth_graph): if not roots: # if *none* of the remaining events are ready, that means # we have a loop. This either means a bug in our logic, or that @@ -1698,9 +1697,6 @@ class FederationEventHandler: await self._auth_and_persist_outliers_inner(room_id, roots) - for ev in roots: - del event_map[ev.event_id] - async def _auth_and_persist_outliers_inner( self, room_id: str, fetched_events: Collection[EventBase] ) -> None: diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py index a0efb96d3b..f4c0194af0 100644 --- a/synapse/util/iterutils.py +++ b/synapse/util/iterutils.py @@ -135,3 +135,54 @@ def sorted_topologically( degree_map[edge] -= 1 if degree_map[edge] == 0: heapq.heappush(zero_degree, edge) + + +def sorted_topologically_batched( + nodes: Iterable[T], + graph: Mapping[T, Collection[T]], +) -> Generator[Collection[T], None, None]: + r"""Walk the graph topologically, returning batches of nodes where all nodes + that references it have been previously returned. + + For example, given the following graph: + + A + / \ + B C + \ / + D + + This function will return: `[[A], [B, C], [D]]`. + + This function is useful for e.g. batch persisting events in an auth chain, + where we can only persist an event if all its auth events have already been + persisted. + """ + + degree_map = {node: 0 for node in nodes} + reverse_graph: Dict[T, Set[T]] = {} + + for node, edges in graph.items(): + if node not in degree_map: + continue + + for edge in set(edges): + if edge in degree_map: + degree_map[node] += 1 + + reverse_graph.setdefault(edge, set()).add(node) + reverse_graph.setdefault(node, set()) + + zero_degree = [node for node, degree in degree_map.items() if degree == 0] + + while zero_degree: + new_zero_degree = [] + for node in zero_degree: + for edge in reverse_graph.get(node, []): + if edge in degree_map: + degree_map[edge] -= 1 + if degree_map[edge] == 0: + new_zero_degree.append(edge) + + yield zero_degree + zero_degree = new_zero_degree diff --git a/tests/util/test_itertools.py b/tests/util/test_itertools.py index 406c16cdcf..fabb05c7e4 100644 --- a/tests/util/test_itertools.py +++ b/tests/util/test_itertools.py @@ -13,7 +13,11 @@ # limitations under the License. from typing import Dict, Iterable, List, Sequence -from synapse.util.iterutils import chunk_seq, sorted_topologically +from synapse.util.iterutils import ( + chunk_seq, + sorted_topologically, + sorted_topologically_batched, +) from tests.unittest import TestCase @@ -107,3 +111,73 @@ class SortTopologically(TestCase): graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3, 2, 1]} self.assertEqual(list(sorted_topologically([4, 3, 2, 1], graph)), [1, 2, 3, 4]) + + +class SortTopologicallyBatched(TestCase): + "Test cases for `sorted_topologically_batched`" + + def test_empty(self) -> None: + "Test that an empty graph works correctly" + + graph: Dict[int, List[int]] = {} + self.assertEqual(list(sorted_topologically_batched([], graph)), []) + + def test_handle_empty_graph(self) -> None: + "Test that a graph where a node doesn't have an entry is treated as empty" + + graph: Dict[int, List[int]] = {} + + # For disconnected nodes the output is simply sorted. + self.assertEqual(list(sorted_topologically_batched([1, 2], graph)), [[1, 2]]) + + def test_disconnected(self) -> None: + "Test that a graph with no edges work" + + graph: Dict[int, List[int]] = {1: [], 2: []} + + # For disconnected nodes the output is simply sorted. + self.assertEqual(list(sorted_topologically_batched([1, 2], graph)), [[1, 2]]) + + def test_linear(self) -> None: + "Test that a simple `4 -> 3 -> 2 -> 1` graph works" + + graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]} + + self.assertEqual( + list(sorted_topologically_batched([4, 3, 2, 1], graph)), + [[1], [2], [3], [4]], + ) + + def test_subset(self) -> None: + "Test that only sorting a subset of the graph works" + graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]} + + self.assertEqual(list(sorted_topologically_batched([4, 3], graph)), [[3], [4]]) + + def test_fork(self) -> None: + "Test that a forked graph works" + graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [1], 4: [2, 3]} + + # Valid orderings are `[1, 3, 2, 4]` or `[1, 2, 3, 4]`, but we should + # always get the same one. + self.assertEqual( + list(sorted_topologically_batched([4, 3, 2, 1], graph)), [[1], [2, 3], [4]] + ) + + def test_duplicates(self) -> None: + "Test that a graph with duplicate edges work" + graph: Dict[int, List[int]] = {1: [], 2: [1, 1], 3: [2, 2], 4: [3]} + + self.assertEqual( + list(sorted_topologically_batched([4, 3, 2, 1], graph)), + [[1], [2], [3], [4]], + ) + + def test_multiple_paths(self) -> None: + "Test that a graph with multiple paths between two nodes work" + graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3, 2, 1]} + + self.assertEqual( + list(sorted_topologically_batched([4, 3, 2, 1], graph)), + [[1], [2], [3], [4]], + ) -- cgit 1.5.1 From 3e8531d3baf205733693f9ae8b43aa0b4c82b744 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Nov 2023 15:19:35 +0000 Subject: Speed up deleting device messages (#16643) Keeping track of a lower bound of stream ID where we've deleted everything below makes the queries much faster. Otherwise, every time we scan for rows to delete we'd re-scan across all the rows that have previously deleted (until the next table VACUUM). --- changelog.d/16643.misc | 1 + synapse/handlers/device.py | 8 +- synapse/storage/databases/main/deviceinbox.py | 106 ++++++++++++++++++++------ synapse/util/task_scheduler.py | 2 +- 4 files changed, 88 insertions(+), 29 deletions(-) create mode 100644 changelog.d/16643.misc (limited to 'synapse/util') diff --git a/changelog.d/16643.misc b/changelog.d/16643.misc new file mode 100644 index 0000000000..cc0cf0901f --- /dev/null +++ b/changelog.d/16643.misc @@ -0,0 +1 @@ +Speed up deleting of device messages when deleting a device. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 93472d0117..1af6d77545 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -396,15 +396,17 @@ class DeviceWorkerHandler: up_to_stream_id = task.params["up_to_stream_id"] # Delete the messages in batches to avoid too much DB load. + from_stream_id = None while True: - res = await self.store.delete_messages_for_device( + from_stream_id, _ = await self.store.delete_messages_for_device_between( user_id=user_id, device_id=device_id, - up_to_stream_id=up_to_stream_id, + from_stream_id=from_stream_id, + to_stream_id=up_to_stream_id, limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, ) - if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: + if from_stream_id is None: return TaskStatus.COMPLETE, None, None await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 3e7425d4a6..02dddd1da4 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -450,14 +450,12 @@ class DeviceInboxWorkerStore(SQLBaseStore): user_id: str, device_id: Optional[str], up_to_stream_id: int, - limit: Optional[int] = None, ) -> int: """ Args: user_id: The recipient user_id. device_id: The recipient device_id. up_to_stream_id: Where to delete messages up to. - limit: maximum number of messages to delete Returns: The number of messages deleted. @@ -478,32 +476,22 @@ class DeviceInboxWorkerStore(SQLBaseStore): log_kv({"message": "No changes in cache since last check"}) return 0 - def delete_messages_for_device_txn(txn: LoggingTransaction) -> int: - limit_statement = "" if limit is None else f"LIMIT {limit}" - sql = f""" - DELETE FROM device_inbox WHERE user_id = ? AND device_id = ? AND stream_id <= ( - SELECT MAX(stream_id) FROM ( - SELECT stream_id FROM device_inbox - WHERE user_id = ? AND device_id = ? AND stream_id <= ? - ORDER BY stream_id - {limit_statement} - ) AS q1 - ) - """ - txn.execute(sql, (user_id, device_id, user_id, device_id, up_to_stream_id)) - return txn.rowcount - - count = await self.db_pool.runInteraction( - "delete_messages_for_device", delete_messages_for_device_txn - ) + from_stream_id = None + count = 0 + while True: + from_stream_id, loop_count = await self.delete_messages_for_device_between( + user_id, + device_id, + from_stream_id=from_stream_id, + to_stream_id=up_to_stream_id, + limit=1000, + ) + count += loop_count + if from_stream_id is None: + break log_kv({"message": f"deleted {count} messages for device", "count": count}) - # In this case we don't know if we hit the limit or the delete is complete - # so let's not update the cache. - if count == limit: - return count - # Update the cache, ensuring that we only ever increase the value updated_last_deleted_stream_id = self._last_device_delete_cache.get( (user_id, device_id), 0 @@ -514,6 +502,74 @@ class DeviceInboxWorkerStore(SQLBaseStore): return count + @trace + async def delete_messages_for_device_between( + self, + user_id: str, + device_id: Optional[str], + from_stream_id: Optional[int], + to_stream_id: int, + limit: int, + ) -> Tuple[Optional[int], int]: + """Delete N device messages between the stream IDs, returning the + highest stream ID deleted (or None if all messages in the range have + been deleted) and the number of messages deleted. + + This is more efficient than `delete_messages_for_device` when calling in + a loop to batch delete messages. + """ + + # Keeping track of a lower bound of stream ID where we've deleted + # everything below makes the queries much faster. Otherwise, every time + # we scan for rows to delete we'd re-scan across all the rows that have + # previously deleted (until the next table VACUUM). + + if from_stream_id is None: + # Minimum device stream ID is 1. + from_stream_id = 0 + + def delete_messages_for_device_between_txn( + txn: LoggingTransaction, + ) -> Tuple[Optional[int], int]: + txn.execute( + """ + SELECT MAX(stream_id) FROM ( + SELECT stream_id FROM device_inbox + WHERE user_id = ? AND device_id = ? + AND ? < stream_id AND stream_id <= ? + ORDER BY stream_id + LIMIT ? + ) AS d + """, + (user_id, device_id, from_stream_id, to_stream_id, limit), + ) + row = txn.fetchone() + if row is None or row[0] is None: + return None, 0 + + (max_stream_id,) = row + + txn.execute( + """ + DELETE FROM device_inbox + WHERE user_id = ? AND device_id = ? + AND ? < stream_id AND stream_id <= ? + """, + (user_id, device_id, from_stream_id, max_stream_id), + ) + + num_deleted = txn.rowcount + if num_deleted < limit: + return None, num_deleted + + return max_stream_id, num_deleted + + return await self.db_pool.runInteraction( + "delete_messages_for_device_between", + delete_messages_for_device_between_txn, + db_autocommit=True, # We don't need to run in a transaction + ) + @trace async def get_new_device_msgs_for_remote( self, destination: str, last_stream_id: int, current_stream_id: int, limit: int diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index caf13b3474..29c561e555 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -193,7 +193,7 @@ class TaskScheduler: result: Optional[JsonMapping] = None, error: Optional[str] = None, ) -> bool: - """Update some task associated values. This is exposed publically so it can + """Update some task associated values. This is exposed publicly so it can be used inside task functions, mainly to update the result and be able to resume a task at a specific step after a restart of synapse. -- cgit 1.5.1