summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/17483.bugfix1
-rw-r--r--changelog.d/17536.misc1
-rwxr-xr-xscripts-dev/federation_client.py25
-rw-r--r--synapse/app/generic_worker.py15
-rw-r--r--synapse/app/homeserver.py14
-rw-r--r--synapse/federation/transport/server/__init__.py4
-rw-r--r--synapse/federation/transport/server/federation.py2
-rw-r--r--synapse/handlers/sliding_sync.py127
-rw-r--r--synapse/rest/__init__.py166
-rw-r--r--synapse/rest/client/sync.py16
-rw-r--r--synapse/storage/databases/main/cache.py3
-rw-r--r--synapse/storage/databases/main/events.py24
-rw-r--r--synapse/storage/databases/main/stream.py96
-rw-r--r--synapse/storage/schema/main/delta/85/07_sliding_sync.sql24
-rw-r--r--tests/rest/client/test_sync.py14
-rw-r--r--tests/storage/test_event_chain.py1
16 files changed, 411 insertions, 122 deletions
diff --git a/changelog.d/17483.bugfix b/changelog.d/17483.bugfix
new file mode 100644
index 0000000000..c97a802dbf
--- /dev/null
+++ b/changelog.d/17483.bugfix
@@ -0,0 +1 @@
+Start handlers for new media endpoints when media resource configured.
diff --git a/changelog.d/17536.misc b/changelog.d/17536.misc
new file mode 100644
index 0000000000..116ef0c36d
--- /dev/null
+++ b/changelog.d/17536.misc
@@ -0,0 +1 @@
+Replace override of deprecated method `HTTPAdapter.get_connection` with `get_connection_with_tls_context`.
\ No newline at end of file
diff --git a/scripts-dev/federation_client.py b/scripts-dev/federation_client.py
index 4c758e5424..fb879ef555 100755
--- a/scripts-dev/federation_client.py
+++ b/scripts-dev/federation_client.py
@@ -43,7 +43,7 @@ import argparse
 import base64
 import json
 import sys
-from typing import Any, Dict, Optional, Tuple
+from typing import Any, Dict, Mapping, Optional, Tuple, Union
 from urllib import parse as urlparse
 
 import requests
@@ -75,7 +75,7 @@ def encode_canonical_json(value: object) -> bytes:
         value,
         # Encode code-points outside of ASCII as UTF-8 rather than \u escapes
         ensure_ascii=False,
-        # Remove unecessary white space.
+        # Remove unnecessary white space.
         separators=(",", ":"),
         # Sort the keys of dictionaries.
         sort_keys=True,
@@ -298,12 +298,23 @@ class MatrixConnectionAdapter(HTTPAdapter):
 
         return super().send(request, *args, **kwargs)
 
-    def get_connection(
-        self, url: str, proxies: Optional[Dict[str, str]] = None
+    def get_connection_with_tls_context(
+        self,
+        request: PreparedRequest,
+        verify: Optional[Union[bool, str]],
+        proxies: Optional[Mapping[str, str]] = None,
+        cert: Optional[Union[Tuple[str, str], str]] = None,
     ) -> HTTPConnectionPool:
-        # overrides the get_connection() method in the base class
-        parsed = urlparse.urlsplit(url)
-        (host, port, ssl_server_name) = self._lookup(parsed.netloc)
+        # overrides the get_connection_with_tls_context() method in the base class
+        parsed = urlparse.urlsplit(request.url)
+
+        # Extract the server name from the request URL, and ensure it's a str.
+        hostname = parsed.netloc
+        if isinstance(hostname, bytes):
+            hostname = hostname.decode("utf-8")
+        assert isinstance(hostname, str)
+
+        (host, port, ssl_server_name) = self._lookup(hostname)
         print(
             f"Connecting to {host}:{port} with SNI {ssl_server_name}", file=sys.stderr
         )
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 248622fa92..53f1859256 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -206,6 +206,21 @@ class GenericWorkerServer(HomeServer):
                                 "/_synapse/admin": admin_resource,
                             }
                         )
+
+                        if "federation" not in res.names:
+                            # Only load the federation media resource separately if federation
+                            # resource is not specified since federation resource includes media
+                            # resource.
+                            resources[FEDERATION_PREFIX] = TransportLayerServer(
+                                self, servlet_groups=["media"]
+                            )
+                        if "client" not in res.names:
+                            # Only load the client media resource separately if client
+                            # resource is not specified since client resource includes media
+                            # resource.
+                            resources[CLIENT_API_PREFIX] = ClientRestResource(
+                                self, servlet_groups=["media"]
+                            )
                     else:
                         logger.warning(
                             "A 'media' listener is configured but the media"
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e114ab7ec4..2a824e8457 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -101,6 +101,12 @@ class SynapseHomeServer(HomeServer):
                     # Skip loading openid resource if federation is defined
                     # since federation resource will include openid
                     continue
+                if name == "media" and (
+                    "federation" in res.names or "client" in res.names
+                ):
+                    # Skip loading media resource if federation or client are defined
+                    # since federation & client resources will include media
+                    continue
                 if name == "health":
                     # Skip loading, health resource is always included
                     continue
@@ -231,6 +237,14 @@ class SynapseHomeServer(HomeServer):
                     "'media' resource conflicts with enable_media_repo=False"
                 )
 
+        if name == "media":
+            resources[FEDERATION_PREFIX] = TransportLayerServer(
+                self, servlet_groups=["media"]
+            )
+            resources[CLIENT_API_PREFIX] = ClientRestResource(
+                self, servlet_groups=["media"]
+            )
+
         if name in ["keys", "federation"]:
             resources[SERVER_KEY_PREFIX] = KeyResource(self)
 
diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py
index 72599bb204..43102567db 100644
--- a/synapse/federation/transport/server/__init__.py
+++ b/synapse/federation/transport/server/__init__.py
@@ -271,6 +271,10 @@ SERVLET_GROUPS: Dict[str, Iterable[Type[BaseFederationServlet]]] = {
     "federation": FEDERATION_SERVLET_CLASSES,
     "room_list": (PublicRoomList,),
     "openid": (OpenIdUserInfo,),
+    "media": (
+        FederationMediaDownloadServlet,
+        FederationMediaThumbnailServlet,
+    ),
 }
 
 
diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index b075a86f68..20f87c885e 100644
--- a/synapse/federation/transport/server/federation.py
+++ b/synapse/federation/transport/server/federation.py
@@ -912,6 +912,4 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
     FederationV1SendKnockServlet,
     FederationMakeKnockServlet,
     FederationAccountStatusServlet,
-    FederationMediaDownloadServlet,
-    FederationMediaThumbnailServlet,
 )
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index ac65b30290..531089c279 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -810,6 +810,7 @@ class SlidingSyncHandler:
 
             connection_position = await self.connection_store.record_rooms(
                 sync_config=sync_config,
+                room_configs=relevant_room_map,
                 from_token=from_token,
                 sent_room_ids=relevant_rooms_to_send_map.keys(),
                 unsent_room_ids=unsent_room_ids,
@@ -1910,6 +1911,7 @@ class SlidingSyncHandler:
         #
         # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
         from_bound = None
+        ignore_timeline_bound = False
         initial = True
         if from_token and not room_membership_for_user_at_to_token.newly_joined:
             room_status = await self.connection_store.have_sent_room(
@@ -1917,6 +1919,7 @@ class SlidingSyncHandler:
                 connection_token=from_token.connection_position,
                 room_id=room_id,
             )
+
             if room_status.status == HaveSentRoomFlag.LIVE:
                 from_bound = from_token.stream_token.room_key
                 initial = False
@@ -1930,9 +1933,24 @@ class SlidingSyncHandler:
             else:
                 assert_never(room_status.status)
 
+            if room_status.timeline_limit is not None and (
+                room_status.timeline_limit < room_sync_config.timeline_limit
+            ):
+                # If the timeline limit has been increased since previous
+                # requests then we treat it as request for more events. We do
+                # this by sending down a `limited` sync, ignoring the from
+                # bound.
+                ignore_timeline_bound = True
+
             log_kv({"sliding_sync.room_status": room_status})
 
-        log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial})
+        log_kv(
+            {
+                "sliding_sync.from_bound": from_bound,
+                "sliding_sync.ignore_timeline_bound": ignore_timeline_bound,
+                "sliding_sync.initial": initial,
+            }
+        )
 
         # Assemble the list of timeline events
         #
@@ -2004,7 +2022,7 @@ class SlidingSyncHandler:
                 # (from newer to older events) starting at to_bound.
                 # This ensures we fill the `limit` with the newest events first,
                 from_key=to_bound,
-                to_key=from_bound,
+                to_key=from_bound if not ignore_timeline_bound else None,
                 direction=Direction.BACKWARDS,
                 # We add one so we can determine if there are enough events to saturate
                 # the limit or not (see `limited`)
@@ -2343,24 +2361,37 @@ class SlidingSyncHandler:
                 )
 
         # Figure out the last bump event in the room
-        last_bump_event_result = (
-            await self.store.get_last_event_pos_in_room_before_stream_ordering(
-                room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
+        bump_stamp = None
+        if timeline_events:
+            for e in reversed(timeline_events):
+                assert e.internal_metadata.stream_ordering is not None
+                if (
+                    e.type in DEFAULT_BUMP_EVENT_TYPES
+                    and e.internal_metadata.stream_ordering > 0
+                ):
+                    bump_stamp = e.internal_metadata.stream_ordering
+                    break
+
+        if bump_stamp is None:
+            # By default, just choose the membership event position
+            bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
+
+            last_bump_event_result = (
+                await self.store.get_last_event_pos_in_room_before_stream_ordering(
+                    room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
+                )
             )
-        )
 
-        # By default, just choose the membership event position
-        bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
-        # But if we found a bump event, use that instead
-        if last_bump_event_result is not None:
-            _, new_bump_event_pos = last_bump_event_result
+            # But if we found a bump event, use that instead
+            if last_bump_event_result is not None:
+                _, new_bump_event_pos = last_bump_event_result
 
-            # If we've just joined a remote room, then the last bump event may
-            # have been backfilled (and so have a negative stream ordering).
-            # These negative stream orderings can't sensibly be compared, so
-            # instead we use the membership event position.
-            if new_bump_event_pos.stream > 0:
-                bump_stamp = new_bump_event_pos.stream
+                # If we've just joined a remote room, then the last bump event may
+                # have been backfilled (and so have a negative stream ordering).
+                # These negative stream orderings can't sensibly be compared, so
+                # instead we use the membership event position.
+                if new_bump_event_pos.stream > 0:
+                    bump_stamp = new_bump_event_pos.stream
 
         set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
 
@@ -2606,12 +2637,13 @@ class SlidingSyncHandler:
                 up_to_stream_id=since_stream_id,
             )
 
-            logger.debug(
-                "Deleted %d to-device messages up to %d for %s",
-                deleted,
-                since_stream_id,
-                user_id,
-            )
+            if deleted:
+                logger.debug(
+                    "Deleted %d to-device messages up to %d for %s",
+                    deleted,
+                    since_stream_id,
+                    user_id,
+                )
 
         messages, stream_id = await self.store.get_messages_for_device(
             user_id=user_id,
@@ -2948,19 +2980,26 @@ class HaveSentRoom:
             contains the last stream token of the last updates we sent down
             the room, i.e. we still need to send everything since then to the
             client.
+        timeline_limit: The timeline limit config for the room, if LIVE or
+            PREVIOUSLY. This is used to track if the client has increased
+            the timeline limit to request more events.
     """
 
     status: HaveSentRoomFlag
     last_token: Optional[RoomStreamToken]
+    timeline_limit: Optional[int]
+
+    @staticmethod
+    def live(timeline_limit: int) -> "HaveSentRoom":
+        return HaveSentRoom(HaveSentRoomFlag.LIVE, None, timeline_limit)
 
     @staticmethod
-    def previously(last_token: RoomStreamToken) -> "HaveSentRoom":
+    def previously(last_token: RoomStreamToken, timeline_limit: int) -> "HaveSentRoom":
         """Constructor for `PREVIOUSLY` flag."""
-        return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token)
+        return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token, timeline_limit)
 
 
-HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None)
-HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None)
+HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None, None)
 
 
 @attr.s(auto_attribs=True)
@@ -3026,6 +3065,7 @@ class SlidingSyncConnectionStore:
     async def record_rooms(
         self,
         sync_config: SlidingSyncConfig,
+        room_configs: Dict[str, RoomSyncConfig],
         from_token: Optional[SlidingSyncStreamToken],
         *,
         sent_room_ids: StrCollection,
@@ -3066,8 +3106,12 @@ class SlidingSyncConnectionStore:
         # end we can treat this as a noop.
         have_updated = False
         for room_id in sent_room_ids:
-            new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE
-            have_updated = True
+            prev_state = new_room_statuses.get(room_id)
+            new_room_statuses[room_id] = HaveSentRoom.live(
+                room_configs[room_id].timeline_limit
+            )
+            if prev_state != new_room_statuses[room_id]:
+                have_updated = True
 
         # Whether we add/update the entries for unsent rooms depends on the
         # existing entry:
@@ -3078,18 +3122,23 @@ class SlidingSyncConnectionStore:
         #     given token, so we don't need to update the entry.
         #   - NEVER: We have never previously sent down the room, and we haven't
         #     sent anything down this time either so we leave it as NEVER.
+        #
+        # We only need to do this if `from_token` is not None, as if it is then
+        # we know that there are no existing entires.
 
-        # Work out the new state for unsent rooms that were `LIVE`.
         if from_token:
-            new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key)
-        else:
-            new_unsent_state = HAVE_SENT_ROOM_NEVER
-
-        for room_id in unsent_room_ids:
-            prev_state = new_room_statuses.get(room_id)
-            if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE:
-                new_room_statuses[room_id] = new_unsent_state
-                have_updated = True
+            for room_id in unsent_room_ids:
+                prev_state = new_room_statuses.get(room_id)
+                if (
+                    prev_state is not None
+                    and prev_state.status == HaveSentRoomFlag.LIVE
+                ):
+                    assert prev_state.timeline_limit is not None
+                    new_room_statuses[room_id] = HaveSentRoom.previously(
+                        from_token.stream_token.room_key,
+                        prev_state.timeline_limit,
+                    )
+                    have_updated = True
 
         if not have_updated:
             return prev_connection_token
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 1aa9ea3877..c5cdc36955 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -18,7 +18,8 @@
 # [This file includes modifications made by New Vector Limited]
 #
 #
-from typing import TYPE_CHECKING, Callable
+import logging
+from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Tuple
 
 from synapse.http.server import HttpServer, JsonResource
 from synapse.rest import admin
@@ -67,11 +68,64 @@ from synapse.rest.client import (
     voip,
 )
 
+logger = logging.getLogger(__name__)
+
 if TYPE_CHECKING:
     from synapse.server import HomeServer
 
 RegisterServletsFunc = Callable[["HomeServer", HttpServer], None]
 
+CLIENT_SERVLET_FUNCTIONS: Tuple[RegisterServletsFunc, ...] = (
+    versions.register_servlets,
+    initial_sync.register_servlets,
+    room.register_deprecated_servlets,
+    events.register_servlets,
+    room.register_servlets,
+    login.register_servlets,
+    profile.register_servlets,
+    presence.register_servlets,
+    directory.register_servlets,
+    voip.register_servlets,
+    pusher.register_servlets,
+    push_rule.register_servlets,
+    logout.register_servlets,
+    sync.register_servlets,
+    filter.register_servlets,
+    account.register_servlets,
+    register.register_servlets,
+    auth.register_servlets,
+    receipts.register_servlets,
+    read_marker.register_servlets,
+    room_keys.register_servlets,
+    keys.register_servlets,
+    tokenrefresh.register_servlets,
+    tags.register_servlets,
+    account_data.register_servlets,
+    reporting.register_servlets,
+    openid.register_servlets,
+    notifications.register_servlets,
+    devices.register_servlets,
+    thirdparty.register_servlets,
+    sendtodevice.register_servlets,
+    user_directory.register_servlets,
+    room_upgrade_rest_servlet.register_servlets,
+    capabilities.register_servlets,
+    account_validity.register_servlets,
+    relations.register_servlets,
+    password_policy.register_servlets,
+    knock.register_servlets,
+    appservice_ping.register_servlets,
+    admin.register_servlets_for_client_rest_resource,
+    mutual_rooms.register_servlets,
+    login_token_request.register_servlets,
+    rendezvous.register_servlets,
+    auth_issuer.register_servlets,
+)
+
+SERVLET_GROUPS: Dict[str, Iterable[RegisterServletsFunc]] = {
+    "client": CLIENT_SERVLET_FUNCTIONS,
+}
+
 
 class ClientRestResource(JsonResource):
     """Matrix Client API REST resource.
@@ -83,80 +137,56 @@ class ClientRestResource(JsonResource):
        * etc
     """
 
-    def __init__(self, hs: "HomeServer"):
+    def __init__(self, hs: "HomeServer", servlet_groups: Optional[List[str]] = None):
         JsonResource.__init__(self, hs, canonical_json=False)
-        self.register_servlets(self, hs)
+        if hs.config.media.can_load_media_repo:
+            # This import is here to prevent a circular import failure
+            from synapse.rest.client import media
+
+            SERVLET_GROUPS["media"] = (media.register_servlets,)
+        self.register_servlets(self, hs, servlet_groups)
 
     @staticmethod
-    def register_servlets(client_resource: HttpServer, hs: "HomeServer") -> None:
+    def register_servlets(
+        client_resource: HttpServer,
+        hs: "HomeServer",
+        servlet_groups: Optional[Iterable[str]] = None,
+    ) -> None:
         # Some servlets are only registered on the main process (and not worker
         # processes).
         is_main_process = hs.config.worker.worker_app is None
 
-        versions.register_servlets(hs, client_resource)
-
-        # Deprecated in r0
-        initial_sync.register_servlets(hs, client_resource)
-        room.register_deprecated_servlets(hs, client_resource)
-
-        # Partially deprecated in r0
-        events.register_servlets(hs, client_resource)
-
-        room.register_servlets(hs, client_resource)
-        login.register_servlets(hs, client_resource)
-        profile.register_servlets(hs, client_resource)
-        presence.register_servlets(hs, client_resource)
-        directory.register_servlets(hs, client_resource)
-        voip.register_servlets(hs, client_resource)
-        if is_main_process:
-            pusher.register_servlets(hs, client_resource)
-        push_rule.register_servlets(hs, client_resource)
-        if is_main_process:
-            logout.register_servlets(hs, client_resource)
-        sync.register_servlets(hs, client_resource)
-        filter.register_servlets(hs, client_resource)
-        account.register_servlets(hs, client_resource)
-        register.register_servlets(hs, client_resource)
-        if is_main_process:
-            auth.register_servlets(hs, client_resource)
-        receipts.register_servlets(hs, client_resource)
-        read_marker.register_servlets(hs, client_resource)
-        room_keys.register_servlets(hs, client_resource)
-        keys.register_servlets(hs, client_resource)
-        if is_main_process:
-            tokenrefresh.register_servlets(hs, client_resource)
-        tags.register_servlets(hs, client_resource)
-        account_data.register_servlets(hs, client_resource)
-        if is_main_process:
-            reporting.register_servlets(hs, client_resource)
-            openid.register_servlets(hs, client_resource)
-        notifications.register_servlets(hs, client_resource)
-        devices.register_servlets(hs, client_resource)
-        if is_main_process:
-            thirdparty.register_servlets(hs, client_resource)
-        sendtodevice.register_servlets(hs, client_resource)
-        user_directory.register_servlets(hs, client_resource)
-        if is_main_process:
-            room_upgrade_rest_servlet.register_servlets(hs, client_resource)
-        capabilities.register_servlets(hs, client_resource)
-        if is_main_process:
-            account_validity.register_servlets(hs, client_resource)
-        relations.register_servlets(hs, client_resource)
-        password_policy.register_servlets(hs, client_resource)
-        knock.register_servlets(hs, client_resource)
-        appservice_ping.register_servlets(hs, client_resource)
-        if hs.config.media.can_load_media_repo:
-            from synapse.rest.client import media
+        if not servlet_groups:
+            servlet_groups = SERVLET_GROUPS.keys()
 
-            media.register_servlets(hs, client_resource)
+        for servlet_group in servlet_groups:
+            # Fail on unknown servlet groups.
+            if servlet_group not in SERVLET_GROUPS:
+                if servlet_group == "media":
+                    logger.warn(
+                        "media.can_load_media_repo needs to be configured for the media servlet to be available"
+                    )
+                raise RuntimeError(
+                    f"Attempting to register unknown client servlet: '{servlet_group}'"
+                )
 
-        # moving to /_synapse/admin
-        if is_main_process:
-            admin.register_servlets_for_client_rest_resource(hs, client_resource)
+            for servletfunc in SERVLET_GROUPS[servlet_group]:
+                if not is_main_process and servletfunc in [
+                    pusher.register_servlets,
+                    logout.register_servlets,
+                    auth.register_servlets,
+                    tokenrefresh.register_servlets,
+                    reporting.register_servlets,
+                    openid.register_servlets,
+                    thirdparty.register_servlets,
+                    room_upgrade_rest_servlet.register_servlets,
+                    account_validity.register_servlets,
+                    admin.register_servlets_for_client_rest_resource,
+                    mutual_rooms.register_servlets,
+                    login_token_request.register_servlets,
+                    rendezvous.register_servlets,
+                    auth_issuer.register_servlets,
+                ]:
+                    continue
 
-        # unstable
-        if is_main_process:
-            mutual_rooms.register_servlets(hs, client_resource)
-            login_token_request.register_servlets(hs, client_resource)
-            rendezvous.register_servlets(hs, client_resource)
-            auth_issuer.register_servlets(hs, client_resource)
+                servletfunc(hs, client_resource)
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index 8c5db2a513..18142d1c65 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -950,6 +950,18 @@ class SlidingSyncRestServlet(RestServlet):
             logger.info("Client has disconnected; not serializing response.")
             return 200, {}
 
+        if from_token:
+            for room_id, room in sliding_sync_results.rooms.items():
+                logger.info(
+                    "Sliding Sync: Sending room %r, initial: %s, limited: %s, events %d: %s",
+                    room_id,
+                    room.initial,
+                    room.limited,
+                    len(room.timeline_events),
+                    [e.event_id for e in room.timeline_events],
+                )
+
+        # logger.info("Sliding sync response: %r", sliding_sync_results)
         response_content = await self.encode_response(requester, sliding_sync_results)
 
         return 200, response_content
@@ -990,7 +1002,7 @@ class SlidingSyncRestServlet(RestServlet):
         for list_key, list_result in lists.items():
             serialized_lists[list_key] = {
                 "count": list_result.count,
-                "ops": [encode_operation(op) for op in list_result.ops],
+                # "ops": [encode_operation(op) for op in list_result.ops],
             }
 
         return serialized_lists
@@ -1010,7 +1022,7 @@ class SlidingSyncRestServlet(RestServlet):
         serialized_rooms: Dict[str, JsonDict] = {}
         for room_id, room_result in rooms.items():
             serialized_rooms[room_id] = {
-                "bump_stamp": room_result.bump_stamp,
+                "bump_stamp": abs(room_result.bump_stamp),
                 "joined_count": room_result.joined_count,
                 "invited_count": room_result.invited_count,
                 "notification_count": room_result.notification_count,
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 63624f3e8f..9fd50951fa 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -319,6 +319,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
 
         if not backfilled:
             self._events_stream_cache.entity_has_changed(room_id, stream_ordering)  # type: ignore[attr-defined]
+            self._attempt_to_invalidate_cache(
+                "get_max_stream_ordering_in_room", (room_id,)
+            )
 
         if redacts:
             self._invalidate_local_get_event_cache(redacts)  # type: ignore[attr-defined]
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1f7acdb859..0c7c2f9306 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -551,7 +551,7 @@ class PersistEventsStore:
         # From this point onwards the events are only events that we haven't
         # seen before.
 
-        self._store_event_txn(txn, events_and_contexts=events_and_contexts)
+        self._store_event_txn(txn, room_id, events_and_contexts=events_and_contexts)
 
         if new_forward_extremities:
             self._update_forward_extremities_txn(
@@ -1555,6 +1555,7 @@ class PersistEventsStore:
     def _store_event_txn(
         self,
         txn: LoggingTransaction,
+        room_id: str,
         events_and_contexts: Collection[Tuple[EventBase, EventContext]],
     ) -> None:
         """Insert new events into the event, event_json, redaction and
@@ -1629,6 +1630,27 @@ class PersistEventsStore:
             ],
         )
 
+        # Update the `sliding_sync_room_metadata` with the latest
+        # (non-backfilled, ie positive) stream ordering.
+        #
+        # We know this list is sorted and non-empty, so we just take the last
+        # one event.
+        max_stream_ordering: int
+        for e, _ in events_and_contexts:
+            assert e.internal_metadata.stream_ordering is not None
+            max_stream_ordering = e.internal_metadata.stream_ordering
+
+        if max_stream_ordering > 0:
+            self.db_pool.simple_upsert_txn(
+                txn,
+                table="sliding_sync_room_metadata",
+                keyvalues={"room_id": room_id},
+                values={
+                    "instance_name": self._instance_name,
+                    "last_stream_ordering": max_stream_ordering,
+                },
+            )
+
         # If we're persisting an unredacted event we go and ensure
         # that we mark any redactions that reference this event as
         # requiring censoring.
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 4989c960a6..baa23b1bfc 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -79,7 +79,12 @@ from synapse.storage.database import (
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
 from synapse.storage.util.id_generators import MultiWriterIdGenerator
-from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection
+from synapse.types import (
+    JsonDict,
+    PersistedEventPosition,
+    RoomStreamToken,
+    StrCollection,
+)
 from synapse.util.caches.descriptors import cached
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.cancellation import cancellable
@@ -623,6 +628,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         self._stream_order_on_start = self.get_room_max_stream_ordering()
         self._min_stream_order_on_start = self.get_room_min_stream_ordering()
 
+        database.updates.register_background_update_handler(
+            "sliding_sync_room_metadata", self._sliding_sync_room_metadata_bg_update
+        )
+
     def get_room_max_stream_ordering(self) -> int:
         """Get the stream_ordering of regular events that we have committed up to
 
@@ -2213,6 +2222,91 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         return None
 
+    async def _sliding_sync_room_metadata_bg_update(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        """Background update to fill out 'sliding_sync_room_metadata' table"""
+        previous_room = progress.get("previous_room", "")
+
+        def _sliding_sync_room_metadata_bg_update_txn(txn: LoggingTransaction) -> int:
+            # Both these queries are just getting the most recent
+            # instance_name/stream ordering for the next N rooms.
+            if isinstance(self.database_engine, PostgresEngine):
+                sql = """
+                    SELECT room_id, instance_name, stream_ordering FROM rooms AS r,
+                    LATERAL (
+                        SELECT instance_name, stream_ordering
+                        FROM events WHERE events.room_id = r.room_id
+                        ORDER BY stream_ordering DESC
+                        LIMIT 1
+                    ) e
+                    WHERE r.room_id > ?
+                    ORDER BY r.room_id ASC
+                    LIMIT ?
+                """
+            else:
+                sql = """
+                    SELECT
+                        room_id,
+                        (
+                            SELECT instance_name
+                            FROM events WHERE events.room_id = r.room_id
+                            ORDER BY stream_ordering DESC
+                            LIMIT 1
+                        ),
+                        (
+                            SELECT stream_ordering
+                            FROM events WHERE events.room_id = r.room_id
+                            ORDER BY stream_ordering DESC
+                            LIMIT 1
+                        )
+                    FROM rooms AS r
+                    WHERE r.room_id > ?
+                    ORDER BY r.room_id ASC
+                    LIMIT ?
+                """
+
+            txn.execute(sql, (previous_room, batch_size))
+            rows = txn.fetchall()
+            if not rows:
+                return 0
+
+            self.db_pool.simple_upsert_many_txn(
+                txn,
+                table="sliding_sync_room_metadata",
+                key_names=("room_id",),
+                key_values=[(room_id,) for room_id, _, _ in rows],
+                value_names=(
+                    "instance_name",
+                    "last_stream_ordering",
+                ),
+                value_values=[
+                    (
+                        instance_name or "master",
+                        stream,
+                    )
+                    for _, instance_name, stream in rows
+                ],
+            )
+
+            self.db_pool.updates._background_update_progress_txn(
+                txn, "sliding_sync_room_metadata", {"previous_room": rows[-1][0]}
+            )
+
+            return len(rows)
+
+        rows = await self.db_pool.runInteraction(
+            "_sliding_sync_room_metadata_bg_update",
+            _sliding_sync_room_metadata_bg_update_txn,
+        )
+
+        if rows == 0:
+            await self.db_pool.updates._end_background_update(
+                "sliding_sync_room_metadata"
+            )
+
+        return rows
+
     @trace
     def get_rooms_that_might_have_updates(
         self, room_ids: StrCollection, from_token: RoomStreamToken
diff --git a/synapse/storage/schema/main/delta/85/07_sliding_sync.sql b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql
new file mode 100644
index 0000000000..e8bc33ff40
--- /dev/null
+++ b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql
@@ -0,0 +1,24 @@
+--
+-- This file is licensed under the Affero General Public License (AGPL) version 3.
+--
+-- Copyright (C) 2024 New Vector, Ltd
+--
+-- This program is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU Affero General Public License as
+-- published by the Free Software Foundation, either version 3 of the
+-- License, or (at your option) any later version.
+--
+-- See the GNU Affero General Public License for more details:
+-- <https://www.gnu.org/licenses/agpl-3.0.html>.
+
+-- A table that maps from room ID to metadata useful for sliding sync.
+CREATE TABLE sliding_sync_room_metadata (
+    room_id TEXT NOT NULL PRIMARY KEY,
+
+    -- The instance_name / stream ordering of the last event in the room.
+    instance_name TEXT NOT NULL,
+    last_stream_ordering BIGINT NOT NULL
+);
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+    (8507, 'sliding_sync_room_metadata', '{}');
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py
index 63df31ec75..eb050066e9 100644
--- a/tests/rest/client/test_sync.py
+++ b/tests/rest/client/test_sync.py
@@ -33,9 +33,19 @@ from synapse.api.constants import (
     ReceiptTypes,
     RelationTypes,
 )
-from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync
+from synapse.rest.client import (
+    devices,
+    knock,
+    login,
+    read_marker,
+    receipts,
+    room,
+    sync,
+)
 from synapse.server import HomeServer
-from synapse.types import JsonDict
+from synapse.types import (
+    JsonDict,
+)
 from synapse.util import Clock
 
 from tests import unittest
diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py
index c4e216c308..037bbca1ba 100644
--- a/tests/storage/test_event_chain.py
+++ b/tests/storage/test_event_chain.py
@@ -440,6 +440,7 @@ class EventChainStoreTestCase(HomeserverTestCase):
             assert persist_events_store is not None
             persist_events_store._store_event_txn(
                 txn,
+                events[0].room_id,
                 [
                     (e, EventContext(self.hs.get_storage_controllers(), {}))
                     for e in events