diff options
author | Brendan Abolivier <babolivier@matrix.org> | 2021-03-19 16:12:40 +0100 |
---|---|---|
committer | Brendan Abolivier <babolivier@matrix.org> | 2021-03-19 16:12:40 +0100 |
commit | 592d6305fda97ceda92a8ce6079a9fdbe9928567 (patch) | |
tree | 7a0edf2be5f22754bf8da194a1412d740ef0a004 /synapse | |
parent | Fix lint (diff) | |
parent | fix mypy (diff) | |
download | synapse-592d6305fda97ceda92a8ce6079a9fdbe9928567.tar.xz |
Merge branch 'develop' into babolivier/msc3026
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/api/constants.py | 6 | ||||
-rw-r--r-- | synapse/config/experimental.py | 2 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 25 | ||||
-rw-r--r-- | synapse/federation/sender/per_destination_queue.py | 104 | ||||
-rw-r--r-- | synapse/handlers/set_password.py | 2 | ||||
-rw-r--r-- | synapse/handlers/space_summary.py | 199 | ||||
-rw-r--r-- | synapse/rest/admin/users.py | 2 | ||||
-rw-r--r-- | synapse/rest/client/v1/room.py | 69 | ||||
-rw-r--r-- | synapse/server.py | 5 | ||||
-rw-r--r-- | synapse/storage/databases/main/registration.py | 1 |
10 files changed, 372 insertions, 43 deletions
diff --git a/synapse/api/constants.py b/synapse/api/constants.py index cc8541bc16..8f37d2cf3b 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -101,6 +101,9 @@ class EventTypes: Dummy = "org.matrix.dummy_event" + MSC1772_SPACE_CHILD = "org.matrix.msc1772.space.child" + MSC1772_SPACE_PARENT = "org.matrix.msc1772.space.parent" + class EduTypes: Presence = "m.presence" @@ -161,6 +164,9 @@ class EventContentFields: # cf https://github.com/matrix-org/matrix-doc/pull/2228 SELF_DESTRUCT_AFTER = "org.matrix.self_destruct_after" + # cf https://github.com/matrix-org/matrix-doc/pull/1772 + MSC1772_ROOM_TYPE = "org.matrix.msc1772.type" + class RoomEncryptionAlgorithms: MEGOLM_V1_AES_SHA2 = "m.megolm.v1.aes-sha2" diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 2f0cd0cfdf..86f4d9af9d 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -27,5 +27,7 @@ class ExperimentalConfig(Config): # MSC2858 (multiple SSO identity providers) self.msc2858_enabled = experimental.get("msc2858_enabled", False) # type: bool + # Spaces (MSC1772, MSC2946, etc) + self.spaces_enabled = experimental.get("spaces_enabled", False) # type: bool # MSC3026 (busy presence state) self.msc3026_enabled = experimental.get("msc3026_enabled", False) # type: bool diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9839d3d016..d84e362070 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -35,7 +35,7 @@ from twisted.internet import defer from twisted.internet.abstract import isIPAddress from twisted.python import failure -from synapse.api.constants import EduTypes, EventTypes, Membership +from synapse.api.constants import EduTypes, EventTypes from synapse.api.errors import ( AuthError, Codes, @@ -63,7 +63,7 @@ from synapse.replication.http.federation import ( ReplicationFederationSendEduRestServlet, ReplicationGetQueryRestServlet, ) -from synapse.types import JsonDict, get_domain_from_id +from synapse.types import JsonDict from synapse.util import glob_to_regex, json_decoder, unwrapFirstError from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.caches.response_cache import ResponseCache @@ -727,27 +727,6 @@ class FederationServer(FederationBase): if the event was unacceptable for any other reason (eg, too large, too many prev_events, couldn't find the prev_events) """ - # check that it's actually being sent from a valid destination to - # workaround bug #1753 in 0.18.5 and 0.18.6 - if origin != get_domain_from_id(pdu.sender): - # We continue to accept join events from any server; this is - # necessary for the federation join dance to work correctly. - # (When we join over federation, the "helper" server is - # responsible for sending out the join event, rather than the - # origin. See bug #1893. This is also true for some third party - # invites). - if not ( - pdu.type == "m.room.member" - and pdu.content - and pdu.content.get("membership", None) - in (Membership.JOIN, Membership.INVITE) - ): - logger.info( - "Discarding PDU %s from invalid origin %s", pdu.event_id, origin - ) - return - else: - logger.info("Accepting join PDU %s from %s", pdu.event_id, origin) # We've already checked that we know the room version by this point room_version = await self.store.get_room_version(pdu.room_id) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index cc0d765e5f..af85fe0a1e 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -15,7 +15,7 @@ # limitations under the License. import datetime import logging -from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast +from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple import attr from prometheus_client import Counter @@ -77,6 +77,7 @@ class PerDestinationQueue: self._transaction_manager = transaction_manager self._instance_name = hs.get_instance_name() self._federation_shard_config = hs.config.worker.federation_shard_config + self._state = hs.get_state_handler() self._should_send_on_this_instance = True if not self._federation_shard_config.should_handle( @@ -415,22 +416,95 @@ class PerDestinationQueue: "This should not happen." % event_ids ) - if logger.isEnabledFor(logging.INFO): - rooms = [p.room_id for p in catchup_pdus] - logger.info("Catching up rooms to %s: %r", self._destination, rooms) + # We send transactions with events from one room only, as its likely + # that the remote will have to do additional processing, which may + # take some time. It's better to give it small amounts of work + # rather than risk the request timing out and repeatedly being + # retried, and not making any progress. + # + # Note: `catchup_pdus` will have exactly one PDU per room. + for pdu in catchup_pdus: + # The PDU from the DB will be the last PDU in the room from + # *this server* that wasn't sent to the remote. However, other + # servers may have sent lots of events since then, and we want + # to try and tell the remote only about the *latest* events in + # the room. This is so that it doesn't get inundated by events + # from various parts of the DAG, which all need to be processed. + # + # Note: this does mean that in large rooms a server coming back + # online will get sent the same events from all the different + # servers, but the remote will correctly deduplicate them and + # handle it only once. + + # Step 1, fetch the current extremities + extrems = await self._store.get_prev_events_for_room(pdu.room_id) + + if pdu.event_id in extrems: + # If the event is in the extremities, then great! We can just + # use that without having to do further checks. + room_catchup_pdus = [pdu] + else: + # If not, fetch the extremities and figure out which we can + # send. + extrem_events = await self._store.get_events_as_list(extrems) + + new_pdus = [] + for p in extrem_events: + # We pulled this from the DB, so it'll be non-null + assert p.internal_metadata.stream_ordering + + # Filter out events that happened before the remote went + # offline + if ( + p.internal_metadata.stream_ordering + < self._last_successful_stream_ordering + ): + continue - await self._transaction_manager.send_new_transaction( - self._destination, catchup_pdus, [] - ) + # Filter out events where the server is not in the room, + # e.g. it may have left/been kicked. *Ideally* we'd pull + # out the kick and send that, but it's a rare edge case + # so we don't bother for now (the server that sent the + # kick should send it out if its online). + hosts = await self._state.get_hosts_in_room_at_events( + p.room_id, [p.event_id] + ) + if self._destination not in hosts: + continue - sent_transactions_counter.inc() - final_pdu = catchup_pdus[-1] - self._last_successful_stream_ordering = cast( - int, final_pdu.internal_metadata.stream_ordering - ) - await self._store.set_destination_last_successful_stream_ordering( - self._destination, self._last_successful_stream_ordering - ) + new_pdus.append(p) + + # If we've filtered out all the extremities, fall back to + # sending the original event. This should ensure that the + # server gets at least some of missed events (especially if + # the other sending servers are up). + if new_pdus: + room_catchup_pdus = new_pdus + + logger.info( + "Catching up rooms to %s: %r", self._destination, pdu.room_id + ) + + await self._transaction_manager.send_new_transaction( + self._destination, room_catchup_pdus, [] + ) + + sent_transactions_counter.inc() + + # We pulled this from the DB, so it'll be non-null + assert pdu.internal_metadata.stream_ordering + + # Note that we mark the last successful stream ordering as that + # from the *original* PDU, rather than the PDU(s) we actually + # send. This is because we use it to mark our position in the + # queue of missed PDUs to process. + self._last_successful_stream_ordering = ( + pdu.internal_metadata.stream_ordering + ) + + await self._store.set_destination_last_successful_stream_ordering( + self._destination, self._last_successful_stream_ordering + ) def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]: if not self._pending_rrs: diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py index 84af2dde7e..04e7c64c94 100644 --- a/synapse/handlers/set_password.py +++ b/synapse/handlers/set_password.py @@ -41,7 +41,7 @@ class SetPasswordHandler(BaseHandler): logout_devices: bool, requester: Optional[Requester] = None, ) -> None: - if not self.hs.config.password_localdb_enabled: + if not self._auth_handler.can_change_password(): raise SynapseError(403, "Password change disabled", errcode=Codes.FORBIDDEN) try: diff --git a/synapse/handlers/space_summary.py b/synapse/handlers/space_summary.py new file mode 100644 index 0000000000..513dc0c71a --- /dev/null +++ b/synapse/handlers/space_summary.py @@ -0,0 +1,199 @@ +# -*- coding: utf-8 -*- +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import itertools +import logging +from collections import deque +from typing import TYPE_CHECKING, Iterable, List, Optional, Set + +from synapse.api.constants import EventContentFields, EventTypes, HistoryVisibility +from synapse.api.errors import AuthError +from synapse.events import EventBase +from synapse.events.utils import format_event_for_client_v2 +from synapse.types import JsonDict + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + +# number of rooms to return. We'll stop once we hit this limit. +# TODO: allow clients to reduce this with a request param. +MAX_ROOMS = 50 + +# max number of events to return per room. +MAX_ROOMS_PER_SPACE = 50 + + +class SpaceSummaryHandler: + def __init__(self, hs: "HomeServer"): + self._clock = hs.get_clock() + self._auth = hs.get_auth() + self._room_list_handler = hs.get_room_list_handler() + self._state_handler = hs.get_state_handler() + self._store = hs.get_datastore() + self._event_serializer = hs.get_event_client_serializer() + + async def get_space_summary( + self, + requester: str, + room_id: str, + suggested_only: bool = False, + max_rooms_per_space: Optional[int] = None, + ) -> JsonDict: + """ + Implementation of the space summary API + + Args: + requester: user id of the user making this request + + room_id: room id to start the summary at + + suggested_only: whether we should only return children with the "suggested" + flag set. + + max_rooms_per_space: an optional limit on the number of child rooms we will + return. This does not apply to the root room (ie, room_id), and + is overridden by ROOMS_PER_SPACE_LIMIT. + + Returns: + summary dict to return + """ + # first of all, check that the user is in the room in question (or it's + # world-readable) + await self._auth.check_user_in_room_or_world_readable(room_id, requester) + + # the queue of rooms to process + room_queue = deque((room_id,)) + + processed_rooms = set() # type: Set[str] + + rooms_result = [] # type: List[JsonDict] + events_result = [] # type: List[JsonDict] + + now = self._clock.time_msec() + + while room_queue and len(rooms_result) < MAX_ROOMS: + room_id = room_queue.popleft() + logger.debug("Processing room %s", room_id) + processed_rooms.add(room_id) + + try: + await self._auth.check_user_in_room_or_world_readable( + room_id, requester + ) + except AuthError: + logger.info( + "user %s cannot view room %s, omitting from summary", + requester, + room_id, + ) + continue + + room_entry = await self._build_room_entry(room_id) + rooms_result.append(room_entry) + + # look for child rooms/spaces. + child_events = await self._get_child_events(room_id) + + if suggested_only: + # we only care about suggested children + child_events = filter(_is_suggested_child_event, child_events) + + # The client-specified max_rooms_per_space limit doesn't apply to the + # room_id specified in the request, so we ignore it if this is the + # first room we are processing. Otherwise, apply any client-specified + # limit, capping to our built-in limit. + if max_rooms_per_space is not None and len(processed_rooms) > 1: + max_rooms = min(MAX_ROOMS_PER_SPACE, max_rooms_per_space) + else: + max_rooms = MAX_ROOMS_PER_SPACE + + for edge_event in itertools.islice(child_events, max_rooms): + edge_room_id = edge_event.state_key + + events_result.append( + await self._event_serializer.serialize_event( + edge_event, + time_now=now, + event_format=format_event_for_client_v2, + ) + ) + + # if we haven't yet visited the target of this link, add it to the queue + if edge_room_id not in processed_rooms: + room_queue.append(edge_room_id) + + return {"rooms": rooms_result, "events": events_result} + + async def _build_room_entry(self, room_id: str) -> JsonDict: + """Generate en entry suitable for the 'rooms' list in the summary response""" + stats = await self._store.get_room_with_stats(room_id) + + # currently this should be impossible because we call + # check_user_in_room_or_world_readable on the room before we get here, so + # there should always be an entry + assert stats is not None, "unable to retrieve stats for %s" % (room_id,) + + current_state_ids = await self._store.get_current_state_ids(room_id) + create_event = await self._store.get_event( + current_state_ids[(EventTypes.Create, "")] + ) + + # TODO: update once MSC1772 lands + room_type = create_event.content.get(EventContentFields.MSC1772_ROOM_TYPE) + + entry = { + "room_id": stats["room_id"], + "name": stats["name"], + "topic": stats["topic"], + "canonical_alias": stats["canonical_alias"], + "num_joined_members": stats["joined_members"], + "avatar_url": stats["avatar"], + "world_readable": ( + stats["history_visibility"] == HistoryVisibility.WORLD_READABLE + ), + "guest_can_join": stats["guest_access"] == "can_join", + "room_type": room_type, + } + + # Filter out Nones – rather omit the field altogether + room_entry = {k: v for k, v in entry.items() if v is not None} + + return room_entry + + async def _get_child_events(self, room_id: str) -> Iterable[EventBase]: + # look for child rooms/spaces. + current_state_ids = await self._store.get_current_state_ids(room_id) + + events = await self._store.get_events_as_list( + [ + event_id + for key, event_id in current_state_ids.items() + # TODO: update once MSC1772 lands + if key[0] == EventTypes.MSC1772_SPACE_CHILD + ] + ) + + # filter out any events without a "via" (which implies it has been redacted) + return (e for e in events if e.content.get("via")) + + +def _is_suggested_child_event(edge_event: EventBase) -> bool: + suggested = edge_event.content.get("suggested") + if isinstance(suggested, bool) and suggested: + return True + logger.debug("Ignorning not-suggested child %s", edge_event.state_key) + return False diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index 2c89b62e25..aaa56a7024 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -271,7 +271,7 @@ class UserRestServletV2(RestServlet): elif not deactivate and user["deactivated"]: if ( "password" not in body - and self.hs.config.password_localdb_enabled + and self.auth_handler.can_change_password() ): raise SynapseError( 400, "Must provide a password to re-activate an account." diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index e7a8207eb1..6c722d634d 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -18,7 +18,7 @@ import logging import re -from typing import TYPE_CHECKING, List, Optional +from typing import TYPE_CHECKING, List, Optional, Tuple from urllib import parse as urlparse from synapse.api.constants import EventTypes, Membership @@ -35,16 +35,25 @@ from synapse.events.utils import format_event_for_client_v2 from synapse.http.servlet import ( RestServlet, assert_params_in_dict, + parse_boolean, parse_integer, parse_json_object_from_request, parse_string, ) +from synapse.http.site import SynapseRequest from synapse.logging.opentracing import set_tag from synapse.rest.client.transactions import HttpTransactionCache from synapse.rest.client.v2_alpha._base import client_patterns from synapse.storage.state import StateFilter from synapse.streams.config import PaginationConfig -from synapse.types import RoomAlias, RoomID, StreamToken, ThirdPartyInstanceID, UserID +from synapse.types import ( + JsonDict, + RoomAlias, + RoomID, + StreamToken, + ThirdPartyInstanceID, + UserID, +) from synapse.util import json_decoder from synapse.util.stringutils import parse_and_validate_server_name, random_string @@ -987,7 +996,58 @@ def register_txn_path(servlet, regex_string, http_server, with_get=False): ) -def register_servlets(hs, http_server, is_worker=False): +class RoomSpaceSummaryRestServlet(RestServlet): + PATTERNS = ( + re.compile( + "^/_matrix/client/unstable/org.matrix.msc2946" + "/rooms/(?P<room_id>[^/]*)/spaces$" + ), + ) + + def __init__(self, hs: "HomeServer"): + super().__init__() + self._auth = hs.get_auth() + self._space_summary_handler = hs.get_space_summary_handler() + + async def on_GET( + self, request: SynapseRequest, room_id: str + ) -> Tuple[int, JsonDict]: + requester = await self._auth.get_user_by_req(request, allow_guest=True) + + return 200, await self._space_summary_handler.get_space_summary( + requester.user.to_string(), + room_id, + suggested_only=parse_boolean(request, "suggested_only", default=False), + max_rooms_per_space=parse_integer(request, "max_rooms_per_space"), + ) + + async def on_POST( + self, request: SynapseRequest, room_id: str + ) -> Tuple[int, JsonDict]: + requester = await self._auth.get_user_by_req(request, allow_guest=True) + content = parse_json_object_from_request(request) + + suggested_only = content.get("suggested_only", False) + if not isinstance(suggested_only, bool): + raise SynapseError( + 400, "'suggested_only' must be a boolean", Codes.BAD_JSON + ) + + max_rooms_per_space = content.get("max_rooms_per_space") + if max_rooms_per_space is not None and not isinstance(max_rooms_per_space, int): + raise SynapseError( + 400, "'max_rooms_per_space' must be an integer", Codes.BAD_JSON + ) + + return 200, await self._space_summary_handler.get_space_summary( + requester.user.to_string(), + room_id, + suggested_only=suggested_only, + max_rooms_per_space=max_rooms_per_space, + ) + + +def register_servlets(hs: "HomeServer", http_server, is_worker=False): RoomStateEventRestServlet(hs).register(http_server) RoomMemberListRestServlet(hs).register(http_server) JoinedRoomMemberListRestServlet(hs).register(http_server) @@ -1001,6 +1061,9 @@ def register_servlets(hs, http_server, is_worker=False): RoomTypingRestServlet(hs).register(http_server) RoomEventContextServlet(hs).register(http_server) + if hs.config.experimental.spaces_enabled: + RoomSpaceSummaryRestServlet(hs).register(http_server) + # Some servlets only get registered for the main process. if not is_worker: RoomCreateRestServlet(hs).register(http_server) diff --git a/synapse/server.py b/synapse/server.py index d11d08c573..98822d8e2f 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -100,6 +100,7 @@ from synapse.handlers.room_member import RoomMemberHandler, RoomMemberMasterHand from synapse.handlers.room_member_worker import RoomMemberWorkerHandler from synapse.handlers.search import SearchHandler from synapse.handlers.set_password import SetPasswordHandler +from synapse.handlers.space_summary import SpaceSummaryHandler from synapse.handlers.sso import SsoHandler from synapse.handlers.stats import StatsHandler from synapse.handlers.sync import SyncHandler @@ -733,6 +734,10 @@ class HomeServer(metaclass=abc.ABCMeta): return AccountDataHandler(self) @cache_in_self + def get_space_summary_handler(self) -> SpaceSummaryHandler: + return SpaceSummaryHandler(self) + + @cache_in_self def get_external_cache(self) -> ExternalCache: return ExternalCache(self) diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index eba66ff352..90a8f664ef 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -1210,6 +1210,7 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): self._invalidate_cache_and_stream( txn, self.get_user_deactivated_status, (user_id,) ) + self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) txn.call_after(self.is_guest.invalidate, (user_id,)) @cached() |