diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index cc8541bc16..8f37d2cf3b 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -101,6 +101,9 @@ class EventTypes:
Dummy = "org.matrix.dummy_event"
+ MSC1772_SPACE_CHILD = "org.matrix.msc1772.space.child"
+ MSC1772_SPACE_PARENT = "org.matrix.msc1772.space.parent"
+
class EduTypes:
Presence = "m.presence"
@@ -161,6 +164,9 @@ class EventContentFields:
# cf https://github.com/matrix-org/matrix-doc/pull/2228
SELF_DESTRUCT_AFTER = "org.matrix.self_destruct_after"
+ # cf https://github.com/matrix-org/matrix-doc/pull/1772
+ MSC1772_ROOM_TYPE = "org.matrix.msc1772.type"
+
class RoomEncryptionAlgorithms:
MEGOLM_V1_AES_SHA2 = "m.megolm.v1.aes-sha2"
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index 2f0cd0cfdf..86f4d9af9d 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -27,5 +27,7 @@ class ExperimentalConfig(Config):
# MSC2858 (multiple SSO identity providers)
self.msc2858_enabled = experimental.get("msc2858_enabled", False) # type: bool
+ # Spaces (MSC1772, MSC2946, etc)
+ self.spaces_enabled = experimental.get("spaces_enabled", False) # type: bool
# MSC3026 (busy presence state)
self.msc3026_enabled = experimental.get("msc3026_enabled", False) # type: bool
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 9839d3d016..d84e362070 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -35,7 +35,7 @@ from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
from twisted.python import failure
-from synapse.api.constants import EduTypes, EventTypes, Membership
+from synapse.api.constants import EduTypes, EventTypes
from synapse.api.errors import (
AuthError,
Codes,
@@ -63,7 +63,7 @@ from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
ReplicationGetQueryRestServlet,
)
-from synapse.types import JsonDict, get_domain_from_id
+from synapse.types import JsonDict
from synapse.util import glob_to_regex, json_decoder, unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
@@ -727,27 +727,6 @@ class FederationServer(FederationBase):
if the event was unacceptable for any other reason (eg, too large,
too many prev_events, couldn't find the prev_events)
"""
- # check that it's actually being sent from a valid destination to
- # workaround bug #1753 in 0.18.5 and 0.18.6
- if origin != get_domain_from_id(pdu.sender):
- # We continue to accept join events from any server; this is
- # necessary for the federation join dance to work correctly.
- # (When we join over federation, the "helper" server is
- # responsible for sending out the join event, rather than the
- # origin. See bug #1893. This is also true for some third party
- # invites).
- if not (
- pdu.type == "m.room.member"
- and pdu.content
- and pdu.content.get("membership", None)
- in (Membership.JOIN, Membership.INVITE)
- ):
- logger.info(
- "Discarding PDU %s from invalid origin %s", pdu.event_id, origin
- )
- return
- else:
- logger.info("Accepting join PDU %s from %s", pdu.event_id, origin)
# We've already checked that we know the room version by this point
room_version = await self.store.get_room_version(pdu.room_id)
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index cc0d765e5f..af85fe0a1e 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -15,7 +15,7 @@
# limitations under the License.
import datetime
import logging
-from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast
+from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple
import attr
from prometheus_client import Counter
@@ -77,6 +77,7 @@ class PerDestinationQueue:
self._transaction_manager = transaction_manager
self._instance_name = hs.get_instance_name()
self._federation_shard_config = hs.config.worker.federation_shard_config
+ self._state = hs.get_state_handler()
self._should_send_on_this_instance = True
if not self._federation_shard_config.should_handle(
@@ -415,22 +416,95 @@ class PerDestinationQueue:
"This should not happen." % event_ids
)
- if logger.isEnabledFor(logging.INFO):
- rooms = [p.room_id for p in catchup_pdus]
- logger.info("Catching up rooms to %s: %r", self._destination, rooms)
+ # We send transactions with events from one room only, as its likely
+ # that the remote will have to do additional processing, which may
+ # take some time. It's better to give it small amounts of work
+ # rather than risk the request timing out and repeatedly being
+ # retried, and not making any progress.
+ #
+ # Note: `catchup_pdus` will have exactly one PDU per room.
+ for pdu in catchup_pdus:
+ # The PDU from the DB will be the last PDU in the room from
+ # *this server* that wasn't sent to the remote. However, other
+ # servers may have sent lots of events since then, and we want
+ # to try and tell the remote only about the *latest* events in
+ # the room. This is so that it doesn't get inundated by events
+ # from various parts of the DAG, which all need to be processed.
+ #
+ # Note: this does mean that in large rooms a server coming back
+ # online will get sent the same events from all the different
+ # servers, but the remote will correctly deduplicate them and
+ # handle it only once.
+
+ # Step 1, fetch the current extremities
+ extrems = await self._store.get_prev_events_for_room(pdu.room_id)
+
+ if pdu.event_id in extrems:
+ # If the event is in the extremities, then great! We can just
+ # use that without having to do further checks.
+ room_catchup_pdus = [pdu]
+ else:
+ # If not, fetch the extremities and figure out which we can
+ # send.
+ extrem_events = await self._store.get_events_as_list(extrems)
+
+ new_pdus = []
+ for p in extrem_events:
+ # We pulled this from the DB, so it'll be non-null
+ assert p.internal_metadata.stream_ordering
+
+ # Filter out events that happened before the remote went
+ # offline
+ if (
+ p.internal_metadata.stream_ordering
+ < self._last_successful_stream_ordering
+ ):
+ continue
- await self._transaction_manager.send_new_transaction(
- self._destination, catchup_pdus, []
- )
+ # Filter out events where the server is not in the room,
+ # e.g. it may have left/been kicked. *Ideally* we'd pull
+ # out the kick and send that, but it's a rare edge case
+ # so we don't bother for now (the server that sent the
+ # kick should send it out if its online).
+ hosts = await self._state.get_hosts_in_room_at_events(
+ p.room_id, [p.event_id]
+ )
+ if self._destination not in hosts:
+ continue
- sent_transactions_counter.inc()
- final_pdu = catchup_pdus[-1]
- self._last_successful_stream_ordering = cast(
- int, final_pdu.internal_metadata.stream_ordering
- )
- await self._store.set_destination_last_successful_stream_ordering(
- self._destination, self._last_successful_stream_ordering
- )
+ new_pdus.append(p)
+
+ # If we've filtered out all the extremities, fall back to
+ # sending the original event. This should ensure that the
+ # server gets at least some of missed events (especially if
+ # the other sending servers are up).
+ if new_pdus:
+ room_catchup_pdus = new_pdus
+
+ logger.info(
+ "Catching up rooms to %s: %r", self._destination, pdu.room_id
+ )
+
+ await self._transaction_manager.send_new_transaction(
+ self._destination, room_catchup_pdus, []
+ )
+
+ sent_transactions_counter.inc()
+
+ # We pulled this from the DB, so it'll be non-null
+ assert pdu.internal_metadata.stream_ordering
+
+ # Note that we mark the last successful stream ordering as that
+ # from the *original* PDU, rather than the PDU(s) we actually
+ # send. This is because we use it to mark our position in the
+ # queue of missed PDUs to process.
+ self._last_successful_stream_ordering = (
+ pdu.internal_metadata.stream_ordering
+ )
+
+ await self._store.set_destination_last_successful_stream_ordering(
+ self._destination, self._last_successful_stream_ordering
+ )
def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
if not self._pending_rrs:
diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py
index 84af2dde7e..04e7c64c94 100644
--- a/synapse/handlers/set_password.py
+++ b/synapse/handlers/set_password.py
@@ -41,7 +41,7 @@ class SetPasswordHandler(BaseHandler):
logout_devices: bool,
requester: Optional[Requester] = None,
) -> None:
- if not self.hs.config.password_localdb_enabled:
+ if not self._auth_handler.can_change_password():
raise SynapseError(403, "Password change disabled", errcode=Codes.FORBIDDEN)
try:
diff --git a/synapse/handlers/space_summary.py b/synapse/handlers/space_summary.py
new file mode 100644
index 0000000000..513dc0c71a
--- /dev/null
+++ b/synapse/handlers/space_summary.py
@@ -0,0 +1,199 @@
+# -*- coding: utf-8 -*-
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import itertools
+import logging
+from collections import deque
+from typing import TYPE_CHECKING, Iterable, List, Optional, Set
+
+from synapse.api.constants import EventContentFields, EventTypes, HistoryVisibility
+from synapse.api.errors import AuthError
+from synapse.events import EventBase
+from synapse.events.utils import format_event_for_client_v2
+from synapse.types import JsonDict
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+# number of rooms to return. We'll stop once we hit this limit.
+# TODO: allow clients to reduce this with a request param.
+MAX_ROOMS = 50
+
+# max number of events to return per room.
+MAX_ROOMS_PER_SPACE = 50
+
+
+class SpaceSummaryHandler:
+ def __init__(self, hs: "HomeServer"):
+ self._clock = hs.get_clock()
+ self._auth = hs.get_auth()
+ self._room_list_handler = hs.get_room_list_handler()
+ self._state_handler = hs.get_state_handler()
+ self._store = hs.get_datastore()
+ self._event_serializer = hs.get_event_client_serializer()
+
+ async def get_space_summary(
+ self,
+ requester: str,
+ room_id: str,
+ suggested_only: bool = False,
+ max_rooms_per_space: Optional[int] = None,
+ ) -> JsonDict:
+ """
+ Implementation of the space summary API
+
+ Args:
+ requester: user id of the user making this request
+
+ room_id: room id to start the summary at
+
+ suggested_only: whether we should only return children with the "suggested"
+ flag set.
+
+ max_rooms_per_space: an optional limit on the number of child rooms we will
+ return. This does not apply to the root room (ie, room_id), and
+ is overridden by ROOMS_PER_SPACE_LIMIT.
+
+ Returns:
+ summary dict to return
+ """
+ # first of all, check that the user is in the room in question (or it's
+ # world-readable)
+ await self._auth.check_user_in_room_or_world_readable(room_id, requester)
+
+ # the queue of rooms to process
+ room_queue = deque((room_id,))
+
+ processed_rooms = set() # type: Set[str]
+
+ rooms_result = [] # type: List[JsonDict]
+ events_result = [] # type: List[JsonDict]
+
+ now = self._clock.time_msec()
+
+ while room_queue and len(rooms_result) < MAX_ROOMS:
+ room_id = room_queue.popleft()
+ logger.debug("Processing room %s", room_id)
+ processed_rooms.add(room_id)
+
+ try:
+ await self._auth.check_user_in_room_or_world_readable(
+ room_id, requester
+ )
+ except AuthError:
+ logger.info(
+ "user %s cannot view room %s, omitting from summary",
+ requester,
+ room_id,
+ )
+ continue
+
+ room_entry = await self._build_room_entry(room_id)
+ rooms_result.append(room_entry)
+
+ # look for child rooms/spaces.
+ child_events = await self._get_child_events(room_id)
+
+ if suggested_only:
+ # we only care about suggested children
+ child_events = filter(_is_suggested_child_event, child_events)
+
+ # The client-specified max_rooms_per_space limit doesn't apply to the
+ # room_id specified in the request, so we ignore it if this is the
+ # first room we are processing. Otherwise, apply any client-specified
+ # limit, capping to our built-in limit.
+ if max_rooms_per_space is not None and len(processed_rooms) > 1:
+ max_rooms = min(MAX_ROOMS_PER_SPACE, max_rooms_per_space)
+ else:
+ max_rooms = MAX_ROOMS_PER_SPACE
+
+ for edge_event in itertools.islice(child_events, max_rooms):
+ edge_room_id = edge_event.state_key
+
+ events_result.append(
+ await self._event_serializer.serialize_event(
+ edge_event,
+ time_now=now,
+ event_format=format_event_for_client_v2,
+ )
+ )
+
+ # if we haven't yet visited the target of this link, add it to the queue
+ if edge_room_id not in processed_rooms:
+ room_queue.append(edge_room_id)
+
+ return {"rooms": rooms_result, "events": events_result}
+
+ async def _build_room_entry(self, room_id: str) -> JsonDict:
+ """Generate en entry suitable for the 'rooms' list in the summary response"""
+ stats = await self._store.get_room_with_stats(room_id)
+
+ # currently this should be impossible because we call
+ # check_user_in_room_or_world_readable on the room before we get here, so
+ # there should always be an entry
+ assert stats is not None, "unable to retrieve stats for %s" % (room_id,)
+
+ current_state_ids = await self._store.get_current_state_ids(room_id)
+ create_event = await self._store.get_event(
+ current_state_ids[(EventTypes.Create, "")]
+ )
+
+ # TODO: update once MSC1772 lands
+ room_type = create_event.content.get(EventContentFields.MSC1772_ROOM_TYPE)
+
+ entry = {
+ "room_id": stats["room_id"],
+ "name": stats["name"],
+ "topic": stats["topic"],
+ "canonical_alias": stats["canonical_alias"],
+ "num_joined_members": stats["joined_members"],
+ "avatar_url": stats["avatar"],
+ "world_readable": (
+ stats["history_visibility"] == HistoryVisibility.WORLD_READABLE
+ ),
+ "guest_can_join": stats["guest_access"] == "can_join",
+ "room_type": room_type,
+ }
+
+ # Filter out Nones – rather omit the field altogether
+ room_entry = {k: v for k, v in entry.items() if v is not None}
+
+ return room_entry
+
+ async def _get_child_events(self, room_id: str) -> Iterable[EventBase]:
+ # look for child rooms/spaces.
+ current_state_ids = await self._store.get_current_state_ids(room_id)
+
+ events = await self._store.get_events_as_list(
+ [
+ event_id
+ for key, event_id in current_state_ids.items()
+ # TODO: update once MSC1772 lands
+ if key[0] == EventTypes.MSC1772_SPACE_CHILD
+ ]
+ )
+
+ # filter out any events without a "via" (which implies it has been redacted)
+ return (e for e in events if e.content.get("via"))
+
+
+def _is_suggested_child_event(edge_event: EventBase) -> bool:
+ suggested = edge_event.content.get("suggested")
+ if isinstance(suggested, bool) and suggested:
+ return True
+ logger.debug("Ignorning not-suggested child %s", edge_event.state_key)
+ return False
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index 2c89b62e25..aaa56a7024 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -271,7 +271,7 @@ class UserRestServletV2(RestServlet):
elif not deactivate and user["deactivated"]:
if (
"password" not in body
- and self.hs.config.password_localdb_enabled
+ and self.auth_handler.can_change_password()
):
raise SynapseError(
400, "Must provide a password to re-activate an account."
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index e7a8207eb1..6c722d634d 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -18,7 +18,7 @@
import logging
import re
-from typing import TYPE_CHECKING, List, Optional
+from typing import TYPE_CHECKING, List, Optional, Tuple
from urllib import parse as urlparse
from synapse.api.constants import EventTypes, Membership
@@ -35,16 +35,25 @@ from synapse.events.utils import format_event_for_client_v2
from synapse.http.servlet import (
RestServlet,
assert_params_in_dict,
+ parse_boolean,
parse_integer,
parse_json_object_from_request,
parse_string,
)
+from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import set_tag
from synapse.rest.client.transactions import HttpTransactionCache
from synapse.rest.client.v2_alpha._base import client_patterns
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
-from synapse.types import RoomAlias, RoomID, StreamToken, ThirdPartyInstanceID, UserID
+from synapse.types import (
+ JsonDict,
+ RoomAlias,
+ RoomID,
+ StreamToken,
+ ThirdPartyInstanceID,
+ UserID,
+)
from synapse.util import json_decoder
from synapse.util.stringutils import parse_and_validate_server_name, random_string
@@ -987,7 +996,58 @@ def register_txn_path(servlet, regex_string, http_server, with_get=False):
)
-def register_servlets(hs, http_server, is_worker=False):
+class RoomSpaceSummaryRestServlet(RestServlet):
+ PATTERNS = (
+ re.compile(
+ "^/_matrix/client/unstable/org.matrix.msc2946"
+ "/rooms/(?P<room_id>[^/]*)/spaces$"
+ ),
+ )
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__()
+ self._auth = hs.get_auth()
+ self._space_summary_handler = hs.get_space_summary_handler()
+
+ async def on_GET(
+ self, request: SynapseRequest, room_id: str
+ ) -> Tuple[int, JsonDict]:
+ requester = await self._auth.get_user_by_req(request, allow_guest=True)
+
+ return 200, await self._space_summary_handler.get_space_summary(
+ requester.user.to_string(),
+ room_id,
+ suggested_only=parse_boolean(request, "suggested_only", default=False),
+ max_rooms_per_space=parse_integer(request, "max_rooms_per_space"),
+ )
+
+ async def on_POST(
+ self, request: SynapseRequest, room_id: str
+ ) -> Tuple[int, JsonDict]:
+ requester = await self._auth.get_user_by_req(request, allow_guest=True)
+ content = parse_json_object_from_request(request)
+
+ suggested_only = content.get("suggested_only", False)
+ if not isinstance(suggested_only, bool):
+ raise SynapseError(
+ 400, "'suggested_only' must be a boolean", Codes.BAD_JSON
+ )
+
+ max_rooms_per_space = content.get("max_rooms_per_space")
+ if max_rooms_per_space is not None and not isinstance(max_rooms_per_space, int):
+ raise SynapseError(
+ 400, "'max_rooms_per_space' must be an integer", Codes.BAD_JSON
+ )
+
+ return 200, await self._space_summary_handler.get_space_summary(
+ requester.user.to_string(),
+ room_id,
+ suggested_only=suggested_only,
+ max_rooms_per_space=max_rooms_per_space,
+ )
+
+
+def register_servlets(hs: "HomeServer", http_server, is_worker=False):
RoomStateEventRestServlet(hs).register(http_server)
RoomMemberListRestServlet(hs).register(http_server)
JoinedRoomMemberListRestServlet(hs).register(http_server)
@@ -1001,6 +1061,9 @@ def register_servlets(hs, http_server, is_worker=False):
RoomTypingRestServlet(hs).register(http_server)
RoomEventContextServlet(hs).register(http_server)
+ if hs.config.experimental.spaces_enabled:
+ RoomSpaceSummaryRestServlet(hs).register(http_server)
+
# Some servlets only get registered for the main process.
if not is_worker:
RoomCreateRestServlet(hs).register(http_server)
diff --git a/synapse/server.py b/synapse/server.py
index d11d08c573..98822d8e2f 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -100,6 +100,7 @@ from synapse.handlers.room_member import RoomMemberHandler, RoomMemberMasterHand
from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
from synapse.handlers.search import SearchHandler
from synapse.handlers.set_password import SetPasswordHandler
+from synapse.handlers.space_summary import SpaceSummaryHandler
from synapse.handlers.sso import SsoHandler
from synapse.handlers.stats import StatsHandler
from synapse.handlers.sync import SyncHandler
@@ -733,6 +734,10 @@ class HomeServer(metaclass=abc.ABCMeta):
return AccountDataHandler(self)
@cache_in_self
+ def get_space_summary_handler(self) -> SpaceSummaryHandler:
+ return SpaceSummaryHandler(self)
+
+ @cache_in_self
def get_external_cache(self) -> ExternalCache:
return ExternalCache(self)
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index eba66ff352..90a8f664ef 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -1210,6 +1210,7 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
self._invalidate_cache_and_stream(
txn, self.get_user_deactivated_status, (user_id,)
)
+ self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
txn.call_after(self.is_guest.invalidate, (user_id,))
@cached()
|