diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index c0012c6872..38aa47963f 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -17,8 +17,6 @@ import logging
from collections import namedtuple
from typing import Iterable, List
-import six
-
from twisted.internet import defer
from twisted.internet.defer import Deferred, DeferredList
from twisted.python.failure import Failure
@@ -41,7 +39,7 @@ from synapse.types import JsonDict, get_domain_from_id
logger = logging.getLogger(__name__)
-class FederationBase(object):
+class FederationBase:
def __init__(self, hs):
self.hs = hs
@@ -93,8 +91,8 @@ class FederationBase(object):
# *actual* redacted copy to be on the safe side.)
redacted_event = prune_event(pdu)
if set(redacted_event.keys()) == set(pdu.keys()) and set(
- six.iterkeys(redacted_event.content)
- ) == set(six.iterkeys(pdu.content)):
+ redacted_event.content.keys()
+ ) == set(pdu.content.keys()):
logger.info(
"Event %s seems to have been redacted; using our redacted "
"copy",
@@ -294,7 +292,7 @@ def event_from_pdu_json(
assert_params_in_dict(pdu_json, ("type", "depth"))
depth = pdu_json["depth"]
- if not isinstance(depth, six.integer_types):
+ if not isinstance(depth, int):
raise SynapseError(400, "Depth %r not an intger" % (depth,), Codes.BAD_JSON)
if depth < 0:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 687cd841ac..38ac7ec699 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -87,7 +87,7 @@ class FederationClient(FederationBase):
self.transport_layer = hs.get_federation_transport_client()
self.hostname = hs.hostname
- self.signing_key = hs.config.signing_key[0]
+ self.signing_key = hs.signing_key
self._get_pdu_cache = ExpiringCache(
cache_name="get_pdu_cache",
@@ -135,7 +135,7 @@ class FederationClient(FederationBase):
and try the request anyway.
Returns:
- a Deferred which will eventually yield a JSON object from the
+ a Awaitable which will eventually yield a JSON object from the
response
"""
sent_queries_counter.labels(query_type).inc()
@@ -157,7 +157,7 @@ class FederationClient(FederationBase):
content (dict): The query content.
Returns:
- a Deferred which will eventually yield a JSON object from the
+ an Awaitable which will eventually yield a JSON object from the
response
"""
sent_queries_counter.labels("client_device_keys").inc()
@@ -180,7 +180,7 @@ class FederationClient(FederationBase):
content (dict): The query content.
Returns:
- a Deferred which will eventually yield a JSON object from the
+ an Awaitable which will eventually yield a JSON object from the
response
"""
sent_queries_counter.labels("client_one_time_keys").inc()
@@ -245,7 +245,7 @@ class FederationClient(FederationBase):
event_id: event to fetch
room_version: version of the room
outlier: Indicates whether the PDU is an `outlier`, i.e. if
- it's from an arbitary point in the context as opposed to part
+ it's from an arbitrary point in the context as opposed to part
of the current block of PDUs. Defaults to `False`
timeout: How long to try (in ms) each destination for before
moving to the next destination. None indicates no timeout.
@@ -351,7 +351,7 @@ class FederationClient(FederationBase):
outlier: bool = False,
include_none: bool = False,
) -> List[EventBase]:
- """Takes a list of PDUs and checks the signatures and hashs of each
+ """Takes a list of PDUs and checks the signatures and hashes of each
one. If a PDU fails its signature check then we check if we have it in
the database and if not then request if from the originating server of
that PDU.
@@ -374,29 +374,26 @@ class FederationClient(FederationBase):
"""
deferreds = self._check_sigs_and_hashes(room_version, pdus)
- @defer.inlineCallbacks
- def handle_check_result(pdu: EventBase, deferred: Deferred):
+ async def handle_check_result(pdu: EventBase, deferred: Deferred):
try:
- res = yield make_deferred_yieldable(deferred)
+ res = await make_deferred_yieldable(deferred)
except SynapseError:
res = None
if not res:
# Check local db.
- res = yield self.store.get_event(
+ res = await self.store.get_event(
pdu.event_id, allow_rejected=True, allow_none=True
)
if not res and pdu.origin != origin:
try:
- res = yield defer.ensureDeferred(
- self.get_pdu(
- destinations=[pdu.origin],
- event_id=pdu.event_id,
- room_version=room_version,
- outlier=outlier,
- timeout=10000,
- )
+ res = await self.get_pdu(
+ destinations=[pdu.origin],
+ event_id=pdu.event_id,
+ room_version=room_version,
+ outlier=outlier,
+ timeout=10000,
)
except SynapseError:
pass
@@ -903,7 +900,7 @@ class FederationClient(FederationBase):
party instance
Returns:
- Deferred[Dict[str, Any]]: The response from the remote server, or None if
+ Awaitable[Dict[str, Any]]: The response from the remote server, or None if
`remote_server` is the same as the local server_name
Raises:
@@ -995,24 +992,25 @@ class FederationClient(FederationBase):
raise RuntimeError("Failed to send to any server.")
- @defer.inlineCallbacks
- def get_room_complexity(self, destination, room_id):
+ async def get_room_complexity(
+ self, destination: str, room_id: str
+ ) -> Optional[dict]:
"""
Fetch the complexity of a remote room from another server.
Args:
- destination (str): The remote server
- room_id (str): The room ID to ask about.
+ destination: The remote server
+ room_id: The room ID to ask about.
Returns:
- Deferred[dict] or Deferred[None]: Dict contains the complexity
- metric versions, while None means we could not fetch the complexity.
+ Dict contains the complexity metric versions, while None means we
+ could not fetch the complexity.
"""
try:
- complexity = yield self.transport_layer.get_room_complexity(
+ complexity = await self.transport_layer.get_room_complexity(
destination=destination, room_id=room_id
)
- defer.returnValue(complexity)
+ return complexity
except CodeMessageException as e:
# We didn't manage to get it -- probably a 404. We are okay if other
# servers don't give it to us.
@@ -1029,4 +1027,4 @@ class FederationClient(FederationBase):
# If we don't manage to find it, return None. It's not an error if a
# server doesn't give it to us.
- defer.returnValue(None)
+ return None
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 32a8a2ee46..218df884b0 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -15,13 +15,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union
-
-import six
-from six import iteritems
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Awaitable,
+ Callable,
+ Dict,
+ List,
+ Match,
+ Optional,
+ Tuple,
+ Union,
+)
-from canonicaljson import json
-from prometheus_client import Counter
+from prometheus_client import Counter, Histogram
from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
@@ -55,10 +62,13 @@ from synapse.replication.http.federation import (
ReplicationGetQueryRestServlet,
)
from synapse.types import JsonDict, get_domain_from_id
-from synapse.util import glob_to_regex, unwrapFirstError
+from synapse.util import glob_to_regex, json_decoder, unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
# when processing incoming transactions, we try to handle multiple rooms in
# parallel, up to this limit.
TRANSACTION_CONCURRENCY_LIMIT = 10
@@ -73,6 +83,10 @@ received_queries_counter = Counter(
"synapse_federation_server_received_queries", "", ["type"]
)
+pdu_process_time = Histogram(
+ "synapse_federation_server_pdu_process_time", "Time taken to process an event",
+)
+
class FederationServer(FederationBase):
def __init__(self, hs):
@@ -94,6 +108,9 @@ class FederationServer(FederationBase):
# We cache responses to state queries, as they take a while and often
# come in waves.
self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000)
+ self._state_ids_resp_cache = ResponseCache(
+ hs, "state_ids_resp", timeout_ms=30000
+ )
async def on_backfill_request(
self, origin: str, room_id: str, versions: List[str], limit: int
@@ -274,21 +291,22 @@ class FederationServer(FederationBase):
for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
- with nested_logging_context(event_id):
- try:
- await self._handle_received_pdu(origin, pdu)
- pdu_results[event_id] = {}
- except FederationError as e:
- logger.warning("Error handling PDU %s: %s", event_id, e)
- pdu_results[event_id] = {"error": str(e)}
- except Exception as e:
- f = failure.Failure()
- pdu_results[event_id] = {"error": str(e)}
- logger.error(
- "Failed to handle PDU %s",
- event_id,
- exc_info=(f.type, f.value, f.getTracebackObject()),
- )
+ with pdu_process_time.time():
+ with nested_logging_context(event_id):
+ try:
+ await self._handle_received_pdu(origin, pdu)
+ pdu_results[event_id] = {}
+ except FederationError as e:
+ logger.warning("Error handling PDU %s: %s", event_id, e)
+ pdu_results[event_id] = {"error": str(e)}
+ except Exception as e:
+ f = failure.Failure()
+ pdu_results[event_id] = {"error": str(e)}
+ logger.error(
+ "Failed to handle PDU %s",
+ event_id,
+ exc_info=(f.type, f.value, f.getTracebackObject()),
+ )
await concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
@@ -360,10 +378,16 @@ class FederationServer(FederationBase):
if not in_room:
raise AuthError(403, "Host not in room.")
+ resp = await self._state_ids_resp_cache.wrap(
+ (room_id, event_id), self._on_state_ids_request_compute, room_id, event_id,
+ )
+
+ return 200, resp
+
+ async def _on_state_ids_request_compute(self, room_id, event_id):
state_ids = await self.handler.get_state_ids_for_pdu(room_id, event_id)
auth_chain_ids = await self.store.get_auth_chain_ids(state_ids)
-
- return 200, {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids}
+ return {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids}
async def _on_context_state_request_compute(
self, room_id: str, event_id: str
@@ -524,9 +548,9 @@ class FederationServer(FederationBase):
json_result = {} # type: Dict[str, Dict[str, dict]]
for user_id, device_keys in results.items():
for device_id, keys in device_keys.items():
- for key_id, json_bytes in keys.items():
+ for key_id, json_str in keys.items():
json_result.setdefault(user_id, {})[device_id] = {
- key_id: json.loads(json_bytes)
+ key_id: json_decoder.decode(json_str)
}
logger.info(
@@ -534,9 +558,9 @@ class FederationServer(FederationBase):
",".join(
(
"%s for %s:%s" % (key_id, user_id, device_id)
- for user_id, user_keys in iteritems(json_result)
- for device_id, device_keys in iteritems(user_keys)
- for key_id, _ in iteritems(device_keys)
+ for user_id, user_keys in json_result.items()
+ for device_id, device_keys in user_keys.items()
+ for key_id, _ in device_keys.items()
)
),
)
@@ -715,7 +739,7 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool:
# server name is a literal IP
allow_ip_literals = acl_event.content.get("allow_ip_literals", True)
if not isinstance(allow_ip_literals, bool):
- logger.warning("Ignorning non-bool allow_ip_literals flag")
+ logger.warning("Ignoring non-bool allow_ip_literals flag")
allow_ip_literals = True
if not allow_ip_literals:
# check for ipv6 literals. These start with '['.
@@ -729,7 +753,7 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool:
# next, check the deny list
deny = acl_event.content.get("deny", [])
if not isinstance(deny, (list, tuple)):
- logger.warning("Ignorning non-list deny ACL %s", deny)
+ logger.warning("Ignoring non-list deny ACL %s", deny)
deny = []
for e in deny:
if _acl_entry_matches(server_name, e):
@@ -739,7 +763,7 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool:
# then the allow list.
allow = acl_event.content.get("allow", [])
if not isinstance(allow, (list, tuple)):
- logger.warning("Ignorning non-list allow ACL %s", allow)
+ logger.warning("Ignoring non-list allow ACL %s", allow)
allow = []
for e in allow:
if _acl_entry_matches(server_name, e):
@@ -752,7 +776,7 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool:
def _acl_entry_matches(server_name: str, acl_entry: str) -> Match:
- if not isinstance(acl_entry, six.string_types):
+ if not isinstance(acl_entry, str):
logger.warning(
"Ignoring non-str ACL entry '%s' (is %s)", acl_entry, type(acl_entry)
)
@@ -761,16 +785,35 @@ def _acl_entry_matches(server_name: str, acl_entry: str) -> Match:
return regex.match(server_name)
-class FederationHandlerRegistry(object):
+class FederationHandlerRegistry:
"""Allows classes to register themselves as handlers for a given EDU or
query type for incoming federation traffic.
"""
- def __init__(self):
- self.edu_handlers = {}
- self.query_handlers = {}
+ def __init__(self, hs: "HomeServer"):
+ self.config = hs.config
+ self.http_client = hs.get_simple_http_client()
+ self.clock = hs.get_clock()
+ self._instance_name = hs.get_instance_name()
- def register_edu_handler(self, edu_type: str, handler: Callable[[str, dict], None]):
+ # These are safe to load in monolith mode, but will explode if we try
+ # and use them. However we have guards before we use them to ensure that
+ # we don't route to ourselves, and in monolith mode that will always be
+ # the case.
+ self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
+ self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
+
+ self.edu_handlers = (
+ {}
+ ) # type: Dict[str, Callable[[str, dict], Awaitable[None]]]
+ self.query_handlers = {} # type: Dict[str, Callable[[dict], Awaitable[None]]]
+
+ # Map from type to instance name that we should route EDU handling to.
+ self._edu_type_to_instance = {} # type: Dict[str, str]
+
+ def register_edu_handler(
+ self, edu_type: str, handler: Callable[[str, dict], Awaitable[None]]
+ ):
"""Sets the handler callable that will be used to handle an incoming
federation EDU of the given type.
@@ -807,66 +850,56 @@ class FederationHandlerRegistry(object):
self.query_handlers[query_type] = handler
+ def register_instance_for_edu(self, edu_type: str, instance_name: str):
+ """Register that the EDU handler is on a different instance than master.
+ """
+ self._edu_type_to_instance[edu_type] = instance_name
+
async def on_edu(self, edu_type: str, origin: str, content: dict):
+ if not self.config.use_presence and edu_type == "m.presence":
+ return
+
+ # Check if we have a handler on this instance
handler = self.edu_handlers.get(edu_type)
- if not handler:
- logger.warning("No handler registered for EDU type %s", edu_type)
+ if handler:
+ with start_active_span_from_edu(content, "handle_edu"):
+ try:
+ await handler(origin, content)
+ except SynapseError as e:
+ logger.info("Failed to handle edu %r: %r", edu_type, e)
+ except Exception:
+ logger.exception("Failed to handle edu %r", edu_type)
return
- with start_active_span_from_edu(content, "handle_edu"):
+ # Check if we can route it somewhere else that isn't us
+ route_to = self._edu_type_to_instance.get(edu_type, "master")
+ if route_to != self._instance_name:
try:
- await handler(origin, content)
+ await self._send_edu(
+ instance_name=route_to,
+ edu_type=edu_type,
+ origin=origin,
+ content=content,
+ )
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception:
logger.exception("Failed to handle edu %r", edu_type)
-
- def on_query(self, query_type: str, args: dict) -> defer.Deferred:
- handler = self.query_handlers.get(query_type)
- if not handler:
- logger.warning("No handler registered for query type %s", query_type)
- raise NotFoundError("No handler for Query type '%s'" % (query_type,))
-
- return handler(args)
-
-
-class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
- """A FederationHandlerRegistry for worker processes.
-
- When receiving EDU or queries it will check if an appropriate handler has
- been registered on the worker, if there isn't one then it calls off to the
- master process.
- """
-
- def __init__(self, hs):
- self.config = hs.config
- self.http_client = hs.get_simple_http_client()
- self.clock = hs.get_clock()
-
- self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
- self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
-
- super(ReplicationFederationHandlerRegistry, self).__init__()
-
- async def on_edu(self, edu_type: str, origin: str, content: dict):
- """Overrides FederationHandlerRegistry
- """
- if not self.config.use_presence and edu_type == "m.presence":
return
- handler = self.edu_handlers.get(edu_type)
- if handler:
- return await super(ReplicationFederationHandlerRegistry, self).on_edu(
- edu_type, origin, content
- )
-
- return await self._send_edu(edu_type=edu_type, origin=origin, content=content)
+ # Oh well, let's just log and move on.
+ logger.warning("No handler registered for EDU type %s", edu_type)
async def on_query(self, query_type: str, args: dict):
- """Overrides FederationHandlerRegistry
- """
handler = self.query_handlers.get(query_type)
if handler:
return await handler(args)
- return await self._get_query_client(query_type=query_type, args=args)
+ # Check if we can route it somewhere else that isn't us
+ if self._instance_name == "master":
+ return await self._get_query_client(query_type=query_type, args=args)
+
+ # Uh oh, no handler! Let's raise an exception so the request returns an
+ # error.
+ logger.warning("No handler registered for query type %s", query_type)
+ raise NotFoundError("No handler for Query type '%s'" % (query_type,))
diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py
index d68b4bd670..079e2b2fe0 100644
--- a/synapse/federation/persistence.py
+++ b/synapse/federation/persistence.py
@@ -20,13 +20,16 @@ These actions are mostly only used by the :py:mod:`.replication` module.
"""
import logging
+from typing import Optional, Tuple
+from synapse.federation.units import Transaction
from synapse.logging.utils import log_function
+from synapse.types import JsonDict
logger = logging.getLogger(__name__)
-class TransactionActions(object):
+class TransactionActions:
""" Defines persistence actions that relate to handling Transactions.
"""
@@ -34,30 +37,32 @@ class TransactionActions(object):
self.store = datastore
@log_function
- def have_responded(self, origin, transaction):
- """ Have we already responded to a transaction with the same id and
+ async def have_responded(
+ self, origin: str, transaction: Transaction
+ ) -> Optional[Tuple[int, JsonDict]]:
+ """Have we already responded to a transaction with the same id and
origin?
Returns:
- Deferred: Results in `None` if we have not previously responded to
- this transaction or a 2-tuple of `(int, dict)` representing the
- response code and response body.
+ `None` if we have not previously responded to this transaction or a
+ 2-tuple of `(int, dict)` representing the response code and response body.
"""
- if not transaction.transaction_id:
+ transaction_id = transaction.transaction_id # type: ignore
+ if not transaction_id:
raise RuntimeError("Cannot persist a transaction with no transaction_id")
- return self.store.get_received_txn_response(transaction.transaction_id, origin)
+ return await self.store.get_received_txn_response(transaction_id, origin)
@log_function
- def set_response(self, origin, transaction, code, response):
- """ Persist how we responded to a transaction.
-
- Returns:
- Deferred
+ async def set_response(
+ self, origin: str, transaction: Transaction, code: int, response: JsonDict
+ ) -> None:
+ """Persist how we responded to a transaction.
"""
- if not transaction.transaction_id:
+ transaction_id = transaction.transaction_id # type: ignore
+ if not transaction_id:
raise RuntimeError("Cannot persist a transaction with no transaction_id")
- return self.store.set_received_txn_response(
- transaction.transaction_id, origin, code, response
+ await self.store.set_received_txn_response(
+ transaction_id, origin, code, response
)
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 52f4f54215..8e46957d15 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -33,14 +33,12 @@ import logging
from collections import namedtuple
from typing import Dict, List, Tuple, Type
-from six import iteritems
-
from sortedcontainers import SortedDict
from twisted.internet import defer
+from synapse.api.presence import UserPresenceState
from synapse.metrics import LaterGauge
-from synapse.storage.presence import UserPresenceState
from synapse.util.metrics import Measure
from .units import Edu
@@ -48,7 +46,7 @@ from .units import Edu
logger = logging.getLogger(__name__)
-class FederationRemoteSendQueue(object):
+class FederationRemoteSendQueue:
"""A drop in replacement for FederationSender"""
def __init__(self, hs):
@@ -57,6 +55,11 @@ class FederationRemoteSendQueue(object):
self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id
+ # We may have multiple federation sender instances, so we need to track
+ # their positions separately.
+ self._sender_instances = hs.config.worker.federation_shard_config.instances
+ self._sender_positions = {}
+
# Pending presence map user_id -> UserPresenceState
self.presence_map = {} # type: Dict[str, UserPresenceState]
@@ -263,7 +266,14 @@ class FederationRemoteSendQueue(object):
def get_current_token(self):
return self.pos - 1
- def federation_ack(self, token):
+ def federation_ack(self, instance_name, token):
+ if self._sender_instances:
+ # If we have configured multiple federation sender instances we need
+ # to track their positions separately, and only clear the queue up
+ # to the token all instances have acked.
+ self._sender_positions[instance_name] = token
+ token = min(self._sender_positions.values())
+
self._clear_queue_before_pos(token)
async def get_replication_rows(
@@ -327,7 +337,7 @@ class FederationRemoteSendQueue(object):
# stream position.
keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]}
- for ((destination, edu_key), pos) in iteritems(keyed_edus):
+ for ((destination, edu_key), pos) in keyed_edus.items():
rows.append(
(
pos,
@@ -355,13 +365,13 @@ class FederationRemoteSendQueue(object):
)
-class BaseFederationRow(object):
+class BaseFederationRow:
"""Base class for rows to be sent in the federation stream.
Specifies how to identify, serialize and deserialize the different types.
"""
- TypeId = "" # Unique string that ids the type. Must be overriden in sub classes.
+ TypeId = "" # Unique string that ids the type. Must be overridden in sub classes.
@staticmethod
def from_data(data):
@@ -530,10 +540,10 @@ def process_rows_for_federation(transaction_queue, rows):
states=[state], destinations=destinations
)
- for destination, edu_map in iteritems(buff.keyed_edus):
+ for destination, edu_map in buff.keyed_edus.items():
for key, edu in edu_map.items():
transaction_queue.send_edu(edu, key)
- for destination, edu_list in iteritems(buff.edus):
+ for destination, edu_list in buff.edus.items():
for edu in edu_list:
transaction_queue.send_edu(edu, None)
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index d473576902..552519e82c 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -16,14 +16,13 @@
import logging
from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple
-from six import itervalues
-
from prometheus_client import Counter
from twisted.internet import defer
import synapse
import synapse.metrics
+from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager
@@ -41,7 +40,6 @@ from synapse.metrics import (
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage.presence import UserPresenceState
from synapse.types import ReadReceipt
from synapse.util.metrics import Measure, measure_func
@@ -58,7 +56,7 @@ sent_pdus_destination_dist_total = Counter(
)
-class FederationSender(object):
+class FederationSender:
def __init__(self, hs: "synapse.server.HomeServer"):
self.hs = hs
self.server_name = hs.hostname
@@ -71,6 +69,9 @@ class FederationSender(object):
self._transaction_manager = TransactionManager(hs)
+ self._instance_name = hs.get_instance_name()
+ self._federation_shard_config = hs.config.worker.federation_shard_config
+
# map from destination to PerDestinationQueue
self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue]
@@ -107,8 +108,6 @@ class FederationSender(object):
),
)
- self._order = 1
-
self._is_processing = False
self._last_poked_id = -1
@@ -193,7 +192,13 @@ class FederationSender(object):
)
return
- destinations = set(destinations)
+ destinations = {
+ d
+ for d in destinations
+ if self._federation_shard_config.should_handle(
+ self._instance_name, d
+ )
+ }
if send_on_behalf_of is not None:
# If we are sending the event on behalf of another server
@@ -203,7 +208,15 @@ class FederationSender(object):
logger.debug("Sending %s to %r", event, destinations)
- self._send_pdu(event, destinations)
+ if destinations:
+ self._send_pdu(event, destinations)
+
+ now = self.clock.time_msec()
+ ts = await self.store.get_received_ts(event.event_id)
+
+ synapse.metrics.event_processing_lag_by_event.labels(
+ "federation_sender"
+ ).observe((now - ts) / 1000)
async def handle_room_events(events: Iterable[EventBase]) -> None:
with Measure(self.clock, "handle_room_events"):
@@ -218,7 +231,7 @@ class FederationSender(object):
defer.gatherResults(
[
run_in_background(handle_room_events, evs)
- for evs in itervalues(events_by_room)
+ for evs in events_by_room.values()
],
consumeErrors=True,
)
@@ -257,9 +270,6 @@ class FederationSender(object):
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.
- order = self._order
- self._order += 1
-
destinations = set(destinations)
destinations.discard(self.server_name)
logger.debug("Sending to: %s", str(destinations))
@@ -271,10 +281,9 @@ class FederationSender(object):
sent_pdus_destination_dist_count.inc()
for destination in destinations:
- self._get_per_destination_queue(destination).send_pdu(pdu, order)
+ self._get_per_destination_queue(destination).send_pdu(pdu)
- @defer.inlineCallbacks
- def send_read_receipt(self, receipt: ReadReceipt):
+ async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""Send a RR to any other servers in the room
Args:
@@ -315,8 +324,13 @@ class FederationSender(object):
room_id = receipt.room_id
# Work out which remote servers should be poked and poke them.
- domains = yield self.state.get_current_hosts_in_room(room_id)
- domains = [d for d in domains if d != self.server_name]
+ domains_set = await self.state.get_current_hosts_in_room(room_id)
+ domains = [
+ d
+ for d in domains_set
+ if d != self.server_name
+ and self._federation_shard_config.should_handle(self._instance_name, d)
+ ]
if not domains:
return
@@ -365,8 +379,7 @@ class FederationSender(object):
queue.flush_read_receipts_for_room(room_id)
@preserve_fn # the caller should not yield on this
- @defer.inlineCallbacks
- def send_presence(self, states: List[UserPresenceState]):
+ async def send_presence(self, states: List[UserPresenceState]):
"""Send the new presence states to the appropriate destinations.
This actually queues up the presence states ready for sending and
@@ -401,7 +414,7 @@ class FederationSender(object):
if not states_map:
break
- yield self._process_presence_inner(list(states_map.values()))
+ await self._process_presence_inner(list(states_map.values()))
except Exception:
logger.exception("Error sending presence states to servers")
finally:
@@ -421,20 +434,29 @@ class FederationSender(object):
for destination in destinations:
if destination == self.server_name:
continue
+ if not self._federation_shard_config.should_handle(
+ self._instance_name, destination
+ ):
+ continue
self._get_per_destination_queue(destination).send_presence(states)
@measure_func("txnqueue._process_presence")
- @defer.inlineCallbacks
- def _process_presence_inner(self, states: List[UserPresenceState]):
+ async def _process_presence_inner(self, states: List[UserPresenceState]):
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
"""
- hosts_and_states = yield get_interested_remotes(self.store, states, self.state)
+ hosts_and_states = await get_interested_remotes(self.store, states, self.state)
for destinations, states in hosts_and_states:
for destination in destinations:
if destination == self.server_name:
continue
+
+ if not self._federation_shard_config.should_handle(
+ self._instance_name, destination
+ ):
+ continue
+
self._get_per_destination_queue(destination).send_presence(states)
def build_and_send_edu(
@@ -456,6 +478,11 @@ class FederationSender(object):
logger.info("Not sending EDU to ourselves")
return
+ if not self._federation_shard_config.should_handle(
+ self._instance_name, destination
+ ):
+ return
+
edu = Edu(
origin=self.server_name,
destination=destination,
@@ -472,6 +499,11 @@ class FederationSender(object):
edu: edu to send
key: clobbering key for this edu
"""
+ if not self._federation_shard_config.should_handle(
+ self._instance_name, edu.destination
+ ):
+ return
+
queue = self._get_per_destination_queue(edu.destination)
if key:
queue.send_keyed_edu(edu, key)
@@ -483,6 +515,11 @@ class FederationSender(object):
logger.warning("Not sending device update to ourselves")
return
+ if not self._federation_shard_config.should_handle(
+ self._instance_name, destination
+ ):
+ return
+
self._get_per_destination_queue(destination).attempt_new_transaction()
def wake_destination(self, destination: str):
@@ -496,6 +533,11 @@ class FederationSender(object):
logger.warning("Not waking up ourselves")
return
+ if not self._federation_shard_config.should_handle(
+ self._instance_name, destination
+ ):
+ return
+
self._get_per_destination_queue(destination).attempt_new_transaction()
@staticmethod
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 4e698981a4..defc228c23 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -24,12 +24,12 @@ from synapse.api.errors import (
HttpResponseException,
RequestSendFailed,
)
+from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
from synapse.federation.units import Edu
from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage.presence import UserPresenceState
from synapse.types import ReadReceipt
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
@@ -53,7 +53,7 @@ sent_edus_by_type = Counter(
)
-class PerDestinationQueue(object):
+class PerDestinationQueue:
"""
Manages the per-destination transmission queues.
@@ -74,12 +74,26 @@ class PerDestinationQueue(object):
self._clock = hs.get_clock()
self._store = hs.get_datastore()
self._transaction_manager = transaction_manager
+ self._instance_name = hs.get_instance_name()
+ self._federation_shard_config = hs.config.worker.federation_shard_config
+
+ self._should_send_on_this_instance = True
+ if not self._federation_shard_config.should_handle(
+ self._instance_name, destination
+ ):
+ # We don't raise an exception here to avoid taking out any other
+ # processing. We have a guard in `attempt_new_transaction` that
+ # ensure we don't start sending stuff.
+ logger.error(
+ "Create a per destination queue for %s on wrong worker", destination,
+ )
+ self._should_send_on_this_instance = False
self._destination = destination
self.transmission_loop_running = False
- # a list of tuples of (pending pdu, order)
- self._pending_pdus = [] # type: List[Tuple[EventBase, int]]
+ # a list of pending PDUs
+ self._pending_pdus = [] # type: List[EventBase]
# XXX this is never actually used: see
# https://github.com/matrix-org/synapse/issues/7549
@@ -118,18 +132,17 @@ class PerDestinationQueue(object):
+ len(self._pending_edus_keyed)
)
- def send_pdu(self, pdu: EventBase, order: int) -> None:
- """Add a PDU to the queue, and start the transmission loop if neccessary
+ def send_pdu(self, pdu: EventBase) -> None:
+ """Add a PDU to the queue, and start the transmission loop if necessary
Args:
pdu: pdu to send
- order
"""
- self._pending_pdus.append((pdu, order))
+ self._pending_pdus.append(pdu)
self.attempt_new_transaction()
def send_presence(self, states: Iterable[UserPresenceState]) -> None:
- """Add presence updates to the queue. Start the transmission loop if neccessary.
+ """Add presence updates to the queue. Start the transmission loop if necessary.
Args:
states: presence to send
@@ -171,7 +184,7 @@ class PerDestinationQueue(object):
returns immediately. Otherwise kicks off the process of sending a
transaction in the background.
"""
- # list of (pending_pdu, deferred, order)
+
if self.transmission_loop_running:
# XXX: this can get stuck on by a never-ending
# request at which point pending_pdus just keeps growing.
@@ -180,6 +193,14 @@ class PerDestinationQueue(object):
logger.debug("TX [%s] Transaction already in progress", self._destination)
return
+ if not self._should_send_on_this_instance:
+ # We don't raise an exception here to avoid taking out any other
+ # processing.
+ logger.error(
+ "Trying to start a transaction to %s on wrong worker", self._destination
+ )
+ return
+
logger.debug("TX [%s] Starting transaction loop", self._destination)
run_as_background_process(
@@ -188,7 +209,7 @@ class PerDestinationQueue(object):
)
async def _transaction_transmission_loop(self) -> None:
- pending_pdus = [] # type: List[Tuple[EventBase, int]]
+ pending_pdus = [] # type: List[EventBase]
try:
self.transmission_loop_running = True
@@ -315,6 +336,28 @@ class PerDestinationQueue(object):
(e.retry_last_ts + e.retry_interval) / 1000.0
),
)
+
+ if e.retry_interval > 60 * 60 * 1000:
+ # we won't retry for another hour!
+ # (this suggests a significant outage)
+ # We drop pending PDUs and EDUs because otherwise they will
+ # rack up indefinitely.
+ # Note that:
+ # - the EDUs that are being dropped here are those that we can
+ # afford to drop (specifically, only typing notifications,
+ # read receipts and presence updates are being dropped here)
+ # - Other EDUs such as to_device messages are queued with a
+ # different mechanism
+ # - this is all volatile state that would be lost if the
+ # federation sender restarted anyway
+
+ # dropping read receipts is a bit sad but should be solved
+ # through another mechanism, because this is all volatile!
+ self._pending_pdus = []
+ self._pending_edus = []
+ self._pending_edus_keyed = {}
+ self._pending_presence = {}
+ self._pending_rrs = {}
except FederationDeniedError as e:
logger.info(e)
except HttpResponseException as e:
@@ -329,13 +372,13 @@ class PerDestinationQueue(object):
"TX [%s] Failed to send transaction: %s", self._destination, e
)
- for p, _ in pending_pdus:
+ for p in pending_pdus:
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)
except Exception:
logger.exception("TX [%s] Failed to send transaction", self._destination)
- for p, _ in pending_pdus:
+ for p in pending_pdus:
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index a2752a54a5..c84072ab73 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -15,8 +15,6 @@
import logging
from typing import TYPE_CHECKING, List
-from canonicaljson import json
-
from synapse.api.errors import HttpResponseException
from synapse.events import EventBase
from synapse.federation.persistence import TransactionActions
@@ -28,6 +26,7 @@ from synapse.logging.opentracing import (
tags,
whitelisted_homeserver,
)
+from synapse.util import json_decoder
from synapse.util.metrics import measure_func
if TYPE_CHECKING:
@@ -36,7 +35,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
-class TransactionManager(object):
+class TransactionManager:
"""Helper class which handles building and sending transactions
shared between PerDestinationQueue objects
@@ -54,33 +53,34 @@ class TransactionManager(object):
@measure_func("_send_new_transaction")
async def send_new_transaction(
- self, destination: str, pending_pdus: List[EventBase], pending_edus: List[Edu]
- ):
+ self, destination: str, pdus: List[EventBase], edus: List[Edu],
+ ) -> bool:
+ """
+ Args:
+ destination: The destination to send to (e.g. 'example.org')
+ pdus: In-order list of PDUs to send
+ edus: List of EDUs to send
+
+ Returns:
+ True iff the transaction was successful
+ """
# Make a transaction-sending opentracing span. This span follows on from
# all the edus in that transaction. This needs to be done since there is
# no active span here, so if the edus were not received by the remote the
# span would have no causality and it would be forgotten.
- # The span_contexts is a generator so that it won't be evaluated if
- # opentracing is disabled. (Yay speed!)
span_contexts = []
keep_destination = whitelisted_homeserver(destination)
- for edu in pending_edus:
+ for edu in edus:
context = edu.get_context()
if context:
- span_contexts.append(extract_text_map(json.loads(context)))
+ span_contexts.append(extract_text_map(json_decoder.decode(context)))
if keep_destination:
edu.strip_context()
with start_active_span_follows_from("send_transaction", span_contexts):
-
- # Sort based on the order field
- pending_pdus.sort(key=lambda t: t[1])
- pdus = [x[0] for x in pending_pdus]
- edus = pending_edus
-
success = True
logger.debug("TX [%s] _attempt_new_transaction", destination)
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 060bf07197..17a10f622e 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -15,12 +15,9 @@
# limitations under the License.
import logging
+import urllib
from typing import Any, Dict, Optional
-from six.moves import urllib
-
-from twisted.internet import defer
-
from synapse.api.constants import Membership
from synapse.api.errors import Codes, HttpResponseException, SynapseError
from synapse.api.urls import (
@@ -33,7 +30,7 @@ from synapse.logging.utils import log_function
logger = logging.getLogger(__name__)
-class TransportLayerClient(object):
+class TransportLayerClient:
"""Sends federation HTTP requests to other servers"""
def __init__(self, hs):
@@ -52,7 +49,7 @@ class TransportLayerClient(object):
event_id (str): The event we want the context at.
Returns:
- Deferred: Results in a dict received from the remote homeserver.
+ Awaitable: Results in a dict received from the remote homeserver.
"""
logger.debug("get_room_state_ids dest=%s, room=%s", destination, room_id)
@@ -76,7 +73,7 @@ class TransportLayerClient(object):
giving up. None indicates no timeout.
Returns:
- Deferred: Results in a dict received from the remote homeserver.
+ Awaitable: Results in a dict received from the remote homeserver.
"""
logger.debug("get_pdu dest=%s, event_id=%s", destination, event_id)
@@ -97,7 +94,7 @@ class TransportLayerClient(object):
limit (int)
Returns:
- Deferred: Results in a dict received from the remote homeserver.
+ Awaitable: Results in a dict received from the remote homeserver.
"""
logger.debug(
"backfill dest=%s, room_id=%s, event_tuples=%r, limit=%s",
@@ -119,16 +116,15 @@ class TransportLayerClient(object):
destination, path=path, args=args, try_trailing_slash_on_400=True
)
- @defer.inlineCallbacks
@log_function
- def send_transaction(self, transaction, json_data_callback=None):
+ async def send_transaction(self, transaction, json_data_callback=None):
""" Sends the given Transaction to its destination
Args:
transaction (Transaction)
Returns:
- Deferred: Succeeds when we get a 2xx HTTP response. The result
+ Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
Fails with ``HTTPRequestException`` if we get an HTTP response
@@ -155,7 +151,7 @@ class TransportLayerClient(object):
path = _create_v1_path("/send/%s", transaction.transaction_id)
- response = yield self.client.put_json(
+ response = await self.client.put_json(
transaction.destination,
path=path,
data=json_data,
@@ -167,14 +163,13 @@ class TransportLayerClient(object):
return response
- @defer.inlineCallbacks
@log_function
- def make_query(
+ async def make_query(
self, destination, query_type, args, retry_on_dns_fail, ignore_backoff=False
):
path = _create_v1_path("/query/%s", query_type)
- content = yield self.client.get_json(
+ content = await self.client.get_json(
destination=destination,
path=path,
args=args,
@@ -185,9 +180,10 @@ class TransportLayerClient(object):
return content
- @defer.inlineCallbacks
@log_function
- def make_membership_event(self, destination, room_id, user_id, membership, params):
+ async def make_membership_event(
+ self, destination, room_id, user_id, membership, params
+ ):
"""Asks a remote server to build and sign us a membership event
Note that this does not append any events to any graphs.
@@ -201,7 +197,7 @@ class TransportLayerClient(object):
request.
Returns:
- Deferred: Succeeds when we get a 2xx HTTP response. The result
+ Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body (ie, the new event).
Fails with ``HTTPRequestException`` if we get an HTTP response
@@ -232,7 +228,7 @@ class TransportLayerClient(object):
ignore_backoff = True
retry_on_dns_fail = True
- content = yield self.client.get_json(
+ content = await self.client.get_json(
destination=destination,
path=path,
args=params,
@@ -243,34 +239,31 @@ class TransportLayerClient(object):
return content
- @defer.inlineCallbacks
@log_function
- def send_join_v1(self, destination, room_id, event_id, content):
+ async def send_join_v1(self, destination, room_id, event_id, content):
path = _create_v1_path("/send_join/%s/%s", room_id, event_id)
- response = yield self.client.put_json(
+ response = await self.client.put_json(
destination=destination, path=path, data=content
)
return response
- @defer.inlineCallbacks
@log_function
- def send_join_v2(self, destination, room_id, event_id, content):
+ async def send_join_v2(self, destination, room_id, event_id, content):
path = _create_v2_path("/send_join/%s/%s", room_id, event_id)
- response = yield self.client.put_json(
+ response = await self.client.put_json(
destination=destination, path=path, data=content
)
return response
- @defer.inlineCallbacks
@log_function
- def send_leave_v1(self, destination, room_id, event_id, content):
+ async def send_leave_v1(self, destination, room_id, event_id, content):
path = _create_v1_path("/send_leave/%s/%s", room_id, event_id)
- response = yield self.client.put_json(
+ response = await self.client.put_json(
destination=destination,
path=path,
data=content,
@@ -283,12 +276,11 @@ class TransportLayerClient(object):
return response
- @defer.inlineCallbacks
@log_function
- def send_leave_v2(self, destination, room_id, event_id, content):
+ async def send_leave_v2(self, destination, room_id, event_id, content):
path = _create_v2_path("/send_leave/%s/%s", room_id, event_id)
- response = yield self.client.put_json(
+ response = await self.client.put_json(
destination=destination,
path=path,
data=content,
@@ -301,31 +293,28 @@ class TransportLayerClient(object):
return response
- @defer.inlineCallbacks
@log_function
- def send_invite_v1(self, destination, room_id, event_id, content):
+ async def send_invite_v1(self, destination, room_id, event_id, content):
path = _create_v1_path("/invite/%s/%s", room_id, event_id)
- response = yield self.client.put_json(
+ response = await self.client.put_json(
destination=destination, path=path, data=content, ignore_backoff=True
)
return response
- @defer.inlineCallbacks
@log_function
- def send_invite_v2(self, destination, room_id, event_id, content):
+ async def send_invite_v2(self, destination, room_id, event_id, content):
path = _create_v2_path("/invite/%s/%s", room_id, event_id)
- response = yield self.client.put_json(
+ response = await self.client.put_json(
destination=destination, path=path, data=content, ignore_backoff=True
)
return response
- @defer.inlineCallbacks
@log_function
- def get_public_rooms(
+ async def get_public_rooms(
self,
remote_server: str,
limit: Optional[int] = None,
@@ -356,7 +345,7 @@ class TransportLayerClient(object):
data["filter"] = search_filter
try:
- response = yield self.client.post_json(
+ response = await self.client.post_json(
destination=remote_server, path=path, data=data, ignore_backoff=True
)
except HttpResponseException as e:
@@ -382,7 +371,7 @@ class TransportLayerClient(object):
args["since"] = [since_token]
try:
- response = yield self.client.get_json(
+ response = await self.client.get_json(
destination=remote_server, path=path, args=args, ignore_backoff=True
)
except HttpResponseException as e:
@@ -397,29 +386,26 @@ class TransportLayerClient(object):
return response
- @defer.inlineCallbacks
@log_function
- def exchange_third_party_invite(self, destination, room_id, event_dict):
+ async def exchange_third_party_invite(self, destination, room_id, event_dict):
path = _create_v1_path("/exchange_third_party_invite/%s", room_id)
- response = yield self.client.put_json(
+ response = await self.client.put_json(
destination=destination, path=path, data=event_dict
)
return response
- @defer.inlineCallbacks
@log_function
- def get_event_auth(self, destination, room_id, event_id):
+ async def get_event_auth(self, destination, room_id, event_id):
path = _create_v1_path("/event_auth/%s/%s", room_id, event_id)
- content = yield self.client.get_json(destination=destination, path=path)
+ content = await self.client.get_json(destination=destination, path=path)
return content
- @defer.inlineCallbacks
@log_function
- def query_client_keys(self, destination, query_content, timeout):
+ async def query_client_keys(self, destination, query_content, timeout):
"""Query the device keys for a list of user ids hosted on a remote
server.
@@ -454,14 +440,13 @@ class TransportLayerClient(object):
"""
path = _create_v1_path("/user/keys/query")
- content = yield self.client.post_json(
+ content = await self.client.post_json(
destination=destination, path=path, data=query_content, timeout=timeout
)
return content
- @defer.inlineCallbacks
@log_function
- def query_user_devices(self, destination, user_id, timeout):
+ async def query_user_devices(self, destination, user_id, timeout):
"""Query the devices for a user id hosted on a remote server.
Response:
@@ -494,14 +479,13 @@ class TransportLayerClient(object):
"""
path = _create_v1_path("/user/devices/%s", user_id)
- content = yield self.client.get_json(
+ content = await self.client.get_json(
destination=destination, path=path, timeout=timeout
)
return content
- @defer.inlineCallbacks
@log_function
- def claim_client_keys(self, destination, query_content, timeout):
+ async def claim_client_keys(self, destination, query_content, timeout):
"""Claim one-time keys for a list of devices hosted on a remote server.
Request:
@@ -533,14 +517,13 @@ class TransportLayerClient(object):
path = _create_v1_path("/user/keys/claim")
- content = yield self.client.post_json(
+ content = await self.client.post_json(
destination=destination, path=path, data=query_content, timeout=timeout
)
return content
- @defer.inlineCallbacks
@log_function
- def get_missing_events(
+ async def get_missing_events(
self,
destination,
room_id,
@@ -552,7 +535,7 @@ class TransportLayerClient(object):
):
path = _create_v1_path("/get_missing_events/%s", room_id)
- content = yield self.client.post_json(
+ content = await self.client.post_json(
destination=destination,
path=path,
data={
@@ -747,7 +730,7 @@ class TransportLayerClient(object):
def remove_user_from_group(
self, destination, group_id, requester_user_id, user_id, content
):
- """Remove a user fron a group
+ """Remove a user from a group
"""
path = _create_v1_path("/groups/%s/users/%s/remove", group_id, user_id)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index af4595498c..9325e0f857 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -20,8 +20,6 @@ import logging
import re
from typing import Optional, Tuple, Type
-from twisted.internet.defer import maybeDeferred
-
import synapse
from synapse.api.errors import Codes, FederationDeniedError, SynapseError
from synapse.api.room_versions import RoomVersions
@@ -102,14 +100,14 @@ class NoAuthenticationError(AuthenticationError):
pass
-class Authenticator(object):
+class Authenticator:
def __init__(self, hs: HomeServer):
self._clock = hs.get_clock()
self.keyring = hs.get_keyring()
self.server_name = hs.hostname
self.store = hs.get_datastore()
self.federation_domain_whitelist = hs.config.federation_domain_whitelist
- self.notifer = hs.get_notifier()
+ self.notifier = hs.get_notifier()
self.replication_client = None
if hs.config.worker.worker_app:
@@ -175,7 +173,7 @@ class Authenticator(object):
await self.store.set_destination_retry_timings(origin, None, 0, 0)
# Inform the relevant places that the remote server is back up.
- self.notifer.notify_remote_server_up(origin)
+ self.notifier.notify_remote_server_up(origin)
if self.replication_client:
# If we're on a worker we try and inform master about this. The
# replication client doesn't hook into the notifier to avoid
@@ -230,7 +228,7 @@ def _parse_auth_header(header_bytes):
)
-class BaseFederationServlet(object):
+class BaseFederationServlet:
"""Abstract base class for federation servlet classes.
The servlet object should have a PATH attribute which takes the form of a regexp to
@@ -340,6 +338,12 @@ class BaseFederationServlet(object):
if origin:
with ratelimiter.ratelimit(origin) as d:
await d
+ if request._disconnected:
+ logger.warning(
+ "client disconnected before we started processing "
+ "request"
+ )
+ return -1, None
response = await func(
origin, content, request.args, *args, **kwargs
)
@@ -361,11 +365,7 @@ class BaseFederationServlet(object):
continue
server.register_paths(
- method,
- (pattern,),
- self._wrap(code),
- self.__class__.__name__,
- trace=False,
+ method, (pattern,), self._wrap(code), self.__class__.__name__,
)
@@ -799,12 +799,8 @@ class PublicRoomList(BaseFederationServlet):
# zero is a special value which corresponds to no limit.
limit = None
- data = await maybeDeferred(
- self.handler.get_local_public_room_list,
- limit,
- since_token,
- network_tuple=network_tuple,
- from_federation=True,
+ data = await self.handler.get_local_public_room_list(
+ limit, since_token, network_tuple=network_tuple, from_federation=True
)
return 200, data
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 6b32e0dcbf..64d98fc8f6 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -107,9 +107,7 @@ class Transaction(JsonEncodedObject):
if "edus" in kwargs and not kwargs["edus"]:
del kwargs["edus"]
- super(Transaction, self).__init__(
- transaction_id=transaction_id, pdus=pdus, **kwargs
- )
+ super().__init__(transaction_id=transaction_id, pdus=pdus, **kwargs)
@staticmethod
def create_new(pdus, **kwargs):
|