1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
|
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
#
from typing import Dict
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import MAX_DEPTH
from synapse.api.room_versions import RoomVersion, RoomVersions
from synapse.server import HomeServer
from synapse.util import Clock
from tests.unittest import HomeserverTestCase
class TestFixupMaxDepthCapBgUpdate(HomeserverTestCase):
"""Test the background update that caps topological_ordering at MAX_DEPTH."""
def prepare(
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
self.store = self.hs.get_datastores().main
self.db_pool = self.store.db_pool
self.room_id = "!testroom:example.com"
# Reinsert the background update as it was already run at the start of
# the test.
self.get_success(
self.db_pool.simple_insert(
table="background_updates",
values={
"update_name": "fixup_max_depth_cap",
"progress_json": "{}",
},
)
)
def create_room(self, room_version: RoomVersion) -> Dict[str, int]:
"""Create a room with a known room version and insert events.
Returns the set of event IDs that exceed MAX_DEPTH and
their depth.
"""
# Create a room with a specific room version
self.get_success(
self.db_pool.simple_insert(
table="rooms",
values={
"room_id": self.room_id,
"room_version": room_version.identifier,
},
)
)
# Insert events with some depths exceeding MAX_DEPTH
event_id_to_depth: Dict[str, int] = {}
for depth in range(MAX_DEPTH - 5, MAX_DEPTH + 5):
event_id = f"$event{depth}:example.com"
event_id_to_depth[event_id] = depth
self.get_success(
self.db_pool.simple_insert(
table="events",
values={
"event_id": event_id,
"room_id": self.room_id,
"topological_ordering": depth,
"depth": depth,
"type": "m.test",
"sender": "@user:test",
"processed": True,
"outlier": False,
},
)
)
return event_id_to_depth
def test_fixup_max_depth_cap_bg_update(self) -> None:
"""Test that the background update correctly caps topological_ordering
at MAX_DEPTH."""
event_id_to_depth = self.create_room(RoomVersions.V6)
# Run the background update
progress = {"room_id": ""}
batch_size = 10
num_rooms = self.get_success(
self.store.fixup_max_depth_cap_bg_update(progress, batch_size)
)
# Verify the number of rooms processed
self.assertEqual(num_rooms, 1)
# Verify that the topological_ordering of events has been capped at
# MAX_DEPTH
rows = self.get_success(
self.db_pool.simple_select_list(
table="events",
keyvalues={"room_id": self.room_id},
retcols=["event_id", "topological_ordering"],
)
)
for event_id, topological_ordering in rows:
if event_id_to_depth[event_id] >= MAX_DEPTH:
# Events with a depth greater than or equal to MAX_DEPTH should
# be capped at MAX_DEPTH.
self.assertEqual(topological_ordering, MAX_DEPTH)
else:
# Events with a depth less than MAX_DEPTH should remain
# unchanged.
self.assertEqual(topological_ordering, event_id_to_depth[event_id])
def test_fixup_max_depth_cap_bg_update_old_room_version(self) -> None:
"""Test that the background update does not cap topological_ordering for
rooms with old room versions."""
event_id_to_depth = self.create_room(RoomVersions.V5)
# Run the background update
progress = {"room_id": ""}
batch_size = 10
num_rooms = self.get_success(
self.store.fixup_max_depth_cap_bg_update(progress, batch_size)
)
# Verify the number of rooms processed
self.assertEqual(num_rooms, 0)
# Verify that the topological_ordering of events has been capped at
# MAX_DEPTH
rows = self.get_success(
self.db_pool.simple_select_list(
table="events",
keyvalues={"room_id": self.room_id},
retcols=["event_id", "topological_ordering"],
)
)
# Assert that the topological_ordering of events has not been changed
# from their depth.
self.assertDictEqual(event_id_to_depth, dict(rows))
|