diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index f56344a3b9..addc0bf000 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from collections import namedtuple
from typing import TYPE_CHECKING
from synapse.api.constants import MAX_DEPTH, EventContentFields, EventTypes, Membership
@@ -104,10 +103,6 @@ class FederationBase:
return pdu
-class PduToCheckSig(namedtuple("PduToCheckSig", ["pdu", "sender_domain", "deferreds"])):
- pass
-
-
async def _check_sigs_on_pdu(
keyring: Keyring, room_version: RoomVersion, pdu: EventBase
) -> None:
@@ -220,15 +215,12 @@ def _is_invite_via_3pid(event: EventBase) -> bool:
)
-def event_from_pdu_json(
- pdu_json: JsonDict, room_version: RoomVersion, outlier: bool = False
-) -> EventBase:
+def event_from_pdu_json(pdu_json: JsonDict, room_version: RoomVersion) -> EventBase:
"""Construct an EventBase from an event json received over federation
Args:
pdu_json: pdu as received over federation
room_version: The version of the room this event belongs to
- outlier: True to mark this event as an outlier
Raises:
SynapseError: if the pdu is missing required fields or is otherwise
@@ -252,6 +244,4 @@ def event_from_pdu_json(
validate_canonicaljson(pdu_json)
event = make_event_from_dict(pdu_json, room_version)
- event.internal_metadata.outlier = outlier
-
return event
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index fee1477ab6..6ea4edfc71 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -265,14 +265,11 @@ class FederationClient(FederationBase):
room_version = await self.store.get_room_version(room_id)
- pdus = [
- event_from_pdu_json(p, room_version, outlier=False)
- for p in transaction_data_pdus
- ]
+ pdus = [event_from_pdu_json(p, room_version) for p in transaction_data_pdus]
# Check signatures and hash of pdus, removing any from the list that fail checks
pdus[:] = await self._check_sigs_and_hash_and_fetch(
- dest, pdus, outlier=True, room_version=room_version
+ dest, pdus, room_version=room_version
)
return pdus
@@ -282,7 +279,6 @@ class FederationClient(FederationBase):
destination: str,
event_id: str,
room_version: RoomVersion,
- outlier: bool = False,
timeout: Optional[int] = None,
) -> Optional[EventBase]:
"""Requests the PDU with given origin and ID from the remote home
@@ -292,9 +288,6 @@ class FederationClient(FederationBase):
destination: Which homeserver to query
event_id: event to fetch
room_version: version of the room
- outlier: Indicates whether the PDU is an `outlier`, i.e. if
- it's from an arbitrary point in the context as opposed to part
- of the current block of PDUs. Defaults to `False`
timeout: How long to try (in ms) each destination for before
moving to the next destination. None indicates no timeout.
@@ -316,8 +309,7 @@ class FederationClient(FederationBase):
)
pdu_list: List[EventBase] = [
- event_from_pdu_json(p, room_version, outlier=outlier)
- for p in transaction_data["pdus"]
+ event_from_pdu_json(p, room_version) for p in transaction_data["pdus"]
]
if pdu_list and pdu_list[0]:
@@ -334,7 +326,6 @@ class FederationClient(FederationBase):
destinations: Iterable[str],
event_id: str,
room_version: RoomVersion,
- outlier: bool = False,
timeout: Optional[int] = None,
) -> Optional[EventBase]:
"""Requests the PDU with given origin and ID from the remote home
@@ -347,9 +338,6 @@ class FederationClient(FederationBase):
destinations: Which homeservers to query
event_id: event to fetch
room_version: version of the room
- outlier: Indicates whether the PDU is an `outlier`, i.e. if
- it's from an arbitrary point in the context as opposed to part
- of the current block of PDUs. Defaults to `False`
timeout: How long to try (in ms) each destination for before
moving to the next destination. None indicates no timeout.
@@ -377,7 +365,6 @@ class FederationClient(FederationBase):
destination=destination,
event_id=event_id,
room_version=room_version,
- outlier=outlier,
timeout=timeout,
)
@@ -435,7 +422,6 @@ class FederationClient(FederationBase):
origin: str,
pdus: Collection[EventBase],
room_version: RoomVersion,
- outlier: bool = False,
) -> List[EventBase]:
"""Takes a list of PDUs and checks the signatures and hashes of each
one. If a PDU fails its signature check then we check if we have it in
@@ -451,7 +437,6 @@ class FederationClient(FederationBase):
origin
pdu
room_version
- outlier: Whether the events are outliers or not
Returns:
A list of PDUs that have valid signatures and hashes.
@@ -466,7 +451,6 @@ class FederationClient(FederationBase):
valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
pdu=pdu,
origin=origin,
- outlier=outlier,
room_version=room_version,
)
@@ -482,7 +466,6 @@ class FederationClient(FederationBase):
pdu: EventBase,
origin: str,
room_version: RoomVersion,
- outlier: bool = False,
) -> Optional[EventBase]:
"""Takes a PDU and checks its signatures and hashes. If the PDU fails
its signature check then we check if we have it in the database and if
@@ -494,9 +477,6 @@ class FederationClient(FederationBase):
origin
pdu
room_version
- outlier: Whether the events are outliers or not
- include_none: Whether to include None in the returned list
- for events that have failed their checks
Returns:
The PDU (possibly redacted) if it has valid signatures and hashes.
@@ -521,7 +501,6 @@ class FederationClient(FederationBase):
destinations=[pdu_origin],
event_id=pdu.event_id,
room_version=room_version,
- outlier=outlier,
timeout=10000,
)
except SynapseError:
@@ -541,13 +520,10 @@ class FederationClient(FederationBase):
room_version = await self.store.get_room_version(room_id)
- auth_chain = [
- event_from_pdu_json(p, room_version, outlier=True)
- for p in res["auth_chain"]
- ]
+ auth_chain = [event_from_pdu_json(p, room_version) for p in res["auth_chain"]]
signed_auth = await self._check_sigs_and_hash_and_fetch(
- destination, auth_chain, outlier=True, room_version=room_version
+ destination, auth_chain, room_version=room_version
)
return signed_auth
@@ -816,7 +792,6 @@ class FederationClient(FederationBase):
valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
pdu=event,
origin=destination,
- outlier=True,
room_version=room_version,
)
@@ -864,7 +839,6 @@ class FederationClient(FederationBase):
valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
pdu=pdu,
origin=destination,
- outlier=True,
room_version=room_version,
)
@@ -1235,7 +1209,7 @@ class FederationClient(FederationBase):
]
signed_events = await self._check_sigs_and_hash_and_fetch(
- destination, events, outlier=False, room_version=room_version
+ destination, events, room_version=room_version
)
except HttpResponseException as e:
if not e.code == 400:
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 8e37e76206..ee71f289c8 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -28,9 +28,9 @@ from typing import (
Union,
)
+from matrix_common.regex import glob_to_regex
from prometheus_client import Counter, Gauge, Histogram
-from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
from twisted.python import failure
@@ -66,8 +66,8 @@ from synapse.replication.http.federation import (
)
from synapse.storage.databases.main.lock import Lock
from synapse.types import JsonDict, get_domain_from_id
-from synapse.util import glob_to_regex, json_decoder, unwrapFirstError
-from synapse.util.async_helpers import Linearizer, concurrently_execute
+from synapse.util import json_decoder, unwrapFirstError
+from synapse.util.async_helpers import Linearizer, concurrently_execute, gather_results
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import parse_server_name
@@ -360,13 +360,13 @@ class FederationServer(FederationBase):
# want to block things like to device messages from reaching clients
# behind the potentially expensive handling of PDUs.
pdu_results, _ = await make_deferred_yieldable(
- defer.gatherResults(
- [
+ gather_results(
+ (
run_in_background(
self._handle_pdus_in_txn, origin, transaction, request_time
),
run_in_background(self._handle_edus_in_txn, origin, transaction),
- ],
+ ),
consumeErrors=True,
).addErrback(unwrapFirstError)
)
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 63289a5a33..0d7c4f5067 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -30,7 +30,6 @@ Events are replicated via a separate events stream.
"""
import logging
-from collections import namedtuple
from typing import (
TYPE_CHECKING,
Dict,
@@ -43,6 +42,7 @@ from typing import (
Type,
)
+import attr
from sortedcontainers import SortedDict
from synapse.api.presence import UserPresenceState
@@ -382,13 +382,11 @@ class BaseFederationRow:
raise NotImplementedError()
-class PresenceDestinationsRow(
- BaseFederationRow,
- namedtuple(
- "PresenceDestinationsRow",
- ("state", "destinations"), # UserPresenceState # list[str]
- ),
-):
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class PresenceDestinationsRow(BaseFederationRow):
+ state: UserPresenceState
+ destinations: List[str]
+
TypeId = "pd"
@staticmethod
@@ -404,17 +402,15 @@ class PresenceDestinationsRow(
buff.presence_destinations.append((self.state, self.destinations))
-class KeyedEduRow(
- BaseFederationRow,
- namedtuple(
- "KeyedEduRow",
- ("key", "edu"), # tuple(str) - the edu key passed to send_edu # Edu
- ),
-):
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class KeyedEduRow(BaseFederationRow):
"""Streams EDUs that have an associated key that is ued to clobber. For example,
typing EDUs clobber based on room_id.
"""
+ key: Tuple[str, ...] # the edu key passed to send_edu
+ edu: Edu
+
TypeId = "k"
@staticmethod
@@ -428,9 +424,12 @@ class KeyedEduRow(
buff.keyed_edus.setdefault(self.edu.destination, {})[self.key] = self.edu
-class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class EduRow(BaseFederationRow):
"""Streams EDUs that don't have keys. See KeyedEduRow"""
+ edu: Edu
+
TypeId = "e"
@staticmethod
@@ -453,14 +452,14 @@ _rowtypes: Tuple[Type[BaseFederationRow], ...] = (
TypeToRow = {Row.TypeId: Row for Row in _rowtypes}
-ParsedFederationStreamData = namedtuple(
- "ParsedFederationStreamData",
- (
- "presence_destinations", # list of tuples of UserPresenceState and destinations
- "keyed_edus", # dict of destination -> { key -> Edu }
- "edus", # dict of destination -> [Edu]
- ),
-)
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class ParsedFederationStreamData:
+ # list of tuples of UserPresenceState and destinations
+ presence_destinations: List[Tuple[UserPresenceState, List[str]]]
+ # dict of destination -> { key -> Edu }
+ keyed_edus: Dict[str, Dict[Tuple[str, ...], Edu]]
+ # dict of destination -> [Edu]
+ edus: Dict[str, List[Edu]]
def process_rows_for_federation(
diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py
index dc39e3537b..da1fbf8b63 100644
--- a/synapse/federation/transport/server/_base.py
+++ b/synapse/federation/transport/server/_base.py
@@ -22,13 +22,11 @@ from synapse.api.urls import FEDERATION_V1_PREFIX
from synapse.http.server import HttpServer, ServletCallback
from synapse.http.servlet import parse_json_object_from_request
from synapse.http.site import SynapseRequest
-from synapse.logging import opentracing
from synapse.logging.context import run_in_background
from synapse.logging.opentracing import (
- SynapseTags,
- start_active_span,
- start_active_span_from_request,
- tags,
+ set_tag,
+ span_context_from_request,
+ start_active_span_follows_from,
whitelisted_homeserver,
)
from synapse.server import HomeServer
@@ -279,30 +277,19 @@ class BaseFederationServlet:
logger.warning("authenticate_request failed: %s", e)
raise
- request_tags = {
- SynapseTags.REQUEST_ID: request.get_request_id(),
- tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
- tags.HTTP_METHOD: request.get_method(),
- tags.HTTP_URL: request.get_redacted_uri(),
- tags.PEER_HOST_IPV6: request.getClientIP(),
- "authenticated_entity": origin,
- "servlet_name": request.request_metrics.name,
- }
-
- # Only accept the span context if the origin is authenticated
- # and whitelisted
+ # update the active opentracing span with the authenticated entity
+ set_tag("authenticated_entity", origin)
+
+ # if the origin is authenticated and whitelisted, link to its span context
+ context = None
if origin and whitelisted_homeserver(origin):
- scope = start_active_span_from_request(
- request, "incoming-federation-request", tags=request_tags
- )
- else:
- scope = start_active_span(
- "incoming-federation-request", tags=request_tags
- )
+ context = span_context_from_request(request)
- with scope:
- opentracing.inject_response_headers(request.responseHeaders)
+ scope = start_active_span_follows_from(
+ "incoming-federation-request", contexts=(context,) if context else ()
+ )
+ with scope:
if origin and self.RATELIMIT:
with ratelimiter.ratelimit(origin) as d:
await d
|