1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
|
import logging
from typing import List, Tuple
from unittest.mock import Mock, patch
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import EventContentFields, EventTypes
from synapse.appservice import ApplicationService
from synapse.rest import admin
from synapse.rest.client import login, register, room, room_batch, sync
from synapse.server import HomeServer
from synapse.types import JsonDict, RoomStreamToken
from synapse.util import Clock
from tests import unittest
logger = logging.getLogger(__name__)
def _create_join_state_events_for_batch_send_request(
virtual_user_ids: List[str],
insert_time: int,
) -> List[JsonDict]:
return [
{
"type": EventTypes.Member,
"sender": virtual_user_id,
"origin_server_ts": insert_time,
"content": {
"membership": "join",
"displayname": "display-name-for-%s" % (virtual_user_id,),
},
"state_key": virtual_user_id,
}
for virtual_user_id in virtual_user_ids
]
def _create_message_events_for_batch_send_request(
virtual_user_id: str, insert_time: int, count: int
) -> List[JsonDict]:
return [
{
"type": EventTypes.Message,
"sender": virtual_user_id,
"origin_server_ts": insert_time,
"content": {
"msgtype": "m.text",
"body": "Historical %d" % (i),
EventContentFields.MSC2716_HISTORICAL: True,
},
}
for i in range(count)
]
class RoomBatchTestCase(unittest.HomeserverTestCase):
"""Test importing batches of historical messages."""
servlets = [
admin.register_servlets_for_client_rest_resource,
room_batch.register_servlets,
room.register_servlets,
register.register_servlets,
login.register_servlets,
sync.register_servlets,
]
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
config = self.default_config()
self.appservice = ApplicationService(
token="i_am_an_app_service",
hostname="test",
id="1234",
namespaces={"users": [{"regex": r"@as_user.*", "exclusive": True}]},
# Note: this user does not have to match the regex above
sender="@as_main:test",
)
mock_load_appservices = Mock(return_value=[self.appservice])
with patch(
"synapse.storage.databases.main.appservice.load_appservices",
mock_load_appservices,
):
hs = self.setup_test_homeserver(config=config)
return hs
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.clock = clock
self._storage_controllers = hs.get_storage_controllers()
self.virtual_user_id, _ = self.register_appservice_user(
"as_user_potato", self.appservice.token
)
def _create_test_room(self) -> Tuple[str, str, str, str]:
room_id = self.helper.create_room_as(
self.appservice.sender, tok=self.appservice.token
)
res_a = self.helper.send_event(
room_id=room_id,
type=EventTypes.Message,
content={
"msgtype": "m.text",
"body": "A",
},
tok=self.appservice.token,
)
event_id_a = res_a["event_id"]
res_b = self.helper.send_event(
room_id=room_id,
type=EventTypes.Message,
content={
"msgtype": "m.text",
"body": "B",
},
tok=self.appservice.token,
)
event_id_b = res_b["event_id"]
res_c = self.helper.send_event(
room_id=room_id,
type=EventTypes.Message,
content={
"msgtype": "m.text",
"body": "C",
},
tok=self.appservice.token,
)
event_id_c = res_c["event_id"]
return room_id, event_id_a, event_id_b, event_id_c
@unittest.override_config({"experimental_features": {"msc2716_enabled": True}})
def test_same_state_groups_for_whole_historical_batch(self) -> None:
"""Make sure that when using the `/batch_send` endpoint to import a
bunch of historical messages, it re-uses the same `state_group` across
the whole batch. This is an easy optimization to make sure we're getting
right because the state for the whole batch is contained in
`state_events_at_start` and can be shared across everything.
"""
time_before_room = int(self.clock.time_msec())
room_id, event_id_a, _, _ = self._create_test_room()
channel = self.make_request(
"POST",
"/_matrix/client/unstable/org.matrix.msc2716/rooms/%s/batch_send?prev_event_id=%s"
% (room_id, event_id_a),
content={
"events": _create_message_events_for_batch_send_request(
self.virtual_user_id, time_before_room, 3
),
"state_events_at_start": _create_join_state_events_for_batch_send_request(
[self.virtual_user_id], time_before_room
),
},
access_token=self.appservice.token,
)
self.assertEqual(channel.code, 200, channel.result)
# Get the historical event IDs that we just imported
historical_event_ids = channel.json_body["event_ids"]
self.assertEqual(len(historical_event_ids), 3)
# Fetch the state_groups
state_group_map = self.get_success(
self._storage_controllers.state.get_state_groups_ids(
room_id, historical_event_ids
)
)
# We expect all of the historical events to be using the same state_group
# so there should only be a single state_group here!
self.assertEqual(
len(state_group_map.keys()),
1,
"Expected a single state_group to be returned by saw state_groups=%s"
% (state_group_map.keys(),),
)
@unittest.override_config({"experimental_features": {"msc2716_enabled": True}})
def test_sync_while_batch_importing(self) -> None:
"""
Make sure that /sync correctly returns full room state when a user joins
during ongoing batch backfilling.
See: https://github.com/matrix-org/synapse/issues/12281
"""
# Create user who will be invited & join room
user_id = self.register_user("beep", "test")
user_tok = self.login("beep", "test")
time_before_room = int(self.clock.time_msec())
# Create a room with some events
room_id, _, _, _ = self._create_test_room()
# Invite the user
self.helper.invite(
room_id, src=self.appservice.sender, tok=self.appservice.token, targ=user_id
)
# Create another room, send a bunch of events to advance the stream token
other_room_id = self.helper.create_room_as(
self.appservice.sender, tok=self.appservice.token
)
for _ in range(5):
self.helper.send_event(
room_id=other_room_id,
type=EventTypes.Message,
content={"msgtype": "m.text", "body": "C"},
tok=self.appservice.token,
)
# Join the room as the normal user
self.helper.join(room_id, user_id, tok=user_tok)
# Create an event to hang the historical batch from - In order to see
# the failure case originally reported in #12281, the historical batch
# must be hung from the most recent event in the room so the base
# insertion event ends up with the highest `topogological_ordering`
# (`depth`) in the room but will have a negative `stream_ordering`
# because it's a `historical` event. Previously, when assembling the
# `state` for the `/sync` response, the bugged logic would sort by
# `topological_ordering` descending and pick up the base insertion
# event because it has a negative `stream_ordering` below the given
# pagination token. Now we properly sort by `stream_ordering`
# descending which puts `historical` events with a negative
# `stream_ordering` way at the bottom and aren't selected as expected.
response = self.helper.send_event(
room_id=room_id,
type=EventTypes.Message,
content={
"msgtype": "m.text",
"body": "C",
},
tok=self.appservice.token,
)
event_to_hang_id = response["event_id"]
channel = self.make_request(
"POST",
"/_matrix/client/unstable/org.matrix.msc2716/rooms/%s/batch_send?prev_event_id=%s"
% (room_id, event_to_hang_id),
content={
"events": _create_message_events_for_batch_send_request(
self.virtual_user_id, time_before_room, 3
),
"state_events_at_start": _create_join_state_events_for_batch_send_request(
[self.virtual_user_id], time_before_room
),
},
access_token=self.appservice.token,
)
self.assertEqual(channel.code, 200, channel.result)
# Now we need to find the invite + join events stream tokens so we can sync between
main_store = self.hs.get_datastores().main
events, next_key = self.get_success(
main_store.get_recent_events_for_room(
room_id,
50,
end_token=main_store.get_room_max_token(),
),
)
invite_event_position = None
for event in events:
if (
event.type == "m.room.member"
and event.content["membership"] == "invite"
):
invite_event_position = self.get_success(
main_store.get_topological_token_for_event(event.event_id)
)
break
assert invite_event_position is not None, "No invite event found"
# Remove the topological order from the token by re-creating w/stream only
invite_event_position = RoomStreamToken(None, invite_event_position.stream)
# Sync everything after this token
since_token = self.get_success(invite_event_position.to_string(main_store))
sync_response = self.make_request(
"GET",
f"/sync?since={since_token}",
access_token=user_tok,
)
# Assert that, for this room, the user was considered to have joined and thus
# receives the full state history
state_event_types = [
event["type"]
for event in sync_response.json_body["rooms"]["join"][room_id]["state"][
"events"
]
]
assert (
"m.room.create" in state_event_types
), "Missing room full state in sync response"
|