summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/constants.py2
-rw-r--r--synapse/api/filtering.py12
-rw-r--r--synapse/config/experimental.py5
-rw-r--r--synapse/config/server.py8
-rw-r--r--synapse/events/utils.py6
-rw-r--r--synapse/federation/federation_server.py28
-rw-r--r--synapse/federation/transport/client.py10
-rw-r--r--synapse/federation/transport/server/federation.py2
-rw-r--r--synapse/handlers/device.py38
-rw-r--r--synapse/handlers/events.py6
-rw-r--r--synapse/handlers/federation.py75
-rw-r--r--synapse/handlers/federation_event.py39
-rw-r--r--synapse/handlers/message.py19
-rw-r--r--synapse/handlers/presence.py56
-rw-r--r--synapse/handlers/sync.py15
-rw-r--r--synapse/rest/client/login.py9
-rw-r--r--synapse/rest/client/sync.py9
-rw-r--r--synapse/rest/client/versions.py3
-rw-r--r--synapse/server.py2
-rw-r--r--synapse/storage/databases/main/devices.py65
-rw-r--r--synapse/storage/databases/main/events.py20
-rw-r--r--synapse/storage/databases/main/events_worker.py24
-rw-r--r--synapse/storage/databases/main/relations.py78
-rw-r--r--synapse/storage/databases/main/room.py31
-rw-r--r--synapse/storage/databases/main/state.py50
-rw-r--r--synapse/storage/databases/main/stream.py26
-rw-r--r--synapse/storage/persist_events.py56
-rw-r--r--synapse/storage/schema/__init__.py6
28 files changed, 442 insertions, 258 deletions
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 92907415e6..0172eb60b8 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -179,8 +179,6 @@ class RelationTypes:
     REPLACE: Final = "m.replace"
     REFERENCE: Final = "m.reference"
     THREAD: Final = "m.thread"
-    # TODO Remove this in Synapse >= v1.57.0.
-    UNSTABLE_THREAD: Final = "io.element.thread"
 
 
 class LimitBlockingTypes:
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 27e97d6f37..4a808e33fe 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -89,9 +89,7 @@ ROOM_EVENT_FILTER_SCHEMA = {
         "org.matrix.not_labels": {"type": "array", "items": {"type": "string"}},
         # MSC3440, filtering by event relations.
         "related_by_senders": {"type": "array", "items": {"type": "string"}},
-        "io.element.relation_senders": {"type": "array", "items": {"type": "string"}},
         "related_by_rel_types": {"type": "array", "items": {"type": "string"}},
-        "io.element.relation_types": {"type": "array", "items": {"type": "string"}},
     },
 }
 
@@ -323,16 +321,6 @@ class Filter:
         self.related_by_senders = self.filter_json.get("related_by_senders", None)
         self.related_by_rel_types = self.filter_json.get("related_by_rel_types", None)
 
-        # Fallback to the unstable prefix if the stable version is not given.
-        if hs.config.experimental.msc3440_enabled:
-            self.related_by_senders = self.related_by_senders or self.filter_json.get(
-                "io.element.relation_senders", None
-            )
-            self.related_by_rel_types = (
-                self.related_by_rel_types
-                or self.filter_json.get("io.element.relation_types", None)
-            )
-
     def filters_all_types(self) -> bool:
         return "*" in self.not_types
 
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index 447476fbfa..979059e723 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -26,9 +26,6 @@ class ExperimentalConfig(Config):
     def read_config(self, config: JsonDict, **kwargs: Any) -> None:
         experimental = config.get("experimental_features") or {}
 
-        # MSC3440 (thread relation)
-        self.msc3440_enabled: bool = experimental.get("msc3440_enabled", False)
-
         # MSC3026 (busy presence state)
         self.msc3026_enabled: bool = experimental.get("msc3026_enabled", False)
 
@@ -77,7 +74,7 @@ class ExperimentalConfig(Config):
         self.msc3720_enabled: bool = experimental.get("msc3720_enabled", False)
 
         # The deprecated groups feature.
-        self.groups_enabled: bool = experimental.get("groups_enabled", True)
+        self.groups_enabled: bool = experimental.get("groups_enabled", False)
 
         # MSC2654: Unread counts
         self.msc2654_enabled: bool = experimental.get("msc2654_enabled", False)
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 415279d269..d771045b52 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -680,14 +680,6 @@ class ServerConfig(Config):
             config.get("use_account_validity_in_account_status") or False
         )
 
-        # This is a temporary option that enables fully using the new
-        # `device_lists_changes_in_room` without the backwards compat code. This
-        # is primarily for testing. If enabled the server should *not* be
-        # downgraded, as it may lead to missing device list updates.
-        self.use_new_device_lists_changes_in_room = (
-            config.get("use_new_device_lists_changes_in_room") or False
-        )
-
         self.rooms_to_exclude_from_sync: List[str] = (
             config.get("exclude_rooms_from_sync") or []
         )
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index 918e87ed9c..43c3241fb0 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -39,7 +39,6 @@ from . import EventBase
 
 if TYPE_CHECKING:
     from synapse.handlers.relations import BundledAggregations
-    from synapse.server import HomeServer
 
 
 # Split strings on "." but not "\." This uses a negative lookbehind assertion for '\'
@@ -396,9 +395,6 @@ class EventClientSerializer:
     clients.
     """
 
-    def __init__(self, hs: "HomeServer"):
-        self._msc3440_enabled = hs.config.experimental.msc3440_enabled
-
     def serialize_event(
         self,
         event: Union[JsonDict, EventBase],
@@ -525,8 +521,6 @@ class EventClientSerializer:
                 "current_user_participated": thread.current_user_participated,
             }
             serialized_aggregations[RelationTypes.THREAD] = thread_summary
-            if self._msc3440_enabled:
-                serialized_aggregations[RelationTypes.UNSTABLE_THREAD] = thread_summary
 
         # Include the bundled aggregations in the event.
         if serialized_aggregations:
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 69d833585f..beab1227b8 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -515,7 +515,7 @@ class FederationServer(FederationBase):
         )
 
     async def on_room_state_request(
-        self, origin: str, room_id: str, event_id: Optional[str]
+        self, origin: str, room_id: str, event_id: str
     ) -> Tuple[int, JsonDict]:
         origin_host, _ = parse_server_name(origin)
         await self.check_server_matches_acl(origin_host, room_id)
@@ -530,18 +530,13 @@ class FederationServer(FederationBase):
         # - but that's non-trivial to get right, and anyway somewhat defeats
         # the point of the linearizer.
         async with self._server_linearizer.queue((origin, room_id)):
-            resp: JsonDict = dict(
-                await self._state_resp_cache.wrap(
-                    (room_id, event_id),
-                    self._on_context_state_request_compute,
-                    room_id,
-                    event_id,
-                )
+            resp = await self._state_resp_cache.wrap(
+                (room_id, event_id),
+                self._on_context_state_request_compute,
+                room_id,
+                event_id,
             )
 
-        room_version = await self.store.get_room_version_id(room_id)
-        resp["room_version"] = room_version
-
         return 200, resp
 
     async def on_state_ids_request(
@@ -574,14 +569,11 @@ class FederationServer(FederationBase):
         return {"pdu_ids": state_ids, "auth_chain_ids": list(auth_chain_ids)}
 
     async def _on_context_state_request_compute(
-        self, room_id: str, event_id: Optional[str]
+        self, room_id: str, event_id: str
     ) -> Dict[str, list]:
         pdus: Collection[EventBase]
-        if event_id:
-            event_ids = await self.handler.get_state_ids_for_pdu(room_id, event_id)
-            pdus = await self.store.get_events_as_list(event_ids)
-        else:
-            pdus = (await self.state.get_current_state(room_id)).values()
+        event_ids = await self.handler.get_state_ids_for_pdu(room_id, event_id)
+        pdus = await self.store.get_events_as_list(event_ids)
 
         auth_chain = await self.store.get_auth_chain(
             room_id, [pdu.event_id for pdu in pdus]
@@ -687,8 +679,6 @@ class FederationServer(FederationBase):
         time_now = self._clock.time_msec()
         event_json = event.get_pdu_json(time_now)
         resp = {
-            # TODO Remove the unstable prefix when servers have updated.
-            "org.matrix.msc3083.v2.event": event_json,
             "event": event_json,
             "state": [p.get_pdu_json(time_now) for p in state_events],
             "auth_chain": [p.get_pdu_json(time_now) for p in auth_chain_events],
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 01dc5ca94f..1421050b9a 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -1380,16 +1380,6 @@ class SendJoinParser(ByteParser[SendJoinResponse]):
                 prefix + "auth_chain.item",
                 use_float=True,
             ),
-            # TODO Remove the unstable prefix when servers have updated.
-            #
-            # By re-using the same event dictionary this will cause the parsing of
-            # org.matrix.msc3083.v2.event and event to stomp over each other.
-            # Generally this should be fine.
-            ijson.kvitems_coro(
-                _event_parser(self._response.event_dict),
-                prefix + "org.matrix.msc3083.v2.event",
-                use_float=True,
-            ),
             ijson.kvitems_coro(
                 _event_parser(self._response.event_dict),
                 prefix + "event",
diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index aed3d5069c..6fbc7b5f15 100644
--- a/synapse/federation/transport/server/federation.py
+++ b/synapse/federation/transport/server/federation.py
@@ -160,7 +160,7 @@ class FederationStateV1Servlet(BaseFederationServerServlet):
         return await self.handler.on_room_state_request(
             origin,
             room_id,
-            parse_string_from_args(query, "event_id", None, required=False),
+            parse_string_from_args(query, "event_id", None, required=True),
         )
 
 
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index ffa28b2a30..3c0fc756d4 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -291,12 +291,6 @@ class DeviceHandler(DeviceWorkerHandler):
         # On start up check if there are any updates pending.
         hs.get_reactor().callWhenRunning(self._handle_new_device_update_async)
 
-        # Used to decide if we calculate outbound pokes up front or not. By
-        # default we do to allow safely downgrading Synapse.
-        self.use_new_device_lists_changes_in_room = (
-            hs.config.server.use_new_device_lists_changes_in_room
-        )
-
     def _check_device_name_length(self, name: Optional[str]) -> None:
         """
         Checks whether a device name is longer than the maximum allowed length.
@@ -490,23 +484,9 @@ class DeviceHandler(DeviceWorkerHandler):
 
         room_ids = await self.store.get_rooms_for_user(user_id)
 
-        hosts: Optional[Set[str]] = None
-        if not self.use_new_device_lists_changes_in_room:
-            hosts = set()
-
-            if self.hs.is_mine_id(user_id):
-                for room_id in room_ids:
-                    joined_users = await self.store.get_users_in_room(room_id)
-                    hosts.update(get_domain_from_id(u) for u in joined_users)
-
-                set_tag("target_hosts", hosts)
-
-                hosts.discard(self.server_name)
-
         position = await self.store.add_device_change_to_streams(
             user_id,
             device_ids,
-            hosts=hosts,
             room_ids=room_ids,
         )
 
@@ -528,14 +508,6 @@ class DeviceHandler(DeviceWorkerHandler):
         # We may need to do some processing asynchronously.
         self._handle_new_device_update_async()
 
-        if hosts:
-            logger.info(
-                "Sending device list update notif for %r to: %r", user_id, hosts
-            )
-            for host in hosts:
-                self.federation_sender.send_device_messages(host, immediate=False)
-                log_kv({"message": "sent device update to host", "host": host})
-
     async def notify_user_signature_update(
         self, from_user_id: str, user_ids: List[str]
     ) -> None:
@@ -677,9 +649,13 @@ class DeviceHandler(DeviceWorkerHandler):
                         return
 
                 for user_id, device_id, room_id, stream_id, opentracing_context in rows:
-                    joined_user_ids = await self.store.get_users_in_room(room_id)
-                    hosts = {get_domain_from_id(u) for u in joined_user_ids}
-                    hosts.discard(self.server_name)
+                    hosts = set()
+
+                    # Ignore any users that aren't ours
+                    if self.hs.is_mine_id(user_id):
+                        joined_user_ids = await self.store.get_users_in_room(room_id)
+                        hosts = {get_domain_from_id(u) for u in joined_user_ids}
+                        hosts.discard(self.server_name)
 
                     # Check if we've already sent this update to some hosts
                     if current_stream_id == stream_id:
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index d2ccb5c5d3..e89c4df314 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -16,7 +16,7 @@ import logging
 import random
 from typing import TYPE_CHECKING, Iterable, List, Optional
 
-from synapse.api.constants import EduTypes, EventTypes, Membership
+from synapse.api.constants import EduTypes, EventTypes, Membership, PresenceState
 from synapse.api.errors import AuthError, SynapseError
 from synapse.events import EventBase
 from synapse.events.utils import SerializeEventConfig
@@ -67,7 +67,9 @@ class EventStreamHandler:
         presence_handler = self.hs.get_presence_handler()
 
         context = await presence_handler.user_syncing(
-            auth_user_id, affect_presence=affect_presence
+            auth_user_id,
+            affect_presence=affect_presence,
+            presence_state=PresenceState.ONLINE,
         )
         with context:
             if timeout:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 78d149905f..1434e99056 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -466,6 +466,8 @@ class FederationHandler:
             )
 
             if ret.partial_state:
+                # TODO(faster_joins): roll this back if we don't manage to start the
+                #   background resync (eg process_remote_join fails)
                 await self.store.store_partial_state_room(room_id, ret.servers_in_room)
 
             max_stream_id = await self._federation_event_handler.process_remote_join(
@@ -478,6 +480,18 @@ class FederationHandler:
                 partial_state=ret.partial_state,
             )
 
+            if ret.partial_state:
+                # Kick off the process of asynchronously fetching the state for this
+                # room.
+                #
+                # TODO(faster_joins): pick this up again on restart
+                run_as_background_process(
+                    desc="sync_partial_state_room",
+                    func=self._sync_partial_state_room,
+                    destination=origin,
+                    room_id=room_id,
+                )
+
             # We wait here until this instance has seen the events come down
             # replication (if we're using replication) as the below uses caches.
             await self._replication.wait_for_stream_position(
@@ -1370,3 +1384,64 @@ class FederationHandler:
         # We fell off the bottom, couldn't get the complexity from anyone. Oh
         # well.
         return None
+
+    async def _sync_partial_state_room(
+        self,
+        destination: str,
+        room_id: str,
+    ) -> None:
+        """Background process to resync the state of a partial-state room
+
+        Args:
+            destination: homeserver to pull the state from
+            room_id: room to be resynced
+        """
+
+        # TODO(faster_joins): do we need to lock to avoid races? What happens if other
+        #   worker processes kick off a resync in parallel? Perhaps we should just elect
+        #   a single worker to do the resync.
+        #
+        # TODO(faster_joins): what happens if we leave the room during a resync? if we
+        #   really leave, that might mean we have difficulty getting the room state over
+        #   federation.
+        #
+        # TODO(faster_joins): try other destinations if the one we have fails
+
+        logger.info("Syncing state for room %s via %s", room_id, destination)
+
+        # we work through the queue in order of increasing stream ordering.
+        while True:
+            batch = await self.store.get_partial_state_events_batch(room_id)
+            if not batch:
+                # all the events are updated, so we can update current state and
+                # clear the lazy-loading flag.
+                logger.info("Updating current state for %s", room_id)
+                assert (
+                    self.storage.persistence is not None
+                ), "TODO(faster_joins): support for workers"
+                await self.storage.persistence.update_current_state(room_id)
+
+                logger.info("Clearing partial-state flag for %s", room_id)
+                success = await self.store.clear_partial_state_room(room_id)
+                if success:
+                    logger.info("State resync complete for %s", room_id)
+
+                    # TODO(faster_joins) update room stats and user directory?
+                    return
+
+                # we raced against more events arriving with partial state. Go round
+                # the loop again. We've already logged a warning, so no need for more.
+                # TODO(faster_joins): there is still a race here, whereby incoming events which raced
+                #   with us will fail to be persisted after the call to `clear_partial_state_room` due to
+                #   having partial state.
+                continue
+
+            events = await self.store.get_events_as_list(
+                batch,
+                redact_behaviour=EventRedactBehaviour.AS_IS,
+                allow_rejected=True,
+            )
+            for event in events:
+                await self._federation_event_handler.update_state_for_partial_state_event(
+                    destination, event
+                )
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 03c1197c99..32bf02818c 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -477,6 +477,45 @@ class FederationEventHandler:
 
             return await self.persist_events_and_notify(room_id, [(event, context)])
 
+    async def update_state_for_partial_state_event(
+        self, destination: str, event: EventBase
+    ) -> None:
+        """Recalculate the state at an event as part of a de-partial-stating process
+
+        Args:
+            destination: server to request full state from
+            event: partial-state event to be de-partial-stated
+        """
+        logger.info("Updating state for %s", event.event_id)
+        with nested_logging_context(suffix=event.event_id):
+            # if we have all the event's prev_events, then we can work out the
+            # state based on their states. Otherwise, we request it from the destination
+            # server.
+            #
+            # This is the same operation as we do when we receive a regular event
+            # over federation.
+            state = await self._resolve_state_at_missing_prevs(destination, event)
+
+            # build a new state group for it if need be
+            context = await self._state_handler.compute_event_context(
+                event,
+                old_state=state,
+            )
+            if context.partial_state:
+                # this can happen if some or all of the event's prev_events still have
+                # partial state - ie, an event has an earlier stream_ordering than one
+                # or more of its prev_events, so we de-partial-state it before its
+                # prev_events.
+                #
+                # TODO(faster_joins): we probably need to be more intelligent, and
+                #    exclude partial-state prev_events from consideration
+                logger.warning(
+                    "%s still has partial state: can't de-partial-state it yet",
+                    event.event_id,
+                )
+                return
+            await self._store.update_state_for_partial_state_event(event, context)
+
     async def backfill(
         self, dest: str, room_id: str, limit: int, extremities: Collection[str]
     ) -> None:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7db6905c61..1b092e900e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -175,17 +175,13 @@ class MessageHandler:
         state_filter = state_filter or StateFilter.all()
 
         if at_token:
-            # FIXME this claims to get the state at a stream position, but
-            # get_recent_events_for_room operates by topo ordering. This therefore
-            # does not reliably give you the state at the given stream position.
-            # (https://github.com/matrix-org/synapse/issues/3305)
-            last_events, _ = await self.store.get_recent_events_for_room(
-                room_id, end_token=at_token.room_key, limit=1
+            last_event = await self.store.get_last_event_in_room_before_stream_ordering(
+                room_id,
+                end_token=at_token.room_key,
             )
 
-            if not last_events:
+            if not last_event:
                 raise NotFoundError("Can't find event for token %s" % (at_token,))
-            last_event = last_events[0]
 
             # check whether the user is in the room at that time to determine
             # whether they should be treated as peeking.
@@ -204,7 +200,7 @@ class MessageHandler:
             visible_events = await filter_events_for_client(
                 self.storage,
                 user_id,
-                last_events,
+                [last_event],
                 filter_send_to_client=False,
                 is_peeking=is_peeking,
             )
@@ -1102,10 +1098,7 @@ class EventCreationHandler:
                 raise SynapseError(400, "Can't send same reaction twice")
 
         # Don't attempt to start a thread if the parent event is a relation.
-        elif (
-            relation_type == RelationTypes.THREAD
-            or relation_type == RelationTypes.UNSTABLE_THREAD
-        ):
+        elif relation_type == RelationTypes.THREAD:
             if await self.store.event_includes_relation(relates_to):
                 raise SynapseError(
                     400, "Cannot start threads from an event with a relation"
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 209a4b0e52..d078162c29 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -151,7 +151,7 @@ class BasePresenceHandler(abc.ABC):
 
     @abc.abstractmethod
     async def user_syncing(
-        self, user_id: str, affect_presence: bool
+        self, user_id: str, affect_presence: bool, presence_state: str
     ) -> ContextManager[None]:
         """Returns a context manager that should surround any stream requests
         from the user.
@@ -165,6 +165,7 @@ class BasePresenceHandler(abc.ABC):
             affect_presence: If false this function will be a no-op.
                 Useful for streams that are not associated with an actual
                 client that is being used by a user.
+            presence_state: The presence state indicated in the sync request
         """
 
     @abc.abstractmethod
@@ -228,6 +229,11 @@ class BasePresenceHandler(abc.ABC):
 
         return states
 
+    async def current_state_for_user(self, user_id: str) -> UserPresenceState:
+        """Get the current presence state for a user."""
+        res = await self.current_state_for_users([user_id])
+        return res[user_id]
+
     @abc.abstractmethod
     async def set_state(
         self,
@@ -461,7 +467,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
                 self.send_user_sync(user_id, False, last_sync_ms)
 
     async def user_syncing(
-        self, user_id: str, affect_presence: bool
+        self, user_id: str, affect_presence: bool, presence_state: str
     ) -> ContextManager[None]:
         """Record that a user is syncing.
 
@@ -471,6 +477,17 @@ class WorkerPresenceHandler(BasePresenceHandler):
         if not affect_presence or not self._presence_enabled:
             return _NullContextManager()
 
+        prev_state = await self.current_state_for_user(user_id)
+        if prev_state != PresenceState.BUSY:
+            # We set state here but pass ignore_status_msg = True as we don't want to
+            # cause the status message to be cleared.
+            # Note that this causes last_active_ts to be incremented which is not
+            # what the spec wants: see comment in the BasePresenceHandler version
+            # of this function.
+            await self.set_state(
+                UserID.from_string(user_id), {"presence": presence_state}, True
+            )
+
         curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
         self._user_to_num_current_syncs[user_id] = curr_sync + 1
 
@@ -942,7 +959,10 @@ class PresenceHandler(BasePresenceHandler):
         await self._update_states([prev_state.copy_and_replace(**new_fields)])
 
     async def user_syncing(
-        self, user_id: str, affect_presence: bool = True
+        self,
+        user_id: str,
+        affect_presence: bool = True,
+        presence_state: str = PresenceState.ONLINE,
     ) -> ContextManager[None]:
         """Returns a context manager that should surround any stream requests
         from the user.
@@ -956,6 +976,7 @@ class PresenceHandler(BasePresenceHandler):
             affect_presence: If false this function will be a no-op.
                 Useful for streams that are not associated with an actual
                 client that is being used by a user.
+            presence_state: The presence state indicated in the sync request
         """
         # Override if it should affect the user's presence, if presence is
         # disabled.
@@ -967,9 +988,25 @@ class PresenceHandler(BasePresenceHandler):
             self.user_to_num_current_syncs[user_id] = curr_sync + 1
 
             prev_state = await self.current_state_for_user(user_id)
+
+            # If they're busy then they don't stop being busy just by syncing,
+            # so just update the last sync time.
+            if prev_state.state != PresenceState.BUSY:
+                # XXX: We set_state separately here and just update the last_active_ts above
+                # This keeps the logic as similar as possible between the worker and single
+                # process modes. Using set_state will actually cause last_active_ts to be
+                # updated always, which is not what the spec calls for, but synapse has done
+                # this for... forever, I think.
+                await self.set_state(
+                    UserID.from_string(user_id), {"presence": presence_state}, True
+                )
+                # Retrieve the new state for the logic below. This should come from the
+                # in-memory cache.
+                prev_state = await self.current_state_for_user(user_id)
+
+            # To keep the single process behaviour consistent with worker mode, run the
+            # same logic as `update_external_syncs_row`, even though it looks weird.
             if prev_state.state == PresenceState.OFFLINE:
-                # If they're currently offline then bring them online, otherwise
-                # just update the last sync times.
                 await self._update_states(
                     [
                         prev_state.copy_and_replace(
@@ -979,6 +1016,10 @@ class PresenceHandler(BasePresenceHandler):
                         )
                     ]
                 )
+            # otherwise, set the new presence state & update the last sync time,
+            # but don't update last_active_ts as this isn't an indication that
+            # they've been active (even though it's probably been updated by
+            # set_state above)
             else:
                 await self._update_states(
                     [
@@ -1086,11 +1127,6 @@ class PresenceHandler(BasePresenceHandler):
             )
             self.external_process_last_updated_ms.pop(process_id, None)
 
-    async def current_state_for_user(self, user_id: str) -> UserPresenceState:
-        """Get the current presence state for a user."""
-        res = await self.current_state_for_users([user_id])
-        return res[user_id]
-
     async def _persist_and_notify(self, states: List[UserPresenceState]) -> None:
         """Persist states in the database, poke the notifier and send to
         interested remote servers
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 6c8b17c420..5125126a80 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -661,16 +661,15 @@ class SyncHandler:
             stream_position: point at which to get state
             state_filter: The state filter used to fetch state from the database.
         """
-        # FIXME this claims to get the state at a stream position, but
-        # get_recent_events_for_room operates by topo ordering. This therefore
-        # does not reliably give you the state at the given stream position.
-        # (https://github.com/matrix-org/synapse/issues/3305)
-        last_events, _ = await self.store.get_recent_events_for_room(
-            room_id, end_token=stream_position.room_key, limit=1
+        # FIXME: This gets the state at the latest event before the stream ordering,
+        # which might not be the same as the "current state" of the room at the time
+        # of the stream token if there were multiple forward extremities at the time.
+        last_event = await self.store.get_last_event_in_room_before_stream_ordering(
+            room_id,
+            end_token=stream_position.room_key,
         )
 
-        if last_events:
-            last_event = last_events[-1]
+        if last_event:
             state = await self.get_state_after_event(
                 last_event, state_filter=state_filter or StateFilter.all()
             )
diff --git a/synapse/rest/client/login.py b/synapse/rest/client/login.py
index c9d44c5964..4a4dbe75de 100644
--- a/synapse/rest/client/login.py
+++ b/synapse/rest/client/login.py
@@ -342,6 +342,15 @@ class LoginRestServlet(RestServlet):
             user_id = canonical_uid
 
         device_id = login_submission.get("device_id")
+
+        # If device_id is present, check that device_id is not longer than a reasonable 512 characters
+        if device_id and len(device_id) > 512:
+            raise LoginError(
+                400,
+                "device_id cannot be longer than 512 characters.",
+                errcode=Codes.INVALID_PARAM,
+            )
+
         initial_display_name = login_submission.get("initial_device_display_name")
         (
             device_id,
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index 2e25e8638b..e8772f86e7 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -180,13 +180,10 @@ class SyncRestServlet(RestServlet):
 
         affect_presence = set_presence != PresenceState.OFFLINE
 
-        if affect_presence:
-            await self.presence_handler.set_state(
-                user, {"presence": set_presence}, True
-            )
-
         context = await self.presence_handler.user_syncing(
-            user.to_string(), affect_presence=affect_presence
+            user.to_string(),
+            affect_presence=affect_presence,
+            presence_state=set_presence,
         )
         with context:
             sync_result = await self.sync_handler.wait_for_sync_for_user(
diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index 9a65aa4843..7b29026381 100644
--- a/synapse/rest/client/versions.py
+++ b/synapse/rest/client/versions.py
@@ -86,7 +86,7 @@ class VersionsRestServlet(RestServlet):
                     # Implements additional endpoints as described in MSC2432
                     "org.matrix.msc2432": True,
                     # Implements additional endpoints as described in MSC2666
-                    "uk.half-shot.msc2666": True,
+                    "uk.half-shot.msc2666.mutual_rooms": True,
                     # Whether new rooms will be set to encrypted or not (based on presets).
                     "io.element.e2ee_forced.public": self.e2ee_forced_public,
                     "io.element.e2ee_forced.private": self.e2ee_forced_private,
@@ -100,7 +100,6 @@ class VersionsRestServlet(RestServlet):
                     # Adds support for jump to date endpoints (/timestamp_to_event) as per MSC3030
                     "org.matrix.msc3030": self.config.experimental.msc3030_enabled,
                     # Adds support for thread relations, per MSC3440.
-                    "org.matrix.msc3440": self.config.experimental.msc3440_enabled,
                     "org.matrix.msc3440.stable": True,  # TODO: remove when "v1.3" is added above
                 },
             },
diff --git a/synapse/server.py b/synapse/server.py
index 380369db92..37c72bd83a 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -758,7 +758,7 @@ class HomeServer(metaclass=abc.ABCMeta):
 
     @cache_in_self
     def get_event_client_serializer(self) -> EventClientSerializer:
-        return EventClientSerializer(self)
+        return EventClientSerializer()
 
     @cache_in_self
     def get_password_policy_handler(self) -> PasswordPolicyHandler:
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index dc8009b23d..318e4df376 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1582,7 +1582,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
         self,
         user_id: str,
         device_ids: Collection[str],
-        hosts: Optional[Collection[str]],
         room_ids: Collection[str],
     ) -> Optional[int]:
         """Persist that a user's devices have been updated, and which hosts
@@ -1592,9 +1591,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             user_id: The ID of the user whose device changed.
             device_ids: The IDs of any changed devices. If empty, this function will
                 return None.
-            hosts: The remote destinations that should be notified of the change. If
-                None then the set of hosts have *not* been calculated, and will be
-                calculated later by a background task.
             room_ids: The rooms that the user is in
 
         Returns:
@@ -1606,14 +1602,12 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
 
         context = get_active_span_text_map()
 
-        def add_device_changes_txn(
-            txn, stream_ids_for_device_change, stream_ids_for_outbound_pokes
-        ):
+        def add_device_changes_txn(txn, stream_ids):
             self._add_device_change_to_stream_txn(
                 txn,
                 user_id,
                 device_ids,
-                stream_ids_for_device_change,
+                stream_ids,
             )
 
             self._add_device_outbound_room_poke_txn(
@@ -1621,43 +1615,17 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
                 user_id,
                 device_ids,
                 room_ids,
-                stream_ids_for_device_change,
-                context,
-                hosts_have_been_calculated=hosts is not None,
-            )
-
-            # If the set of hosts to send to has not been calculated yet (and so
-            # `hosts` is None) or there are no `hosts` to send to, then skip
-            # trying to persist them to the DB.
-            if not hosts:
-                return
-
-            self._add_device_outbound_poke_to_stream_txn(
-                txn,
-                user_id,
-                device_ids,
-                hosts,
-                stream_ids_for_outbound_pokes,
+                stream_ids,
                 context,
             )
 
-        # `device_lists_stream` wants a stream ID per device update.
-        num_stream_ids = len(device_ids)
-
-        if hosts:
-            # `device_lists_outbound_pokes` wants a different stream ID for
-            # each row, which is a row per host per device update.
-            num_stream_ids += len(hosts) * len(device_ids)
-
-        async with self._device_list_id_gen.get_next_mult(num_stream_ids) as stream_ids:
-            stream_ids_for_device_change = stream_ids[: len(device_ids)]
-            stream_ids_for_outbound_pokes = stream_ids[len(device_ids) :]
-
+        async with self._device_list_id_gen.get_next_mult(
+            len(device_ids)
+        ) as stream_ids:
             await self.db_pool.runInteraction(
                 "add_device_change_to_stream",
                 add_device_changes_txn,
-                stream_ids_for_device_change,
-                stream_ids_for_outbound_pokes,
+                stream_ids,
             )
 
         return stream_ids[-1]
@@ -1735,7 +1703,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
                     next(stream_id_iterator),
                     user_id,
                     device_id,
-                    False,
+                    not self.hs.is_mine_id(
+                        user_id
+                    ),  # We only need to send out update for *our* users
                     now,
                     encoded_context if whitelisted_homeserver(destination) else "{}",
                 )
@@ -1752,19 +1722,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
         room_ids: Collection[str],
         stream_ids: List[str],
         context: Dict[str, str],
-        hosts_have_been_calculated: bool,
     ) -> None:
-        """Record the user in the room has updated their device.
-
-        Args:
-            hosts_have_been_calculated: True if `device_lists_outbound_pokes`
-                has been updated already with the updates.
-        """
-
-        # We only need to convert to outbound pokes if they are our user.
-        converted_to_destinations = (
-            hosts_have_been_calculated or not self.hs.is_mine_id(user_id)
-        )
+        """Record the user in the room has updated their device."""
 
         encoded_context = json_encoder.encode(context)
 
@@ -1789,7 +1748,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
                     device_id,
                     room_id,
                     stream_id,
-                    converted_to_destinations,
+                    False,
                     encoded_context,
                 )
                 for room_id in room_ids
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 3fcd5f5b99..2a1e567ce0 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -963,6 +963,21 @@ class PersistEventsStore:
                 values=to_insert,
             )
 
+    async def update_current_state(
+        self,
+        room_id: str,
+        state_delta: DeltaState,
+        stream_id: int,
+    ) -> None:
+        """Update the current state stored in the datatabase for the given room"""
+
+        await self.db_pool.runInteraction(
+            "update_current_state",
+            self._update_current_state_txn,
+            state_delta_by_room={room_id: state_delta},
+            stream_id=stream_id,
+        )
+
     def _update_current_state_txn(
         self,
         txn: LoggingTransaction,
@@ -1819,10 +1834,7 @@ class PersistEventsStore:
         if rel_type == RelationTypes.REPLACE:
             txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
 
-        if (
-            rel_type == RelationTypes.THREAD
-            or rel_type == RelationTypes.UNSTABLE_THREAD
-        ):
+        if rel_type == RelationTypes.THREAD:
             txn.call_after(self.store.get_thread_summary.invalidate, (parent_id,))
             # It should be safe to only invalidate the cache if the user has not
             # previously participated in the thread, but that's difficult (and
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index a60e3f4fdd..5288cdba03 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1979,3 +1979,27 @@ class EventsWorkerStore(SQLBaseStore):
             desc="is_partial_state_event",
         )
         return result is not None
+
+    async def get_partial_state_events_batch(self, room_id: str) -> List[str]:
+        """Get a list of events in the given room that have partial state"""
+        return await self.db_pool.runInteraction(
+            "get_partial_state_events_batch",
+            self._get_partial_state_events_batch_txn,
+            room_id,
+        )
+
+    @staticmethod
+    def _get_partial_state_events_batch_txn(
+        txn: LoggingTransaction, room_id: str
+    ) -> List[str]:
+        txn.execute(
+            """
+            SELECT event_id FROM partial_state_events AS pse
+                JOIN events USING (event_id)
+            WHERE pse.room_id = ?
+            ORDER BY events.stream_ordering
+            LIMIT 100
+            """,
+            (room_id,),
+        )
+        return [row[0] for row in txn]
diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index 407158ceee..a5c31f6787 100644
--- a/synapse/storage/databases/main/relations.py
+++ b/synapse/storage/databases/main/relations.py
@@ -14,7 +14,6 @@
 
 import logging
 from typing import (
-    TYPE_CHECKING,
     Collection,
     Dict,
     FrozenSet,
@@ -32,20 +31,12 @@ import attr
 from synapse.api.constants import RelationTypes
 from synapse.events import EventBase
 from synapse.storage._base import SQLBaseStore
-from synapse.storage.database import (
-    DatabasePool,
-    LoggingDatabaseConnection,
-    LoggingTransaction,
-    make_in_list_sql_clause,
-)
+from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause
 from synapse.storage.databases.main.stream import generate_pagination_where_clause
 from synapse.storage.engines import PostgresEngine
 from synapse.types import JsonDict, RoomStreamToken, StreamToken
 from synapse.util.caches.descriptors import cached, cachedList
 
-if TYPE_CHECKING:
-    from synapse.server import HomeServer
-
 logger = logging.getLogger(__name__)
 
 
@@ -63,16 +54,6 @@ class _RelatedEvent:
 
 
 class RelationsWorkerStore(SQLBaseStore):
-    def __init__(
-        self,
-        database: DatabasePool,
-        db_conn: LoggingDatabaseConnection,
-        hs: "HomeServer",
-    ):
-        super().__init__(database, db_conn, hs)
-
-        self._msc3440_enabled = hs.config.experimental.msc3440_enabled
-
     @cached(uncached_args=("event",), tree=True)
     async def get_relations_for_event(
         self,
@@ -497,7 +478,7 @@ class RelationsWorkerStore(SQLBaseStore):
                         AND parent.room_id = child.room_id
                     WHERE
                         %s
-                        AND %s
+                        AND relation_type = ?
                     ORDER BY parent.event_id, child.topological_ordering DESC, child.stream_ordering DESC
                 """
             else:
@@ -512,22 +493,16 @@ class RelationsWorkerStore(SQLBaseStore):
                         AND parent.room_id = child.room_id
                     WHERE
                         %s
-                        AND %s
+                        AND relation_type = ?
                     ORDER BY child.topological_ordering DESC, child.stream_ordering DESC
                 """
 
             clause, args = make_in_list_sql_clause(
                 txn.database_engine, "relates_to_id", event_ids
             )
+            args.append(RelationTypes.THREAD)
 
-            if self._msc3440_enabled:
-                relations_clause = "(relation_type = ? OR relation_type = ?)"
-                args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD))
-            else:
-                relations_clause = "relation_type = ?"
-                args.append(RelationTypes.THREAD)
-
-            txn.execute(sql % (clause, relations_clause), args)
+            txn.execute(sql % (clause,), args)
             latest_event_ids = {}
             for parent_event_id, child_event_id in txn:
                 # Only consider the latest threaded reply (by topological ordering).
@@ -547,7 +522,7 @@ class RelationsWorkerStore(SQLBaseStore):
                     AND parent.room_id = child.room_id
                 WHERE
                     %s
-                    AND %s
+                    AND relation_type = ?
                 GROUP BY parent.event_id
             """
 
@@ -556,15 +531,9 @@ class RelationsWorkerStore(SQLBaseStore):
             clause, args = make_in_list_sql_clause(
                 txn.database_engine, "relates_to_id", latest_event_ids.keys()
             )
+            args.append(RelationTypes.THREAD)
 
-            if self._msc3440_enabled:
-                relations_clause = "(relation_type = ? OR relation_type = ?)"
-                args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD))
-            else:
-                relations_clause = "relation_type = ?"
-                args.append(RelationTypes.THREAD)
-
-            txn.execute(sql % (clause, relations_clause), args)
+            txn.execute(sql % (clause,), args)
             counts = dict(cast(List[Tuple[str, int]], txn.fetchall()))
 
             return counts, latest_event_ids
@@ -622,7 +591,7 @@ class RelationsWorkerStore(SQLBaseStore):
                 parent.event_id = relates_to_id
                 AND parent.room_id = child.room_id
             WHERE
-                %s
+                relation_type = ?
                 AND %s
                 AND %s
             GROUP BY parent.event_id, child.sender
@@ -638,16 +607,9 @@ class RelationsWorkerStore(SQLBaseStore):
                 txn.database_engine, "relates_to_id", event_ids
             )
 
-            if self._msc3440_enabled:
-                relations_clause = "(relation_type = ? OR relation_type = ?)"
-                relations_args = [RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD]
-            else:
-                relations_clause = "relation_type = ?"
-                relations_args = [RelationTypes.THREAD]
-
             txn.execute(
-                sql % (users_sql, events_clause, relations_clause),
-                users_args + events_args + relations_args,
+                sql % (users_sql, events_clause),
+                [RelationTypes.THREAD] + users_args + events_args,
             )
             return {(row[0], row[1]): row[2] for row in txn}
 
@@ -677,7 +639,7 @@ class RelationsWorkerStore(SQLBaseStore):
             user participated in that event's thread, otherwise false.
         """
 
-        def _get_thread_summary_txn(txn: LoggingTransaction) -> Set[str]:
+        def _get_threads_participated_txn(txn: LoggingTransaction) -> Set[str]:
             # Fetch whether the requester has participated or not.
             sql = """
                 SELECT DISTINCT relates_to_id
@@ -688,28 +650,20 @@ class RelationsWorkerStore(SQLBaseStore):
                     AND parent.room_id = child.room_id
                 WHERE
                     %s
-                    AND %s
+                    AND relation_type = ?
                     AND child.sender = ?
             """
 
             clause, args = make_in_list_sql_clause(
                 txn.database_engine, "relates_to_id", event_ids
             )
+            args.extend([RelationTypes.THREAD, user_id])
 
-            if self._msc3440_enabled:
-                relations_clause = "(relation_type = ? OR relation_type = ?)"
-                args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD))
-            else:
-                relations_clause = "relation_type = ?"
-                args.append(RelationTypes.THREAD)
-
-            args.append(user_id)
-
-            txn.execute(sql % (clause, relations_clause), args)
+            txn.execute(sql % (clause,), args)
             return {row[0] for row in txn.fetchall()}
 
         participated_threads = await self.db_pool.runInteraction(
-            "get_thread_summary", _get_thread_summary_txn
+            "get_threads_participated", _get_threads_participated_txn
         )
 
         return {event_id: event_id in participated_threads for event_id in event_ids}
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 18b1acd9e1..87e9482c60 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1077,6 +1077,37 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
             get_rooms_for_retention_period_in_range_txn,
         )
 
+    async def clear_partial_state_room(self, room_id: str) -> bool:
+        # this can race with incoming events, so we watch out for FK errors.
+        # TODO(faster_joins): this still doesn't completely fix the race, since the persist process
+        #   is not atomic. I fear we need an application-level lock.
+        try:
+            await self.db_pool.runInteraction(
+                "clear_partial_state_room", self._clear_partial_state_room_txn, room_id
+            )
+            return True
+        except self.db_pool.engine.module.DatabaseError as e:
+            # TODO(faster_joins): how do we distinguish between FK errors and other errors?
+            logger.warning(
+                "Exception while clearing lazy partial-state-room %s, retrying: %s",
+                room_id,
+                e,
+            )
+            return False
+
+    @staticmethod
+    def _clear_partial_state_room_txn(txn: LoggingTransaction, room_id: str) -> None:
+        DatabasePool.simple_delete_txn(
+            txn,
+            table="partial_state_rooms_servers",
+            keyvalues={"room_id": room_id},
+        )
+        DatabasePool.simple_delete_one_txn(
+            txn,
+            table="partial_state_rooms",
+            keyvalues={"room_id": room_id},
+        )
+
 
 class _BackgroundUpdates:
     REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index ecdc1fdc4c..7a1b013fa3 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -21,6 +21,7 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
 from synapse.events import EventBase
+from synapse.events.snapshot import EventContext
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import (
     DatabasePool,
@@ -129,7 +130,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         )
 
         if room_version is None:
-            raise NotFoundError("Could not room_version for %s" % (room_id,))
+            raise NotFoundError("Could not find room_version for %s" % (room_id,))
 
         return room_version
 
@@ -354,6 +355,53 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         return {row["state_group"] for row in rows}
 
+    async def update_state_for_partial_state_event(
+        self,
+        event: EventBase,
+        context: EventContext,
+    ) -> None:
+        """Update the state group for a partial state event"""
+        await self.db_pool.runInteraction(
+            "update_state_for_partial_state_event",
+            self._update_state_for_partial_state_event_txn,
+            event,
+            context,
+        )
+
+    def _update_state_for_partial_state_event_txn(
+        self,
+        txn,
+        event: EventBase,
+        context: EventContext,
+    ):
+        # we shouldn't have any outliers here
+        assert not event.internal_metadata.is_outlier()
+
+        # anything that was rejected should have the same state as its
+        # predecessor.
+        if context.rejected:
+            assert context.state_group == context.state_group_before_event
+
+        self.db_pool.simple_update_txn(
+            txn,
+            table="event_to_state_groups",
+            keyvalues={"event_id": event.event_id},
+            updatevalues={"state_group": context.state_group},
+        )
+
+        self.db_pool.simple_delete_one_txn(
+            txn,
+            table="partial_state_events",
+            keyvalues={"event_id": event.event_id},
+        )
+
+        # TODO(faster_joins): need to do something about workers here
+        txn.call_after(
+            self._get_state_group_for_event.prefill,
+            (event.event_id,),
+            context.state_group,
+        )
+
 
 class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
 
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 6d45a8a9f6..793e906630 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -758,6 +758,32 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             "get_room_event_before_stream_ordering", _f
         )
 
+    async def get_last_event_in_room_before_stream_ordering(
+        self,
+        room_id: str,
+        end_token: RoomStreamToken,
+    ) -> Optional[EventBase]:
+        """Returns the last event in a room at or before a stream ordering
+
+        Args:
+            room_id
+            end_token: The token used to stream from
+
+        Returns:
+            The most recent event.
+        """
+
+        last_row = await self.get_room_event_before_stream_ordering(
+            room_id=room_id,
+            stream_ordering=end_token.stream,
+        )
+        if last_row:
+            _, _, event_id = last_row
+            event = await self.get_event(event_id, get_prev_content=True)
+            return event
+
+        return None
+
     async def get_current_room_stream_token_for_room_id(
         self, room_id: Optional[str] = None
     ) -> RoomStreamToken:
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index b402922817..e496ba7bed 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -376,6 +376,62 @@ class EventsPersistenceStorage:
         pos = PersistedEventPosition(self._instance_name, event_stream_id)
         return event, pos, self.main_store.get_room_max_token()
 
+    async def update_current_state(self, room_id: str) -> None:
+        """Recalculate the current state for a room, and persist it"""
+        state = await self._calculate_current_state(room_id)
+        delta = await self._calculate_state_delta(room_id, state)
+
+        # TODO(faster_joins): get a real stream ordering, to make this work correctly
+        #    across workers.
+        #
+        # TODO(faster_joins): this can race against event persistence, in which case we
+        #    will end up with incorrect state. Perhaps we should make this a job we
+        #    farm out to the event persister, somehow.
+        stream_id = self.main_store.get_room_max_stream_ordering()
+        await self.persist_events_store.update_current_state(room_id, delta, stream_id)
+
+    async def _calculate_current_state(self, room_id: str) -> StateMap[str]:
+        """Calculate the current state of a room, based on the forward extremities
+
+        Args:
+            room_id: room for which to calculate current state
+
+        Returns:
+            map from (type, state_key) to event id for the  current state in the room
+        """
+        latest_event_ids = await self.main_store.get_latest_event_ids_in_room(room_id)
+        state_groups = set(
+            (
+                await self.main_store._get_state_group_for_events(latest_event_ids)
+            ).values()
+        )
+
+        state_maps_by_state_group = await self.state_store._get_state_for_groups(
+            state_groups
+        )
+
+        if len(state_groups) == 1:
+            # If there is only one state group, then we know what the current
+            # state is.
+            return state_maps_by_state_group[state_groups.pop()]
+
+        # Ok, we need to defer to the state handler to resolve our state sets.
+        logger.debug("calling resolve_state_groups from preserve_events")
+
+        # Avoid a circular import.
+        from synapse.state import StateResolutionStore
+
+        room_version = await self.main_store.get_room_version_id(room_id)
+        res = await self._state_resolution_handler.resolve_state_groups(
+            room_id,
+            room_version,
+            state_maps_by_state_group,
+            event_map=None,
+            state_res_store=StateResolutionStore(self.main_store),
+        )
+
+        return res.state
+
     async def _persist_event_batch(
         self,
         events_and_contexts: List[Tuple[EventBase, EventContext]],
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 151f2aa9bb..871d4ace12 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -66,9 +66,9 @@ Changes in SCHEMA_VERSION = 69:
 
 
 SCHEMA_COMPAT_VERSION = (
-    # we now have `state_key` columns in both `events` and `state_events`, so
-    # now incompatible with synapses wth SCHEMA_VERSION < 66.
-    66
+    # We now assume that `device_lists_changes_in_room` has been filled out for
+    # recent device_list_updates.
+    69
 )
 """Limit on how far the synapse codebase can be rolled back without breaking db compat