diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index b127289d8d..881888fa93 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -119,9 +119,6 @@ class SQLBaseStore(metaclass=ABCMeta):
self._attempt_to_invalidate_cache(
"get_user_in_room_with_profile", (room_id, user_id)
)
- self._attempt_to_invalidate_cache(
- "get_rooms_for_user_with_stream_ordering", (user_id,)
- )
self._attempt_to_invalidate_cache("get_rooms_for_user", (user_id,))
# Purge other caches based on room state.
@@ -148,9 +145,6 @@ class SQLBaseStore(metaclass=ABCMeta):
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
- self._attempt_to_invalidate_cache(
- "get_rooms_for_user_with_stream_ordering", None
- )
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index c6787faea0..2d6b75e47e 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -268,16 +268,12 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]
if data.type == EventTypes.Member:
- self.get_rooms_for_user_with_stream_ordering.invalidate( # type: ignore[attr-defined]
- (data.state_key,)
- )
self.get_rooms_for_user.invalidate((data.state_key,)) # type: ignore[attr-defined]
elif row.type == EventsStreamAllStateRow.TypeId:
assert isinstance(data, EventsStreamAllStateRow)
# Similar to the above, but the entire caches are invalidated. This is
# unfortunate for the membership caches, but should recover quickly.
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]
- self.get_rooms_for_user_with_stream_ordering.invalidate_all() # type: ignore[attr-defined]
self.get_rooms_for_user.invalidate_all() # type: ignore[attr-defined]
else:
raise Exception("Unknown events stream row type %s" % (row.type,))
@@ -334,9 +330,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache(
"get_invited_rooms_for_local_user", (state_key,)
)
- self._attempt_to_invalidate_cache(
- "get_rooms_for_user_with_stream_ordering", (state_key,)
- )
self._attempt_to_invalidate_cache("get_rooms_for_user", (state_key,))
self._attempt_to_invalidate_cache(
@@ -399,9 +392,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache("get_thread_id", None)
self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None)
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
- self._attempt_to_invalidate_cache(
- "get_rooms_for_user_with_stream_ordering", None
- )
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("did_forget", None)
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index a5acea8c3b..4d4877c4c3 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1457,7 +1457,8 @@ class EventsWorkerStore(SQLBaseStore):
event_dict[event_id] = _EventRow(
event_id=event_id,
stream_ordering=row[1],
- instance_name=row[2],
+ # If instance_name is null we default to "master"
+ instance_name=row[2] or "master",
internal_metadata=row[3],
json=row[4],
format_version=row[5],
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index d8b54dc4e3..5d2fd08495 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -50,12 +50,7 @@ from synapse.storage.database import (
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import Sqlite3Engine
-from synapse.storage.roommember import (
- GetRoomsForUserWithStreamOrdering,
- MemberSummary,
- ProfileInfo,
- RoomsForUser,
-)
+from synapse.storage.roommember import MemberSummary, ProfileInfo, RoomsForUser
from synapse.types import (
JsonDict,
PersistedEventPosition,
@@ -494,7 +489,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
sender=sender,
membership=membership,
event_id=event_id,
- event_pos=PersistedEventPosition(instance_name, stream_ordering),
+ event_pos=PersistedEventPosition(
+ # If instance_name is null we default to "master"
+ instance_name or "master",
+ stream_ordering,
+ ),
room_version_id=room_version,
)
for room_id, sender, membership, event_id, instance_name, stream_ordering, room_version in txn
@@ -606,53 +605,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
return results
- @cached(max_entries=500000, iterable=True)
- async def get_rooms_for_user_with_stream_ordering(
- self, user_id: str
- ) -> FrozenSet[GetRoomsForUserWithStreamOrdering]:
- """Returns a set of room_ids the user is currently joined to.
-
- If a remote user only returns rooms this server is currently
- participating in.
-
- Args:
- user_id
-
- Returns:
- Returns the rooms the user is in currently, along with the stream
- ordering of the most recent join for that user and room, along with
- the room version of the room.
- """
- return await self.db_pool.runInteraction(
- "get_rooms_for_user_with_stream_ordering",
- self._get_rooms_for_user_with_stream_ordering_txn,
- user_id,
- )
-
- def _get_rooms_for_user_with_stream_ordering_txn(
- self, txn: LoggingTransaction, user_id: str
- ) -> FrozenSet[GetRoomsForUserWithStreamOrdering]:
- # We use `current_state_events` here and not `local_current_membership`
- # as a) this gets called with remote users and b) this only gets called
- # for rooms the server is participating in.
- sql = """
- SELECT room_id, e.instance_name, e.stream_ordering
- FROM current_state_events AS c
- INNER JOIN events AS e USING (room_id, event_id)
- WHERE
- c.type = 'm.room.member'
- AND c.state_key = ?
- AND c.membership = ?
- """
-
- txn.execute(sql, (user_id, Membership.JOIN))
- return frozenset(
- GetRoomsForUserWithStreamOrdering(
- room_id, PersistedEventPosition(instance, stream_id)
- )
- for room_id, instance, stream_id in txn
- )
-
async def get_users_server_still_shares_room_with(
self, user_ids: Collection[str]
) -> Set[str]:
@@ -701,13 +653,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
If a remote user only returns rooms this server is currently
participating in.
"""
- rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate(
- (user_id,),
- None,
- update_metrics=False,
- )
- if rooms:
- return frozenset(r.room_id for r in rooms)
room_ids = await self.db_pool.simple_select_onecol(
table="current_state_events",
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index be81025355..e74e0d2e91 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -371,7 +371,7 @@ def _make_generic_sql_bound(
def _filter_results(
lower_token: Optional[RoomStreamToken],
upper_token: Optional[RoomStreamToken],
- instance_name: str,
+ instance_name: Optional[str],
topological_ordering: int,
stream_ordering: int,
) -> bool:
@@ -384,8 +384,14 @@ def _filter_results(
position maps, which we handle by fetching more than necessary from the DB
and then filtering (rather than attempting to construct a complicated SQL
query).
+
+ The `instance_name` arg is optional to handle historic rows, and is
+ interpreted as if it was "master".
"""
+ if instance_name is None:
+ instance_name = "master"
+
event_historical_tuple = (
topological_ordering,
stream_ordering,
@@ -420,7 +426,7 @@ def _filter_results(
def _filter_results_by_stream(
lower_token: Optional[RoomStreamToken],
upper_token: Optional[RoomStreamToken],
- instance_name: str,
+ instance_name: Optional[str],
stream_ordering: int,
) -> bool:
"""
@@ -436,7 +442,14 @@ def _filter_results_by_stream(
position maps, which we handle by fetching more than necessary from the DB
and then filtering (rather than attempting to construct a complicated SQL
query).
+
+ The `instance_name` arg is optional to handle historic rows, and is
+ interpreted as if it was "master".
"""
+
+ if instance_name is None:
+ instance_name = "master"
+
if lower_token:
assert lower_token.topological is None
@@ -912,7 +925,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
prev_sender,
) in txn:
assert room_id is not None
- assert instance_name is not None
assert stream_ordering is not None
if _filter_results_by_stream(
@@ -936,7 +948,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# Event
event_id=event_id,
event_pos=PersistedEventPosition(
- instance_name=instance_name,
+ # If instance_name is null we default to "master"
+ instance_name=instance_name or "master",
stream=stream_ordering,
),
# When `s.event_id = null`, we won't be able to get respective
@@ -952,13 +965,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
prev_event_id=prev_event_id,
prev_event_pos=(
PersistedEventPosition(
- instance_name=prev_instance_name,
+ # If instance_name is null we default to "master"
+ instance_name=prev_instance_name or "master",
stream=prev_stream_ordering,
)
- if (
- prev_instance_name is not None
- and prev_stream_ordering is not None
- )
+ if (prev_stream_ordering is not None)
else None
),
prev_membership=prev_membership,
@@ -1270,7 +1281,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
stream_ordering=stream_ordering,
):
return event_id, PersistedEventPosition(
- instance_name, stream_ordering
+ # If instance_name is null we default to "master"
+ instance_name or "master",
+ stream_ordering,
)
return None
|