summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/17483.bugfix1
-rw-r--r--changelog.d/17536.misc1
-rwxr-xr-xscripts-dev/federation_client.py25
-rw-r--r--synapse/app/generic_worker.py15
-rw-r--r--synapse/app/homeserver.py14
-rw-r--r--synapse/federation/transport/server/__init__.py4
-rw-r--r--synapse/federation/transport/server/federation.py2
-rw-r--r--synapse/handlers/sliding_sync.py127
-rw-r--r--synapse/rest/__init__.py166
-rw-r--r--synapse/rest/client/sync.py16
-rw-r--r--synapse/storage/databases/main/cache.py3
-rw-r--r--synapse/storage/databases/main/events.py24
-rw-r--r--synapse/storage/databases/main/stream.py96
-rw-r--r--synapse/storage/schema/main/delta/85/07_sliding_sync.sql24
-rw-r--r--tests/rest/client/test_sync.py14
-rw-r--r--tests/storage/test_event_chain.py1
16 files changed, 411 insertions, 122 deletions
diff --git a/changelog.d/17483.bugfix b/changelog.d/17483.bugfix
new file mode 100644

index 0000000000..c97a802dbf --- /dev/null +++ b/changelog.d/17483.bugfix
@@ -0,0 +1 @@ +Start handlers for new media endpoints when media resource configured. diff --git a/changelog.d/17536.misc b/changelog.d/17536.misc new file mode 100644
index 0000000000..116ef0c36d --- /dev/null +++ b/changelog.d/17536.misc
@@ -0,0 +1 @@ +Replace override of deprecated method `HTTPAdapter.get_connection` with `get_connection_with_tls_context`. \ No newline at end of file diff --git a/scripts-dev/federation_client.py b/scripts-dev/federation_client.py
index 4c758e5424..fb879ef555 100755 --- a/scripts-dev/federation_client.py +++ b/scripts-dev/federation_client.py
@@ -43,7 +43,7 @@ import argparse import base64 import json import sys -from typing import Any, Dict, Optional, Tuple +from typing import Any, Dict, Mapping, Optional, Tuple, Union from urllib import parse as urlparse import requests @@ -75,7 +75,7 @@ def encode_canonical_json(value: object) -> bytes: value, # Encode code-points outside of ASCII as UTF-8 rather than \u escapes ensure_ascii=False, - # Remove unecessary white space. + # Remove unnecessary white space. separators=(",", ":"), # Sort the keys of dictionaries. sort_keys=True, @@ -298,12 +298,23 @@ class MatrixConnectionAdapter(HTTPAdapter): return super().send(request, *args, **kwargs) - def get_connection( - self, url: str, proxies: Optional[Dict[str, str]] = None + def get_connection_with_tls_context( + self, + request: PreparedRequest, + verify: Optional[Union[bool, str]], + proxies: Optional[Mapping[str, str]] = None, + cert: Optional[Union[Tuple[str, str], str]] = None, ) -> HTTPConnectionPool: - # overrides the get_connection() method in the base class - parsed = urlparse.urlsplit(url) - (host, port, ssl_server_name) = self._lookup(parsed.netloc) + # overrides the get_connection_with_tls_context() method in the base class + parsed = urlparse.urlsplit(request.url) + + # Extract the server name from the request URL, and ensure it's a str. + hostname = parsed.netloc + if isinstance(hostname, bytes): + hostname = hostname.decode("utf-8") + assert isinstance(hostname, str) + + (host, port, ssl_server_name) = self._lookup(hostname) print( f"Connecting to {host}:{port} with SNI {ssl_server_name}", file=sys.stderr ) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 248622fa92..53f1859256 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py
@@ -206,6 +206,21 @@ class GenericWorkerServer(HomeServer): "/_synapse/admin": admin_resource, } ) + + if "federation" not in res.names: + # Only load the federation media resource separately if federation + # resource is not specified since federation resource includes media + # resource. + resources[FEDERATION_PREFIX] = TransportLayerServer( + self, servlet_groups=["media"] + ) + if "client" not in res.names: + # Only load the client media resource separately if client + # resource is not specified since client resource includes media + # resource. + resources[CLIENT_API_PREFIX] = ClientRestResource( + self, servlet_groups=["media"] + ) else: logger.warning( "A 'media' listener is configured but the media" diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e114ab7ec4..2a824e8457 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py
@@ -101,6 +101,12 @@ class SynapseHomeServer(HomeServer): # Skip loading openid resource if federation is defined # since federation resource will include openid continue + if name == "media" and ( + "federation" in res.names or "client" in res.names + ): + # Skip loading media resource if federation or client are defined + # since federation & client resources will include media + continue if name == "health": # Skip loading, health resource is always included continue @@ -231,6 +237,14 @@ class SynapseHomeServer(HomeServer): "'media' resource conflicts with enable_media_repo=False" ) + if name == "media": + resources[FEDERATION_PREFIX] = TransportLayerServer( + self, servlet_groups=["media"] + ) + resources[CLIENT_API_PREFIX] = ClientRestResource( + self, servlet_groups=["media"] + ) + if name in ["keys", "federation"]: resources[SERVER_KEY_PREFIX] = KeyResource(self) diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py
index 72599bb204..43102567db 100644 --- a/synapse/federation/transport/server/__init__.py +++ b/synapse/federation/transport/server/__init__.py
@@ -271,6 +271,10 @@ SERVLET_GROUPS: Dict[str, Iterable[Type[BaseFederationServlet]]] = { "federation": FEDERATION_SERVLET_CLASSES, "room_list": (PublicRoomList,), "openid": (OpenIdUserInfo,), + "media": ( + FederationMediaDownloadServlet, + FederationMediaThumbnailServlet, + ), } diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index b075a86f68..20f87c885e 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py
@@ -912,6 +912,4 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = ( FederationV1SendKnockServlet, FederationMakeKnockServlet, FederationAccountStatusServlet, - FederationMediaDownloadServlet, - FederationMediaThumbnailServlet, ) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index ac65b30290..531089c279 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py
@@ -810,6 +810,7 @@ class SlidingSyncHandler: connection_position = await self.connection_store.record_rooms( sync_config=sync_config, + room_configs=relevant_room_map, from_token=from_token, sent_room_ids=relevant_rooms_to_send_map.keys(), unsent_room_ids=unsent_room_ids, @@ -1910,6 +1911,7 @@ class SlidingSyncHandler: # # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917 from_bound = None + ignore_timeline_bound = False initial = True if from_token and not room_membership_for_user_at_to_token.newly_joined: room_status = await self.connection_store.have_sent_room( @@ -1917,6 +1919,7 @@ class SlidingSyncHandler: connection_token=from_token.connection_position, room_id=room_id, ) + if room_status.status == HaveSentRoomFlag.LIVE: from_bound = from_token.stream_token.room_key initial = False @@ -1930,9 +1933,24 @@ class SlidingSyncHandler: else: assert_never(room_status.status) + if room_status.timeline_limit is not None and ( + room_status.timeline_limit < room_sync_config.timeline_limit + ): + # If the timeline limit has been increased since previous + # requests then we treat it as request for more events. We do + # this by sending down a `limited` sync, ignoring the from + # bound. + ignore_timeline_bound = True + log_kv({"sliding_sync.room_status": room_status}) - log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial}) + log_kv( + { + "sliding_sync.from_bound": from_bound, + "sliding_sync.ignore_timeline_bound": ignore_timeline_bound, + "sliding_sync.initial": initial, + } + ) # Assemble the list of timeline events # @@ -2004,7 +2022,7 @@ class SlidingSyncHandler: # (from newer to older events) starting at to_bound. # This ensures we fill the `limit` with the newest events first, from_key=to_bound, - to_key=from_bound, + to_key=from_bound if not ignore_timeline_bound else None, direction=Direction.BACKWARDS, # We add one so we can determine if there are enough events to saturate # the limit or not (see `limited`) @@ -2343,24 +2361,37 @@ class SlidingSyncHandler: ) # Figure out the last bump event in the room - last_bump_event_result = ( - await self.store.get_last_event_pos_in_room_before_stream_ordering( - room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES + bump_stamp = None + if timeline_events: + for e in reversed(timeline_events): + assert e.internal_metadata.stream_ordering is not None + if ( + e.type in DEFAULT_BUMP_EVENT_TYPES + and e.internal_metadata.stream_ordering > 0 + ): + bump_stamp = e.internal_metadata.stream_ordering + break + + if bump_stamp is None: + # By default, just choose the membership event position + bump_stamp = room_membership_for_user_at_to_token.event_pos.stream + + last_bump_event_result = ( + await self.store.get_last_event_pos_in_room_before_stream_ordering( + room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES + ) ) - ) - # By default, just choose the membership event position - bump_stamp = room_membership_for_user_at_to_token.event_pos.stream - # But if we found a bump event, use that instead - if last_bump_event_result is not None: - _, new_bump_event_pos = last_bump_event_result + # But if we found a bump event, use that instead + if last_bump_event_result is not None: + _, new_bump_event_pos = last_bump_event_result - # If we've just joined a remote room, then the last bump event may - # have been backfilled (and so have a negative stream ordering). - # These negative stream orderings can't sensibly be compared, so - # instead we use the membership event position. - if new_bump_event_pos.stream > 0: - bump_stamp = new_bump_event_pos.stream + # If we've just joined a remote room, then the last bump event may + # have been backfilled (and so have a negative stream ordering). + # These negative stream orderings can't sensibly be compared, so + # instead we use the membership event position. + if new_bump_event_pos.stream > 0: + bump_stamp = new_bump_event_pos.stream set_tag(SynapseTags.RESULT_PREFIX + "initial", initial) @@ -2606,12 +2637,13 @@ class SlidingSyncHandler: up_to_stream_id=since_stream_id, ) - logger.debug( - "Deleted %d to-device messages up to %d for %s", - deleted, - since_stream_id, - user_id, - ) + if deleted: + logger.debug( + "Deleted %d to-device messages up to %d for %s", + deleted, + since_stream_id, + user_id, + ) messages, stream_id = await self.store.get_messages_for_device( user_id=user_id, @@ -2948,19 +2980,26 @@ class HaveSentRoom: contains the last stream token of the last updates we sent down the room, i.e. we still need to send everything since then to the client. + timeline_limit: The timeline limit config for the room, if LIVE or + PREVIOUSLY. This is used to track if the client has increased + the timeline limit to request more events. """ status: HaveSentRoomFlag last_token: Optional[RoomStreamToken] + timeline_limit: Optional[int] + + @staticmethod + def live(timeline_limit: int) -> "HaveSentRoom": + return HaveSentRoom(HaveSentRoomFlag.LIVE, None, timeline_limit) @staticmethod - def previously(last_token: RoomStreamToken) -> "HaveSentRoom": + def previously(last_token: RoomStreamToken, timeline_limit: int) -> "HaveSentRoom": """Constructor for `PREVIOUSLY` flag.""" - return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token) + return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token, timeline_limit) -HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None) -HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None) +HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None, None) @attr.s(auto_attribs=True) @@ -3026,6 +3065,7 @@ class SlidingSyncConnectionStore: async def record_rooms( self, sync_config: SlidingSyncConfig, + room_configs: Dict[str, RoomSyncConfig], from_token: Optional[SlidingSyncStreamToken], *, sent_room_ids: StrCollection, @@ -3066,8 +3106,12 @@ class SlidingSyncConnectionStore: # end we can treat this as a noop. have_updated = False for room_id in sent_room_ids: - new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE - have_updated = True + prev_state = new_room_statuses.get(room_id) + new_room_statuses[room_id] = HaveSentRoom.live( + room_configs[room_id].timeline_limit + ) + if prev_state != new_room_statuses[room_id]: + have_updated = True # Whether we add/update the entries for unsent rooms depends on the # existing entry: @@ -3078,18 +3122,23 @@ class SlidingSyncConnectionStore: # given token, so we don't need to update the entry. # - NEVER: We have never previously sent down the room, and we haven't # sent anything down this time either so we leave it as NEVER. + # + # We only need to do this if `from_token` is not None, as if it is then + # we know that there are no existing entires. - # Work out the new state for unsent rooms that were `LIVE`. if from_token: - new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key) - else: - new_unsent_state = HAVE_SENT_ROOM_NEVER - - for room_id in unsent_room_ids: - prev_state = new_room_statuses.get(room_id) - if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE: - new_room_statuses[room_id] = new_unsent_state - have_updated = True + for room_id in unsent_room_ids: + prev_state = new_room_statuses.get(room_id) + if ( + prev_state is not None + and prev_state.status == HaveSentRoomFlag.LIVE + ): + assert prev_state.timeline_limit is not None + new_room_statuses[room_id] = HaveSentRoom.previously( + from_token.stream_token.room_key, + prev_state.timeline_limit, + ) + have_updated = True if not have_updated: return prev_connection_token diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 1aa9ea3877..c5cdc36955 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py
@@ -18,7 +18,8 @@ # [This file includes modifications made by New Vector Limited] # # -from typing import TYPE_CHECKING, Callable +import logging +from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Tuple from synapse.http.server import HttpServer, JsonResource from synapse.rest import admin @@ -67,11 +68,64 @@ from synapse.rest.client import ( voip, ) +logger = logging.getLogger(__name__) + if TYPE_CHECKING: from synapse.server import HomeServer RegisterServletsFunc = Callable[["HomeServer", HttpServer], None] +CLIENT_SERVLET_FUNCTIONS: Tuple[RegisterServletsFunc, ...] = ( + versions.register_servlets, + initial_sync.register_servlets, + room.register_deprecated_servlets, + events.register_servlets, + room.register_servlets, + login.register_servlets, + profile.register_servlets, + presence.register_servlets, + directory.register_servlets, + voip.register_servlets, + pusher.register_servlets, + push_rule.register_servlets, + logout.register_servlets, + sync.register_servlets, + filter.register_servlets, + account.register_servlets, + register.register_servlets, + auth.register_servlets, + receipts.register_servlets, + read_marker.register_servlets, + room_keys.register_servlets, + keys.register_servlets, + tokenrefresh.register_servlets, + tags.register_servlets, + account_data.register_servlets, + reporting.register_servlets, + openid.register_servlets, + notifications.register_servlets, + devices.register_servlets, + thirdparty.register_servlets, + sendtodevice.register_servlets, + user_directory.register_servlets, + room_upgrade_rest_servlet.register_servlets, + capabilities.register_servlets, + account_validity.register_servlets, + relations.register_servlets, + password_policy.register_servlets, + knock.register_servlets, + appservice_ping.register_servlets, + admin.register_servlets_for_client_rest_resource, + mutual_rooms.register_servlets, + login_token_request.register_servlets, + rendezvous.register_servlets, + auth_issuer.register_servlets, +) + +SERVLET_GROUPS: Dict[str, Iterable[RegisterServletsFunc]] = { + "client": CLIENT_SERVLET_FUNCTIONS, +} + class ClientRestResource(JsonResource): """Matrix Client API REST resource. @@ -83,80 +137,56 @@ class ClientRestResource(JsonResource): * etc """ - def __init__(self, hs: "HomeServer"): + def __init__(self, hs: "HomeServer", servlet_groups: Optional[List[str]] = None): JsonResource.__init__(self, hs, canonical_json=False) - self.register_servlets(self, hs) + if hs.config.media.can_load_media_repo: + # This import is here to prevent a circular import failure + from synapse.rest.client import media + + SERVLET_GROUPS["media"] = (media.register_servlets,) + self.register_servlets(self, hs, servlet_groups) @staticmethod - def register_servlets(client_resource: HttpServer, hs: "HomeServer") -> None: + def register_servlets( + client_resource: HttpServer, + hs: "HomeServer", + servlet_groups: Optional[Iterable[str]] = None, + ) -> None: # Some servlets are only registered on the main process (and not worker # processes). is_main_process = hs.config.worker.worker_app is None - versions.register_servlets(hs, client_resource) - - # Deprecated in r0 - initial_sync.register_servlets(hs, client_resource) - room.register_deprecated_servlets(hs, client_resource) - - # Partially deprecated in r0 - events.register_servlets(hs, client_resource) - - room.register_servlets(hs, client_resource) - login.register_servlets(hs, client_resource) - profile.register_servlets(hs, client_resource) - presence.register_servlets(hs, client_resource) - directory.register_servlets(hs, client_resource) - voip.register_servlets(hs, client_resource) - if is_main_process: - pusher.register_servlets(hs, client_resource) - push_rule.register_servlets(hs, client_resource) - if is_main_process: - logout.register_servlets(hs, client_resource) - sync.register_servlets(hs, client_resource) - filter.register_servlets(hs, client_resource) - account.register_servlets(hs, client_resource) - register.register_servlets(hs, client_resource) - if is_main_process: - auth.register_servlets(hs, client_resource) - receipts.register_servlets(hs, client_resource) - read_marker.register_servlets(hs, client_resource) - room_keys.register_servlets(hs, client_resource) - keys.register_servlets(hs, client_resource) - if is_main_process: - tokenrefresh.register_servlets(hs, client_resource) - tags.register_servlets(hs, client_resource) - account_data.register_servlets(hs, client_resource) - if is_main_process: - reporting.register_servlets(hs, client_resource) - openid.register_servlets(hs, client_resource) - notifications.register_servlets(hs, client_resource) - devices.register_servlets(hs, client_resource) - if is_main_process: - thirdparty.register_servlets(hs, client_resource) - sendtodevice.register_servlets(hs, client_resource) - user_directory.register_servlets(hs, client_resource) - if is_main_process: - room_upgrade_rest_servlet.register_servlets(hs, client_resource) - capabilities.register_servlets(hs, client_resource) - if is_main_process: - account_validity.register_servlets(hs, client_resource) - relations.register_servlets(hs, client_resource) - password_policy.register_servlets(hs, client_resource) - knock.register_servlets(hs, client_resource) - appservice_ping.register_servlets(hs, client_resource) - if hs.config.media.can_load_media_repo: - from synapse.rest.client import media + if not servlet_groups: + servlet_groups = SERVLET_GROUPS.keys() - media.register_servlets(hs, client_resource) + for servlet_group in servlet_groups: + # Fail on unknown servlet groups. + if servlet_group not in SERVLET_GROUPS: + if servlet_group == "media": + logger.warn( + "media.can_load_media_repo needs to be configured for the media servlet to be available" + ) + raise RuntimeError( + f"Attempting to register unknown client servlet: '{servlet_group}'" + ) - # moving to /_synapse/admin - if is_main_process: - admin.register_servlets_for_client_rest_resource(hs, client_resource) + for servletfunc in SERVLET_GROUPS[servlet_group]: + if not is_main_process and servletfunc in [ + pusher.register_servlets, + logout.register_servlets, + auth.register_servlets, + tokenrefresh.register_servlets, + reporting.register_servlets, + openid.register_servlets, + thirdparty.register_servlets, + room_upgrade_rest_servlet.register_servlets, + account_validity.register_servlets, + admin.register_servlets_for_client_rest_resource, + mutual_rooms.register_servlets, + login_token_request.register_servlets, + rendezvous.register_servlets, + auth_issuer.register_servlets, + ]: + continue - # unstable - if is_main_process: - mutual_rooms.register_servlets(hs, client_resource) - login_token_request.register_servlets(hs, client_resource) - rendezvous.register_servlets(hs, client_resource) - auth_issuer.register_servlets(hs, client_resource) + servletfunc(hs, client_resource) diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index 8c5db2a513..18142d1c65 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py
@@ -950,6 +950,18 @@ class SlidingSyncRestServlet(RestServlet): logger.info("Client has disconnected; not serializing response.") return 200, {} + if from_token: + for room_id, room in sliding_sync_results.rooms.items(): + logger.info( + "Sliding Sync: Sending room %r, initial: %s, limited: %s, events %d: %s", + room_id, + room.initial, + room.limited, + len(room.timeline_events), + [e.event_id for e in room.timeline_events], + ) + + # logger.info("Sliding sync response: %r", sliding_sync_results) response_content = await self.encode_response(requester, sliding_sync_results) return 200, response_content @@ -990,7 +1002,7 @@ class SlidingSyncRestServlet(RestServlet): for list_key, list_result in lists.items(): serialized_lists[list_key] = { "count": list_result.count, - "ops": [encode_operation(op) for op in list_result.ops], + # "ops": [encode_operation(op) for op in list_result.ops], } return serialized_lists @@ -1010,7 +1022,7 @@ class SlidingSyncRestServlet(RestServlet): serialized_rooms: Dict[str, JsonDict] = {} for room_id, room_result in rooms.items(): serialized_rooms[room_id] = { - "bump_stamp": room_result.bump_stamp, + "bump_stamp": abs(room_result.bump_stamp), "joined_count": room_result.joined_count, "invited_count": room_result.invited_count, "notification_count": room_result.notification_count, diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 63624f3e8f..9fd50951fa 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py
@@ -319,6 +319,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore): if not backfilled: self._events_stream_cache.entity_has_changed(room_id, stream_ordering) # type: ignore[attr-defined] + self._attempt_to_invalidate_cache( + "get_max_stream_ordering_in_room", (room_id,) + ) if redacts: self._invalidate_local_get_event_cache(redacts) # type: ignore[attr-defined] diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1f7acdb859..0c7c2f9306 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py
@@ -551,7 +551,7 @@ class PersistEventsStore: # From this point onwards the events are only events that we haven't # seen before. - self._store_event_txn(txn, events_and_contexts=events_and_contexts) + self._store_event_txn(txn, room_id, events_and_contexts=events_and_contexts) if new_forward_extremities: self._update_forward_extremities_txn( @@ -1555,6 +1555,7 @@ class PersistEventsStore: def _store_event_txn( self, txn: LoggingTransaction, + room_id: str, events_and_contexts: Collection[Tuple[EventBase, EventContext]], ) -> None: """Insert new events into the event, event_json, redaction and @@ -1629,6 +1630,27 @@ class PersistEventsStore: ], ) + # Update the `sliding_sync_room_metadata` with the latest + # (non-backfilled, ie positive) stream ordering. + # + # We know this list is sorted and non-empty, so we just take the last + # one event. + max_stream_ordering: int + for e, _ in events_and_contexts: + assert e.internal_metadata.stream_ordering is not None + max_stream_ordering = e.internal_metadata.stream_ordering + + if max_stream_ordering > 0: + self.db_pool.simple_upsert_txn( + txn, + table="sliding_sync_room_metadata", + keyvalues={"room_id": room_id}, + values={ + "instance_name": self._instance_name, + "last_stream_ordering": max_stream_ordering, + }, + ) + # If we're persisting an unredacted event we go and ensure # that we mark any redactions that reference this event as # requiring censoring. diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 4989c960a6..baa23b1bfc 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py
@@ -79,7 +79,12 @@ from synapse.storage.database import ( from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.util.id_generators import MultiWriterIdGenerator -from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection +from synapse.types import ( + JsonDict, + PersistedEventPosition, + RoomStreamToken, + StrCollection, +) from synapse.util.caches.descriptors import cached from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.cancellation import cancellable @@ -623,6 +628,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): self._stream_order_on_start = self.get_room_max_stream_ordering() self._min_stream_order_on_start = self.get_room_min_stream_ordering() + database.updates.register_background_update_handler( + "sliding_sync_room_metadata", self._sliding_sync_room_metadata_bg_update + ) + def get_room_max_stream_ordering(self) -> int: """Get the stream_ordering of regular events that we have committed up to @@ -2213,6 +2222,91 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return None + async def _sliding_sync_room_metadata_bg_update( + self, progress: JsonDict, batch_size: int + ) -> int: + """Background update to fill out 'sliding_sync_room_metadata' table""" + previous_room = progress.get("previous_room", "") + + def _sliding_sync_room_metadata_bg_update_txn(txn: LoggingTransaction) -> int: + # Both these queries are just getting the most recent + # instance_name/stream ordering for the next N rooms. + if isinstance(self.database_engine, PostgresEngine): + sql = """ + SELECT room_id, instance_name, stream_ordering FROM rooms AS r, + LATERAL ( + SELECT instance_name, stream_ordering + FROM events WHERE events.room_id = r.room_id + ORDER BY stream_ordering DESC + LIMIT 1 + ) e + WHERE r.room_id > ? + ORDER BY r.room_id ASC + LIMIT ? + """ + else: + sql = """ + SELECT + room_id, + ( + SELECT instance_name + FROM events WHERE events.room_id = r.room_id + ORDER BY stream_ordering DESC + LIMIT 1 + ), + ( + SELECT stream_ordering + FROM events WHERE events.room_id = r.room_id + ORDER BY stream_ordering DESC + LIMIT 1 + ) + FROM rooms AS r + WHERE r.room_id > ? + ORDER BY r.room_id ASC + LIMIT ? + """ + + txn.execute(sql, (previous_room, batch_size)) + rows = txn.fetchall() + if not rows: + return 0 + + self.db_pool.simple_upsert_many_txn( + txn, + table="sliding_sync_room_metadata", + key_names=("room_id",), + key_values=[(room_id,) for room_id, _, _ in rows], + value_names=( + "instance_name", + "last_stream_ordering", + ), + value_values=[ + ( + instance_name or "master", + stream, + ) + for _, instance_name, stream in rows + ], + ) + + self.db_pool.updates._background_update_progress_txn( + txn, "sliding_sync_room_metadata", {"previous_room": rows[-1][0]} + ) + + return len(rows) + + rows = await self.db_pool.runInteraction( + "_sliding_sync_room_metadata_bg_update", + _sliding_sync_room_metadata_bg_update_txn, + ) + + if rows == 0: + await self.db_pool.updates._end_background_update( + "sliding_sync_room_metadata" + ) + + return rows + @trace def get_rooms_that_might_have_updates( self, room_ids: StrCollection, from_token: RoomStreamToken diff --git a/synapse/storage/schema/main/delta/85/07_sliding_sync.sql b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql new file mode 100644
index 0000000000..e8bc33ff40 --- /dev/null +++ b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql
@@ -0,0 +1,24 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2024 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- <https://www.gnu.org/licenses/agpl-3.0.html>. + +-- A table that maps from room ID to metadata useful for sliding sync. +CREATE TABLE sliding_sync_room_metadata ( + room_id TEXT NOT NULL PRIMARY KEY, + + -- The instance_name / stream ordering of the last event in the room. + instance_name TEXT NOT NULL, + last_stream_ordering BIGINT NOT NULL +); + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (8507, 'sliding_sync_room_metadata', '{}'); diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py
index 63df31ec75..eb050066e9 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py
@@ -33,9 +33,19 @@ from synapse.api.constants import ( ReceiptTypes, RelationTypes, ) -from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync +from synapse.rest.client import ( + devices, + knock, + login, + read_marker, + receipts, + room, + sync, +) from synapse.server import HomeServer -from synapse.types import JsonDict +from synapse.types import ( + JsonDict, +) from synapse.util import Clock from tests import unittest diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py
index c4e216c308..037bbca1ba 100644 --- a/tests/storage/test_event_chain.py +++ b/tests/storage/test_event_chain.py
@@ -440,6 +440,7 @@ class EventChainStoreTestCase(HomeserverTestCase): assert persist_events_store is not None persist_events_store._store_event_txn( txn, + events[0].room_id, [ (e, EventContext(self.hs.get_storage_controllers(), {})) for e in events