diff --git a/changelog.d/17398.bugfix b/changelog.d/17398.bugfix
new file mode 100644
index 0000000000..7931c431ef
--- /dev/null
+++ b/changelog.d/17398.bugfix
@@ -0,0 +1 @@
+Fix bug in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint when using an old database.
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index b127289d8d..881888fa93 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -119,9 +119,6 @@ class SQLBaseStore(metaclass=ABCMeta):
self._attempt_to_invalidate_cache(
"get_user_in_room_with_profile", (room_id, user_id)
)
- self._attempt_to_invalidate_cache(
- "get_rooms_for_user_with_stream_ordering", (user_id,)
- )
self._attempt_to_invalidate_cache("get_rooms_for_user", (user_id,))
# Purge other caches based on room state.
@@ -148,9 +145,6 @@ class SQLBaseStore(metaclass=ABCMeta):
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
- self._attempt_to_invalidate_cache(
- "get_rooms_for_user_with_stream_ordering", None
- )
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index c6787faea0..2d6b75e47e 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -268,16 +268,12 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]
if data.type == EventTypes.Member:
- self.get_rooms_for_user_with_stream_ordering.invalidate( # type: ignore[attr-defined]
- (data.state_key,)
- )
self.get_rooms_for_user.invalidate((data.state_key,)) # type: ignore[attr-defined]
elif row.type == EventsStreamAllStateRow.TypeId:
assert isinstance(data, EventsStreamAllStateRow)
# Similar to the above, but the entire caches are invalidated. This is
# unfortunate for the membership caches, but should recover quickly.
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]
- self.get_rooms_for_user_with_stream_ordering.invalidate_all() # type: ignore[attr-defined]
self.get_rooms_for_user.invalidate_all() # type: ignore[attr-defined]
else:
raise Exception("Unknown events stream row type %s" % (row.type,))
@@ -334,9 +330,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache(
"get_invited_rooms_for_local_user", (state_key,)
)
- self._attempt_to_invalidate_cache(
- "get_rooms_for_user_with_stream_ordering", (state_key,)
- )
self._attempt_to_invalidate_cache("get_rooms_for_user", (state_key,))
self._attempt_to_invalidate_cache(
@@ -399,9 +392,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache("get_thread_id", None)
self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None)
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
- self._attempt_to_invalidate_cache(
- "get_rooms_for_user_with_stream_ordering", None
- )
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("did_forget", None)
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index a5acea8c3b..4d4877c4c3 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1457,7 +1457,8 @@ class EventsWorkerStore(SQLBaseStore):
event_dict[event_id] = _EventRow(
event_id=event_id,
stream_ordering=row[1],
- instance_name=row[2],
+ # If instance_name is null we default to "master"
+ instance_name=row[2] or "master",
internal_metadata=row[3],
json=row[4],
format_version=row[5],
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index d8b54dc4e3..5d2fd08495 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -50,12 +50,7 @@ from synapse.storage.database import (
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import Sqlite3Engine
-from synapse.storage.roommember import (
- GetRoomsForUserWithStreamOrdering,
- MemberSummary,
- ProfileInfo,
- RoomsForUser,
-)
+from synapse.storage.roommember import MemberSummary, ProfileInfo, RoomsForUser
from synapse.types import (
JsonDict,
PersistedEventPosition,
@@ -494,7 +489,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
sender=sender,
membership=membership,
event_id=event_id,
- event_pos=PersistedEventPosition(instance_name, stream_ordering),
+ event_pos=PersistedEventPosition(
+ # If instance_name is null we default to "master"
+ instance_name or "master",
+ stream_ordering,
+ ),
room_version_id=room_version,
)
for room_id, sender, membership, event_id, instance_name, stream_ordering, room_version in txn
@@ -606,53 +605,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
return results
- @cached(max_entries=500000, iterable=True)
- async def get_rooms_for_user_with_stream_ordering(
- self, user_id: str
- ) -> FrozenSet[GetRoomsForUserWithStreamOrdering]:
- """Returns a set of room_ids the user is currently joined to.
-
- If a remote user only returns rooms this server is currently
- participating in.
-
- Args:
- user_id
-
- Returns:
- Returns the rooms the user is in currently, along with the stream
- ordering of the most recent join for that user and room, along with
- the room version of the room.
- """
- return await self.db_pool.runInteraction(
- "get_rooms_for_user_with_stream_ordering",
- self._get_rooms_for_user_with_stream_ordering_txn,
- user_id,
- )
-
- def _get_rooms_for_user_with_stream_ordering_txn(
- self, txn: LoggingTransaction, user_id: str
- ) -> FrozenSet[GetRoomsForUserWithStreamOrdering]:
- # We use `current_state_events` here and not `local_current_membership`
- # as a) this gets called with remote users and b) this only gets called
- # for rooms the server is participating in.
- sql = """
- SELECT room_id, e.instance_name, e.stream_ordering
- FROM current_state_events AS c
- INNER JOIN events AS e USING (room_id, event_id)
- WHERE
- c.type = 'm.room.member'
- AND c.state_key = ?
- AND c.membership = ?
- """
-
- txn.execute(sql, (user_id, Membership.JOIN))
- return frozenset(
- GetRoomsForUserWithStreamOrdering(
- room_id, PersistedEventPosition(instance, stream_id)
- )
- for room_id, instance, stream_id in txn
- )
-
async def get_users_server_still_shares_room_with(
self, user_ids: Collection[str]
) -> Set[str]:
@@ -701,13 +653,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
If a remote user only returns rooms this server is currently
participating in.
"""
- rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate(
- (user_id,),
- None,
- update_metrics=False,
- )
- if rooms:
- return frozenset(r.room_id for r in rooms)
room_ids = await self.db_pool.simple_select_onecol(
table="current_state_events",
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index be81025355..e74e0d2e91 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -371,7 +371,7 @@ def _make_generic_sql_bound(
def _filter_results(
lower_token: Optional[RoomStreamToken],
upper_token: Optional[RoomStreamToken],
- instance_name: str,
+ instance_name: Optional[str],
topological_ordering: int,
stream_ordering: int,
) -> bool:
@@ -384,8 +384,14 @@ def _filter_results(
position maps, which we handle by fetching more than necessary from the DB
and then filtering (rather than attempting to construct a complicated SQL
query).
+
+ The `instance_name` arg is optional to handle historic rows, and is
+ interpreted as if it was "master".
"""
+ if instance_name is None:
+ instance_name = "master"
+
event_historical_tuple = (
topological_ordering,
stream_ordering,
@@ -420,7 +426,7 @@ def _filter_results(
def _filter_results_by_stream(
lower_token: Optional[RoomStreamToken],
upper_token: Optional[RoomStreamToken],
- instance_name: str,
+ instance_name: Optional[str],
stream_ordering: int,
) -> bool:
"""
@@ -436,7 +442,14 @@ def _filter_results_by_stream(
position maps, which we handle by fetching more than necessary from the DB
and then filtering (rather than attempting to construct a complicated SQL
query).
+
+ The `instance_name` arg is optional to handle historic rows, and is
+ interpreted as if it was "master".
"""
+
+ if instance_name is None:
+ instance_name = "master"
+
if lower_token:
assert lower_token.topological is None
@@ -912,7 +925,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
prev_sender,
) in txn:
assert room_id is not None
- assert instance_name is not None
assert stream_ordering is not None
if _filter_results_by_stream(
@@ -936,7 +948,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# Event
event_id=event_id,
event_pos=PersistedEventPosition(
- instance_name=instance_name,
+ # If instance_name is null we default to "master"
+ instance_name=instance_name or "master",
stream=stream_ordering,
),
# When `s.event_id = null`, we won't be able to get respective
@@ -952,13 +965,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
prev_event_id=prev_event_id,
prev_event_pos=(
PersistedEventPosition(
- instance_name=prev_instance_name,
+ # If instance_name is null we default to "master"
+ instance_name=prev_instance_name or "master",
stream=prev_stream_ordering,
)
- if (
- prev_instance_name is not None
- and prev_stream_ordering is not None
- )
+ if (prev_stream_ordering is not None)
else None
),
prev_membership=prev_membership,
@@ -1270,7 +1281,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
stream_ordering=stream_ordering,
):
return event_id, PersistedEventPosition(
- instance_name, stream_ordering
+ # If instance_name is null we default to "master"
+ instance_name or "master",
+ stream_ordering,
)
return None
diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py
index 674dd4fb54..77aafa492e 100644
--- a/tests/handlers/test_sync.py
+++ b/tests/handlers/test_sync.py
@@ -210,7 +210,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
)
# Blow away caches (supported room versions can only change due to a restart).
- self.store.get_rooms_for_user_with_stream_ordering.invalidate_all()
self.store.get_rooms_for_user.invalidate_all()
self.store._get_event_cache.clear()
self.store._event_ref.clear()
diff --git a/tests/replication/storage/test_events.py b/tests/replication/storage/test_events.py
index a56f1e2d5d..1afe523d02 100644
--- a/tests/replication/storage/test_events.py
+++ b/tests/replication/storage/test_events.py
@@ -30,19 +30,16 @@ from synapse.api.constants import ReceiptTypes
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase, make_event_from_dict
from synapse.events.snapshot import EventContext
-from synapse.handlers.room import RoomEventSource
from synapse.server import HomeServer
from synapse.storage.databases.main.event_push_actions import (
NotifCounts,
RoomNotifCounts,
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
-from synapse.storage.roommember import GetRoomsForUserWithStreamOrdering, RoomsForUser
+from synapse.storage.roommember import RoomsForUser
from synapse.types import PersistedEventPosition
from synapse.util import Clock
-from tests.server import FakeTransport
-
from ._base import BaseWorkerStoreTestCase
USER_ID = "@feeling:test"
@@ -221,125 +218,6 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
),
)
- def test_get_rooms_for_user_with_stream_ordering(self) -> None:
- """Check that the cache on get_rooms_for_user_with_stream_ordering is invalidated
- by rows in the events stream
- """
- self.persist(type="m.room.create", key="", creator=USER_ID)
- self.persist(type="m.room.member", key=USER_ID, membership="join")
- self.replicate()
- self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
-
- j2 = self.persist(
- type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
- )
- assert j2.internal_metadata.instance_name is not None
- assert j2.internal_metadata.stream_ordering is not None
- self.replicate()
-
- expected_pos = PersistedEventPosition(
- j2.internal_metadata.instance_name, j2.internal_metadata.stream_ordering
- )
- self.check(
- "get_rooms_for_user_with_stream_ordering",
- (USER_ID_2,),
- {GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)},
- )
-
- def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(
- self,
- ) -> None:
- """Check that current_state invalidation happens correctly with multiple events
- in the persistence batch.
-
- This test attempts to reproduce a race condition between the event persistence
- loop and a worker-based Sync handler.
-
- The problem occurred when the master persisted several events in one batch. It
- only updates the current_state at the end of each batch, so the obvious thing
- to do is then to issue a current_state_delta stream update corresponding to the
- last stream_id in the batch.
-
- However, that raises the possibility that a worker will see the replication
- notification for a join event before the current_state caches are invalidated.
-
- The test involves:
- * creating a join and a message event for a user, and persisting them in the
- same batch
-
- * controlling the replication stream so that updates are sent gradually
-
- * between each bunch of replication updates, check that we see a consistent
- snapshot of the state.
- """
- self.persist(type="m.room.create", key="", creator=USER_ID)
- self.persist(type="m.room.member", key=USER_ID, membership="join")
- self.replicate()
- self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
-
- # limit the replication rate
- repl_transport = self._server_transport
- assert isinstance(repl_transport, FakeTransport)
- repl_transport.autoflush = False
-
- # build the join and message events and persist them in the same batch.
- logger.info("----- build test events ------")
- j2, j2ctx = self.build_event(
- type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
- )
- msg, msgctx = self.build_event()
- self.get_success(self.persistance.persist_events([(j2, j2ctx), (msg, msgctx)]))
- self.replicate()
- assert j2.internal_metadata.instance_name is not None
- assert j2.internal_metadata.stream_ordering is not None
-
- event_source = RoomEventSource(self.hs)
- event_source.store = self.worker_store
- current_token = event_source.get_current_key()
-
- # gradually stream out the replication
- while repl_transport.buffer:
- logger.info("------ flush ------")
- repl_transport.flush(30)
- self.pump(0)
-
- prev_token = current_token
- current_token = event_source.get_current_key()
-
- # attempt to replicate the behaviour of the sync handler.
- #
- # First, we get a list of the rooms we are joined to
- joined_rooms = self.get_success(
- self.worker_store.get_rooms_for_user_with_stream_ordering(USER_ID_2)
- )
-
- # Then, we get a list of the events since the last sync
- membership_changes = self.get_success(
- self.worker_store.get_membership_changes_for_user(
- USER_ID_2, prev_token, current_token
- )
- )
-
- logger.info(
- "%s->%s: joined_rooms=%r membership_changes=%r",
- prev_token,
- current_token,
- joined_rooms,
- membership_changes,
- )
-
- # the membership change is only any use to us if the room is in the
- # joined_rooms list.
- if membership_changes:
- expected_pos = PersistedEventPosition(
- j2.internal_metadata.instance_name,
- j2.internal_metadata.stream_ordering,
- )
- self.assertEqual(
- joined_rooms,
- {GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)},
- )
-
event_id = 0
def persist(self, backfill: bool = False, **kwargs: Any) -> EventBase:
|