diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 3217127865..233f8c113d 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1868,22 +1868,17 @@ class FederationHandler:
async with self._is_partial_state_room_linearizer.queue(room_id):
logger.info("Clearing partial-state flag for %s", room_id)
- success = await self.store.clear_partial_state_room(room_id)
+ new_stream_id = await self.store.clear_partial_state_room(room_id)
- # Poke the notifier so that other workers see the write to
- # the un-partial-stated rooms stream.
- self._notifier.notify_replication()
-
- if success:
+ if new_stream_id is not None:
logger.info("State resync complete for %s", room_id)
self._storage_controllers.state.notify_room_un_partial_stated(
room_id
)
- # Poke the notifier so that other workers see the write to
- # the un-partial-stated rooms stream.
- self._notifier.notify_replication()
-
+ await self._notifier.on_un_partial_stated_room(
+ room_id, new_stream_id
+ )
return
# we raced against more events arriving with partial state. Go round
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 78d488f2b1..ee11764567 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -290,7 +290,7 @@ class SyncHandler:
expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
)
- self.rooms_to_exclude = hs.config.server.rooms_to_exclude_from_sync
+ self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
async def wait_for_sync_for_user(
self,
@@ -1340,7 +1340,10 @@ class SyncHandler:
membership_change_events = []
if since_token:
membership_change_events = await self.store.get_membership_changes_for_user(
- user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude
+ user_id,
+ since_token.room_key,
+ now_token.room_key,
+ self.rooms_to_exclude_globally,
)
mem_last_change_by_room_id: Dict[str, EventBase] = {}
@@ -1375,12 +1378,39 @@ class SyncHandler:
else:
mutable_joined_room_ids.discard(room_id)
+ # Tweak the set of rooms to return to the client for eager (non-lazy) syncs.
+ mutable_rooms_to_exclude = set(self.rooms_to_exclude_globally)
+ if not sync_config.filter_collection.lazy_load_members():
+ # Non-lazy syncs should never include partially stated rooms.
+ # Exclude all partially stated rooms from this sync.
+ for room_id in mutable_joined_room_ids:
+ if await self.store.is_partial_state_room(room_id):
+ mutable_rooms_to_exclude.add(room_id)
+
+ # Incremental eager syncs should additionally include rooms that
+ # - we are joined to
+ # - are full-stated
+ # - became fully-stated at some point during the sync period
+ # (These rooms will have been omitted during a previous eager sync.)
+ forced_newly_joined_room_ids = set()
+ if since_token and not sync_config.filter_collection.lazy_load_members():
+ un_partial_stated_rooms = (
+ await self.store.get_un_partial_stated_rooms_between(
+ since_token.un_partial_stated_rooms_key,
+ now_token.un_partial_stated_rooms_key,
+ mutable_joined_room_ids,
+ )
+ )
+ for room_id in un_partial_stated_rooms:
+ if not await self.store.is_partial_state_room(room_id):
+ forced_newly_joined_room_ids.add(room_id)
+
# Now we have our list of joined room IDs, exclude as configured and freeze
joined_room_ids = frozenset(
(
room_id
for room_id in mutable_joined_room_ids
- if room_id not in self.rooms_to_exclude
+ if room_id not in mutable_rooms_to_exclude
)
)
@@ -1397,6 +1427,8 @@ class SyncHandler:
since_token=since_token,
now_token=now_token,
joined_room_ids=joined_room_ids,
+ excluded_room_ids=frozenset(mutable_rooms_to_exclude),
+ forced_newly_joined_room_ids=frozenset(forced_newly_joined_room_ids),
membership_change_events=membership_change_events,
)
@@ -1834,14 +1866,16 @@ class SyncHandler:
# 3. Work out which rooms need reporting in the sync response.
ignored_users = await self.store.ignored_users(user_id)
if since_token:
- room_changes = await self._get_rooms_changed(
+ room_changes = await self._get_room_changes_for_incremental_sync(
sync_result_builder, ignored_users
)
tags_by_room = await self.store.get_updated_tags(
user_id, since_token.account_data_key
)
else:
- room_changes = await self._get_all_rooms(sync_result_builder, ignored_users)
+ room_changes = await self._get_room_changes_for_initial_sync(
+ sync_result_builder, ignored_users
+ )
tags_by_room = await self.store.get_tags_for_user(user_id)
log_kv({"rooms_changed": len(room_changes.room_entries)})
@@ -1900,7 +1934,7 @@ class SyncHandler:
assert since_token
- if membership_change_events:
+ if membership_change_events or sync_result_builder.forced_newly_joined_room_ids:
return True
stream_id = since_token.room_key.stream
@@ -1909,7 +1943,7 @@ class SyncHandler:
return True
return False
- async def _get_rooms_changed(
+ async def _get_room_changes_for_incremental_sync(
self,
sync_result_builder: "SyncResultBuilder",
ignored_users: FrozenSet[str],
@@ -1947,7 +1981,9 @@ class SyncHandler:
for event in membership_change_events:
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
- newly_joined_rooms: List[str] = []
+ newly_joined_rooms: List[str] = list(
+ sync_result_builder.forced_newly_joined_room_ids
+ )
newly_left_rooms: List[str] = []
room_entries: List[RoomSyncResultBuilder] = []
invited: List[InvitedSyncResult] = []
@@ -2153,7 +2189,7 @@ class SyncHandler:
newly_left_rooms,
)
- async def _get_all_rooms(
+ async def _get_room_changes_for_initial_sync(
self,
sync_result_builder: "SyncResultBuilder",
ignored_users: FrozenSet[str],
@@ -2178,7 +2214,7 @@ class SyncHandler:
room_list = await self.store.get_rooms_for_local_user_where_membership_is(
user_id=user_id,
membership_list=Membership.LIST,
- excluded_rooms=self.rooms_to_exclude,
+ excluded_rooms=sync_result_builder.excluded_room_ids,
)
room_entries = []
@@ -2549,6 +2585,13 @@ class SyncResultBuilder:
since_token: The token supplied by user, or None.
now_token: The token to sync up to.
joined_room_ids: List of rooms the user is joined to
+ excluded_room_ids: Set of room ids we should omit from the /sync response.
+ forced_newly_joined_room_ids:
+ Rooms that should be presented in the /sync response as if they were
+ newly joined during the sync period, even if that's not the case.
+ (This is useful if the room was previously excluded from a /sync response,
+ and now the client should be made aware of it.)
+ Only used by incremental syncs.
# The following mirror the fields in a sync response
presence
@@ -2565,6 +2608,8 @@ class SyncResultBuilder:
since_token: Optional[StreamToken]
now_token: StreamToken
joined_room_ids: FrozenSet[str]
+ excluded_room_ids: FrozenSet[str]
+ forced_newly_joined_room_ids: FrozenSet[str]
membership_change_events: List[EventBase]
presence: List[UserPresenceState] = attr.Factory(list)
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 28f0d4a25a..2b0e52f23c 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -314,6 +314,32 @@ class Notifier:
event_entries.append((entry, event.event_id))
await self.notify_new_room_events(event_entries, max_room_stream_token)
+ async def on_un_partial_stated_room(
+ self,
+ room_id: str,
+ new_token: int,
+ ) -> None:
+ """Used by the resync background processes to wake up all listeners
+ of this room when it is un-partial-stated.
+
+ It will also notify replication listeners of the change in stream.
+ """
+
+ # Wake up all related user stream notifiers
+ user_streams = self.room_to_user_streams.get(room_id, set())
+ time_now_ms = self.clock.time_msec()
+ for user_stream in user_streams:
+ try:
+ user_stream.notify(
+ StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms
+ )
+ except Exception:
+ logger.exception("Failed to notify listener")
+
+ # Poke the replication so that other workers also see the write to
+ # the un-partial-stated rooms stream.
+ self.notify_replication()
+
async def notify_new_room_events(
self,
event_entries: List[Tuple[_PendingRoomEventEntry, str]],
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 2a9cb499a4..cc0528bd8e 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -260,6 +260,7 @@ class ReplicationDataHandler:
self._state_storage_controller.notify_room_un_partial_stated(
row.room_id
)
+ await self.notifier.on_un_partial_stated_room(row.room_id, token)
elif stream_name == UnPartialStatedEventStream.NAME:
for row in rows:
assert isinstance(row, UnPartialStatedEventStreamRow)
diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index aea96e9d24..84f844b79e 100644
--- a/synapse/storage/databases/main/relations.py
+++ b/synapse/storage/databases/main/relations.py
@@ -292,6 +292,7 @@ class RelationsWorkerStore(SQLBaseStore):
to_device_key=0,
device_list_key=0,
groups_key=0,
+ un_partial_stated_rooms_key=0,
)
return events[:limit], next_token
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 6a65b2a89b..3aa7b94560 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -26,6 +26,7 @@ from typing import (
Mapping,
Optional,
Sequence,
+ Set,
Tuple,
Union,
cast,
@@ -1294,10 +1295,44 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
instance_name
)
+ async def get_un_partial_stated_rooms_between(
+ self, last_id: int, current_id: int, room_ids: Collection[str]
+ ) -> Set[str]:
+ """Get all rooms that got un partial stated between `last_id` exclusive and
+ `current_id` inclusive.
+
+ Returns:
+ The list of room ids.
+ """
+
+ if last_id == current_id:
+ return set()
+
+ def _get_un_partial_stated_rooms_between_txn(
+ txn: LoggingTransaction,
+ ) -> Set[str]:
+ sql = """
+ SELECT DISTINCT room_id FROM un_partial_stated_room_stream
+ WHERE ? < stream_id AND stream_id <= ? AND
+ """
+
+ clause, args = make_in_list_sql_clause(
+ self.database_engine, "room_id", room_ids
+ )
+
+ txn.execute(sql + clause, [last_id, current_id] + args)
+
+ return {r[0] for r in txn}
+
+ return await self.db_pool.runInteraction(
+ "get_un_partial_stated_rooms_between",
+ _get_un_partial_stated_rooms_between_txn,
+ )
+
async def get_un_partial_stated_rooms_from_stream(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
- """Get updates for caches replication stream.
+ """Get updates for un partial stated rooms replication stream.
Args:
instance_name: The writer we want to fetch updates from. Unused
@@ -2304,16 +2339,16 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
(room_id,),
)
- async def clear_partial_state_room(self, room_id: str) -> bool:
+ async def clear_partial_state_room(self, room_id: str) -> Optional[int]:
"""Clears the partial state flag for a room.
Args:
room_id: The room whose partial state flag is to be cleared.
Returns:
- `True` if the partial state flag has been cleared successfully.
+ The corresponding stream id for the un-partial-stated rooms stream.
- `False` if the partial state flag could not be cleared because the room
+ `None` if the partial state flag could not be cleared because the room
still contains events with partial state.
"""
try:
@@ -2324,7 +2359,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
room_id,
un_partial_state_room_stream_id,
)
- return True
+ return un_partial_state_room_stream_id
except self.db_pool.engine.module.IntegrityError as e:
# Assume that any `IntegrityError`s are due to partial state events.
logger.info(
@@ -2332,7 +2367,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
room_id,
e,
)
- return False
+ return None
def _clear_partial_state_room_txn(
self,
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index f02c1d7ea7..8e2ba7b7b4 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -15,6 +15,7 @@
import logging
from typing import (
TYPE_CHECKING,
+ AbstractSet,
Collection,
Dict,
FrozenSet,
@@ -47,7 +48,13 @@ from synapse.storage.roommember import (
ProfileInfo,
RoomsForUser,
)
-from synapse.types import JsonDict, PersistedEventPosition, StateMap, get_domain_from_id
+from synapse.types import (
+ JsonDict,
+ PersistedEventPosition,
+ StateMap,
+ StrCollection,
+ get_domain_from_id,
+)
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
@@ -385,7 +392,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
self,
user_id: str,
membership_list: Collection[str],
- excluded_rooms: Optional[List[str]] = None,
+ excluded_rooms: StrCollection = (),
) -> List[RoomsForUser]:
"""Get all the rooms for this *local* user where the membership for this user
matches one in the membership list.
@@ -412,10 +419,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
)
# Now we filter out forgotten and excluded rooms
- rooms_to_exclude: Set[str] = await self.get_forgotten_rooms_for_user(user_id)
+ rooms_to_exclude = await self.get_forgotten_rooms_for_user(user_id)
if excluded_rooms is not None:
- rooms_to_exclude.update(set(excluded_rooms))
+ # Take a copy to avoid mutating the in-cache set
+ rooms_to_exclude = set(rooms_to_exclude)
+ rooms_to_exclude.update(excluded_rooms)
return [room for room in rooms if room.room_id not in rooms_to_exclude]
@@ -1169,7 +1178,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return count == 0
@cached()
- async def get_forgotten_rooms_for_user(self, user_id: str) -> Set[str]:
+ async def get_forgotten_rooms_for_user(self, user_id: str) -> AbstractSet[str]:
"""Gets all rooms the user has forgotten.
Args:
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index 619eb7f601..d7084d2358 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -53,11 +53,15 @@ class EventSources:
*(attribute.type(hs) for attribute in attr.fields(_EventSourcesInner))
)
self.store = hs.get_datastores().main
+ self._instance_name = hs.get_instance_name()
def get_current_token(self) -> StreamToken:
push_rules_key = self.store.get_max_push_rules_stream_id()
to_device_key = self.store.get_to_device_stream_token()
device_list_key = self.store.get_device_stream_token()
+ un_partial_stated_rooms_key = self.store.get_un_partial_stated_rooms_token(
+ self._instance_name
+ )
token = StreamToken(
room_key=self.sources.room.get_current_key(),
@@ -70,6 +74,7 @@ class EventSources:
device_list_key=device_list_key,
# Groups key is unused.
groups_key=0,
+ un_partial_stated_rooms_key=un_partial_stated_rooms_key,
)
return token
@@ -107,5 +112,6 @@ class EventSources:
to_device_key=0,
device_list_key=0,
groups_key=0,
+ un_partial_stated_rooms_key=0,
)
return token
diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py
index c59eca2430..f82d1cfc29 100644
--- a/synapse/types/__init__.py
+++ b/synapse/types/__init__.py
@@ -17,6 +17,7 @@ import re
import string
from typing import (
TYPE_CHECKING,
+ AbstractSet,
Any,
ClassVar,
Dict,
@@ -79,7 +80,7 @@ JsonSerializable = object
# Collection[str] that does not include str itself; str being a Sequence[str]
# is very misleading and results in bugs.
-StrCollection = Union[Tuple[str, ...], List[str], Set[str]]
+StrCollection = Union[Tuple[str, ...], List[str], AbstractSet[str]]
# Note that this seems to require inheriting *directly* from Interface in order
@@ -633,6 +634,7 @@ class StreamKeyType:
PUSH_RULES: Final = "push_rules_key"
TO_DEVICE: Final = "to_device_key"
DEVICE_LIST: Final = "device_list_key"
+ UN_PARTIAL_STATED_ROOMS = "un_partial_stated_rooms_key"
@attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -640,7 +642,7 @@ class StreamToken:
"""A collection of keys joined together by underscores in the following
order and which represent the position in their respective streams.
- ex. `s2633508_17_338_6732159_1082514_541479_274711_265584_1`
+ ex. `s2633508_17_338_6732159_1082514_541479_274711_265584_1_379`
1. `room_key`: `s2633508` which is a `RoomStreamToken`
- `RoomStreamToken`'s can also look like `t426-2633508` or `m56~2.58~3.59`
- See the docstring for `RoomStreamToken` for more details.
@@ -652,12 +654,13 @@ class StreamToken:
7. `to_device_key`: `274711`
8. `device_list_key`: `265584`
9. `groups_key`: `1` (note that this key is now unused)
+ 10. `un_partial_stated_rooms_key`: `379`
You can see how many of these keys correspond to the various
fields in a "/sync" response:
```json
{
- "next_batch": "s12_4_0_1_1_1_1_4_1",
+ "next_batch": "s12_4_0_1_1_1_1_4_1_1",
"presence": {
"events": []
},
@@ -669,7 +672,7 @@ class StreamToken:
"!QrZlfIDQLNLdZHqTnt:hs1": {
"timeline": {
"events": [],
- "prev_batch": "s10_4_0_1_1_1_1_4_1",
+ "prev_batch": "s10_4_0_1_1_1_1_4_1_1",
"limited": false
},
"state": {
@@ -705,6 +708,7 @@ class StreamToken:
device_list_key: int
# Note that the groups key is no longer used and may have bogus values.
groups_key: int
+ un_partial_stated_rooms_key: int
_SEPARATOR = "_"
START: ClassVar["StreamToken"]
@@ -743,6 +747,7 @@ class StreamToken:
# serialized so that there will not be confusion in the future
# if additional tokens are added.
str(self.groups_key),
+ str(self.un_partial_stated_rooms_key),
]
)
@@ -775,7 +780,7 @@ class StreamToken:
return attr.evolve(self, **{key: new_value})
-StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0)
+StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0, 0)
@attr.s(slots=True, frozen=True, auto_attribs=True)
|