diff --git a/changelog.d/17483.bugfix b/changelog.d/17483.bugfix
new file mode 100644
index 0000000000..c97a802dbf
--- /dev/null
+++ b/changelog.d/17483.bugfix
@@ -0,0 +1 @@
+Start handlers for new media endpoints when media resource configured.
diff --git a/changelog.d/17536.misc b/changelog.d/17536.misc
new file mode 100644
index 0000000000..116ef0c36d
--- /dev/null
+++ b/changelog.d/17536.misc
@@ -0,0 +1 @@
+Replace override of deprecated method `HTTPAdapter.get_connection` with `get_connection_with_tls_context`.
\ No newline at end of file
diff --git a/scripts-dev/federation_client.py b/scripts-dev/federation_client.py
index 4c758e5424..fb879ef555 100755
--- a/scripts-dev/federation_client.py
+++ b/scripts-dev/federation_client.py
@@ -43,7 +43,7 @@ import argparse
import base64
import json
import sys
-from typing import Any, Dict, Optional, Tuple
+from typing import Any, Dict, Mapping, Optional, Tuple, Union
from urllib import parse as urlparse
import requests
@@ -75,7 +75,7 @@ def encode_canonical_json(value: object) -> bytes:
value,
# Encode code-points outside of ASCII as UTF-8 rather than \u escapes
ensure_ascii=False,
- # Remove unecessary white space.
+ # Remove unnecessary white space.
separators=(",", ":"),
# Sort the keys of dictionaries.
sort_keys=True,
@@ -298,12 +298,23 @@ class MatrixConnectionAdapter(HTTPAdapter):
return super().send(request, *args, **kwargs)
- def get_connection(
- self, url: str, proxies: Optional[Dict[str, str]] = None
+ def get_connection_with_tls_context(
+ self,
+ request: PreparedRequest,
+ verify: Optional[Union[bool, str]],
+ proxies: Optional[Mapping[str, str]] = None,
+ cert: Optional[Union[Tuple[str, str], str]] = None,
) -> HTTPConnectionPool:
- # overrides the get_connection() method in the base class
- parsed = urlparse.urlsplit(url)
- (host, port, ssl_server_name) = self._lookup(parsed.netloc)
+ # overrides the get_connection_with_tls_context() method in the base class
+ parsed = urlparse.urlsplit(request.url)
+
+ # Extract the server name from the request URL, and ensure it's a str.
+ hostname = parsed.netloc
+ if isinstance(hostname, bytes):
+ hostname = hostname.decode("utf-8")
+ assert isinstance(hostname, str)
+
+ (host, port, ssl_server_name) = self._lookup(hostname)
print(
f"Connecting to {host}:{port} with SNI {ssl_server_name}", file=sys.stderr
)
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 248622fa92..53f1859256 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -206,6 +206,21 @@ class GenericWorkerServer(HomeServer):
"/_synapse/admin": admin_resource,
}
)
+
+ if "federation" not in res.names:
+ # Only load the federation media resource separately if federation
+ # resource is not specified since federation resource includes media
+ # resource.
+ resources[FEDERATION_PREFIX] = TransportLayerServer(
+ self, servlet_groups=["media"]
+ )
+ if "client" not in res.names:
+ # Only load the client media resource separately if client
+ # resource is not specified since client resource includes media
+ # resource.
+ resources[CLIENT_API_PREFIX] = ClientRestResource(
+ self, servlet_groups=["media"]
+ )
else:
logger.warning(
"A 'media' listener is configured but the media"
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e114ab7ec4..2a824e8457 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -101,6 +101,12 @@ class SynapseHomeServer(HomeServer):
# Skip loading openid resource if federation is defined
# since federation resource will include openid
continue
+ if name == "media" and (
+ "federation" in res.names or "client" in res.names
+ ):
+ # Skip loading media resource if federation or client are defined
+ # since federation & client resources will include media
+ continue
if name == "health":
# Skip loading, health resource is always included
continue
@@ -231,6 +237,14 @@ class SynapseHomeServer(HomeServer):
"'media' resource conflicts with enable_media_repo=False"
)
+ if name == "media":
+ resources[FEDERATION_PREFIX] = TransportLayerServer(
+ self, servlet_groups=["media"]
+ )
+ resources[CLIENT_API_PREFIX] = ClientRestResource(
+ self, servlet_groups=["media"]
+ )
+
if name in ["keys", "federation"]:
resources[SERVER_KEY_PREFIX] = KeyResource(self)
diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py
index 72599bb204..43102567db 100644
--- a/synapse/federation/transport/server/__init__.py
+++ b/synapse/federation/transport/server/__init__.py
@@ -271,6 +271,10 @@ SERVLET_GROUPS: Dict[str, Iterable[Type[BaseFederationServlet]]] = {
"federation": FEDERATION_SERVLET_CLASSES,
"room_list": (PublicRoomList,),
"openid": (OpenIdUserInfo,),
+ "media": (
+ FederationMediaDownloadServlet,
+ FederationMediaThumbnailServlet,
+ ),
}
diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index b075a86f68..20f87c885e 100644
--- a/synapse/federation/transport/server/federation.py
+++ b/synapse/federation/transport/server/federation.py
@@ -912,6 +912,4 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
FederationV1SendKnockServlet,
FederationMakeKnockServlet,
FederationAccountStatusServlet,
- FederationMediaDownloadServlet,
- FederationMediaThumbnailServlet,
)
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index ac65b30290..531089c279 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -810,6 +810,7 @@ class SlidingSyncHandler:
connection_position = await self.connection_store.record_rooms(
sync_config=sync_config,
+ room_configs=relevant_room_map,
from_token=from_token,
sent_room_ids=relevant_rooms_to_send_map.keys(),
unsent_room_ids=unsent_room_ids,
@@ -1910,6 +1911,7 @@ class SlidingSyncHandler:
#
# Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
from_bound = None
+ ignore_timeline_bound = False
initial = True
if from_token and not room_membership_for_user_at_to_token.newly_joined:
room_status = await self.connection_store.have_sent_room(
@@ -1917,6 +1919,7 @@ class SlidingSyncHandler:
connection_token=from_token.connection_position,
room_id=room_id,
)
+
if room_status.status == HaveSentRoomFlag.LIVE:
from_bound = from_token.stream_token.room_key
initial = False
@@ -1930,9 +1933,24 @@ class SlidingSyncHandler:
else:
assert_never(room_status.status)
+ if room_status.timeline_limit is not None and (
+ room_status.timeline_limit < room_sync_config.timeline_limit
+ ):
+ # If the timeline limit has been increased since previous
+ # requests then we treat it as request for more events. We do
+ # this by sending down a `limited` sync, ignoring the from
+ # bound.
+ ignore_timeline_bound = True
+
log_kv({"sliding_sync.room_status": room_status})
- log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial})
+ log_kv(
+ {
+ "sliding_sync.from_bound": from_bound,
+ "sliding_sync.ignore_timeline_bound": ignore_timeline_bound,
+ "sliding_sync.initial": initial,
+ }
+ )
# Assemble the list of timeline events
#
@@ -2004,7 +2022,7 @@ class SlidingSyncHandler:
# (from newer to older events) starting at to_bound.
# This ensures we fill the `limit` with the newest events first,
from_key=to_bound,
- to_key=from_bound,
+ to_key=from_bound if not ignore_timeline_bound else None,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
@@ -2343,24 +2361,37 @@ class SlidingSyncHandler:
)
# Figure out the last bump event in the room
- last_bump_event_result = (
- await self.store.get_last_event_pos_in_room_before_stream_ordering(
- room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
+ bump_stamp = None
+ if timeline_events:
+ for e in reversed(timeline_events):
+ assert e.internal_metadata.stream_ordering is not None
+ if (
+ e.type in DEFAULT_BUMP_EVENT_TYPES
+ and e.internal_metadata.stream_ordering > 0
+ ):
+ bump_stamp = e.internal_metadata.stream_ordering
+ break
+
+ if bump_stamp is None:
+ # By default, just choose the membership event position
+ bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
+
+ last_bump_event_result = (
+ await self.store.get_last_event_pos_in_room_before_stream_ordering(
+ room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
+ )
)
- )
- # By default, just choose the membership event position
- bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
- # But if we found a bump event, use that instead
- if last_bump_event_result is not None:
- _, new_bump_event_pos = last_bump_event_result
+ # But if we found a bump event, use that instead
+ if last_bump_event_result is not None:
+ _, new_bump_event_pos = last_bump_event_result
- # If we've just joined a remote room, then the last bump event may
- # have been backfilled (and so have a negative stream ordering).
- # These negative stream orderings can't sensibly be compared, so
- # instead we use the membership event position.
- if new_bump_event_pos.stream > 0:
- bump_stamp = new_bump_event_pos.stream
+ # If we've just joined a remote room, then the last bump event may
+ # have been backfilled (and so have a negative stream ordering).
+ # These negative stream orderings can't sensibly be compared, so
+ # instead we use the membership event position.
+ if new_bump_event_pos.stream > 0:
+ bump_stamp = new_bump_event_pos.stream
set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
@@ -2606,12 +2637,13 @@ class SlidingSyncHandler:
up_to_stream_id=since_stream_id,
)
- logger.debug(
- "Deleted %d to-device messages up to %d for %s",
- deleted,
- since_stream_id,
- user_id,
- )
+ if deleted:
+ logger.debug(
+ "Deleted %d to-device messages up to %d for %s",
+ deleted,
+ since_stream_id,
+ user_id,
+ )
messages, stream_id = await self.store.get_messages_for_device(
user_id=user_id,
@@ -2948,19 +2980,26 @@ class HaveSentRoom:
contains the last stream token of the last updates we sent down
the room, i.e. we still need to send everything since then to the
client.
+ timeline_limit: The timeline limit config for the room, if LIVE or
+ PREVIOUSLY. This is used to track if the client has increased
+ the timeline limit to request more events.
"""
status: HaveSentRoomFlag
last_token: Optional[RoomStreamToken]
+ timeline_limit: Optional[int]
+
+ @staticmethod
+ def live(timeline_limit: int) -> "HaveSentRoom":
+ return HaveSentRoom(HaveSentRoomFlag.LIVE, None, timeline_limit)
@staticmethod
- def previously(last_token: RoomStreamToken) -> "HaveSentRoom":
+ def previously(last_token: RoomStreamToken, timeline_limit: int) -> "HaveSentRoom":
"""Constructor for `PREVIOUSLY` flag."""
- return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token)
+ return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token, timeline_limit)
-HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None)
-HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None)
+HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None, None)
@attr.s(auto_attribs=True)
@@ -3026,6 +3065,7 @@ class SlidingSyncConnectionStore:
async def record_rooms(
self,
sync_config: SlidingSyncConfig,
+ room_configs: Dict[str, RoomSyncConfig],
from_token: Optional[SlidingSyncStreamToken],
*,
sent_room_ids: StrCollection,
@@ -3066,8 +3106,12 @@ class SlidingSyncConnectionStore:
# end we can treat this as a noop.
have_updated = False
for room_id in sent_room_ids:
- new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE
- have_updated = True
+ prev_state = new_room_statuses.get(room_id)
+ new_room_statuses[room_id] = HaveSentRoom.live(
+ room_configs[room_id].timeline_limit
+ )
+ if prev_state != new_room_statuses[room_id]:
+ have_updated = True
# Whether we add/update the entries for unsent rooms depends on the
# existing entry:
@@ -3078,18 +3122,23 @@ class SlidingSyncConnectionStore:
# given token, so we don't need to update the entry.
# - NEVER: We have never previously sent down the room, and we haven't
# sent anything down this time either so we leave it as NEVER.
+ #
+ # We only need to do this if `from_token` is not None, as if it is then
+ # we know that there are no existing entires.
- # Work out the new state for unsent rooms that were `LIVE`.
if from_token:
- new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key)
- else:
- new_unsent_state = HAVE_SENT_ROOM_NEVER
-
- for room_id in unsent_room_ids:
- prev_state = new_room_statuses.get(room_id)
- if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE:
- new_room_statuses[room_id] = new_unsent_state
- have_updated = True
+ for room_id in unsent_room_ids:
+ prev_state = new_room_statuses.get(room_id)
+ if (
+ prev_state is not None
+ and prev_state.status == HaveSentRoomFlag.LIVE
+ ):
+ assert prev_state.timeline_limit is not None
+ new_room_statuses[room_id] = HaveSentRoom.previously(
+ from_token.stream_token.room_key,
+ prev_state.timeline_limit,
+ )
+ have_updated = True
if not have_updated:
return prev_connection_token
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 1aa9ea3877..c5cdc36955 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -18,7 +18,8 @@
# [This file includes modifications made by New Vector Limited]
#
#
-from typing import TYPE_CHECKING, Callable
+import logging
+from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Tuple
from synapse.http.server import HttpServer, JsonResource
from synapse.rest import admin
@@ -67,11 +68,64 @@ from synapse.rest.client import (
voip,
)
+logger = logging.getLogger(__name__)
+
if TYPE_CHECKING:
from synapse.server import HomeServer
RegisterServletsFunc = Callable[["HomeServer", HttpServer], None]
+CLIENT_SERVLET_FUNCTIONS: Tuple[RegisterServletsFunc, ...] = (
+ versions.register_servlets,
+ initial_sync.register_servlets,
+ room.register_deprecated_servlets,
+ events.register_servlets,
+ room.register_servlets,
+ login.register_servlets,
+ profile.register_servlets,
+ presence.register_servlets,
+ directory.register_servlets,
+ voip.register_servlets,
+ pusher.register_servlets,
+ push_rule.register_servlets,
+ logout.register_servlets,
+ sync.register_servlets,
+ filter.register_servlets,
+ account.register_servlets,
+ register.register_servlets,
+ auth.register_servlets,
+ receipts.register_servlets,
+ read_marker.register_servlets,
+ room_keys.register_servlets,
+ keys.register_servlets,
+ tokenrefresh.register_servlets,
+ tags.register_servlets,
+ account_data.register_servlets,
+ reporting.register_servlets,
+ openid.register_servlets,
+ notifications.register_servlets,
+ devices.register_servlets,
+ thirdparty.register_servlets,
+ sendtodevice.register_servlets,
+ user_directory.register_servlets,
+ room_upgrade_rest_servlet.register_servlets,
+ capabilities.register_servlets,
+ account_validity.register_servlets,
+ relations.register_servlets,
+ password_policy.register_servlets,
+ knock.register_servlets,
+ appservice_ping.register_servlets,
+ admin.register_servlets_for_client_rest_resource,
+ mutual_rooms.register_servlets,
+ login_token_request.register_servlets,
+ rendezvous.register_servlets,
+ auth_issuer.register_servlets,
+)
+
+SERVLET_GROUPS: Dict[str, Iterable[RegisterServletsFunc]] = {
+ "client": CLIENT_SERVLET_FUNCTIONS,
+}
+
class ClientRestResource(JsonResource):
"""Matrix Client API REST resource.
@@ -83,80 +137,56 @@ class ClientRestResource(JsonResource):
* etc
"""
- def __init__(self, hs: "HomeServer"):
+ def __init__(self, hs: "HomeServer", servlet_groups: Optional[List[str]] = None):
JsonResource.__init__(self, hs, canonical_json=False)
- self.register_servlets(self, hs)
+ if hs.config.media.can_load_media_repo:
+ # This import is here to prevent a circular import failure
+ from synapse.rest.client import media
+
+ SERVLET_GROUPS["media"] = (media.register_servlets,)
+ self.register_servlets(self, hs, servlet_groups)
@staticmethod
- def register_servlets(client_resource: HttpServer, hs: "HomeServer") -> None:
+ def register_servlets(
+ client_resource: HttpServer,
+ hs: "HomeServer",
+ servlet_groups: Optional[Iterable[str]] = None,
+ ) -> None:
# Some servlets are only registered on the main process (and not worker
# processes).
is_main_process = hs.config.worker.worker_app is None
- versions.register_servlets(hs, client_resource)
-
- # Deprecated in r0
- initial_sync.register_servlets(hs, client_resource)
- room.register_deprecated_servlets(hs, client_resource)
-
- # Partially deprecated in r0
- events.register_servlets(hs, client_resource)
-
- room.register_servlets(hs, client_resource)
- login.register_servlets(hs, client_resource)
- profile.register_servlets(hs, client_resource)
- presence.register_servlets(hs, client_resource)
- directory.register_servlets(hs, client_resource)
- voip.register_servlets(hs, client_resource)
- if is_main_process:
- pusher.register_servlets(hs, client_resource)
- push_rule.register_servlets(hs, client_resource)
- if is_main_process:
- logout.register_servlets(hs, client_resource)
- sync.register_servlets(hs, client_resource)
- filter.register_servlets(hs, client_resource)
- account.register_servlets(hs, client_resource)
- register.register_servlets(hs, client_resource)
- if is_main_process:
- auth.register_servlets(hs, client_resource)
- receipts.register_servlets(hs, client_resource)
- read_marker.register_servlets(hs, client_resource)
- room_keys.register_servlets(hs, client_resource)
- keys.register_servlets(hs, client_resource)
- if is_main_process:
- tokenrefresh.register_servlets(hs, client_resource)
- tags.register_servlets(hs, client_resource)
- account_data.register_servlets(hs, client_resource)
- if is_main_process:
- reporting.register_servlets(hs, client_resource)
- openid.register_servlets(hs, client_resource)
- notifications.register_servlets(hs, client_resource)
- devices.register_servlets(hs, client_resource)
- if is_main_process:
- thirdparty.register_servlets(hs, client_resource)
- sendtodevice.register_servlets(hs, client_resource)
- user_directory.register_servlets(hs, client_resource)
- if is_main_process:
- room_upgrade_rest_servlet.register_servlets(hs, client_resource)
- capabilities.register_servlets(hs, client_resource)
- if is_main_process:
- account_validity.register_servlets(hs, client_resource)
- relations.register_servlets(hs, client_resource)
- password_policy.register_servlets(hs, client_resource)
- knock.register_servlets(hs, client_resource)
- appservice_ping.register_servlets(hs, client_resource)
- if hs.config.media.can_load_media_repo:
- from synapse.rest.client import media
+ if not servlet_groups:
+ servlet_groups = SERVLET_GROUPS.keys()
- media.register_servlets(hs, client_resource)
+ for servlet_group in servlet_groups:
+ # Fail on unknown servlet groups.
+ if servlet_group not in SERVLET_GROUPS:
+ if servlet_group == "media":
+ logger.warn(
+ "media.can_load_media_repo needs to be configured for the media servlet to be available"
+ )
+ raise RuntimeError(
+ f"Attempting to register unknown client servlet: '{servlet_group}'"
+ )
- # moving to /_synapse/admin
- if is_main_process:
- admin.register_servlets_for_client_rest_resource(hs, client_resource)
+ for servletfunc in SERVLET_GROUPS[servlet_group]:
+ if not is_main_process and servletfunc in [
+ pusher.register_servlets,
+ logout.register_servlets,
+ auth.register_servlets,
+ tokenrefresh.register_servlets,
+ reporting.register_servlets,
+ openid.register_servlets,
+ thirdparty.register_servlets,
+ room_upgrade_rest_servlet.register_servlets,
+ account_validity.register_servlets,
+ admin.register_servlets_for_client_rest_resource,
+ mutual_rooms.register_servlets,
+ login_token_request.register_servlets,
+ rendezvous.register_servlets,
+ auth_issuer.register_servlets,
+ ]:
+ continue
- # unstable
- if is_main_process:
- mutual_rooms.register_servlets(hs, client_resource)
- login_token_request.register_servlets(hs, client_resource)
- rendezvous.register_servlets(hs, client_resource)
- auth_issuer.register_servlets(hs, client_resource)
+ servletfunc(hs, client_resource)
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index 8c5db2a513..18142d1c65 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -950,6 +950,18 @@ class SlidingSyncRestServlet(RestServlet):
logger.info("Client has disconnected; not serializing response.")
return 200, {}
+ if from_token:
+ for room_id, room in sliding_sync_results.rooms.items():
+ logger.info(
+ "Sliding Sync: Sending room %r, initial: %s, limited: %s, events %d: %s",
+ room_id,
+ room.initial,
+ room.limited,
+ len(room.timeline_events),
+ [e.event_id for e in room.timeline_events],
+ )
+
+ # logger.info("Sliding sync response: %r", sliding_sync_results)
response_content = await self.encode_response(requester, sliding_sync_results)
return 200, response_content
@@ -990,7 +1002,7 @@ class SlidingSyncRestServlet(RestServlet):
for list_key, list_result in lists.items():
serialized_lists[list_key] = {
"count": list_result.count,
- "ops": [encode_operation(op) for op in list_result.ops],
+ # "ops": [encode_operation(op) for op in list_result.ops],
}
return serialized_lists
@@ -1010,7 +1022,7 @@ class SlidingSyncRestServlet(RestServlet):
serialized_rooms: Dict[str, JsonDict] = {}
for room_id, room_result in rooms.items():
serialized_rooms[room_id] = {
- "bump_stamp": room_result.bump_stamp,
+ "bump_stamp": abs(room_result.bump_stamp),
"joined_count": room_result.joined_count,
"invited_count": room_result.invited_count,
"notification_count": room_result.notification_count,
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 63624f3e8f..9fd50951fa 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -319,6 +319,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
if not backfilled:
self._events_stream_cache.entity_has_changed(room_id, stream_ordering) # type: ignore[attr-defined]
+ self._attempt_to_invalidate_cache(
+ "get_max_stream_ordering_in_room", (room_id,)
+ )
if redacts:
self._invalidate_local_get_event_cache(redacts) # type: ignore[attr-defined]
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1f7acdb859..0c7c2f9306 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -551,7 +551,7 @@ class PersistEventsStore:
# From this point onwards the events are only events that we haven't
# seen before.
- self._store_event_txn(txn, events_and_contexts=events_and_contexts)
+ self._store_event_txn(txn, room_id, events_and_contexts=events_and_contexts)
if new_forward_extremities:
self._update_forward_extremities_txn(
@@ -1555,6 +1555,7 @@ class PersistEventsStore:
def _store_event_txn(
self,
txn: LoggingTransaction,
+ room_id: str,
events_and_contexts: Collection[Tuple[EventBase, EventContext]],
) -> None:
"""Insert new events into the event, event_json, redaction and
@@ -1629,6 +1630,27 @@ class PersistEventsStore:
],
)
+ # Update the `sliding_sync_room_metadata` with the latest
+ # (non-backfilled, ie positive) stream ordering.
+ #
+ # We know this list is sorted and non-empty, so we just take the last
+ # one event.
+ max_stream_ordering: int
+ for e, _ in events_and_contexts:
+ assert e.internal_metadata.stream_ordering is not None
+ max_stream_ordering = e.internal_metadata.stream_ordering
+
+ if max_stream_ordering > 0:
+ self.db_pool.simple_upsert_txn(
+ txn,
+ table="sliding_sync_room_metadata",
+ keyvalues={"room_id": room_id},
+ values={
+ "instance_name": self._instance_name,
+ "last_stream_ordering": max_stream_ordering,
+ },
+ )
+
# If we're persisting an unredacted event we go and ensure
# that we mark any redactions that reference this event as
# requiring censoring.
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 4989c960a6..baa23b1bfc 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -79,7 +79,12 @@ from synapse.storage.database import (
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.util.id_generators import MultiWriterIdGenerator
-from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection
+from synapse.types import (
+ JsonDict,
+ PersistedEventPosition,
+ RoomStreamToken,
+ StrCollection,
+)
from synapse.util.caches.descriptors import cached
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.cancellation import cancellable
@@ -623,6 +628,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
self._stream_order_on_start = self.get_room_max_stream_ordering()
self._min_stream_order_on_start = self.get_room_min_stream_ordering()
+ database.updates.register_background_update_handler(
+ "sliding_sync_room_metadata", self._sliding_sync_room_metadata_bg_update
+ )
+
def get_room_max_stream_ordering(self) -> int:
"""Get the stream_ordering of regular events that we have committed up to
@@ -2213,6 +2222,91 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return None
+ async def _sliding_sync_room_metadata_bg_update(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
+ """Background update to fill out 'sliding_sync_room_metadata' table"""
+ previous_room = progress.get("previous_room", "")
+
+ def _sliding_sync_room_metadata_bg_update_txn(txn: LoggingTransaction) -> int:
+ # Both these queries are just getting the most recent
+ # instance_name/stream ordering for the next N rooms.
+ if isinstance(self.database_engine, PostgresEngine):
+ sql = """
+ SELECT room_id, instance_name, stream_ordering FROM rooms AS r,
+ LATERAL (
+ SELECT instance_name, stream_ordering
+ FROM events WHERE events.room_id = r.room_id
+ ORDER BY stream_ordering DESC
+ LIMIT 1
+ ) e
+ WHERE r.room_id > ?
+ ORDER BY r.room_id ASC
+ LIMIT ?
+ """
+ else:
+ sql = """
+ SELECT
+ room_id,
+ (
+ SELECT instance_name
+ FROM events WHERE events.room_id = r.room_id
+ ORDER BY stream_ordering DESC
+ LIMIT 1
+ ),
+ (
+ SELECT stream_ordering
+ FROM events WHERE events.room_id = r.room_id
+ ORDER BY stream_ordering DESC
+ LIMIT 1
+ )
+ FROM rooms AS r
+ WHERE r.room_id > ?
+ ORDER BY r.room_id ASC
+ LIMIT ?
+ """
+
+ txn.execute(sql, (previous_room, batch_size))
+ rows = txn.fetchall()
+ if not rows:
+ return 0
+
+ self.db_pool.simple_upsert_many_txn(
+ txn,
+ table="sliding_sync_room_metadata",
+ key_names=("room_id",),
+ key_values=[(room_id,) for room_id, _, _ in rows],
+ value_names=(
+ "instance_name",
+ "last_stream_ordering",
+ ),
+ value_values=[
+ (
+ instance_name or "master",
+ stream,
+ )
+ for _, instance_name, stream in rows
+ ],
+ )
+
+ self.db_pool.updates._background_update_progress_txn(
+ txn, "sliding_sync_room_metadata", {"previous_room": rows[-1][0]}
+ )
+
+ return len(rows)
+
+ rows = await self.db_pool.runInteraction(
+ "_sliding_sync_room_metadata_bg_update",
+ _sliding_sync_room_metadata_bg_update_txn,
+ )
+
+ if rows == 0:
+ await self.db_pool.updates._end_background_update(
+ "sliding_sync_room_metadata"
+ )
+
+ return rows
+
@trace
def get_rooms_that_might_have_updates(
self, room_ids: StrCollection, from_token: RoomStreamToken
diff --git a/synapse/storage/schema/main/delta/85/07_sliding_sync.sql b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql
new file mode 100644
index 0000000000..e8bc33ff40
--- /dev/null
+++ b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql
@@ -0,0 +1,24 @@
+--
+-- This file is licensed under the Affero General Public License (AGPL) version 3.
+--
+-- Copyright (C) 2024 New Vector, Ltd
+--
+-- This program is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU Affero General Public License as
+-- published by the Free Software Foundation, either version 3 of the
+-- License, or (at your option) any later version.
+--
+-- See the GNU Affero General Public License for more details:
+-- <https://www.gnu.org/licenses/agpl-3.0.html>.
+
+-- A table that maps from room ID to metadata useful for sliding sync.
+CREATE TABLE sliding_sync_room_metadata (
+ room_id TEXT NOT NULL PRIMARY KEY,
+
+ -- The instance_name / stream ordering of the last event in the room.
+ instance_name TEXT NOT NULL,
+ last_stream_ordering BIGINT NOT NULL
+);
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+ (8507, 'sliding_sync_room_metadata', '{}');
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py
index 63df31ec75..eb050066e9 100644
--- a/tests/rest/client/test_sync.py
+++ b/tests/rest/client/test_sync.py
@@ -33,9 +33,19 @@ from synapse.api.constants import (
ReceiptTypes,
RelationTypes,
)
-from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync
+from synapse.rest.client import (
+ devices,
+ knock,
+ login,
+ read_marker,
+ receipts,
+ room,
+ sync,
+)
from synapse.server import HomeServer
-from synapse.types import JsonDict
+from synapse.types import (
+ JsonDict,
+)
from synapse.util import Clock
from tests import unittest
diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py
index c4e216c308..037bbca1ba 100644
--- a/tests/storage/test_event_chain.py
+++ b/tests/storage/test_event_chain.py
@@ -440,6 +440,7 @@ class EventChainStoreTestCase(HomeserverTestCase):
assert persist_events_store is not None
persist_events_store._store_event_txn(
txn,
+ events[0].room_id,
[
(e, EventContext(self.hs.get_storage_controllers(), {}))
for e in events
|