diff options
-rw-r--r-- | changelog.d/12344.removal | 1 | ||||
-rw-r--r-- | changelog.d/12382.removal | 1 | ||||
-rw-r--r-- | changelog.d/12394.misc | 1 | ||||
-rw-r--r-- | docs/upgrade.md | 7 | ||||
-rw-r--r-- | synapse/api/constants.py | 2 | ||||
-rw-r--r-- | synapse/api/filtering.py | 12 | ||||
-rw-r--r-- | synapse/config/experimental.py | 5 | ||||
-rw-r--r-- | synapse/events/utils.py | 6 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 75 | ||||
-rw-r--r-- | synapse/handlers/federation_event.py | 39 | ||||
-rw-r--r-- | synapse/handlers/message.py | 5 | ||||
-rw-r--r-- | synapse/rest/client/versions.py | 1 | ||||
-rw-r--r-- | synapse/server.py | 2 | ||||
-rw-r--r-- | synapse/storage/databases/main/events.py | 20 | ||||
-rw-r--r-- | synapse/storage/databases/main/events_worker.py | 24 | ||||
-rw-r--r-- | synapse/storage/databases/main/relations.py | 78 | ||||
-rw-r--r-- | synapse/storage/databases/main/room.py | 31 | ||||
-rw-r--r-- | synapse/storage/databases/main/state.py | 48 | ||||
-rw-r--r-- | synapse/storage/persist_events.py | 56 | ||||
-rw-r--r-- | tests/api/test_filtering.py | 4 | ||||
-rw-r--r-- | tests/rest/admin/test_admin.py | 1 |
21 files changed, 320 insertions, 99 deletions
diff --git a/changelog.d/12344.removal b/changelog.d/12344.removal new file mode 100644 index 0000000000..ecefa76d8e --- /dev/null +++ b/changelog.d/12344.removal @@ -0,0 +1 @@ +The groups/communities feature in Synapse has been disabled by default. diff --git a/changelog.d/12382.removal b/changelog.d/12382.removal new file mode 100644 index 0000000000..eb91186340 --- /dev/null +++ b/changelog.d/12382.removal @@ -0,0 +1 @@ +Remove unstable identifiers from [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440). diff --git a/changelog.d/12394.misc b/changelog.d/12394.misc new file mode 100644 index 0000000000..69109fcc37 --- /dev/null +++ b/changelog.d/12394.misc @@ -0,0 +1 @@ +Preparation for faster-room-join work: start a background process to resynchronise the room state after a room join. diff --git a/docs/upgrade.md b/docs/upgrade.md index 55a6ff7cae..d5516dfc99 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -85,6 +85,13 @@ process, for example: dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb ``` +# Upgrading to v1.58.0 + +## Groups/communities feature has been disabled by default + +The non-standard groups/communities feature in Synapse has been disabled by default +and will be removed in Synapse v1.61.0. + # Upgrading to v1.57.0 ## Changes to database schema for application services diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 92907415e6..0172eb60b8 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -179,8 +179,6 @@ class RelationTypes: REPLACE: Final = "m.replace" REFERENCE: Final = "m.reference" THREAD: Final = "m.thread" - # TODO Remove this in Synapse >= v1.57.0. - UNSTABLE_THREAD: Final = "io.element.thread" class LimitBlockingTypes: diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 27e97d6f37..4a808e33fe 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -89,9 +89,7 @@ ROOM_EVENT_FILTER_SCHEMA = { "org.matrix.not_labels": {"type": "array", "items": {"type": "string"}}, # MSC3440, filtering by event relations. "related_by_senders": {"type": "array", "items": {"type": "string"}}, - "io.element.relation_senders": {"type": "array", "items": {"type": "string"}}, "related_by_rel_types": {"type": "array", "items": {"type": "string"}}, - "io.element.relation_types": {"type": "array", "items": {"type": "string"}}, }, } @@ -323,16 +321,6 @@ class Filter: self.related_by_senders = self.filter_json.get("related_by_senders", None) self.related_by_rel_types = self.filter_json.get("related_by_rel_types", None) - # Fallback to the unstable prefix if the stable version is not given. - if hs.config.experimental.msc3440_enabled: - self.related_by_senders = self.related_by_senders or self.filter_json.get( - "io.element.relation_senders", None - ) - self.related_by_rel_types = ( - self.related_by_rel_types - or self.filter_json.get("io.element.relation_types", None) - ) - def filters_all_types(self) -> bool: return "*" in self.not_types diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 447476fbfa..979059e723 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -26,9 +26,6 @@ class ExperimentalConfig(Config): def read_config(self, config: JsonDict, **kwargs: Any) -> None: experimental = config.get("experimental_features") or {} - # MSC3440 (thread relation) - self.msc3440_enabled: bool = experimental.get("msc3440_enabled", False) - # MSC3026 (busy presence state) self.msc3026_enabled: bool = experimental.get("msc3026_enabled", False) @@ -77,7 +74,7 @@ class ExperimentalConfig(Config): self.msc3720_enabled: bool = experimental.get("msc3720_enabled", False) # The deprecated groups feature. - self.groups_enabled: bool = experimental.get("groups_enabled", True) + self.groups_enabled: bool = experimental.get("groups_enabled", False) # MSC2654: Unread counts self.msc2654_enabled: bool = experimental.get("msc2654_enabled", False) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 918e87ed9c..43c3241fb0 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -39,7 +39,6 @@ from . import EventBase if TYPE_CHECKING: from synapse.handlers.relations import BundledAggregations - from synapse.server import HomeServer # Split strings on "." but not "\." This uses a negative lookbehind assertion for '\' @@ -396,9 +395,6 @@ class EventClientSerializer: clients. """ - def __init__(self, hs: "HomeServer"): - self._msc3440_enabled = hs.config.experimental.msc3440_enabled - def serialize_event( self, event: Union[JsonDict, EventBase], @@ -525,8 +521,6 @@ class EventClientSerializer: "current_user_participated": thread.current_user_participated, } serialized_aggregations[RelationTypes.THREAD] = thread_summary - if self._msc3440_enabled: - serialized_aggregations[RelationTypes.UNSTABLE_THREAD] = thread_summary # Include the bundled aggregations in the event. if serialized_aggregations: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 78d149905f..1434e99056 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -466,6 +466,8 @@ class FederationHandler: ) if ret.partial_state: + # TODO(faster_joins): roll this back if we don't manage to start the + # background resync (eg process_remote_join fails) await self.store.store_partial_state_room(room_id, ret.servers_in_room) max_stream_id = await self._federation_event_handler.process_remote_join( @@ -478,6 +480,18 @@ class FederationHandler: partial_state=ret.partial_state, ) + if ret.partial_state: + # Kick off the process of asynchronously fetching the state for this + # room. + # + # TODO(faster_joins): pick this up again on restart + run_as_background_process( + desc="sync_partial_state_room", + func=self._sync_partial_state_room, + destination=origin, + room_id=room_id, + ) + # We wait here until this instance has seen the events come down # replication (if we're using replication) as the below uses caches. await self._replication.wait_for_stream_position( @@ -1370,3 +1384,64 @@ class FederationHandler: # We fell off the bottom, couldn't get the complexity from anyone. Oh # well. return None + + async def _sync_partial_state_room( + self, + destination: str, + room_id: str, + ) -> None: + """Background process to resync the state of a partial-state room + + Args: + destination: homeserver to pull the state from + room_id: room to be resynced + """ + + # TODO(faster_joins): do we need to lock to avoid races? What happens if other + # worker processes kick off a resync in parallel? Perhaps we should just elect + # a single worker to do the resync. + # + # TODO(faster_joins): what happens if we leave the room during a resync? if we + # really leave, that might mean we have difficulty getting the room state over + # federation. + # + # TODO(faster_joins): try other destinations if the one we have fails + + logger.info("Syncing state for room %s via %s", room_id, destination) + + # we work through the queue in order of increasing stream ordering. + while True: + batch = await self.store.get_partial_state_events_batch(room_id) + if not batch: + # all the events are updated, so we can update current state and + # clear the lazy-loading flag. + logger.info("Updating current state for %s", room_id) + assert ( + self.storage.persistence is not None + ), "TODO(faster_joins): support for workers" + await self.storage.persistence.update_current_state(room_id) + + logger.info("Clearing partial-state flag for %s", room_id) + success = await self.store.clear_partial_state_room(room_id) + if success: + logger.info("State resync complete for %s", room_id) + + # TODO(faster_joins) update room stats and user directory? + return + + # we raced against more events arriving with partial state. Go round + # the loop again. We've already logged a warning, so no need for more. + # TODO(faster_joins): there is still a race here, whereby incoming events which raced + # with us will fail to be persisted after the call to `clear_partial_state_room` due to + # having partial state. + continue + + events = await self.store.get_events_as_list( + batch, + redact_behaviour=EventRedactBehaviour.AS_IS, + allow_rejected=True, + ) + for event in events: + await self._federation_event_handler.update_state_for_partial_state_event( + destination, event + ) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 03c1197c99..32bf02818c 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -477,6 +477,45 @@ class FederationEventHandler: return await self.persist_events_and_notify(room_id, [(event, context)]) + async def update_state_for_partial_state_event( + self, destination: str, event: EventBase + ) -> None: + """Recalculate the state at an event as part of a de-partial-stating process + + Args: + destination: server to request full state from + event: partial-state event to be de-partial-stated + """ + logger.info("Updating state for %s", event.event_id) + with nested_logging_context(suffix=event.event_id): + # if we have all the event's prev_events, then we can work out the + # state based on their states. Otherwise, we request it from the destination + # server. + # + # This is the same operation as we do when we receive a regular event + # over federation. + state = await self._resolve_state_at_missing_prevs(destination, event) + + # build a new state group for it if need be + context = await self._state_handler.compute_event_context( + event, + old_state=state, + ) + if context.partial_state: + # this can happen if some or all of the event's prev_events still have + # partial state - ie, an event has an earlier stream_ordering than one + # or more of its prev_events, so we de-partial-state it before its + # prev_events. + # + # TODO(faster_joins): we probably need to be more intelligent, and + # exclude partial-state prev_events from consideration + logger.warning( + "%s still has partial state: can't de-partial-state it yet", + event.event_id, + ) + return + await self._store.update_state_for_partial_state_event(event, context) + async def backfill( self, dest: str, room_id: str, limit: int, extremities: Collection[str] ) -> None: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 7db6905c61..47a63005a9 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1102,10 +1102,7 @@ class EventCreationHandler: raise SynapseError(400, "Can't send same reaction twice") # Don't attempt to start a thread if the parent event is a relation. - elif ( - relation_type == RelationTypes.THREAD - or relation_type == RelationTypes.UNSTABLE_THREAD - ): + elif relation_type == RelationTypes.THREAD: if await self.store.event_includes_relation(relates_to): raise SynapseError( 400, "Cannot start threads from an event with a relation" diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py index 9a65aa4843..7ee6b5505b 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py @@ -100,7 +100,6 @@ class VersionsRestServlet(RestServlet): # Adds support for jump to date endpoints (/timestamp_to_event) as per MSC3030 "org.matrix.msc3030": self.config.experimental.msc3030_enabled, # Adds support for thread relations, per MSC3440. - "org.matrix.msc3440": self.config.experimental.msc3440_enabled, "org.matrix.msc3440.stable": True, # TODO: remove when "v1.3" is added above }, }, diff --git a/synapse/server.py b/synapse/server.py index 380369db92..37c72bd83a 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -758,7 +758,7 @@ class HomeServer(metaclass=abc.ABCMeta): @cache_in_self def get_event_client_serializer(self) -> EventClientSerializer: - return EventClientSerializer(self) + return EventClientSerializer() @cache_in_self def get_password_policy_handler(self) -> PasswordPolicyHandler: diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 3fcd5f5b99..2a1e567ce0 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -963,6 +963,21 @@ class PersistEventsStore: values=to_insert, ) + async def update_current_state( + self, + room_id: str, + state_delta: DeltaState, + stream_id: int, + ) -> None: + """Update the current state stored in the datatabase for the given room""" + + await self.db_pool.runInteraction( + "update_current_state", + self._update_current_state_txn, + state_delta_by_room={room_id: state_delta}, + stream_id=stream_id, + ) + def _update_current_state_txn( self, txn: LoggingTransaction, @@ -1819,10 +1834,7 @@ class PersistEventsStore: if rel_type == RelationTypes.REPLACE: txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,)) - if ( - rel_type == RelationTypes.THREAD - or rel_type == RelationTypes.UNSTABLE_THREAD - ): + if rel_type == RelationTypes.THREAD: txn.call_after(self.store.get_thread_summary.invalidate, (parent_id,)) # It should be safe to only invalidate the cache if the user has not # previously participated in the thread, but that's difficult (and diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index a60e3f4fdd..5288cdba03 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1979,3 +1979,27 @@ class EventsWorkerStore(SQLBaseStore): desc="is_partial_state_event", ) return result is not None + + async def get_partial_state_events_batch(self, room_id: str) -> List[str]: + """Get a list of events in the given room that have partial state""" + return await self.db_pool.runInteraction( + "get_partial_state_events_batch", + self._get_partial_state_events_batch_txn, + room_id, + ) + + @staticmethod + def _get_partial_state_events_batch_txn( + txn: LoggingTransaction, room_id: str + ) -> List[str]: + txn.execute( + """ + SELECT event_id FROM partial_state_events AS pse + JOIN events USING (event_id) + WHERE pse.room_id = ? + ORDER BY events.stream_ordering + LIMIT 100 + """, + (room_id,), + ) + return [row[0] for row in txn] diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 407158ceee..a5c31f6787 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -14,7 +14,6 @@ import logging from typing import ( - TYPE_CHECKING, Collection, Dict, FrozenSet, @@ -32,20 +31,12 @@ import attr from synapse.api.constants import RelationTypes from synapse.events import EventBase from synapse.storage._base import SQLBaseStore -from synapse.storage.database import ( - DatabasePool, - LoggingDatabaseConnection, - LoggingTransaction, - make_in_list_sql_clause, -) +from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause from synapse.storage.databases.main.stream import generate_pagination_where_clause from synapse.storage.engines import PostgresEngine from synapse.types import JsonDict, RoomStreamToken, StreamToken from synapse.util.caches.descriptors import cached, cachedList -if TYPE_CHECKING: - from synapse.server import HomeServer - logger = logging.getLogger(__name__) @@ -63,16 +54,6 @@ class _RelatedEvent: class RelationsWorkerStore(SQLBaseStore): - def __init__( - self, - database: DatabasePool, - db_conn: LoggingDatabaseConnection, - hs: "HomeServer", - ): - super().__init__(database, db_conn, hs) - - self._msc3440_enabled = hs.config.experimental.msc3440_enabled - @cached(uncached_args=("event",), tree=True) async def get_relations_for_event( self, @@ -497,7 +478,7 @@ class RelationsWorkerStore(SQLBaseStore): AND parent.room_id = child.room_id WHERE %s - AND %s + AND relation_type = ? ORDER BY parent.event_id, child.topological_ordering DESC, child.stream_ordering DESC """ else: @@ -512,22 +493,16 @@ class RelationsWorkerStore(SQLBaseStore): AND parent.room_id = child.room_id WHERE %s - AND %s + AND relation_type = ? ORDER BY child.topological_ordering DESC, child.stream_ordering DESC """ clause, args = make_in_list_sql_clause( txn.database_engine, "relates_to_id", event_ids ) + args.append(RelationTypes.THREAD) - if self._msc3440_enabled: - relations_clause = "(relation_type = ? OR relation_type = ?)" - args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD)) - else: - relations_clause = "relation_type = ?" - args.append(RelationTypes.THREAD) - - txn.execute(sql % (clause, relations_clause), args) + txn.execute(sql % (clause,), args) latest_event_ids = {} for parent_event_id, child_event_id in txn: # Only consider the latest threaded reply (by topological ordering). @@ -547,7 +522,7 @@ class RelationsWorkerStore(SQLBaseStore): AND parent.room_id = child.room_id WHERE %s - AND %s + AND relation_type = ? GROUP BY parent.event_id """ @@ -556,15 +531,9 @@ class RelationsWorkerStore(SQLBaseStore): clause, args = make_in_list_sql_clause( txn.database_engine, "relates_to_id", latest_event_ids.keys() ) + args.append(RelationTypes.THREAD) - if self._msc3440_enabled: - relations_clause = "(relation_type = ? OR relation_type = ?)" - args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD)) - else: - relations_clause = "relation_type = ?" - args.append(RelationTypes.THREAD) - - txn.execute(sql % (clause, relations_clause), args) + txn.execute(sql % (clause,), args) counts = dict(cast(List[Tuple[str, int]], txn.fetchall())) return counts, latest_event_ids @@ -622,7 +591,7 @@ class RelationsWorkerStore(SQLBaseStore): parent.event_id = relates_to_id AND parent.room_id = child.room_id WHERE - %s + relation_type = ? AND %s AND %s GROUP BY parent.event_id, child.sender @@ -638,16 +607,9 @@ class RelationsWorkerStore(SQLBaseStore): txn.database_engine, "relates_to_id", event_ids ) - if self._msc3440_enabled: - relations_clause = "(relation_type = ? OR relation_type = ?)" - relations_args = [RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD] - else: - relations_clause = "relation_type = ?" - relations_args = [RelationTypes.THREAD] - txn.execute( - sql % (users_sql, events_clause, relations_clause), - users_args + events_args + relations_args, + sql % (users_sql, events_clause), + [RelationTypes.THREAD] + users_args + events_args, ) return {(row[0], row[1]): row[2] for row in txn} @@ -677,7 +639,7 @@ class RelationsWorkerStore(SQLBaseStore): user participated in that event's thread, otherwise false. """ - def _get_thread_summary_txn(txn: LoggingTransaction) -> Set[str]: + def _get_threads_participated_txn(txn: LoggingTransaction) -> Set[str]: # Fetch whether the requester has participated or not. sql = """ SELECT DISTINCT relates_to_id @@ -688,28 +650,20 @@ class RelationsWorkerStore(SQLBaseStore): AND parent.room_id = child.room_id WHERE %s - AND %s + AND relation_type = ? AND child.sender = ? """ clause, args = make_in_list_sql_clause( txn.database_engine, "relates_to_id", event_ids ) + args.extend([RelationTypes.THREAD, user_id]) - if self._msc3440_enabled: - relations_clause = "(relation_type = ? OR relation_type = ?)" - args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD)) - else: - relations_clause = "relation_type = ?" - args.append(RelationTypes.THREAD) - - args.append(user_id) - - txn.execute(sql % (clause, relations_clause), args) + txn.execute(sql % (clause,), args) return {row[0] for row in txn.fetchall()} participated_threads = await self.db_pool.runInteraction( - "get_thread_summary", _get_thread_summary_txn + "get_threads_participated", _get_threads_participated_txn ) return {event_id: event_id in participated_threads for event_id in event_ids} diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 18b1acd9e1..87e9482c60 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1077,6 +1077,37 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): get_rooms_for_retention_period_in_range_txn, ) + async def clear_partial_state_room(self, room_id: str) -> bool: + # this can race with incoming events, so we watch out for FK errors. + # TODO(faster_joins): this still doesn't completely fix the race, since the persist process + # is not atomic. I fear we need an application-level lock. + try: + await self.db_pool.runInteraction( + "clear_partial_state_room", self._clear_partial_state_room_txn, room_id + ) + return True + except self.db_pool.engine.module.DatabaseError as e: + # TODO(faster_joins): how do we distinguish between FK errors and other errors? + logger.warning( + "Exception while clearing lazy partial-state-room %s, retrying: %s", + room_id, + e, + ) + return False + + @staticmethod + def _clear_partial_state_room_txn(txn: LoggingTransaction, room_id: str) -> None: + DatabasePool.simple_delete_txn( + txn, + table="partial_state_rooms_servers", + keyvalues={"room_id": room_id}, + ) + DatabasePool.simple_delete_one_txn( + txn, + table="partial_state_rooms", + keyvalues={"room_id": room_id}, + ) + class _BackgroundUpdates: REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory" diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index ecdc1fdc4c..eba35f3700 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -21,6 +21,7 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion from synapse.events import EventBase +from synapse.events.snapshot import EventContext from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -354,6 +355,53 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): return {row["state_group"] for row in rows} + async def update_state_for_partial_state_event( + self, + event: EventBase, + context: EventContext, + ) -> None: + """Update the state group for a partial state event""" + await self.db_pool.runInteraction( + "update_state_for_partial_state_event", + self._update_state_for_partial_state_event_txn, + event, + context, + ) + + def _update_state_for_partial_state_event_txn( + self, + txn, + event: EventBase, + context: EventContext, + ): + # we shouldn't have any outliers here + assert not event.internal_metadata.is_outlier() + + # anything that was rejected should have the same state as its + # predecessor. + if context.rejected: + assert context.state_group == context.state_group_before_event + + self.db_pool.simple_update_txn( + txn, + table="event_to_state_groups", + keyvalues={"event_id": event.event_id}, + updatevalues={"state_group": context.state_group}, + ) + + self.db_pool.simple_delete_one_txn( + txn, + table="partial_state_events", + keyvalues={"event_id": event.event_id}, + ) + + # TODO(faster_joins): need to do something about workers here + txn.call_after( + self._get_state_group_for_event.prefill, + (event.event_id,), + context.state_group, + ) + class MainStateBackgroundUpdateStore(RoomMemberWorkerStore): diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index b402922817..e496ba7bed 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -376,6 +376,62 @@ class EventsPersistenceStorage: pos = PersistedEventPosition(self._instance_name, event_stream_id) return event, pos, self.main_store.get_room_max_token() + async def update_current_state(self, room_id: str) -> None: + """Recalculate the current state for a room, and persist it""" + state = await self._calculate_current_state(room_id) + delta = await self._calculate_state_delta(room_id, state) + + # TODO(faster_joins): get a real stream ordering, to make this work correctly + # across workers. + # + # TODO(faster_joins): this can race against event persistence, in which case we + # will end up with incorrect state. Perhaps we should make this a job we + # farm out to the event persister, somehow. + stream_id = self.main_store.get_room_max_stream_ordering() + await self.persist_events_store.update_current_state(room_id, delta, stream_id) + + async def _calculate_current_state(self, room_id: str) -> StateMap[str]: + """Calculate the current state of a room, based on the forward extremities + + Args: + room_id: room for which to calculate current state + + Returns: + map from (type, state_key) to event id for the current state in the room + """ + latest_event_ids = await self.main_store.get_latest_event_ids_in_room(room_id) + state_groups = set( + ( + await self.main_store._get_state_group_for_events(latest_event_ids) + ).values() + ) + + state_maps_by_state_group = await self.state_store._get_state_for_groups( + state_groups + ) + + if len(state_groups) == 1: + # If there is only one state group, then we know what the current + # state is. + return state_maps_by_state_group[state_groups.pop()] + + # Ok, we need to defer to the state handler to resolve our state sets. + logger.debug("calling resolve_state_groups from preserve_events") + + # Avoid a circular import. + from synapse.state import StateResolutionStore + + room_version = await self.main_store.get_room_version_id(room_id) + res = await self._state_resolution_handler.resolve_state_groups( + room_id, + room_version, + state_maps_by_state_group, + event_map=None, + state_res_store=StateResolutionStore(self.main_store), + ) + + return res.state + async def _persist_event_batch( self, events_and_contexts: List[Tuple[EventBase, EventContext]], diff --git a/tests/api/test_filtering.py b/tests/api/test_filtering.py index 8c3354ce3c..985d6e397d 100644 --- a/tests/api/test_filtering.py +++ b/tests/api/test_filtering.py @@ -481,9 +481,7 @@ class FilteringTestCase(unittest.HomeserverTestCase): # events). This is a bit cheeky, but tests the logic of _check_event_relations. # Filter for a particular sender. - definition = { - "io.element.relation_senders": ["@foo:bar"], - } + definition = {"related_by_senders": ["@foo:bar"]} async def events_have_relations(*args, **kwargs): return ["$with_relation"] diff --git a/tests/rest/admin/test_admin.py b/tests/rest/admin/test_admin.py index 849d00ab4d..40571b753a 100644 --- a/tests/rest/admin/test_admin.py +++ b/tests/rest/admin/test_admin.py @@ -63,6 +63,7 @@ class DeleteGroupTestCase(unittest.HomeserverTestCase): self.other_user = self.register_user("user", "pass") self.other_user_token = self.login("user", "pass") + @unittest.override_config({"experimental_features": {"groups_enabled": True}}) def test_delete_group(self) -> None: # Create a new group channel = self.make_request( |