diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 7dc696c7ae..af4595498c 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -18,6 +18,7 @@
import functools
import logging
import re
+from typing import Optional, Tuple, Type
from twisted.internet.defer import maybeDeferred
@@ -44,6 +45,7 @@ from synapse.logging.opentracing import (
tags,
whitelisted_homeserver,
)
+from synapse.server import HomeServer
from synapse.types import ThirdPartyInstanceID, get_domain_from_id
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string
@@ -101,12 +103,17 @@ class NoAuthenticationError(AuthenticationError):
class Authenticator(object):
- def __init__(self, hs):
+ def __init__(self, hs: HomeServer):
self._clock = hs.get_clock()
self.keyring = hs.get_keyring()
self.server_name = hs.hostname
self.store = hs.get_datastore()
self.federation_domain_whitelist = hs.config.federation_domain_whitelist
+ self.notifer = hs.get_notifier()
+
+ self.replication_client = None
+ if hs.config.worker.worker_app:
+ self.replication_client = hs.get_tcp_replication()
# A method just so we can pass 'self' as the authenticator to the Servlets
async def authenticate_request(self, request, content):
@@ -151,7 +158,7 @@ class Authenticator(object):
origin, json_request, now, "Incoming request"
)
- logger.info("Request from %s", origin)
+ logger.debug("Request from %s", origin)
request.authenticated_entity = origin
# If we get a valid signed request from the other side, its probably
@@ -165,7 +172,18 @@ class Authenticator(object):
async def _reset_retry_timings(self, origin):
try:
logger.info("Marking origin %r as up", origin)
- await self.store.set_destination_retry_timings(origin, 0, 0)
+ await self.store.set_destination_retry_timings(origin, None, 0, 0)
+
+ # Inform the relevant places that the remote server is back up.
+ self.notifer.notify_remote_server_up(origin)
+ if self.replication_client:
+ # If we're on a worker we try and inform master about this. The
+ # replication client doesn't hook into the notifier to avoid
+ # infinite loops where we send a `REMOTE_SERVER_UP` command to
+ # master, which then echoes it back to us which in turn pokes
+ # the notifier.
+ self.replication_client.send_remote_server_up(origin)
+
except Exception:
logger.exception("Error resetting retry timings on %s", origin)
@@ -202,7 +220,7 @@ def _parse_auth_header(header_bytes):
sig = strip_quotes(param_dict["sig"])
return origin, key, sig
except Exception as e:
- logger.warn(
+ logger.warning(
"Error parsing auth header '%s': %s",
header_bytes.decode("ascii", "replace"),
e,
@@ -250,6 +268,8 @@ class BaseFederationServlet(object):
returned.
"""
+ PATH = "" # Overridden in subclasses, the regex to match against the path.
+
REQUIRE_AUTH = True
PREFIX = FEDERATION_V1_PREFIX # Allows specifying the API version
@@ -287,10 +307,12 @@ class BaseFederationServlet(object):
except NoAuthenticationError:
origin = None
if self.REQUIRE_AUTH:
- logger.warn("authenticate_request failed: missing authentication")
+ logger.warning(
+ "authenticate_request failed: missing authentication"
+ )
raise
except Exception as e:
- logger.warn("authenticate_request failed: %s", e)
+ logger.warning("authenticate_request failed: %s", e)
raise
request_tags = {
@@ -328,9 +350,6 @@ class BaseFederationServlet(object):
return response
- # Extra logic that functools.wraps() doesn't finish
- new_func.__self__ = func.__self__
-
return new_func
def register(self, server):
@@ -419,7 +438,7 @@ class FederationEventServlet(BaseFederationServlet):
return await self.handler.on_pdu_request(origin, event_id)
-class FederationStateServlet(BaseFederationServlet):
+class FederationStateV1Servlet(BaseFederationServlet):
PATH = "/state/(?P<context>[^/]*)/?"
# This is when someone asks for all data for a given context.
@@ -427,7 +446,7 @@ class FederationStateServlet(BaseFederationServlet):
return await self.handler.on_context_state_request(
origin,
context,
- parse_string_from_args(query, "event_id", None, required=True),
+ parse_string_from_args(query, "event_id", None, required=False),
)
@@ -504,11 +523,21 @@ class FederationMakeLeaveServlet(BaseFederationServlet):
return 200, content
-class FederationSendLeaveServlet(BaseFederationServlet):
+class FederationV1SendLeaveServlet(BaseFederationServlet):
PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
async def on_PUT(self, origin, content, query, room_id, event_id):
content = await self.handler.on_send_leave_request(origin, content, room_id)
+ return 200, (200, content)
+
+
+class FederationV2SendLeaveServlet(BaseFederationServlet):
+ PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
+
+ PREFIX = FEDERATION_V2_PREFIX
+
+ async def on_PUT(self, origin, content, query, room_id, event_id):
+ content = await self.handler.on_send_leave_request(origin, content, room_id)
return 200, content
@@ -519,13 +548,25 @@ class FederationEventAuthServlet(BaseFederationServlet):
return await self.handler.on_event_auth(origin, context, event_id)
-class FederationSendJoinServlet(BaseFederationServlet):
+class FederationV1SendJoinServlet(BaseFederationServlet):
PATH = "/send_join/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
async def on_PUT(self, origin, content, query, context, event_id):
# TODO(paul): assert that context/event_id parsed from path actually
# match those given in content
content = await self.handler.on_send_join_request(origin, content, context)
+ return 200, (200, content)
+
+
+class FederationV2SendJoinServlet(BaseFederationServlet):
+ PATH = "/send_join/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
+
+ PREFIX = FEDERATION_V2_PREFIX
+
+ async def on_PUT(self, origin, content, query, context, event_id):
+ # TODO(paul): assert that context/event_id parsed from path actually
+ # match those given in content
+ content = await self.handler.on_send_join_request(origin, content, context)
return 200, content
@@ -538,7 +579,7 @@ class FederationV1InviteServlet(BaseFederationServlet):
# state resolution algorithm, and we don't use that for processing
# invites
content = await self.handler.on_invite_request(
- origin, content, room_version=RoomVersions.V1.identifier
+ origin, content, room_version_id=RoomVersions.V1.identifier
)
# V1 federation API is defined to return a content of `[200, {...}]`
@@ -565,7 +606,7 @@ class FederationV2InviteServlet(BaseFederationServlet):
event.setdefault("unsigned", {})["invite_room_state"] = invite_room_state
content = await self.handler.on_invite_request(
- origin, event, room_version=room_version
+ origin, event, room_version_id=room_version
)
return 200, content
@@ -602,17 +643,6 @@ class FederationClientKeysClaimServlet(BaseFederationServlet):
return 200, response
-class FederationQueryAuthServlet(BaseFederationServlet):
- PATH = "/query_auth/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
-
- async def on_POST(self, origin, content, query, context, event_id):
- new_content = await self.handler.on_query_auth_request(
- origin, content, context, event_id
- )
-
- return 200, new_content
-
-
class FederationGetMissingEventsServlet(BaseFederationServlet):
# TODO(paul): Why does this path alone end with "/?" optional?
PATH = "/get_missing_events/(?P<room_id>[^/]*)/?"
@@ -712,7 +742,7 @@ class PublicRoomList(BaseFederationServlet):
This API returns information in the same format as /publicRooms on the
client API, but will only ever include local public rooms and hence is
- intended for consumption by other home servers.
+ intended for consumption by other homeservers.
GET /publicRooms HTTP/1.1
@@ -765,6 +795,10 @@ class PublicRoomList(BaseFederationServlet):
else:
network_tuple = ThirdPartyInstanceID(None, None)
+ if limit == 0:
+ # zero is a special value which corresponds to no limit.
+ limit = None
+
data = await maybeDeferred(
self.handler.get_local_public_room_list,
limit,
@@ -779,7 +813,7 @@ class PublicRoomList(BaseFederationServlet):
if not self.allow_access:
raise FederationDeniedError(origin)
- limit = int(content.get("limit", 100))
+ limit = int(content.get("limit", 100)) # type: Optional[int]
since_token = content.get("since", None)
search_filter = content.get("filter", None)
@@ -800,6 +834,10 @@ class PublicRoomList(BaseFederationServlet):
if search_filter is None:
logger.warning("Nonefilter")
+ if limit == 0:
+ # zero is a special value which corresponds to no limit.
+ limit = None
+
data = await self.handler.get_local_public_room_list(
limit=limit,
since_token=since_token,
@@ -922,7 +960,7 @@ class FederationGroupsAddRoomsConfigServlet(BaseFederationServlet):
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
- result = await self.groups_handler.update_room_in_group(
+ result = await self.handler.update_room_in_group(
group_id, requester_user_id, room_id, config_key, content
)
@@ -1350,18 +1388,19 @@ class RoomComplexityServlet(BaseFederationServlet):
FEDERATION_SERVLET_CLASSES = (
FederationSendServlet,
FederationEventServlet,
- FederationStateServlet,
+ FederationStateV1Servlet,
FederationStateIdsServlet,
FederationBackfillServlet,
FederationQueryServlet,
FederationMakeJoinServlet,
FederationMakeLeaveServlet,
FederationEventServlet,
- FederationSendJoinServlet,
- FederationSendLeaveServlet,
+ FederationV1SendJoinServlet,
+ FederationV2SendJoinServlet,
+ FederationV1SendLeaveServlet,
+ FederationV2SendLeaveServlet,
FederationV1InviteServlet,
FederationV2InviteServlet,
- FederationQueryAuthServlet,
FederationGetMissingEventsServlet,
FederationEventAuthServlet,
FederationClientKeysQueryServlet,
@@ -1371,11 +1410,13 @@ FEDERATION_SERVLET_CLASSES = (
On3pidBindServlet,
FederationVersionServlet,
RoomComplexityServlet,
-)
+) # type: Tuple[Type[BaseFederationServlet], ...]
-OPENID_SERVLET_CLASSES = (OpenIdUserInfo,)
+OPENID_SERVLET_CLASSES = (
+ OpenIdUserInfo,
+) # type: Tuple[Type[BaseFederationServlet], ...]
-ROOM_LIST_CLASSES = (PublicRoomList,)
+ROOM_LIST_CLASSES = (PublicRoomList,) # type: Tuple[Type[PublicRoomList], ...]
GROUP_SERVER_SERVLET_CLASSES = (
FederationGroupsProfileServlet,
@@ -1396,17 +1437,19 @@ GROUP_SERVER_SERVLET_CLASSES = (
FederationGroupsAddRoomsServlet,
FederationGroupsAddRoomsConfigServlet,
FederationGroupsSettingJoinPolicyServlet,
-)
+) # type: Tuple[Type[BaseFederationServlet], ...]
GROUP_LOCAL_SERVLET_CLASSES = (
FederationGroupsLocalInviteServlet,
FederationGroupsRemoveLocalUserServlet,
FederationGroupsBulkPublicisedServlet,
-)
+) # type: Tuple[Type[BaseFederationServlet], ...]
-GROUP_ATTESTATION_SERVLET_CLASSES = (FederationGroupsRenewAttestaionServlet,)
+GROUP_ATTESTATION_SERVLET_CLASSES = (
+ FederationGroupsRenewAttestaionServlet,
+) # type: Tuple[Type[BaseFederationServlet], ...]
DEFAULT_SERVLET_GROUPS = (
"federation",
|