diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py
index 77b936361a..db4fe2c798 100644
--- a/synapse/federation/transport/server/__init__.py
+++ b/synapse/federation/transport/server/__init__.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import Dict, Iterable, List, Optional, Tuple, Type
+from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Type
from typing_extensions import Literal
@@ -36,17 +36,19 @@ from synapse.http.servlet import (
parse_integer_from_args,
parse_string_from_args,
)
-from synapse.server import HomeServer
from synapse.types import JsonDict, ThirdPartyInstanceID
from synapse.util.ratelimitutils import FederationRateLimiter
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
logger = logging.getLogger(__name__)
class TransportLayerServer(JsonResource):
"""Handles incoming federation HTTP requests"""
- def __init__(self, hs: HomeServer, servlet_groups: Optional[List[str]] = None):
+ def __init__(self, hs: "HomeServer", servlet_groups: Optional[List[str]] = None):
"""Initialize the TransportLayerServer
Will by default register all servlets. For custom behaviour, pass in
@@ -113,7 +115,7 @@ class PublicRoomList(BaseFederationServlet):
def __init__(
self,
- hs: HomeServer,
+ hs: "HomeServer",
authenticator: Authenticator,
ratelimiter: FederationRateLimiter,
server_name: str,
@@ -203,7 +205,7 @@ class FederationGroupsRenewAttestaionServlet(BaseFederationServlet):
def __init__(
self,
- hs: HomeServer,
+ hs: "HomeServer",
authenticator: Authenticator,
ratelimiter: FederationRateLimiter,
server_name: str,
@@ -251,7 +253,7 @@ class OpenIdUserInfo(BaseFederationServlet):
def __init__(
self,
- hs: HomeServer,
+ hs: "HomeServer",
authenticator: Authenticator,
ratelimiter: FederationRateLimiter,
server_name: str,
@@ -297,7 +299,7 @@ DEFAULT_SERVLET_GROUPS: Dict[str, Iterable[Type[BaseFederationServlet]]] = {
def register_servlets(
- hs: HomeServer,
+ hs: "HomeServer",
resource: HttpServer,
authenticator: Authenticator,
ratelimiter: FederationRateLimiter,
diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py
index da1fbf8b63..dff2b68359 100644
--- a/synapse/federation/transport/server/_base.py
+++ b/synapse/federation/transport/server/_base.py
@@ -15,7 +15,8 @@
import functools
import logging
import re
-from typing import Any, Awaitable, Callable, Optional, Tuple, cast
+import time
+from typing import TYPE_CHECKING, Any, Awaitable, Callable, Optional, Tuple, cast
from synapse.api.errors import Codes, FederationDeniedError, SynapseError
from synapse.api.urls import FEDERATION_V1_PREFIX
@@ -24,16 +25,20 @@ from synapse.http.servlet import parse_json_object_from_request
from synapse.http.site import SynapseRequest
from synapse.logging.context import run_in_background
from synapse.logging.opentracing import (
+ active_span,
set_tag,
span_context_from_request,
+ start_active_span,
start_active_span_follows_from,
whitelisted_homeserver,
)
-from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.stringutils import parse_and_validate_server_name
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
logger = logging.getLogger(__name__)
@@ -46,7 +51,7 @@ class NoAuthenticationError(AuthenticationError):
class Authenticator:
- def __init__(self, hs: HomeServer):
+ def __init__(self, hs: "HomeServer"):
self._clock = hs.get_clock()
self.keyring = hs.get_keyring()
self.server_name = hs.hostname
@@ -114,11 +119,11 @@ class Authenticator:
# alive
retry_timings = await self.store.get_destination_retry_timings(origin)
if retry_timings and retry_timings.retry_last_ts:
- run_in_background(self._reset_retry_timings, origin)
+ run_in_background(self.reset_retry_timings, origin)
return origin
- async def _reset_retry_timings(self, origin: str) -> None:
+ async def reset_retry_timings(self, origin: str) -> None:
try:
logger.info("Marking origin %r as up", origin)
await self.store.set_destination_retry_timings(origin, None, 0, 0)
@@ -227,7 +232,7 @@ class BaseFederationServlet:
def __init__(
self,
- hs: HomeServer,
+ hs: "HomeServer",
authenticator: Authenticator,
ratelimiter: FederationRateLimiter,
server_name: str,
@@ -263,9 +268,10 @@ class BaseFederationServlet:
content = parse_json_object_from_request(request)
try:
- origin: Optional[str] = await authenticator.authenticate_request(
- request, content
- )
+ with start_active_span("authenticate_request"):
+ origin: Optional[str] = await authenticator.authenticate_request(
+ request, content
+ )
except NoAuthenticationError:
origin = None
if self.REQUIRE_AUTH:
@@ -280,32 +286,57 @@ class BaseFederationServlet:
# update the active opentracing span with the authenticated entity
set_tag("authenticated_entity", origin)
- # if the origin is authenticated and whitelisted, link to its span context
+ # if the origin is authenticated and whitelisted, use its span context
+ # as the parent.
context = None
if origin and whitelisted_homeserver(origin):
context = span_context_from_request(request)
- scope = start_active_span_follows_from(
- "incoming-federation-request", contexts=(context,) if context else ()
- )
+ if context:
+ servlet_span = active_span()
+ # a scope which uses the origin's context as a parent
+ processing_start_time = time.time()
+ scope = start_active_span_follows_from(
+ "incoming-federation-request",
+ child_of=context,
+ contexts=(servlet_span,),
+ start_time=processing_start_time,
+ )
- with scope:
- if origin and self.RATELIMIT:
- with ratelimiter.ratelimit(origin) as d:
- await d
- if request._disconnected:
- logger.warning(
- "client disconnected before we started processing "
- "request"
+ else:
+ # just use our context as a parent
+ scope = start_active_span(
+ "incoming-federation-request",
+ )
+
+ try:
+ with scope:
+ if origin and self.RATELIMIT:
+ with ratelimiter.ratelimit(origin) as d:
+ await d
+ if request._disconnected:
+ logger.warning(
+ "client disconnected before we started processing "
+ "request"
+ )
+ return None
+ response = await func(
+ origin, content, request.args, *args, **kwargs
)
- return None
+ else:
response = await func(
origin, content, request.args, *args, **kwargs
)
- else:
- response = await func(
- origin, content, request.args, *args, **kwargs
+ finally:
+ # if we used the origin's context as the parent, add a new span using
+ # the servlet span as a parent, so that we have a link
+ if context:
+ scope2 = start_active_span_follows_from(
+ "process-federation_request",
+ contexts=(scope.span,),
+ start_time=processing_start_time,
)
+ scope2.close()
return response
diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index beadfa422b..d86dfede4e 100644
--- a/synapse/federation/transport/server/federation.py
+++ b/synapse/federation/transport/server/federation.py
@@ -12,7 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import Dict, List, Mapping, Optional, Sequence, Tuple, Type, Union
+from typing import (
+ TYPE_CHECKING,
+ Dict,
+ List,
+ Mapping,
+ Optional,
+ Sequence,
+ Tuple,
+ Type,
+ Union,
+)
from typing_extensions import Literal
@@ -30,11 +40,13 @@ from synapse.http.servlet import (
parse_string_from_args,
parse_strings_from_args,
)
-from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
logger = logging.getLogger(__name__)
issue_8631_logger = logging.getLogger("synapse.8631_debug")
@@ -47,7 +59,7 @@ class BaseFederationServerServlet(BaseFederationServlet):
def __init__(
self,
- hs: HomeServer,
+ hs: "HomeServer",
authenticator: Authenticator,
ratelimiter: FederationRateLimiter,
server_name: str,
@@ -97,11 +109,11 @@ class FederationSendServlet(BaseFederationServerServlet):
)
if issue_8631_logger.isEnabledFor(logging.DEBUG):
- DEVICE_UPDATE_EDUS = {"m.device_list_update", "m.signing_key_update"}
+ DEVICE_UPDATE_EDUS = ["m.device_list_update", "m.signing_key_update"]
device_list_updates = [
edu.content
for edu in transaction_data.get("edus", [])
- if edu.edu_type in DEVICE_UPDATE_EDUS
+ if edu.get("edu_type") in DEVICE_UPDATE_EDUS
]
if device_list_updates:
issue_8631_logger.debug(
@@ -596,7 +608,7 @@ class FederationSpaceSummaryServlet(BaseFederationServlet):
def __init__(
self,
- hs: HomeServer,
+ hs: "HomeServer",
authenticator: Authenticator,
ratelimiter: FederationRateLimiter,
server_name: str,
@@ -670,7 +682,7 @@ class FederationRoomHierarchyServlet(BaseFederationServlet):
def __init__(
self,
- hs: HomeServer,
+ hs: "HomeServer",
authenticator: Authenticator,
ratelimiter: FederationRateLimiter,
server_name: str,
@@ -706,7 +718,7 @@ class RoomComplexityServlet(BaseFederationServlet):
def __init__(
self,
- hs: HomeServer,
+ hs: "HomeServer",
authenticator: Authenticator,
ratelimiter: FederationRateLimiter,
server_name: str,
diff --git a/synapse/federation/transport/server/groups_local.py b/synapse/federation/transport/server/groups_local.py
index a12cd18d58..496472e1dc 100644
--- a/synapse/federation/transport/server/groups_local.py
+++ b/synapse/federation/transport/server/groups_local.py
@@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import Dict, List, Tuple, Type
+from typing import TYPE_CHECKING, Dict, List, Tuple, Type
from synapse.api.errors import SynapseError
from synapse.federation.transport.server._base import (
@@ -19,10 +19,12 @@ from synapse.federation.transport.server._base import (
BaseFederationServlet,
)
from synapse.handlers.groups_local import GroupsLocalHandler
-from synapse.server import HomeServer
from synapse.types import JsonDict, get_domain_from_id
from synapse.util.ratelimitutils import FederationRateLimiter
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
class BaseGroupsLocalServlet(BaseFederationServlet):
"""Abstract base class for federation servlet classes which provides a groups local handler.
@@ -32,7 +34,7 @@ class BaseGroupsLocalServlet(BaseFederationServlet):
def __init__(
self,
- hs: HomeServer,
+ hs: "HomeServer",
authenticator: Authenticator,
ratelimiter: FederationRateLimiter,
server_name: str,
diff --git a/synapse/federation/transport/server/groups_server.py b/synapse/federation/transport/server/groups_server.py
index b30e92a5eb..851b50152e 100644
--- a/synapse/federation/transport/server/groups_server.py
+++ b/synapse/federation/transport/server/groups_server.py
@@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import Dict, List, Tuple, Type
+from typing import TYPE_CHECKING, Dict, List, Tuple, Type
from typing_extensions import Literal
@@ -22,10 +22,12 @@ from synapse.federation.transport.server._base import (
BaseFederationServlet,
)
from synapse.http.servlet import parse_string_from_args
-from synapse.server import HomeServer
from synapse.types import JsonDict, get_domain_from_id
from synapse.util.ratelimitutils import FederationRateLimiter
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
class BaseGroupsServerServlet(BaseFederationServlet):
"""Abstract base class for federation servlet classes which provides a groups server handler.
@@ -35,7 +37,7 @@ class BaseGroupsServerServlet(BaseFederationServlet):
def __init__(
self,
- hs: HomeServer,
+ hs: "HomeServer",
authenticator: Authenticator,
ratelimiter: FederationRateLimiter,
server_name: str,
|