diff --git a/changelog.d/12344.removal b/changelog.d/12344.removal
new file mode 100644
index 0000000000..ecefa76d8e
--- /dev/null
+++ b/changelog.d/12344.removal
@@ -0,0 +1 @@
+The groups/communities feature in Synapse has been disabled by default.
diff --git a/changelog.d/12382.removal b/changelog.d/12382.removal
new file mode 100644
index 0000000000..eb91186340
--- /dev/null
+++ b/changelog.d/12382.removal
@@ -0,0 +1 @@
+Remove unstable identifiers from [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440).
diff --git a/changelog.d/12394.misc b/changelog.d/12394.misc
new file mode 100644
index 0000000000..69109fcc37
--- /dev/null
+++ b/changelog.d/12394.misc
@@ -0,0 +1 @@
+Preparation for faster-room-join work: start a background process to resynchronise the room state after a room join.
diff --git a/docs/upgrade.md b/docs/upgrade.md
index 55a6ff7cae..d5516dfc99 100644
--- a/docs/upgrade.md
+++ b/docs/upgrade.md
@@ -85,6 +85,13 @@ process, for example:
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
```
+# Upgrading to v1.58.0
+
+## Groups/communities feature has been disabled by default
+
+The non-standard groups/communities feature in Synapse has been disabled by default
+and will be removed in Synapse v1.61.0.
+
# Upgrading to v1.57.0
## Changes to database schema for application services
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 92907415e6..0172eb60b8 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -179,8 +179,6 @@ class RelationTypes:
REPLACE: Final = "m.replace"
REFERENCE: Final = "m.reference"
THREAD: Final = "m.thread"
- # TODO Remove this in Synapse >= v1.57.0.
- UNSTABLE_THREAD: Final = "io.element.thread"
class LimitBlockingTypes:
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 27e97d6f37..4a808e33fe 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -89,9 +89,7 @@ ROOM_EVENT_FILTER_SCHEMA = {
"org.matrix.not_labels": {"type": "array", "items": {"type": "string"}},
# MSC3440, filtering by event relations.
"related_by_senders": {"type": "array", "items": {"type": "string"}},
- "io.element.relation_senders": {"type": "array", "items": {"type": "string"}},
"related_by_rel_types": {"type": "array", "items": {"type": "string"}},
- "io.element.relation_types": {"type": "array", "items": {"type": "string"}},
},
}
@@ -323,16 +321,6 @@ class Filter:
self.related_by_senders = self.filter_json.get("related_by_senders", None)
self.related_by_rel_types = self.filter_json.get("related_by_rel_types", None)
- # Fallback to the unstable prefix if the stable version is not given.
- if hs.config.experimental.msc3440_enabled:
- self.related_by_senders = self.related_by_senders or self.filter_json.get(
- "io.element.relation_senders", None
- )
- self.related_by_rel_types = (
- self.related_by_rel_types
- or self.filter_json.get("io.element.relation_types", None)
- )
-
def filters_all_types(self) -> bool:
return "*" in self.not_types
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index 447476fbfa..979059e723 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -26,9 +26,6 @@ class ExperimentalConfig(Config):
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
experimental = config.get("experimental_features") or {}
- # MSC3440 (thread relation)
- self.msc3440_enabled: bool = experimental.get("msc3440_enabled", False)
-
# MSC3026 (busy presence state)
self.msc3026_enabled: bool = experimental.get("msc3026_enabled", False)
@@ -77,7 +74,7 @@ class ExperimentalConfig(Config):
self.msc3720_enabled: bool = experimental.get("msc3720_enabled", False)
# The deprecated groups feature.
- self.groups_enabled: bool = experimental.get("groups_enabled", True)
+ self.groups_enabled: bool = experimental.get("groups_enabled", False)
# MSC2654: Unread counts
self.msc2654_enabled: bool = experimental.get("msc2654_enabled", False)
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index 918e87ed9c..43c3241fb0 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -39,7 +39,6 @@ from . import EventBase
if TYPE_CHECKING:
from synapse.handlers.relations import BundledAggregations
- from synapse.server import HomeServer
# Split strings on "." but not "\." This uses a negative lookbehind assertion for '\'
@@ -396,9 +395,6 @@ class EventClientSerializer:
clients.
"""
- def __init__(self, hs: "HomeServer"):
- self._msc3440_enabled = hs.config.experimental.msc3440_enabled
-
def serialize_event(
self,
event: Union[JsonDict, EventBase],
@@ -525,8 +521,6 @@ class EventClientSerializer:
"current_user_participated": thread.current_user_participated,
}
serialized_aggregations[RelationTypes.THREAD] = thread_summary
- if self._msc3440_enabled:
- serialized_aggregations[RelationTypes.UNSTABLE_THREAD] = thread_summary
# Include the bundled aggregations in the event.
if serialized_aggregations:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 78d149905f..1434e99056 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -466,6 +466,8 @@ class FederationHandler:
)
if ret.partial_state:
+ # TODO(faster_joins): roll this back if we don't manage to start the
+ # background resync (eg process_remote_join fails)
await self.store.store_partial_state_room(room_id, ret.servers_in_room)
max_stream_id = await self._federation_event_handler.process_remote_join(
@@ -478,6 +480,18 @@ class FederationHandler:
partial_state=ret.partial_state,
)
+ if ret.partial_state:
+ # Kick off the process of asynchronously fetching the state for this
+ # room.
+ #
+ # TODO(faster_joins): pick this up again on restart
+ run_as_background_process(
+ desc="sync_partial_state_room",
+ func=self._sync_partial_state_room,
+ destination=origin,
+ room_id=room_id,
+ )
+
# We wait here until this instance has seen the events come down
# replication (if we're using replication) as the below uses caches.
await self._replication.wait_for_stream_position(
@@ -1370,3 +1384,64 @@ class FederationHandler:
# We fell off the bottom, couldn't get the complexity from anyone. Oh
# well.
return None
+
+ async def _sync_partial_state_room(
+ self,
+ destination: str,
+ room_id: str,
+ ) -> None:
+ """Background process to resync the state of a partial-state room
+
+ Args:
+ destination: homeserver to pull the state from
+ room_id: room to be resynced
+ """
+
+ # TODO(faster_joins): do we need to lock to avoid races? What happens if other
+ # worker processes kick off a resync in parallel? Perhaps we should just elect
+ # a single worker to do the resync.
+ #
+ # TODO(faster_joins): what happens if we leave the room during a resync? if we
+ # really leave, that might mean we have difficulty getting the room state over
+ # federation.
+ #
+ # TODO(faster_joins): try other destinations if the one we have fails
+
+ logger.info("Syncing state for room %s via %s", room_id, destination)
+
+ # we work through the queue in order of increasing stream ordering.
+ while True:
+ batch = await self.store.get_partial_state_events_batch(room_id)
+ if not batch:
+ # all the events are updated, so we can update current state and
+ # clear the lazy-loading flag.
+ logger.info("Updating current state for %s", room_id)
+ assert (
+ self.storage.persistence is not None
+ ), "TODO(faster_joins): support for workers"
+ await self.storage.persistence.update_current_state(room_id)
+
+ logger.info("Clearing partial-state flag for %s", room_id)
+ success = await self.store.clear_partial_state_room(room_id)
+ if success:
+ logger.info("State resync complete for %s", room_id)
+
+ # TODO(faster_joins) update room stats and user directory?
+ return
+
+ # we raced against more events arriving with partial state. Go round
+ # the loop again. We've already logged a warning, so no need for more.
+ # TODO(faster_joins): there is still a race here, whereby incoming events which raced
+ # with us will fail to be persisted after the call to `clear_partial_state_room` due to
+ # having partial state.
+ continue
+
+ events = await self.store.get_events_as_list(
+ batch,
+ redact_behaviour=EventRedactBehaviour.AS_IS,
+ allow_rejected=True,
+ )
+ for event in events:
+ await self._federation_event_handler.update_state_for_partial_state_event(
+ destination, event
+ )
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 03c1197c99..32bf02818c 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -477,6 +477,45 @@ class FederationEventHandler:
return await self.persist_events_and_notify(room_id, [(event, context)])
+ async def update_state_for_partial_state_event(
+ self, destination: str, event: EventBase
+ ) -> None:
+ """Recalculate the state at an event as part of a de-partial-stating process
+
+ Args:
+ destination: server to request full state from
+ event: partial-state event to be de-partial-stated
+ """
+ logger.info("Updating state for %s", event.event_id)
+ with nested_logging_context(suffix=event.event_id):
+ # if we have all the event's prev_events, then we can work out the
+ # state based on their states. Otherwise, we request it from the destination
+ # server.
+ #
+ # This is the same operation as we do when we receive a regular event
+ # over federation.
+ state = await self._resolve_state_at_missing_prevs(destination, event)
+
+ # build a new state group for it if need be
+ context = await self._state_handler.compute_event_context(
+ event,
+ old_state=state,
+ )
+ if context.partial_state:
+ # this can happen if some or all of the event's prev_events still have
+ # partial state - ie, an event has an earlier stream_ordering than one
+ # or more of its prev_events, so we de-partial-state it before its
+ # prev_events.
+ #
+ # TODO(faster_joins): we probably need to be more intelligent, and
+ # exclude partial-state prev_events from consideration
+ logger.warning(
+ "%s still has partial state: can't de-partial-state it yet",
+ event.event_id,
+ )
+ return
+ await self._store.update_state_for_partial_state_event(event, context)
+
async def backfill(
self, dest: str, room_id: str, limit: int, extremities: Collection[str]
) -> None:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7db6905c61..47a63005a9 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1102,10 +1102,7 @@ class EventCreationHandler:
raise SynapseError(400, "Can't send same reaction twice")
# Don't attempt to start a thread if the parent event is a relation.
- elif (
- relation_type == RelationTypes.THREAD
- or relation_type == RelationTypes.UNSTABLE_THREAD
- ):
+ elif relation_type == RelationTypes.THREAD:
if await self.store.event_includes_relation(relates_to):
raise SynapseError(
400, "Cannot start threads from an event with a relation"
diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index 9a65aa4843..7ee6b5505b 100644
--- a/synapse/rest/client/versions.py
+++ b/synapse/rest/client/versions.py
@@ -100,7 +100,6 @@ class VersionsRestServlet(RestServlet):
# Adds support for jump to date endpoints (/timestamp_to_event) as per MSC3030
"org.matrix.msc3030": self.config.experimental.msc3030_enabled,
# Adds support for thread relations, per MSC3440.
- "org.matrix.msc3440": self.config.experimental.msc3440_enabled,
"org.matrix.msc3440.stable": True, # TODO: remove when "v1.3" is added above
},
},
diff --git a/synapse/server.py b/synapse/server.py
index 380369db92..37c72bd83a 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -758,7 +758,7 @@ class HomeServer(metaclass=abc.ABCMeta):
@cache_in_self
def get_event_client_serializer(self) -> EventClientSerializer:
- return EventClientSerializer(self)
+ return EventClientSerializer()
@cache_in_self
def get_password_policy_handler(self) -> PasswordPolicyHandler:
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 3fcd5f5b99..2a1e567ce0 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -963,6 +963,21 @@ class PersistEventsStore:
values=to_insert,
)
+ async def update_current_state(
+ self,
+ room_id: str,
+ state_delta: DeltaState,
+ stream_id: int,
+ ) -> None:
+ """Update the current state stored in the datatabase for the given room"""
+
+ await self.db_pool.runInteraction(
+ "update_current_state",
+ self._update_current_state_txn,
+ state_delta_by_room={room_id: state_delta},
+ stream_id=stream_id,
+ )
+
def _update_current_state_txn(
self,
txn: LoggingTransaction,
@@ -1819,10 +1834,7 @@ class PersistEventsStore:
if rel_type == RelationTypes.REPLACE:
txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
- if (
- rel_type == RelationTypes.THREAD
- or rel_type == RelationTypes.UNSTABLE_THREAD
- ):
+ if rel_type == RelationTypes.THREAD:
txn.call_after(self.store.get_thread_summary.invalidate, (parent_id,))
# It should be safe to only invalidate the cache if the user has not
# previously participated in the thread, but that's difficult (and
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index a60e3f4fdd..5288cdba03 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1979,3 +1979,27 @@ class EventsWorkerStore(SQLBaseStore):
desc="is_partial_state_event",
)
return result is not None
+
+ async def get_partial_state_events_batch(self, room_id: str) -> List[str]:
+ """Get a list of events in the given room that have partial state"""
+ return await self.db_pool.runInteraction(
+ "get_partial_state_events_batch",
+ self._get_partial_state_events_batch_txn,
+ room_id,
+ )
+
+ @staticmethod
+ def _get_partial_state_events_batch_txn(
+ txn: LoggingTransaction, room_id: str
+ ) -> List[str]:
+ txn.execute(
+ """
+ SELECT event_id FROM partial_state_events AS pse
+ JOIN events USING (event_id)
+ WHERE pse.room_id = ?
+ ORDER BY events.stream_ordering
+ LIMIT 100
+ """,
+ (room_id,),
+ )
+ return [row[0] for row in txn]
diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index 407158ceee..a5c31f6787 100644
--- a/synapse/storage/databases/main/relations.py
+++ b/synapse/storage/databases/main/relations.py
@@ -14,7 +14,6 @@
import logging
from typing import (
- TYPE_CHECKING,
Collection,
Dict,
FrozenSet,
@@ -32,20 +31,12 @@ import attr
from synapse.api.constants import RelationTypes
from synapse.events import EventBase
from synapse.storage._base import SQLBaseStore
-from synapse.storage.database import (
- DatabasePool,
- LoggingDatabaseConnection,
- LoggingTransaction,
- make_in_list_sql_clause,
-)
+from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause
from synapse.storage.databases.main.stream import generate_pagination_where_clause
from synapse.storage.engines import PostgresEngine
from synapse.types import JsonDict, RoomStreamToken, StreamToken
from synapse.util.caches.descriptors import cached, cachedList
-if TYPE_CHECKING:
- from synapse.server import HomeServer
-
logger = logging.getLogger(__name__)
@@ -63,16 +54,6 @@ class _RelatedEvent:
class RelationsWorkerStore(SQLBaseStore):
- def __init__(
- self,
- database: DatabasePool,
- db_conn: LoggingDatabaseConnection,
- hs: "HomeServer",
- ):
- super().__init__(database, db_conn, hs)
-
- self._msc3440_enabled = hs.config.experimental.msc3440_enabled
-
@cached(uncached_args=("event",), tree=True)
async def get_relations_for_event(
self,
@@ -497,7 +478,7 @@ class RelationsWorkerStore(SQLBaseStore):
AND parent.room_id = child.room_id
WHERE
%s
- AND %s
+ AND relation_type = ?
ORDER BY parent.event_id, child.topological_ordering DESC, child.stream_ordering DESC
"""
else:
@@ -512,22 +493,16 @@ class RelationsWorkerStore(SQLBaseStore):
AND parent.room_id = child.room_id
WHERE
%s
- AND %s
+ AND relation_type = ?
ORDER BY child.topological_ordering DESC, child.stream_ordering DESC
"""
clause, args = make_in_list_sql_clause(
txn.database_engine, "relates_to_id", event_ids
)
+ args.append(RelationTypes.THREAD)
- if self._msc3440_enabled:
- relations_clause = "(relation_type = ? OR relation_type = ?)"
- args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD))
- else:
- relations_clause = "relation_type = ?"
- args.append(RelationTypes.THREAD)
-
- txn.execute(sql % (clause, relations_clause), args)
+ txn.execute(sql % (clause,), args)
latest_event_ids = {}
for parent_event_id, child_event_id in txn:
# Only consider the latest threaded reply (by topological ordering).
@@ -547,7 +522,7 @@ class RelationsWorkerStore(SQLBaseStore):
AND parent.room_id = child.room_id
WHERE
%s
- AND %s
+ AND relation_type = ?
GROUP BY parent.event_id
"""
@@ -556,15 +531,9 @@ class RelationsWorkerStore(SQLBaseStore):
clause, args = make_in_list_sql_clause(
txn.database_engine, "relates_to_id", latest_event_ids.keys()
)
+ args.append(RelationTypes.THREAD)
- if self._msc3440_enabled:
- relations_clause = "(relation_type = ? OR relation_type = ?)"
- args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD))
- else:
- relations_clause = "relation_type = ?"
- args.append(RelationTypes.THREAD)
-
- txn.execute(sql % (clause, relations_clause), args)
+ txn.execute(sql % (clause,), args)
counts = dict(cast(List[Tuple[str, int]], txn.fetchall()))
return counts, latest_event_ids
@@ -622,7 +591,7 @@ class RelationsWorkerStore(SQLBaseStore):
parent.event_id = relates_to_id
AND parent.room_id = child.room_id
WHERE
- %s
+ relation_type = ?
AND %s
AND %s
GROUP BY parent.event_id, child.sender
@@ -638,16 +607,9 @@ class RelationsWorkerStore(SQLBaseStore):
txn.database_engine, "relates_to_id", event_ids
)
- if self._msc3440_enabled:
- relations_clause = "(relation_type = ? OR relation_type = ?)"
- relations_args = [RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD]
- else:
- relations_clause = "relation_type = ?"
- relations_args = [RelationTypes.THREAD]
-
txn.execute(
- sql % (users_sql, events_clause, relations_clause),
- users_args + events_args + relations_args,
+ sql % (users_sql, events_clause),
+ [RelationTypes.THREAD] + users_args + events_args,
)
return {(row[0], row[1]): row[2] for row in txn}
@@ -677,7 +639,7 @@ class RelationsWorkerStore(SQLBaseStore):
user participated in that event's thread, otherwise false.
"""
- def _get_thread_summary_txn(txn: LoggingTransaction) -> Set[str]:
+ def _get_threads_participated_txn(txn: LoggingTransaction) -> Set[str]:
# Fetch whether the requester has participated or not.
sql = """
SELECT DISTINCT relates_to_id
@@ -688,28 +650,20 @@ class RelationsWorkerStore(SQLBaseStore):
AND parent.room_id = child.room_id
WHERE
%s
- AND %s
+ AND relation_type = ?
AND child.sender = ?
"""
clause, args = make_in_list_sql_clause(
txn.database_engine, "relates_to_id", event_ids
)
+ args.extend([RelationTypes.THREAD, user_id])
- if self._msc3440_enabled:
- relations_clause = "(relation_type = ? OR relation_type = ?)"
- args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD))
- else:
- relations_clause = "relation_type = ?"
- args.append(RelationTypes.THREAD)
-
- args.append(user_id)
-
- txn.execute(sql % (clause, relations_clause), args)
+ txn.execute(sql % (clause,), args)
return {row[0] for row in txn.fetchall()}
participated_threads = await self.db_pool.runInteraction(
- "get_thread_summary", _get_thread_summary_txn
+ "get_threads_participated", _get_threads_participated_txn
)
return {event_id: event_id in participated_threads for event_id in event_ids}
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 18b1acd9e1..87e9482c60 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1077,6 +1077,37 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
get_rooms_for_retention_period_in_range_txn,
)
+ async def clear_partial_state_room(self, room_id: str) -> bool:
+ # this can race with incoming events, so we watch out for FK errors.
+ # TODO(faster_joins): this still doesn't completely fix the race, since the persist process
+ # is not atomic. I fear we need an application-level lock.
+ try:
+ await self.db_pool.runInteraction(
+ "clear_partial_state_room", self._clear_partial_state_room_txn, room_id
+ )
+ return True
+ except self.db_pool.engine.module.DatabaseError as e:
+ # TODO(faster_joins): how do we distinguish between FK errors and other errors?
+ logger.warning(
+ "Exception while clearing lazy partial-state-room %s, retrying: %s",
+ room_id,
+ e,
+ )
+ return False
+
+ @staticmethod
+ def _clear_partial_state_room_txn(txn: LoggingTransaction, room_id: str) -> None:
+ DatabasePool.simple_delete_txn(
+ txn,
+ table="partial_state_rooms_servers",
+ keyvalues={"room_id": room_id},
+ )
+ DatabasePool.simple_delete_one_txn(
+ txn,
+ table="partial_state_rooms",
+ keyvalues={"room_id": room_id},
+ )
+
class _BackgroundUpdates:
REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index ecdc1fdc4c..eba35f3700 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -21,6 +21,7 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.events import EventBase
+from synapse.events.snapshot import EventContext
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
@@ -354,6 +355,53 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
return {row["state_group"] for row in rows}
+ async def update_state_for_partial_state_event(
+ self,
+ event: EventBase,
+ context: EventContext,
+ ) -> None:
+ """Update the state group for a partial state event"""
+ await self.db_pool.runInteraction(
+ "update_state_for_partial_state_event",
+ self._update_state_for_partial_state_event_txn,
+ event,
+ context,
+ )
+
+ def _update_state_for_partial_state_event_txn(
+ self,
+ txn,
+ event: EventBase,
+ context: EventContext,
+ ):
+ # we shouldn't have any outliers here
+ assert not event.internal_metadata.is_outlier()
+
+ # anything that was rejected should have the same state as its
+ # predecessor.
+ if context.rejected:
+ assert context.state_group == context.state_group_before_event
+
+ self.db_pool.simple_update_txn(
+ txn,
+ table="event_to_state_groups",
+ keyvalues={"event_id": event.event_id},
+ updatevalues={"state_group": context.state_group},
+ )
+
+ self.db_pool.simple_delete_one_txn(
+ txn,
+ table="partial_state_events",
+ keyvalues={"event_id": event.event_id},
+ )
+
+ # TODO(faster_joins): need to do something about workers here
+ txn.call_after(
+ self._get_state_group_for_event.prefill,
+ (event.event_id,),
+ context.state_group,
+ )
+
class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index b402922817..e496ba7bed 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -376,6 +376,62 @@ class EventsPersistenceStorage:
pos = PersistedEventPosition(self._instance_name, event_stream_id)
return event, pos, self.main_store.get_room_max_token()
+ async def update_current_state(self, room_id: str) -> None:
+ """Recalculate the current state for a room, and persist it"""
+ state = await self._calculate_current_state(room_id)
+ delta = await self._calculate_state_delta(room_id, state)
+
+ # TODO(faster_joins): get a real stream ordering, to make this work correctly
+ # across workers.
+ #
+ # TODO(faster_joins): this can race against event persistence, in which case we
+ # will end up with incorrect state. Perhaps we should make this a job we
+ # farm out to the event persister, somehow.
+ stream_id = self.main_store.get_room_max_stream_ordering()
+ await self.persist_events_store.update_current_state(room_id, delta, stream_id)
+
+ async def _calculate_current_state(self, room_id: str) -> StateMap[str]:
+ """Calculate the current state of a room, based on the forward extremities
+
+ Args:
+ room_id: room for which to calculate current state
+
+ Returns:
+ map from (type, state_key) to event id for the current state in the room
+ """
+ latest_event_ids = await self.main_store.get_latest_event_ids_in_room(room_id)
+ state_groups = set(
+ (
+ await self.main_store._get_state_group_for_events(latest_event_ids)
+ ).values()
+ )
+
+ state_maps_by_state_group = await self.state_store._get_state_for_groups(
+ state_groups
+ )
+
+ if len(state_groups) == 1:
+ # If there is only one state group, then we know what the current
+ # state is.
+ return state_maps_by_state_group[state_groups.pop()]
+
+ # Ok, we need to defer to the state handler to resolve our state sets.
+ logger.debug("calling resolve_state_groups from preserve_events")
+
+ # Avoid a circular import.
+ from synapse.state import StateResolutionStore
+
+ room_version = await self.main_store.get_room_version_id(room_id)
+ res = await self._state_resolution_handler.resolve_state_groups(
+ room_id,
+ room_version,
+ state_maps_by_state_group,
+ event_map=None,
+ state_res_store=StateResolutionStore(self.main_store),
+ )
+
+ return res.state
+
async def _persist_event_batch(
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],
diff --git a/tests/api/test_filtering.py b/tests/api/test_filtering.py
index 8c3354ce3c..985d6e397d 100644
--- a/tests/api/test_filtering.py
+++ b/tests/api/test_filtering.py
@@ -481,9 +481,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
# events). This is a bit cheeky, but tests the logic of _check_event_relations.
# Filter for a particular sender.
- definition = {
- "io.element.relation_senders": ["@foo:bar"],
- }
+ definition = {"related_by_senders": ["@foo:bar"]}
async def events_have_relations(*args, **kwargs):
return ["$with_relation"]
diff --git a/tests/rest/admin/test_admin.py b/tests/rest/admin/test_admin.py
index 849d00ab4d..40571b753a 100644
--- a/tests/rest/admin/test_admin.py
+++ b/tests/rest/admin/test_admin.py
@@ -63,6 +63,7 @@ class DeleteGroupTestCase(unittest.HomeserverTestCase):
self.other_user = self.register_user("user", "pass")
self.other_user_token = self.login("user", "pass")
+ @unittest.override_config({"experimental_features": {"groups_enabled": True}})
def test_delete_group(self) -> None:
# Create a new group
channel = self.make_request(
|