summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_base.py8
-rw-r--r--synapse/federation/federation_server.py48
-rw-r--r--synapse/federation/send_queue.py8
-rw-r--r--synapse/federation/sender/__init__.py14
-rw-r--r--synapse/federation/transport/client.py19
-rw-r--r--synapse/federation/transport/server.py53
6 files changed, 110 insertions, 40 deletions
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py

index c0012c6872..420df2385f 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py
@@ -17,8 +17,6 @@ import logging from collections import namedtuple from typing import Iterable, List -import six - from twisted.internet import defer from twisted.internet.defer import Deferred, DeferredList from twisted.python.failure import Failure @@ -93,8 +91,8 @@ class FederationBase(object): # *actual* redacted copy to be on the safe side.) redacted_event = prune_event(pdu) if set(redacted_event.keys()) == set(pdu.keys()) and set( - six.iterkeys(redacted_event.content) - ) == set(six.iterkeys(pdu.content)): + redacted_event.content.keys() + ) == set(pdu.content.keys()): logger.info( "Event %s seems to have been redacted; using our redacted " "copy", @@ -294,7 +292,7 @@ def event_from_pdu_json( assert_params_in_dict(pdu_json, ("type", "depth")) depth = pdu_json["depth"] - if not isinstance(depth, six.integer_types): + if not isinstance(depth, int): raise SynapseError(400, "Depth %r not an intger" % (depth,), Codes.BAD_JSON) if depth < 0: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 32a8a2ee46..e704cf2f44 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py
@@ -17,11 +17,8 @@ import logging from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union -import six -from six import iteritems - from canonicaljson import json -from prometheus_client import Counter +from prometheus_client import Counter, Histogram from twisted.internet import defer from twisted.internet.abstract import isIPAddress @@ -73,6 +70,10 @@ received_queries_counter = Counter( "synapse_federation_server_received_queries", "", ["type"] ) +pdu_process_time = Histogram( + "synapse_federation_server_pdu_process_time", "Time taken to process an event", +) + class FederationServer(FederationBase): def __init__(self, hs): @@ -274,21 +275,22 @@ class FederationServer(FederationBase): for pdu in pdus_by_room[room_id]: event_id = pdu.event_id - with nested_logging_context(event_id): - try: - await self._handle_received_pdu(origin, pdu) - pdu_results[event_id] = {} - except FederationError as e: - logger.warning("Error handling PDU %s: %s", event_id, e) - pdu_results[event_id] = {"error": str(e)} - except Exception as e: - f = failure.Failure() - pdu_results[event_id] = {"error": str(e)} - logger.error( - "Failed to handle PDU %s", - event_id, - exc_info=(f.type, f.value, f.getTracebackObject()), - ) + with pdu_process_time.time(): + with nested_logging_context(event_id): + try: + await self._handle_received_pdu(origin, pdu) + pdu_results[event_id] = {} + except FederationError as e: + logger.warning("Error handling PDU %s: %s", event_id, e) + pdu_results[event_id] = {"error": str(e)} + except Exception as e: + f = failure.Failure() + pdu_results[event_id] = {"error": str(e)} + logger.error( + "Failed to handle PDU %s", + event_id, + exc_info=(f.type, f.value, f.getTracebackObject()), + ) await concurrently_execute( process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT @@ -534,9 +536,9 @@ class FederationServer(FederationBase): ",".join( ( "%s for %s:%s" % (key_id, user_id, device_id) - for user_id, user_keys in iteritems(json_result) - for device_id, device_keys in iteritems(user_keys) - for key_id, _ in iteritems(device_keys) + for user_id, user_keys in json_result.items() + for device_id, device_keys in user_keys.items() + for key_id, _ in device_keys.items() ) ), ) @@ -752,7 +754,7 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool: def _acl_entry_matches(server_name: str, acl_entry: str) -> Match: - if not isinstance(acl_entry, six.string_types): + if not isinstance(acl_entry, str): logger.warning( "Ignoring non-str ACL entry '%s' (is %s)", acl_entry, type(acl_entry) ) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 52f4f54215..6bbd762681 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py
@@ -33,8 +33,6 @@ import logging from collections import namedtuple from typing import Dict, List, Tuple, Type -from six import iteritems - from sortedcontainers import SortedDict from twisted.internet import defer @@ -327,7 +325,7 @@ class FederationRemoteSendQueue(object): # stream position. keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]} - for ((destination, edu_key), pos) in iteritems(keyed_edus): + for ((destination, edu_key), pos) in keyed_edus.items(): rows.append( ( pos, @@ -530,10 +528,10 @@ def process_rows_for_federation(transaction_queue, rows): states=[state], destinations=destinations ) - for destination, edu_map in iteritems(buff.keyed_edus): + for destination, edu_map in buff.keyed_edus.items(): for key, edu in edu_map.items(): transaction_queue.send_edu(edu, key) - for destination, edu_list in iteritems(buff.edus): + for destination, edu_list in buff.edus.items(): for edu in edu_list: transaction_queue.send_edu(edu, None) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index d473576902..23fb515683 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py
@@ -16,8 +16,6 @@ import logging from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple -from six import itervalues - from prometheus_client import Counter from twisted.internet import defer @@ -203,7 +201,15 @@ class FederationSender(object): logger.debug("Sending %s to %r", event, destinations) - self._send_pdu(event, destinations) + if destinations: + self._send_pdu(event, destinations) + + now = self.clock.time_msec() + ts = await self.store.get_received_ts(event.event_id) + + synapse.metrics.event_processing_lag_by_event.labels( + "federation_sender" + ).observe(now - ts) async def handle_room_events(events: Iterable[EventBase]) -> None: with Measure(self.clock, "handle_room_events"): @@ -218,7 +224,7 @@ class FederationSender(object): defer.gatherResults( [ run_in_background(handle_room_events, evs) - for evs in itervalues(events_by_room) + for evs in events_by_room.values() ], consumeErrors=True, ) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 060bf07197..1ac522c8a1 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py
@@ -15,9 +15,8 @@ # limitations under the License. import logging -from typing import Any, Dict, Optional - -from six.moves import urllib +import urllib +from typing import Any, Dict, List, Optional from twisted.internet import defer @@ -1021,6 +1020,20 @@ class TransportLayerClient(object): return self.client.get_json(destination=destination, path=path) + def get_info_of_users(self, destination: str, user_ids: List[str]): + """ + Args: + destination: The remote server + user_ids: A list of user IDs to query info about + + Returns: + Deferred[List]: A dictionary of User ID to information about that user. + """ + path = _create_path(FEDERATION_UNSTABLE_PREFIX, "/users/info") + data = {"user_ids": user_ids} + + return self.client.post_json(destination=destination, path=path, data=data) + def _create_path(federation_prefix, path, *args): """ diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index af4595498c..cb6331d613 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py
@@ -33,6 +33,7 @@ from synapse.api.urls import ( from synapse.http.endpoint import parse_and_validate_server_name from synapse.http.server import JsonResource from synapse.http.servlet import ( + assert_params_in_dict, parse_boolean_from_args, parse_integer_from_args, parse_json_object_from_request, @@ -849,6 +850,57 @@ class PublicRoomList(BaseFederationServlet): return 200, data +class FederationUserInfoServlet(BaseFederationServlet): + """ + Return information about a set of users. + + This API returns expiration and deactivation information about a set of + users. Requested users not local to this homeserver will be ignored. + + Example request: + POST /users/info + + { + "user_ids": [ + "@alice:example.com", + "@bob:example.com" + ] + } + + Example response + { + "@alice:example.com": { + "expired": false, + "deactivated": true + } + } + """ + + PATH = "/users/info" + PREFIX = FEDERATION_UNSTABLE_PREFIX + + def __init__(self, handler, authenticator, ratelimiter, server_name): + super(FederationUserInfoServlet, self).__init__( + handler, authenticator, ratelimiter, server_name + ) + self.handler = handler + + async def on_POST(self, origin, content, query): + assert_params_in_dict(content, required=["user_ids"]) + + user_ids = content.get("user_ids", []) + + if not isinstance(user_ids, list): + raise SynapseError( + 400, + "'user_ids' must be a list of user ID strings", + errcode=Codes.INVALID_PARAM, + ) + + data = await self.handler.store.get_info_for_users(user_ids) + return 200, data + + class FederationVersionServlet(BaseFederationServlet): PATH = "/version" @@ -1410,6 +1462,7 @@ FEDERATION_SERVLET_CLASSES = ( On3pidBindServlet, FederationVersionServlet, RoomComplexityServlet, + FederationUserInfoServlet, ) # type: Tuple[Type[BaseFederationServlet], ...] OPENID_SERVLET_CLASSES = (