diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index bee81fc019..afdb5bf2fa 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -27,11 +27,13 @@ from typing import (
List,
Mapping,
Optional,
+ Sequence,
Tuple,
TypeVar,
Union,
)
+import attr
from prometheus_client import Counter
from twisted.internet import defer
@@ -62,7 +64,7 @@ from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.retryutils import NotRetryingDestination
if TYPE_CHECKING:
- from synapse.app.homeserver import HomeServer
+ from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@@ -455,6 +457,7 @@ class FederationClient(FederationBase):
description: str,
destinations: Iterable[str],
callback: Callable[[str], Awaitable[T]],
+ failover_on_unknown_endpoint: bool = False,
) -> T:
"""Try an operation on a series of servers, until it succeeds
@@ -474,6 +477,10 @@ class FederationClient(FederationBase):
next server tried. Normally the stacktrace is logged but this is
suppressed if the exception is an InvalidResponseError.
+ failover_on_unknown_endpoint: if True, we will try other servers if it looks
+ like a server doesn't support the endpoint. This is typically useful
+ if the endpoint in question is new or experimental.
+
Returns:
The result of callback, if it succeeds
@@ -493,16 +500,31 @@ class FederationClient(FederationBase):
except UnsupportedRoomVersionError:
raise
except HttpResponseException as e:
- if not 500 <= e.code < 600:
- raise e.to_synapse_error()
- else:
- logger.warning(
- "Failed to %s via %s: %i %s",
- description,
- destination,
- e.code,
- e.args[0],
- )
+ synapse_error = e.to_synapse_error()
+ failover = False
+
+ if 500 <= e.code < 600:
+ failover = True
+
+ elif failover_on_unknown_endpoint:
+ # there is no good way to detect an "unknown" endpoint. Dendrite
+ # returns a 404 (with no body); synapse returns a 400
+ # with M_UNRECOGNISED.
+ if e.code == 404 or (
+ e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED
+ ):
+ failover = True
+
+ if not failover:
+ raise synapse_error from e
+
+ logger.warning(
+ "Failed to %s via %s: %i %s",
+ description,
+ destination,
+ e.code,
+ e.args[0],
+ )
except Exception:
logger.warning(
"Failed to %s via %s", description, destination, exc_info=True
@@ -1042,3 +1064,141 @@ class FederationClient(FederationBase):
# If we don't manage to find it, return None. It's not an error if a
# server doesn't give it to us.
return None
+
+ async def get_space_summary(
+ self,
+ destinations: Iterable[str],
+ room_id: str,
+ suggested_only: bool,
+ max_rooms_per_space: Optional[int],
+ exclude_rooms: List[str],
+ ) -> "FederationSpaceSummaryResult":
+ """
+ Call other servers to get a summary of the given space
+
+
+ Args:
+ destinations: The remote servers. We will try them in turn, omitting any
+ that have been blacklisted.
+
+ room_id: ID of the space to be queried
+
+ suggested_only: If true, ask the remote server to only return children
+ with the "suggested" flag set
+
+ max_rooms_per_space: A limit on the number of children to return for each
+ space
+
+ exclude_rooms: A list of room IDs to tell the remote server to skip
+
+ Returns:
+ a parsed FederationSpaceSummaryResult
+
+ Raises:
+ SynapseError if we were unable to get a valid summary from any of the
+ remote servers
+ """
+
+ async def send_request(destination: str) -> FederationSpaceSummaryResult:
+ res = await self.transport_layer.get_space_summary(
+ destination=destination,
+ room_id=room_id,
+ suggested_only=suggested_only,
+ max_rooms_per_space=max_rooms_per_space,
+ exclude_rooms=exclude_rooms,
+ )
+
+ try:
+ return FederationSpaceSummaryResult.from_json_dict(res)
+ except ValueError as e:
+ raise InvalidResponseError(str(e))
+
+ return await self._try_destination_list(
+ "fetch space summary",
+ destinations,
+ send_request,
+ failover_on_unknown_endpoint=True,
+ )
+
+
+@attr.s(frozen=True, slots=True)
+class FederationSpaceSummaryEventResult:
+ """Represents a single event in the result of a successful get_space_summary call.
+
+ It's essentially just a serialised event object, but we do a bit of parsing and
+ validation in `from_json_dict` and store some of the validated properties in
+ object attributes.
+ """
+
+ event_type = attr.ib(type=str)
+ state_key = attr.ib(type=str)
+ via = attr.ib(type=Sequence[str])
+
+ # the raw data, including the above keys
+ data = attr.ib(type=JsonDict)
+
+ @classmethod
+ def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryEventResult":
+ """Parse an event within the result of a /spaces/ request
+
+ Args:
+ d: json object to be parsed
+
+ Raises:
+ ValueError if d is not a valid event
+ """
+
+ event_type = d.get("type")
+ if not isinstance(event_type, str):
+ raise ValueError("Invalid event: 'event_type' must be a str")
+
+ state_key = d.get("state_key")
+ if not isinstance(state_key, str):
+ raise ValueError("Invalid event: 'state_key' must be a str")
+
+ content = d.get("content")
+ if not isinstance(content, dict):
+ raise ValueError("Invalid event: 'content' must be a dict")
+
+ via = content.get("via")
+ if not isinstance(via, Sequence):
+ raise ValueError("Invalid event: 'via' must be a list")
+ if any(not isinstance(v, str) for v in via):
+ raise ValueError("Invalid event: 'via' must be a list of strings")
+
+ return cls(event_type, state_key, via, d)
+
+
+@attr.s(frozen=True, slots=True)
+class FederationSpaceSummaryResult:
+ """Represents the data returned by a successful get_space_summary call."""
+
+ rooms = attr.ib(type=Sequence[JsonDict])
+ events = attr.ib(type=Sequence[FederationSpaceSummaryEventResult])
+
+ @classmethod
+ def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryResult":
+ """Parse the result of a /spaces/ request
+
+ Args:
+ d: json object to be parsed
+
+ Raises:
+ ValueError if d is not a valid /spaces/ response
+ """
+ rooms = d.get("rooms")
+ if not isinstance(rooms, Sequence):
+ raise ValueError("'rooms' must be a list")
+ if any(not isinstance(r, dict) for r in rooms):
+ raise ValueError("Invalid room in 'rooms' list")
+
+ events = d.get("events")
+ if not isinstance(events, Sequence):
+ raise ValueError("'events' must be a list")
+ if any(not isinstance(e, dict) for e in events):
+ raise ValueError("Invalid event in 'events' list")
+ parsed_events = [
+ FederationSpaceSummaryEventResult.from_json_dict(e) for e in events
+ ]
+
+ return cls(rooms, parsed_events)
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 9839d3d016..d84e362070 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -35,7 +35,7 @@ from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
from twisted.python import failure
-from synapse.api.constants import EduTypes, EventTypes, Membership
+from synapse.api.constants import EduTypes, EventTypes
from synapse.api.errors import (
AuthError,
Codes,
@@ -63,7 +63,7 @@ from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
ReplicationGetQueryRestServlet,
)
-from synapse.types import JsonDict, get_domain_from_id
+from synapse.types import JsonDict
from synapse.util import glob_to_regex, json_decoder, unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
@@ -727,27 +727,6 @@ class FederationServer(FederationBase):
if the event was unacceptable for any other reason (eg, too large,
too many prev_events, couldn't find the prev_events)
"""
- # check that it's actually being sent from a valid destination to
- # workaround bug #1753 in 0.18.5 and 0.18.6
- if origin != get_domain_from_id(pdu.sender):
- # We continue to accept join events from any server; this is
- # necessary for the federation join dance to work correctly.
- # (When we join over federation, the "helper" server is
- # responsible for sending out the join event, rather than the
- # origin. See bug #1893. This is also true for some third party
- # invites).
- if not (
- pdu.type == "m.room.member"
- and pdu.content
- and pdu.content.get("membership", None)
- in (Membership.JOIN, Membership.INVITE)
- ):
- logger.info(
- "Discarding PDU %s from invalid origin %s", pdu.event_id, origin
- )
- return
- else:
- logger.info("Accepting join PDU %s from %s", pdu.event_id, origin)
# We've already checked that we know the room version by this point
room_version = await self.store.get_room_version(pdu.room_id)
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index cc0d765e5f..af85fe0a1e 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -15,7 +15,7 @@
# limitations under the License.
import datetime
import logging
-from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast
+from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple
import attr
from prometheus_client import Counter
@@ -77,6 +77,7 @@ class PerDestinationQueue:
self._transaction_manager = transaction_manager
self._instance_name = hs.get_instance_name()
self._federation_shard_config = hs.config.worker.federation_shard_config
+ self._state = hs.get_state_handler()
self._should_send_on_this_instance = True
if not self._federation_shard_config.should_handle(
@@ -415,22 +416,95 @@ class PerDestinationQueue:
"This should not happen." % event_ids
)
- if logger.isEnabledFor(logging.INFO):
- rooms = [p.room_id for p in catchup_pdus]
- logger.info("Catching up rooms to %s: %r", self._destination, rooms)
+ # We send transactions with events from one room only, as its likely
+ # that the remote will have to do additional processing, which may
+ # take some time. It's better to give it small amounts of work
+ # rather than risk the request timing out and repeatedly being
+ # retried, and not making any progress.
+ #
+ # Note: `catchup_pdus` will have exactly one PDU per room.
+ for pdu in catchup_pdus:
+ # The PDU from the DB will be the last PDU in the room from
+ # *this server* that wasn't sent to the remote. However, other
+ # servers may have sent lots of events since then, and we want
+ # to try and tell the remote only about the *latest* events in
+ # the room. This is so that it doesn't get inundated by events
+ # from various parts of the DAG, which all need to be processed.
+ #
+ # Note: this does mean that in large rooms a server coming back
+ # online will get sent the same events from all the different
+ # servers, but the remote will correctly deduplicate them and
+ # handle it only once.
+
+ # Step 1, fetch the current extremities
+ extrems = await self._store.get_prev_events_for_room(pdu.room_id)
+
+ if pdu.event_id in extrems:
+ # If the event is in the extremities, then great! We can just
+ # use that without having to do further checks.
+ room_catchup_pdus = [pdu]
+ else:
+ # If not, fetch the extremities and figure out which we can
+ # send.
+ extrem_events = await self._store.get_events_as_list(extrems)
+
+ new_pdus = []
+ for p in extrem_events:
+ # We pulled this from the DB, so it'll be non-null
+ assert p.internal_metadata.stream_ordering
+
+ # Filter out events that happened before the remote went
+ # offline
+ if (
+ p.internal_metadata.stream_ordering
+ < self._last_successful_stream_ordering
+ ):
+ continue
- await self._transaction_manager.send_new_transaction(
- self._destination, catchup_pdus, []
- )
+ # Filter out events where the server is not in the room,
+ # e.g. it may have left/been kicked. *Ideally* we'd pull
+ # out the kick and send that, but it's a rare edge case
+ # so we don't bother for now (the server that sent the
+ # kick should send it out if its online).
+ hosts = await self._state.get_hosts_in_room_at_events(
+ p.room_id, [p.event_id]
+ )
+ if self._destination not in hosts:
+ continue
- sent_transactions_counter.inc()
- final_pdu = catchup_pdus[-1]
- self._last_successful_stream_ordering = cast(
- int, final_pdu.internal_metadata.stream_ordering
- )
- await self._store.set_destination_last_successful_stream_ordering(
- self._destination, self._last_successful_stream_ordering
- )
+ new_pdus.append(p)
+
+ # If we've filtered out all the extremities, fall back to
+ # sending the original event. This should ensure that the
+ # server gets at least some of missed events (especially if
+ # the other sending servers are up).
+ if new_pdus:
+ room_catchup_pdus = new_pdus
+
+ logger.info(
+ "Catching up rooms to %s: %r", self._destination, pdu.room_id
+ )
+
+ await self._transaction_manager.send_new_transaction(
+ self._destination, room_catchup_pdus, []
+ )
+
+ sent_transactions_counter.inc()
+
+ # We pulled this from the DB, so it'll be non-null
+ assert pdu.internal_metadata.stream_ordering
+
+ # Note that we mark the last successful stream ordering as that
+ # from the *original* PDU, rather than the PDU(s) we actually
+ # send. This is because we use it to mark our position in the
+ # queue of missed PDUs to process.
+ self._last_successful_stream_ordering = (
+ pdu.internal_metadata.stream_ordering
+ )
+
+ await self._store.set_destination_last_successful_stream_ordering(
+ self._destination, self._last_successful_stream_ordering
+ )
def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
if not self._pending_rrs:
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 10c4747f97..6aee47c431 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -16,7 +16,7 @@
import logging
import urllib
-from typing import Any, Dict, Optional
+from typing import Any, Dict, List, Optional
from synapse.api.constants import Membership
from synapse.api.errors import Codes, HttpResponseException, SynapseError
@@ -26,6 +26,7 @@ from synapse.api.urls import (
FEDERATION_V2_PREFIX,
)
from synapse.logging.utils import log_function
+from synapse.types import JsonDict
logger = logging.getLogger(__name__)
@@ -978,6 +979,38 @@ class TransportLayerClient:
return self.client.get_json(destination=destination, path=path)
+ async def get_space_summary(
+ self,
+ destination: str,
+ room_id: str,
+ suggested_only: bool,
+ max_rooms_per_space: Optional[int],
+ exclude_rooms: List[str],
+ ) -> JsonDict:
+ """
+ Args:
+ destination: The remote server
+ room_id: The room ID to ask about.
+ suggested_only: if True, only suggested rooms will be returned
+ max_rooms_per_space: an optional limit to the number of children to be
+ returned per space
+ exclude_rooms: a list of any rooms we can skip
+ """
+ path = _create_path(
+ FEDERATION_UNSTABLE_PREFIX, "/org.matrix.msc2946/spaces/%s", room_id
+ )
+
+ params = {
+ "suggested_only": suggested_only,
+ "exclude_rooms": exclude_rooms,
+ }
+ if max_rooms_per_space is not None:
+ params["max_rooms_per_space"] = max_rooms_per_space
+
+ return await self.client.post_json(
+ destination=destination, path=path, data=params
+ )
+
def _create_path(federation_prefix, path, *args):
"""
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 2cf935f38d..84e39c5a46 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -18,7 +18,7 @@
import functools
import logging
import re
-from typing import Optional, Tuple, Type
+from typing import Container, Mapping, Optional, Sequence, Tuple, Type
import synapse
from synapse.api.constants import MAX_GROUP_CATEGORYID_LENGTH, MAX_GROUP_ROLEID_LENGTH
@@ -29,7 +29,7 @@ from synapse.api.urls import (
FEDERATION_V1_PREFIX,
FEDERATION_V2_PREFIX,
)
-from synapse.http.server import JsonResource
+from synapse.http.server import HttpServer, JsonResource
from synapse.http.servlet import (
parse_boolean_from_args,
parse_integer_from_args,
@@ -44,7 +44,8 @@ from synapse.logging.opentracing import (
whitelisted_homeserver,
)
from synapse.server import HomeServer
-from synapse.types import ThirdPartyInstanceID, get_domain_from_id
+from synapse.types import JsonDict, ThirdPartyInstanceID, get_domain_from_id
+from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.stringutils import parse_and_validate_server_name
from synapse.util.versionstring import get_version_string
@@ -1376,6 +1377,40 @@ class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
return 200, new_content
+class FederationSpaceSummaryServlet(BaseFederationServlet):
+ PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc2946"
+ PATH = "/spaces/(?P<room_id>[^/]*)"
+
+ async def on_POST(
+ self,
+ origin: str,
+ content: JsonDict,
+ query: Mapping[bytes, Sequence[bytes]],
+ room_id: str,
+ ) -> Tuple[int, JsonDict]:
+ suggested_only = content.get("suggested_only", False)
+ if not isinstance(suggested_only, bool):
+ raise SynapseError(
+ 400, "'suggested_only' must be a boolean", Codes.BAD_JSON
+ )
+
+ exclude_rooms = content.get("exclude_rooms", [])
+ if not isinstance(exclude_rooms, list) or any(
+ not isinstance(x, str) for x in exclude_rooms
+ ):
+ raise SynapseError(400, "bad value for 'exclude_rooms'", Codes.BAD_JSON)
+
+ max_rooms_per_space = content.get("max_rooms_per_space")
+ if max_rooms_per_space is not None and not isinstance(max_rooms_per_space, int):
+ raise SynapseError(
+ 400, "bad value for 'max_rooms_per_space'", Codes.BAD_JSON
+ )
+
+ return 200, await self.handler.federation_space_summary(
+ room_id, suggested_only, max_rooms_per_space, exclude_rooms
+ )
+
+
class RoomComplexityServlet(BaseFederationServlet):
"""
Indicates to other servers how complex (and therefore likely
@@ -1474,18 +1509,24 @@ DEFAULT_SERVLET_GROUPS = (
)
-def register_servlets(hs, resource, authenticator, ratelimiter, servlet_groups=None):
+def register_servlets(
+ hs: HomeServer,
+ resource: HttpServer,
+ authenticator: Authenticator,
+ ratelimiter: FederationRateLimiter,
+ servlet_groups: Optional[Container[str]] = None,
+):
"""Initialize and register servlet classes.
Will by default register all servlets. For custom behaviour, pass in
a list of servlet_groups to register.
Args:
- hs (synapse.server.HomeServer): homeserver
- resource (JsonResource): resource class to register to
- authenticator (Authenticator): authenticator to use
- ratelimiter (util.ratelimitutils.FederationRateLimiter): ratelimiter to use
- servlet_groups (list[str], optional): List of servlet groups to register.
+ hs: homeserver
+ resource: resource class to register to
+ authenticator: authenticator to use
+ ratelimiter: ratelimiter to use
+ servlet_groups: List of servlet groups to register.
Defaults to ``DEFAULT_SERVLET_GROUPS``.
"""
if not servlet_groups:
@@ -1500,6 +1541,14 @@ def register_servlets(hs, resource, authenticator, ratelimiter, servlet_groups=N
server_name=hs.hostname,
).register(resource)
+ if hs.config.experimental.spaces_enabled:
+ FederationSpaceSummaryServlet(
+ handler=hs.get_space_summary_handler(),
+ authenticator=authenticator,
+ ratelimiter=ratelimiter,
+ server_name=hs.hostname,
+ ).register(resource)
+
if "openid" in servlet_groups:
for servletclass in OPENID_SERVLET_CLASSES:
servletclass(
|