diff --git a/changelog.d/12319.bugfix b/changelog.d/12319.bugfix
new file mode 100644
index 0000000000..a50191feaa
--- /dev/null
+++ b/changelog.d/12319.bugfix
@@ -0,0 +1 @@
+Fix bug with incremental sync missing events when rejoining/backfilling. Contributed by Nick @ Beeper.
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 47a63005a9..1b092e900e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -175,17 +175,13 @@ class MessageHandler:
state_filter = state_filter or StateFilter.all()
if at_token:
- # FIXME this claims to get the state at a stream position, but
- # get_recent_events_for_room operates by topo ordering. This therefore
- # does not reliably give you the state at the given stream position.
- # (https://github.com/matrix-org/synapse/issues/3305)
- last_events, _ = await self.store.get_recent_events_for_room(
- room_id, end_token=at_token.room_key, limit=1
+ last_event = await self.store.get_last_event_in_room_before_stream_ordering(
+ room_id,
+ end_token=at_token.room_key,
)
- if not last_events:
+ if not last_event:
raise NotFoundError("Can't find event for token %s" % (at_token,))
- last_event = last_events[0]
# check whether the user is in the room at that time to determine
# whether they should be treated as peeking.
@@ -204,7 +200,7 @@ class MessageHandler:
visible_events = await filter_events_for_client(
self.storage,
user_id,
- last_events,
+ [last_event],
filter_send_to_client=False,
is_peeking=is_peeking,
)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 6c8b17c420..5125126a80 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -661,16 +661,15 @@ class SyncHandler:
stream_position: point at which to get state
state_filter: The state filter used to fetch state from the database.
"""
- # FIXME this claims to get the state at a stream position, but
- # get_recent_events_for_room operates by topo ordering. This therefore
- # does not reliably give you the state at the given stream position.
- # (https://github.com/matrix-org/synapse/issues/3305)
- last_events, _ = await self.store.get_recent_events_for_room(
- room_id, end_token=stream_position.room_key, limit=1
+ # FIXME: This gets the state at the latest event before the stream ordering,
+ # which might not be the same as the "current state" of the room at the time
+ # of the stream token if there were multiple forward extremities at the time.
+ last_event = await self.store.get_last_event_in_room_before_stream_ordering(
+ room_id,
+ end_token=stream_position.room_key,
)
- if last_events:
- last_event = last_events[-1]
+ if last_event:
state = await self.get_state_after_event(
last_event, state_filter=state_filter or StateFilter.all()
)
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 6d45a8a9f6..793e906630 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -758,6 +758,32 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"get_room_event_before_stream_ordering", _f
)
+ async def get_last_event_in_room_before_stream_ordering(
+ self,
+ room_id: str,
+ end_token: RoomStreamToken,
+ ) -> Optional[EventBase]:
+ """Returns the last event in a room at or before a stream ordering
+
+ Args:
+ room_id
+ end_token: The token used to stream from
+
+ Returns:
+ The most recent event.
+ """
+
+ last_row = await self.get_room_event_before_stream_ordering(
+ room_id=room_id,
+ stream_ordering=end_token.stream,
+ )
+ if last_row:
+ _, _, event_id = last_row
+ event = await self.get_event(event_id, get_prev_content=True)
+ return event
+
+ return None
+
async def get_current_room_stream_token_for_room_id(
self, room_id: Optional[str] = None
) -> RoomStreamToken:
diff --git a/tests/rest/client/test_room_batch.py b/tests/rest/client/test_room_batch.py
index 44f333a0ee..41a1bf6d89 100644
--- a/tests/rest/client/test_room_batch.py
+++ b/tests/rest/client/test_room_batch.py
@@ -7,9 +7,9 @@ from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import EventContentFields, EventTypes
from synapse.appservice import ApplicationService
from synapse.rest import admin
-from synapse.rest.client import login, register, room, room_batch
+from synapse.rest.client import login, register, room, room_batch, sync
from synapse.server import HomeServer
-from synapse.types import JsonDict
+from synapse.types import JsonDict, RoomStreamToken
from synapse.util import Clock
from tests import unittest
@@ -63,6 +63,7 @@ class RoomBatchTestCase(unittest.HomeserverTestCase):
room.register_servlets,
register.register_servlets,
login.register_servlets,
+ sync.register_servlets,
]
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
@@ -178,3 +179,123 @@ class RoomBatchTestCase(unittest.HomeserverTestCase):
"Expected a single state_group to be returned by saw state_groups=%s"
% (state_group_map.keys(),),
)
+
+ @unittest.override_config({"experimental_features": {"msc2716_enabled": True}})
+ def test_sync_while_batch_importing(self) -> None:
+ """
+ Make sure that /sync correctly returns full room state when a user joins
+ during ongoing batch backfilling.
+ See: https://github.com/matrix-org/synapse/issues/12281
+ """
+ # Create user who will be invited & join room
+ user_id = self.register_user("beep", "test")
+ user_tok = self.login("beep", "test")
+
+ time_before_room = int(self.clock.time_msec())
+
+ # Create a room with some events
+ room_id, _, _, _ = self._create_test_room()
+ # Invite the user
+ self.helper.invite(
+ room_id, src=self.appservice.sender, tok=self.appservice.token, targ=user_id
+ )
+
+ # Create another room, send a bunch of events to advance the stream token
+ other_room_id = self.helper.create_room_as(
+ self.appservice.sender, tok=self.appservice.token
+ )
+ for _ in range(5):
+ self.helper.send_event(
+ room_id=other_room_id,
+ type=EventTypes.Message,
+ content={"msgtype": "m.text", "body": "C"},
+ tok=self.appservice.token,
+ )
+
+ # Join the room as the normal user
+ self.helper.join(room_id, user_id, tok=user_tok)
+
+ # Create an event to hang the historical batch from - In order to see
+ # the failure case originally reported in #12281, the historical batch
+ # must be hung from the most recent event in the room so the base
+ # insertion event ends up with the highest `topogological_ordering`
+ # (`depth`) in the room but will have a negative `stream_ordering`
+ # because it's a `historical` event. Previously, when assembling the
+ # `state` for the `/sync` response, the bugged logic would sort by
+ # `topological_ordering` descending and pick up the base insertion
+ # event because it has a negative `stream_ordering` below the given
+ # pagination token. Now we properly sort by `stream_ordering`
+ # descending which puts `historical` events with a negative
+ # `stream_ordering` way at the bottom and aren't selected as expected.
+ response = self.helper.send_event(
+ room_id=room_id,
+ type=EventTypes.Message,
+ content={
+ "msgtype": "m.text",
+ "body": "C",
+ },
+ tok=self.appservice.token,
+ )
+ event_to_hang_id = response["event_id"]
+
+ channel = self.make_request(
+ "POST",
+ "/_matrix/client/unstable/org.matrix.msc2716/rooms/%s/batch_send?prev_event_id=%s"
+ % (room_id, event_to_hang_id),
+ content={
+ "events": _create_message_events_for_batch_send_request(
+ self.virtual_user_id, time_before_room, 3
+ ),
+ "state_events_at_start": _create_join_state_events_for_batch_send_request(
+ [self.virtual_user_id], time_before_room
+ ),
+ },
+ access_token=self.appservice.token,
+ )
+ self.assertEqual(channel.code, 200, channel.result)
+
+ # Now we need to find the invite + join events stream tokens so we can sync between
+ main_store = self.hs.get_datastores().main
+ events, next_key = self.get_success(
+ main_store.get_recent_events_for_room(
+ room_id,
+ 50,
+ end_token=main_store.get_room_max_token(),
+ ),
+ )
+ invite_event_position = None
+ for event in events:
+ if (
+ event.type == "m.room.member"
+ and event.content["membership"] == "invite"
+ ):
+ invite_event_position = self.get_success(
+ main_store.get_topological_token_for_event(event.event_id)
+ )
+ break
+
+ assert invite_event_position is not None, "No invite event found"
+
+ # Remove the topological order from the token by re-creating w/stream only
+ invite_event_position = RoomStreamToken(None, invite_event_position.stream)
+
+ # Sync everything after this token
+ since_token = self.get_success(invite_event_position.to_string(main_store))
+ sync_response = self.make_request(
+ "GET",
+ f"/sync?since={since_token}",
+ access_token=user_tok,
+ )
+
+ # Assert that, for this room, the user was considered to have joined and thus
+ # receives the full state history
+ state_event_types = [
+ event["type"]
+ for event in sync_response.json_body["rooms"]["join"][room_id]["state"][
+ "events"
+ ]
+ ]
+
+ assert (
+ "m.room.create" in state_event_types
+ ), "Missing room full state in sync response"
|