From 15c1ae45e52c31d03f162c5b201f0ccafa032885 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 3 Aug 2018 16:04:29 +0100 Subject: Docstrings for BaseFederationServlet ... to save me reverse-engineering this stuff again. --- synapse/federation/transport/server.py | 47 ++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) (limited to 'synapse/federation') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index eae5f2b427..93bd899b86 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -190,6 +190,41 @@ def _parse_auth_header(header_bytes): class BaseFederationServlet(object): + """Abstract base class for federation servlet classes. + + The servlet object should have a PATH attribute which takes the form of a regexp to + match against the request path (excluding the /federation/v1 prefix). + + The servlet should also implement one or more of on_GET, on_POST, on_PUT, to match + the appropriate HTTP method. These methods have the signature: + + on_(self, origin, content, query, **kwargs) + + With arguments: + + origin (unicode|None): The authenticated server_name of the calling server, + unless REQUIRE_AUTH is set to False and authentication failed. + + content (unicode|None): decoded json body of the request. None if the + request was a GET. + + query (dict[bytes, list[bytes]]): Query params from the request. url-decoded + (ie, '+' and '%xx' are decoded) but note that it is *not* utf8-decoded + yet. + + **kwargs (dict[unicode, unicode]): the dict mapping keys to path + components as specified in the path match regexp. + + Returns: + Deferred[(int, object)|None]: either (response code, response object) to + return a JSON response, or None if the request has already been handled. + + Raises: + SynapseError: to return an error code + + Exception: other exceptions will be caught, logged, and a 500 will be + returned. + """ REQUIRE_AUTH = True def __init__(self, handler, authenticator, ratelimiter, server_name): @@ -204,6 +239,18 @@ class BaseFederationServlet(object): @defer.inlineCallbacks @functools.wraps(func) def new_func(request, *args, **kwargs): + """ A callback which can be passed to HttpServer.RegisterPaths + + Args: + request (twisted.web.http.Request): + *args: unused? + **kwargs (dict[unicode, unicode]): the dict mapping keys to path + components as specified in the path match regexp. + + Returns: + Deferred[(int, object)|None]: (response code, response object) as returned + by the callback method. None if the request has already been handled. + """ content = None if request.method in ["PUT", "POST"]: # TODO: Handle other method types? other content types? -- cgit 1.5.1 From 0d63d93ca834771324f0c5b9340346ad2113a4b1 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 25 Jul 2018 22:25:41 +0100 Subject: Enforce compatibility when processing make_join requests Reject make_join requests from servers which do not support the room version. Also include the room version in the response. --- synapse/api/errors.py | 22 ++++++++++++++++++++++ synapse/federation/federation_server.py | 21 ++++++++++++++++++--- synapse/federation/transport/server.py | 24 +++++++++++++++++++++++- 3 files changed, 63 insertions(+), 4 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 477ca07a24..70400347bc 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -58,6 +58,7 @@ class Codes(object): CANNOT_LEAVE_SERVER_NOTICE_ROOM = "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM" MAU_LIMIT_EXCEEDED = "M_MAU_LIMIT_EXCEEDED" UNSUPPORTED_ROOM_VERSION = "M_UNSUPPORTED_ROOM_VERSION" + INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION" class CodeMessageException(RuntimeError): @@ -287,6 +288,27 @@ class LimitExceededError(SynapseError): ) +class IncompatibleRoomVersionError(SynapseError): + """A server is trying to join a room whose version it does not support.""" + + def __init__(self, room_version): + super(IncompatibleRoomVersionError, self).__init__( + code=400, + msg="Your homeserver does not support the features required to " + "join this room", + errcode=Codes.INCOMPATIBLE_ROOM_VERSION, + ) + + self._room_version = room_version + + def error_dict(self): + return cs_error( + self.msg, + self.errcode, + room_version=self._room_version, + ) + + def cs_error(msg, code=Codes.UNKNOWN, **kwargs): """ Utility method for constructing an error response for client-server interactions. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index bf89d568af..2b62f687b6 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -27,7 +27,13 @@ from twisted.internet.abstract import isIPAddress from twisted.python import failure from synapse.api.constants import EventTypes -from synapse.api.errors import AuthError, FederationError, NotFoundError, SynapseError +from synapse.api.errors import ( + AuthError, + FederationError, + IncompatibleRoomVersionError, + NotFoundError, + SynapseError, +) from synapse.crypto.event_signing import compute_event_signature from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.federation.persistence import TransactionActions @@ -323,12 +329,21 @@ class FederationServer(FederationBase): defer.returnValue((200, resp)) @defer.inlineCallbacks - def on_make_join_request(self, origin, room_id, user_id): + def on_make_join_request(self, origin, room_id, user_id, supported_versions): origin_host, _ = parse_server_name(origin) yield self.check_server_matches_acl(origin_host, room_id) + + room_version = yield self.store.get_room_version(room_id) + if room_version not in supported_versions: + logger.warn("Room version %s not in %s", room_version, supported_versions) + raise IncompatibleRoomVersionError(room_version=room_version) + pdu = yield self.handler.on_make_join_request(room_id, user_id) time_now = self._clock.time_msec() - defer.returnValue({"event": pdu.get_pdu_json(time_now)}) + defer.returnValue({ + "event": pdu.get_pdu_json(time_now), + "room_version": room_version, + }) @defer.inlineCallbacks def on_invite_request(self, origin, content): diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 93bd899b86..77969a4f38 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -431,9 +431,31 @@ class FederationMakeJoinServlet(BaseFederationServlet): PATH = "/make_join/(?P[^/]*)/(?P[^/]*)" @defer.inlineCallbacks - def on_GET(self, origin, content, query, context, user_id): + def on_GET(self, origin, _content, query, context, user_id): + """ + Args: + origin (unicode): The authenticated server_name of the calling server + + _content (None): (GETs don't have bodies) + + query (dict[bytes, list[bytes]]): Query params from the request. + + **kwargs (dict[unicode, unicode]): the dict mapping keys to path + components as specified in the path match regexp. + + Returns: + Deferred[(int, object)|None]: either (response code, response object) to + return a JSON response, or None if the request has already been handled. + """ + versions = query.get(b'ver') + if versions is not None: + supported_versions = [v.decode("utf-8") for v in versions] + else: + supported_versions = ["1"] + content = yield self.handler.on_make_join_request( origin, context, user_id, + supported_versions=supported_versions, ) defer.returnValue((200, content)) -- cgit 1.5.1 From 3777fa26aaf36245168980054c4eebf96169dd13 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 1 Aug 2018 15:35:29 +0100 Subject: sanity check response from make_join --- synapse/federation/federation_client.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 7550e11b6e..de4b813a15 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -561,7 +561,9 @@ class FederationClient(FederationBase): destination, room_id, user_id, membership ) - pdu_dict = ret["event"] + pdu_dict = ret.get("event", None) + if not isinstance(pdu_dict, dict): + raise InvalidResponseError("Bad 'event' field in response") logger.debug("Got response to make_%s: %s", membership, pdu_dict) -- cgit 1.5.1 From f900d508244b4277065d34dd9a05224fd60d5221 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 6 Aug 2018 13:45:37 +0100 Subject: include known room versions in outgoing make_joins --- synapse/federation/federation_client.py | 8 +++++--- synapse/federation/transport/client.py | 5 ++++- synapse/handlers/federation.py | 13 +++++++++++-- synapse/http/matrixfederationclient.py | 7 +++++-- 4 files changed, 25 insertions(+), 8 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index de4b813a15..7ec1d7a889 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -521,7 +521,7 @@ class FederationClient(FederationBase): raise RuntimeError("Failed to %s via any server", description) def make_membership_event(self, destinations, room_id, user_id, membership, - content={},): + content, params): """ Creates an m.room.member event, with context, without participating in the room. @@ -537,8 +537,10 @@ class FederationClient(FederationBase): user_id (str): The user whose membership is being evented. membership (str): The "membership" property of the event. Must be one of "join" or "leave". - content (object): Any additional data to put into the content field + content (dict): Any additional data to put into the content field of the event. + params (dict[str, str|Iterable[str]]): Query parameters to include in the + request. Return: Deferred: resolves to a tuple of (origin (str), event (object)) where origin is the remote homeserver which generated the event. @@ -558,7 +560,7 @@ class FederationClient(FederationBase): @defer.inlineCallbacks def send_request(destination): ret = yield self.transport_layer.make_membership_event( - destination, room_id, user_id, membership + destination, room_id, user_id, membership, params, ) pdu_dict = ret.get("event", None) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 4529d454af..b4fbe2c9d5 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -195,7 +195,7 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def make_membership_event(self, destination, room_id, user_id, membership): + def make_membership_event(self, destination, room_id, user_id, membership, params): """Asks a remote server to build and sign us a membership event Note that this does not append any events to any graphs. @@ -205,6 +205,8 @@ class TransportLayerClient(object): room_id (str): room to join/leave user_id (str): user to be joined/left membership (str): one of join/leave + params (dict[str, str|Iterable[str]]): Query parameters to include in the + request. Returns: Deferred: Succeeds when we get a 2xx HTTP response. The result @@ -241,6 +243,7 @@ class TransportLayerClient(object): content = yield self.client.get_json( destination=destination, path=path, + args=params, retry_on_dns_fail=retry_on_dns_fail, timeout=20000, ignore_backoff=ignore_backoff, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 533b82c783..0dffd44e22 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -30,7 +30,12 @@ from unpaddedbase64 import decode_base64 from twisted.internet import defer -from synapse.api.constants import EventTypes, Membership, RejectedReason +from synapse.api.constants import ( + KNOWN_ROOM_VERSIONS, + EventTypes, + Membership, + RejectedReason, +) from synapse.api.errors import ( AuthError, CodeMessageException, @@ -922,6 +927,9 @@ class FederationHandler(BaseHandler): joinee, "join", content, + params={ + "ver": KNOWN_ROOM_VERSIONS, + }, ) # This shouldn't happen, because the RoomMemberHandler has a @@ -1187,13 +1195,14 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _make_and_verify_event(self, target_hosts, room_id, user_id, membership, - content={},): + content={}, params=None): origin, pdu = yield self.federation_client.make_membership_event( target_hosts, room_id, user_id, membership, content, + params=params, ) logger.debug("Got response to make_%s: %s", membership, pdu) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index bf1aa29502..b3f5415aa6 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -439,7 +439,7 @@ class MatrixFederationHttpClient(object): defer.returnValue(json.loads(body)) @defer.inlineCallbacks - def get_json(self, destination, path, args={}, retry_on_dns_fail=True, + def get_json(self, destination, path, args=None, retry_on_dns_fail=True, timeout=None, ignore_backoff=False): """ GETs some json from the given host homeserver and path @@ -447,7 +447,7 @@ class MatrixFederationHttpClient(object): destination (str): The remote server to send the HTTP request to. path (str): The HTTP path. - args (dict): A dictionary used to create query strings, defaults to + args (dict|None): A dictionary used to create query strings, defaults to None. timeout (int): How long to try (in ms) the destination for before giving up. None indicates no timeout and that the request will @@ -702,6 +702,9 @@ def check_content_type_is_json(headers): def encode_query_args(args): + if args is None: + return b"" + encoded_args = {} for k, vs in args.items(): if isinstance(vs, string_types): -- cgit 1.5.1 From a3f5bf79a0fc0ea6d59069945f53717a3e9c6581 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Jul 2018 11:44:22 +0100 Subject: Add EDU/query handling over replication --- synapse/federation/federation_server.py | 43 +++++++++++++++++++++++++++++++++ synapse/handlers/federation.py | 24 +++++++++--------- synapse/replication/http/federation.py | 2 +- synapse/server.py | 6 ++++- 4 files changed, 62 insertions(+), 13 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index bf89d568af..941e30a596 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -33,6 +33,10 @@ from synapse.federation.federation_base import FederationBase, event_from_pdu_js from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction from synapse.http.endpoint import parse_server_name +from synapse.replication.http.federation import ( + ReplicationFederationSendEduRestServlet, + ReplicationGetQueryRestServlet, +) from synapse.types import get_domain_from_id from synapse.util import async from synapse.util.caches.response_cache import ResponseCache @@ -745,6 +749,8 @@ class FederationHandlerRegistry(object): if edu_type in self.edu_handlers: raise KeyError("Already have an EDU handler for %s" % (edu_type,)) + logger.info("Registering federation EDU handler for %r", edu_type) + self.edu_handlers[edu_type] = handler def register_query_handler(self, query_type, handler): @@ -763,6 +769,8 @@ class FederationHandlerRegistry(object): "Already have a Query handler for %s" % (query_type,) ) + logger.info("Registering federation query handler for %r", query_type) + self.query_handlers[query_type] = handler @defer.inlineCallbacks @@ -785,3 +793,38 @@ class FederationHandlerRegistry(object): raise NotFoundError("No handler for Query type '%s'" % (query_type,)) return handler(args) + + +class ReplicationFederationHandlerRegistry(FederationHandlerRegistry): + def __init__(self, hs): + self.config = hs.config + self.http_client = hs.get_simple_http_client() + self.clock = hs.get_clock() + + self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs) + self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs) + + super(ReplicationFederationHandlerRegistry, self).__init__() + + def on_edu(self, edu_type, origin, content): + handler = self.edu_handlers.get(edu_type) + if handler: + return super(ReplicationFederationHandlerRegistry, self).on_edu( + edu_type, origin, content, + ) + + return self._send_edu( + edu_type=edu_type, + origin=origin, + content=content, + ) + + def on_query(self, query_type, args): + handler = self.query_handlers.get(query_type) + if handler: + return handler(args) + + return self._get_query_client( + query_type=query_type, + args=args, + ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0524dec942..d2cbb12df3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -44,8 +44,10 @@ from synapse.crypto.event_signing import ( compute_event_signature, ) from synapse.events.validator import EventValidator -from synapse.replication.http.federation import send_federation_events_to_master -from synapse.replication.http.membership import notify_user_membership_change +from synapse.replication.http.federation import ( + ReplicationFederationSendEventsRestServlet, +) +from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet from synapse.state import resolve_events_with_factory from synapse.types import UserID, get_domain_from_id from synapse.util import logcontext, unwrapFirstError @@ -91,6 +93,13 @@ class FederationHandler(BaseHandler): self.config = hs.config self.http_client = hs.get_simple_http_client() + self._send_events_to_master = ( + ReplicationFederationSendEventsRestServlet.make_client(hs) + ) + self._notify_user_membership_change = ( + ReplicationUserJoinedLeftRoomRestServlet.make_client(hs) + ) + # When joining a room we need to queue any events for that room up self.room_queues = {} self._room_pdu_linearizer = Linearizer("fed_room_pdu") @@ -2318,12 +2327,8 @@ class FederationHandler(BaseHandler): Deferred """ if self.config.worker_app: - yield send_federation_events_to_master( - clock=self.hs.get_clock(), + yield self._send_events_to_master( store=self.store, - client=self.http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, event_and_contexts=event_and_contexts, backfilled=backfilled ) @@ -2381,10 +2386,7 @@ class FederationHandler(BaseHandler): """Called when a new user has joined the room """ if self.config.worker_app: - return notify_user_membership_change( - client=self.http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, + return self._notify_user_membership_change( room_id=room_id, user_id=user.to_string(), change="joined", diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index f39aaa89be..3fa7bd64c7 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -59,8 +59,8 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): self.notifier = hs.get_notifier() self.pusher_pool = hs.get_pusherpool() - @defer.inlineCallbacks @staticmethod + @defer.inlineCallbacks def _serialize_payload(store, event_and_contexts, backfilled): """ Args: diff --git a/synapse/server.py b/synapse/server.py index 140be9ebe8..26228d8c72 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -36,6 +36,7 @@ from synapse.federation.federation_client import FederationClient from synapse.federation.federation_server import ( FederationHandlerRegistry, FederationServer, + ReplicationFederationHandlerRegistry, ) from synapse.federation.send_queue import FederationRemoteSendQueue from synapse.federation.transaction_queue import TransactionQueue @@ -423,7 +424,10 @@ class HomeServer(object): return RoomMemberMasterHandler(self) def build_federation_registry(self): - return FederationHandlerRegistry() + if self.config.worker_app: + return ReplicationFederationHandlerRegistry(self) + else: + return FederationHandlerRegistry() def build_server_notices_manager(self): if self.config.worker_app: -- cgit 1.5.1 From 19a17068f1bc98a1556ff618b544b5fbf57eeba0 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 6 Aug 2018 15:15:19 +0100 Subject: Check m.room.create for sane room_versions --- synapse/event_auth.py | 10 +++++++++- synapse/federation/federation_client.py | 26 +++++++++++++++++++++++--- 2 files changed, 32 insertions(+), 4 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/event_auth.py b/synapse/event_auth.py index b32f64e729..6baeccca38 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -20,7 +20,7 @@ from signedjson.key import decode_verify_key_bytes from signedjson.sign import SignatureVerifyException, verify_signed_json from unpaddedbase64 import decode_base64 -from synapse.api.constants import EventTypes, JoinRules, Membership +from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, JoinRules, Membership from synapse.api.errors import AuthError, EventSizeError, SynapseError from synapse.types import UserID, get_domain_from_id @@ -83,6 +83,14 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True): 403, "Creation event's room_id domain does not match sender's" ) + + room_version = event.content.get("room_version", "1") + if room_version not in KNOWN_ROOM_VERSIONS: + raise AuthError( + 403, + "room appears to have unsupported version %s" % ( + room_version, + )) # FIXME logger.debug("Allowing! %s", event) return diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 7ec1d7a889..c9f3c2d352 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -25,7 +25,7 @@ from prometheus_client import Counter from twisted.internet import defer -from synapse.api.constants import Membership +from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership from synapse.api.errors import ( CodeMessageException, FederationDeniedError, @@ -518,7 +518,7 @@ class FederationClient(FederationBase): description, destination, exc_info=1, ) - raise RuntimeError("Failed to %s via any server", description) + raise RuntimeError("Failed to %s via any server" % (description, )) def make_membership_event(self, destinations, room_id, user_id, membership, content, params): @@ -609,6 +609,26 @@ class FederationClient(FederationBase): Fails with a ``RuntimeError`` if no servers were reachable. """ + def check_authchain_validity(signed_auth_chain): + for e in signed_auth_chain: + if e.type == EventTypes.Create: + create_event = e + break + else: + raise InvalidResponseError( + "no %s in auth chain" % (EventTypes.Create,), + ) + + # the room version should be sane. + room_version = create_event.content.get("room_version", "1") + if room_version not in KNOWN_ROOM_VERSIONS: + # This shouldn't be possible, because the remote server should have + # rejected the join attempt during make_join. + raise InvalidResponseError( + "room appears to have unsupported version %s" % ( + room_version, + )) + @defer.inlineCallbacks def send_request(destination): time_now = self._clock.time_msec() @@ -665,7 +685,7 @@ class FederationClient(FederationBase): for s in signed_state: s.internal_metadata = copy.deepcopy(s.internal_metadata) - auth_chain.sort(key=lambda e: e.depth) + check_authchain_validity(signed_auth) defer.returnValue({ "state": signed_state, -- cgit 1.5.1 From 53bca4690b5c94fb1506dbc628ce2ef0f770745f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 7 Aug 2018 19:09:48 +0100 Subject: more metrics for the federation and appservice senders --- synapse/federation/transaction_queue.py | 10 +++++++++- synapse/handlers/appservice.py | 10 ++++++++++ synapse/metrics/__init__.py | 13 +++++++++++++ 3 files changed, 32 insertions(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 78f9d40a3a..f603c8a368 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -26,6 +26,8 @@ from synapse.api.errors import FederationDeniedError, HttpResponseException from synapse.handlers.presence import format_user_presence_state, get_interested_remotes from synapse.metrics import ( LaterGauge, + event_processing_loop_counter, + event_processing_loop_room_count, events_processed_counter, sent_edus_counter, sent_transactions_counter, @@ -253,7 +255,13 @@ class TransactionQueue(object): synapse.metrics.event_processing_last_ts.labels( "federation_sender").set(ts) - events_processed_counter.inc(len(events)) + events_processed_counter.inc(len(events)) + + event_processing_loop_room_count.labels( + "federation_sender" + ).inc(len(events_by_room)) + + event_processing_loop_counter.labels("federation_sender").inc() synapse.metrics.event_processing_positions.labels( "federation_sender").set(next_token) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index ee41aed69e..f0f89af7dc 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -23,6 +23,10 @@ from twisted.internet import defer import synapse from synapse.api.constants import EventTypes +from synapse.metrics import ( + event_processing_loop_counter, + event_processing_loop_room_count, +) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.metrics import Measure @@ -136,6 +140,12 @@ class ApplicationServicesHandler(object): events_processed_counter.inc(len(events)) + event_processing_loop_room_count.labels( + "appservice_sender" + ).inc(len(events_by_room)) + + event_processing_loop_counter.labels("appservice_sender").inc() + synapse.metrics.event_processing_lag.labels( "appservice_sender").set(now - ts) synapse.metrics.event_processing_last_ts.labels( diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index a9158fc066..b7cb3a730a 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -174,6 +174,19 @@ sent_transactions_counter = Counter("synapse_federation_client_sent_transactions events_processed_counter = Counter("synapse_federation_client_events_processed", "") +event_processing_loop_counter = Counter( + "synapse_event_processing_loop", + "Event processing loop iterations", + ["name"], +) + +event_processing_loop_room_count = Counter( + "synapse_event_processing_loop_room_count", + "Rooms seen per event processing loop iteration", + ["name"], +) + + # Used to track where various components have processed in the event stream, # e.g. federation sending, appservice sending, etc. event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"]) -- cgit 1.5.1 From 72d1902bbeaa029e7003aecaeaf9bc5a41d17646 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 9 Aug 2018 10:23:49 +0100 Subject: Fixup doc comments --- synapse/federation/federation_server.py | 11 +++++++++++ synapse/replication/http/federation.py | 17 +++++++++++++++++ 2 files changed, 28 insertions(+) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index d23c1cf13b..3bc26c4ab4 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -811,6 +811,13 @@ class FederationHandlerRegistry(object): class ReplicationFederationHandlerRegistry(FederationHandlerRegistry): + """A FederationHandlerRegistry for worker processes. + + When receiving EDU or queries it will check if an appropriate handler has + been registered on the worker, if there isn't one then it calls off to the + master process. + """ + def __init__(self, hs): self.config = hs.config self.http_client = hs.get_simple_http_client() @@ -822,6 +829,8 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry): super(ReplicationFederationHandlerRegistry, self).__init__() def on_edu(self, edu_type, origin, content): + """Overrides FederationHandlerRegistry + """ handler = self.edu_handlers.get(edu_type) if handler: return super(ReplicationFederationHandlerRegistry, self).on_edu( @@ -835,6 +844,8 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry): ) def on_query(self, query_type, args): + """Overrides FederationHandlerRegistry + """ handler = self.query_handlers.get(query_type) if handler: return handler(args) diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 3fa7bd64c7..3e6cbbf5a1 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -157,6 +157,15 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): class ReplicationFederationSendEduRestServlet(ReplicationEndpoint): """Handles EDUs newly received from federation, including persisting and notifying. + + Request format: + + POST /_synapse/replication/fed_send_edu/:edu_type/:txn_id + + { + "origin": ..., + "content: { ... } + } """ NAME = "fed_send_edu" @@ -196,6 +205,14 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint): class ReplicationGetQueryRestServlet(ReplicationEndpoint): """Handle responding to queries from federation. + + Request format: + + POST /_synapse/replication/fed_query/:query_type + + { + "args": { ... } + } """ NAME = "fed_query" -- cgit 1.5.1 From b37c4724191995eec4f5bc9d64f176b2a315f777 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 10 Aug 2018 23:50:21 +1000 Subject: Rename async to async_helpers because `async` is a keyword on Python 3.7 (#3678) --- changelog.d/3678.misc | 1 + synapse/app/federation_sender.py | 2 +- synapse/federation/federation_server.py | 8 +- synapse/handlers/device.py | 2 +- synapse/handlers/federation.py | 2 +- synapse/handlers/initial_sync.py | 2 +- synapse/handlers/message.py | 2 +- synapse/handlers/pagination.py | 2 +- synapse/handlers/presence.py | 2 +- synapse/handlers/read_marker.py | 2 +- synapse/handlers/register.py | 2 +- synapse/handlers/room_list.py | 2 +- synapse/handlers/room_member.py | 2 +- synapse/handlers/sync.py | 2 +- synapse/http/client.py | 2 +- synapse/http/matrixfederationclient.py | 2 +- synapse/notifier.py | 2 +- synapse/push/bulk_push_rule_evaluator.py | 2 +- synapse/push/mailer.py | 2 +- synapse/rest/client/transactions.py | 2 +- synapse/rest/media/v1/media_repository.py | 2 +- synapse/rest/media/v1/preview_url_resource.py | 2 +- synapse/state.py | 2 +- synapse/storage/events.py | 2 +- synapse/storage/roommember.py | 2 +- synapse/util/async.py | 415 -------------------------- synapse/util/async_helpers.py | 415 ++++++++++++++++++++++++++ synapse/util/caches/descriptors.py | 2 +- synapse/util/caches/response_cache.py | 2 +- synapse/util/caches/snapshot_cache.py | 2 +- synapse/util/logcontext.py | 2 +- tests/storage/test__base.py | 2 +- tests/util/test_linearizer.py | 2 +- tests/util/test_rwlock.py | 2 +- 34 files changed, 450 insertions(+), 449 deletions(-) create mode 100644 changelog.d/3678.misc delete mode 100644 synapse/util/async.py create mode 100644 synapse/util/async_helpers.py (limited to 'synapse/federation') diff --git a/changelog.d/3678.misc b/changelog.d/3678.misc new file mode 100644 index 0000000000..0d7c8da64a --- /dev/null +++ b/changelog.d/3678.misc @@ -0,0 +1 @@ +Rename synapse.util.async to synapse.util.async_helpers to mitigate async becoming a keyword on Python 3.7. diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index e082d45c94..7bbf0ad082 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -40,7 +40,7 @@ from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.engines import create_engine -from synapse.util.async import Linearizer +from synapse.util.async_helpers import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 2b62f687b6..a23136784a 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -40,7 +40,7 @@ from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction from synapse.http.endpoint import parse_server_name from synapse.types import get_domain_from_id -from synapse.util import async +from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.caches.response_cache import ResponseCache from synapse.util.logutils import log_function @@ -67,8 +67,8 @@ class FederationServer(FederationBase): self.auth = hs.get_auth() self.handler = hs.get_handlers().federation_handler - self._server_linearizer = async.Linearizer("fed_server") - self._transaction_linearizer = async.Linearizer("fed_txn_handler") + self._server_linearizer = Linearizer("fed_server") + self._transaction_linearizer = Linearizer("fed_txn_handler") self.transaction_actions = TransactionActions(self.store) @@ -200,7 +200,7 @@ class FederationServer(FederationBase): event_id, f.getTraceback().rstrip(), ) - yield async.concurrently_execute( + yield concurrently_execute( process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT, ) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 2d44f15da3..9e017116a9 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -23,7 +23,7 @@ from synapse.api.constants import EventTypes from synapse.api.errors import FederationDeniedError from synapse.types import RoomStreamToken, get_domain_from_id from synapse.util import stringutils -from synapse.util.async import Linearizer +from synapse.util.async_helpers import Linearizer from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import measure_func from synapse.util.retryutils import NotRetryingDestination diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0dffd44e22..2380d17f4e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -52,7 +52,7 @@ from synapse.events.validator import EventValidator from synapse.state import resolve_events_with_factory from synapse.types import UserID, get_domain_from_id from synapse.util import logcontext, unwrapFirstError -from synapse.util.async import Linearizer +from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_joined_room from synapse.util.frozenutils import unfreeze from synapse.util.logutils import log_function diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 40e7580a61..1fb17fd9a5 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -25,7 +25,7 @@ from synapse.handlers.presence import format_user_presence_state from synapse.streams.config import PaginationConfig from synapse.types import StreamToken, UserID from synapse.util import unwrapFirstError -from synapse.util.async import concurrently_execute +from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.snapshot_cache import SnapshotCache from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.visibility import filter_events_for_client diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index bcb093ba3e..01a362360e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -32,7 +32,7 @@ from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.replication.http.send_event import ReplicationSendEventRestServlet from synapse.types import RoomAlias, UserID -from synapse.util.async import Linearizer +from synapse.util.async_helpers import Linearizer from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.logcontext import run_in_background from synapse.util.metrics import measure_func diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index b2849783ed..a97d43550f 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -22,7 +22,7 @@ from synapse.api.constants import Membership from synapse.api.errors import SynapseError from synapse.events.utils import serialize_event from synapse.types import RoomStreamToken -from synapse.util.async import ReadWriteLock +from synapse.util.async_helpers import ReadWriteLock from synapse.util.logcontext import run_in_background from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 3732830194..20fc3b0323 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -36,7 +36,7 @@ from synapse.api.errors import SynapseError from synapse.metrics import LaterGauge from synapse.storage.presence import UserPresenceState from synapse.types import UserID, get_domain_from_id -from synapse.util.async import Linearizer +from synapse.util.async_helpers import Linearizer from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.logcontext import run_in_background from synapse.util.logutils import log_function diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index 995460f82a..32108568c6 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -17,7 +17,7 @@ import logging from twisted.internet import defer -from synapse.util.async import Linearizer +from synapse.util.async_helpers import Linearizer from ._base import BaseHandler diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 0e16bbe0ee..3526b20d5a 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -28,7 +28,7 @@ from synapse.api.errors import ( ) from synapse.http.client import CaptchaServerHttpClient from synapse.types import RoomAlias, RoomID, UserID, create_requester -from synapse.util.async import Linearizer +from synapse.util.async_helpers import Linearizer from synapse.util.threepids import check_3pid_allowed from ._base import BaseHandler diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 828229f5c3..37e41afd61 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -26,7 +26,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, JoinRules from synapse.types import ThirdPartyInstanceID -from synapse.util.async import concurrently_execute +from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.response_cache import ResponseCache diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 0d4a3f4677..fb94b5d7d4 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -30,7 +30,7 @@ import synapse.types from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes, SynapseError from synapse.types import RoomID, UserID -from synapse.util.async import Linearizer +from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_joined_room, user_left_room logger = logging.getLogger(__name__) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index dff1f67dcb..6393a9674b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -25,7 +25,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.push.clientformat import format_push_rules_for_user from synapse.types import RoomStreamToken -from synapse.util.async import concurrently_execute +from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.lrucache import LruCache from synapse.util.caches.response_cache import ResponseCache diff --git a/synapse/http/client.py b/synapse/http/client.py index 3771e0b3f6..ab4fbf59b2 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -42,7 +42,7 @@ from twisted.web.http_headers import Headers from synapse.api.errors import Codes, HttpResponseException, SynapseError from synapse.http import cancelled_to_request_timed_out_error, redact_uri from synapse.http.endpoint import SpiderEndpoint -from synapse.util.async import add_timeout_to_deferred +from synapse.util.async_helpers import add_timeout_to_deferred from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.logcontext import make_deferred_yieldable diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 762273f59b..44b61e70a4 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -43,7 +43,7 @@ from synapse.api.errors import ( from synapse.http import cancelled_to_request_timed_out_error from synapse.http.endpoint import matrix_federation_endpoint from synapse.util import logcontext -from synapse.util.async import add_timeout_to_deferred +from synapse.util.async_helpers import add_timeout_to_deferred from synapse.util.logcontext import make_deferred_yieldable logger = logging.getLogger(__name__) diff --git a/synapse/notifier.py b/synapse/notifier.py index e650c3e494..82f391481c 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -25,7 +25,7 @@ from synapse.api.errors import AuthError from synapse.handlers.presence import format_user_presence_state from synapse.metrics import LaterGauge from synapse.types import StreamToken -from synapse.util.async import ( +from synapse.util.async_helpers import ( DeferredTimeoutError, ObservableDeferred, add_timeout_to_deferred, diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 1d14d3639c..8f9a76147f 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -26,7 +26,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.event_auth import get_user_power_level from synapse.state import POWER_KEY -from synapse.util.async import Linearizer +from synapse.util.async_helpers import Linearizer from synapse.util.caches import register_cache from synapse.util.caches.descriptors import cached diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 9d601208fd..bfa6df7b68 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -35,7 +35,7 @@ from synapse.push.presentable_names import ( name_from_member_event, ) from synapse.types import UserID -from synapse.util.async import concurrently_execute +from synapse.util.async_helpers import concurrently_execute from synapse.visibility import filter_events_for_client logger = logging.getLogger(__name__) diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py index 00b1b3066e..511e96ab00 100644 --- a/synapse/rest/client/transactions.py +++ b/synapse/rest/client/transactions.py @@ -17,7 +17,7 @@ to ensure idempotency when performing PUTs using the REST API.""" import logging -from synapse.util.async import ObservableDeferred +from synapse.util.async_helpers import ObservableDeferred from synapse.util.logcontext import make_deferred_yieldable, run_in_background logger = logging.getLogger(__name__) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 8fb413d825..4c589e05e0 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -36,7 +36,7 @@ from synapse.api.errors import ( ) from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.util.async import Linearizer +from synapse.util.async_helpers import Linearizer from synapse.util.logcontext import make_deferred_yieldable from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import is_ascii, random_string diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 27aa0def2f..778ef97337 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -42,7 +42,7 @@ from synapse.http.server import ( ) from synapse.http.servlet import parse_integer, parse_string from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.util.async import ObservableDeferred +from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.stringutils import is_ascii, random_string diff --git a/synapse/state.py b/synapse/state.py index e1092b97a9..8b92d4057a 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -28,7 +28,7 @@ from synapse import event_auth from synapse.api.constants import EventTypes from synapse.api.errors import AuthError from synapse.events.snapshot import EventContext -from synapse.util.async import Linearizer +from synapse.util.async_helpers import Linearizer from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logutils import log_function diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ce32e8fefd..d4aa192a0a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -38,7 +38,7 @@ from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.event_federation import EventFederationStore from synapse.storage.events_worker import EventsWorkerStore from synapse.types import RoomStreamToken, get_domain_from_id -from synapse.util.async import ObservableDeferred +from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 10dce21cea..9b4e6d6aa8 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -26,7 +26,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.storage.events_worker import EventsWorkerStore from synapse.types import get_domain_from_id -from synapse.util.async import Linearizer +from synapse.util.async_helpers import Linearizer from synapse.util.caches import intern_string from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.stringutils import to_ascii diff --git a/synapse/util/async.py b/synapse/util/async.py deleted file mode 100644 index a7094e2fb4..0000000000 --- a/synapse/util/async.py +++ /dev/null @@ -1,415 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014-2016 OpenMarket Ltd -# Copyright 2018 New Vector Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import collections -import logging -from contextlib import contextmanager - -from six.moves import range - -from twisted.internet import defer -from twisted.internet.defer import CancelledError -from twisted.python import failure - -from synapse.util import Clock, logcontext, unwrapFirstError - -from .logcontext import ( - PreserveLoggingContext, - make_deferred_yieldable, - run_in_background, -) - -logger = logging.getLogger(__name__) - - -class ObservableDeferred(object): - """Wraps a deferred object so that we can add observer deferreds. These - observer deferreds do not affect the callback chain of the original - deferred. - - If consumeErrors is true errors will be captured from the origin deferred. - - Cancelling or otherwise resolving an observer will not affect the original - ObservableDeferred. - - NB that it does not attempt to do anything with logcontexts; in general - you should probably make_deferred_yieldable the deferreds - returned by `observe`, and ensure that the original deferred runs its - callbacks in the sentinel logcontext. - """ - - __slots__ = ["_deferred", "_observers", "_result"] - - def __init__(self, deferred, consumeErrors=False): - object.__setattr__(self, "_deferred", deferred) - object.__setattr__(self, "_result", None) - object.__setattr__(self, "_observers", set()) - - def callback(r): - object.__setattr__(self, "_result", (True, r)) - while self._observers: - try: - # TODO: Handle errors here. - self._observers.pop().callback(r) - except Exception: - pass - return r - - def errback(f): - object.__setattr__(self, "_result", (False, f)) - while self._observers: - try: - # TODO: Handle errors here. - self._observers.pop().errback(f) - except Exception: - pass - - if consumeErrors: - return None - else: - return f - - deferred.addCallbacks(callback, errback) - - def observe(self): - """Observe the underlying deferred. - - Can return either a deferred if the underlying deferred is still pending - (or has failed), or the actual value. Callers may need to use maybeDeferred. - """ - if not self._result: - d = defer.Deferred() - - def remove(r): - self._observers.discard(d) - return r - d.addBoth(remove) - - self._observers.add(d) - return d - else: - success, res = self._result - return res if success else defer.fail(res) - - def observers(self): - return self._observers - - def has_called(self): - return self._result is not None - - def has_succeeded(self): - return self._result is not None and self._result[0] is True - - def get_result(self): - return self._result[1] - - def __getattr__(self, name): - return getattr(self._deferred, name) - - def __setattr__(self, name, value): - setattr(self._deferred, name, value) - - def __repr__(self): - return "" % ( - id(self), self._result, self._deferred, - ) - - -def concurrently_execute(func, args, limit): - """Executes the function with each argument conncurrently while limiting - the number of concurrent executions. - - Args: - func (func): Function to execute, should return a deferred. - args (list): List of arguments to pass to func, each invocation of func - gets a signle argument. - limit (int): Maximum number of conccurent executions. - - Returns: - deferred: Resolved when all function invocations have finished. - """ - it = iter(args) - - @defer.inlineCallbacks - def _concurrently_execute_inner(): - try: - while True: - yield func(next(it)) - except StopIteration: - pass - - return logcontext.make_deferred_yieldable(defer.gatherResults([ - run_in_background(_concurrently_execute_inner) - for _ in range(limit) - ], consumeErrors=True)).addErrback(unwrapFirstError) - - -class Linearizer(object): - """Limits concurrent access to resources based on a key. Useful to ensure - only a few things happen at a time on a given resource. - - Example: - - with (yield limiter.queue("test_key")): - # do some work. - - """ - def __init__(self, name=None, max_count=1, clock=None): - """ - Args: - max_count(int): The maximum number of concurrent accesses - """ - if name is None: - self.name = id(self) - else: - self.name = name - - if not clock: - from twisted.internet import reactor - clock = Clock(reactor) - self._clock = clock - self.max_count = max_count - - # key_to_defer is a map from the key to a 2 element list where - # the first element is the number of things executing, and - # the second element is an OrderedDict, where the keys are deferreds for the - # things blocked from executing. - self.key_to_defer = {} - - @defer.inlineCallbacks - def queue(self, key): - entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()]) - - # If the number of things executing is greater than the maximum - # then add a deferred to the list of blocked items - # When on of the things currently executing finishes it will callback - # this item so that it can continue executing. - if entry[0] >= self.max_count: - new_defer = defer.Deferred() - entry[1][new_defer] = 1 - - logger.info( - "Waiting to acquire linearizer lock %r for key %r", self.name, key, - ) - try: - yield make_deferred_yieldable(new_defer) - except Exception as e: - if isinstance(e, CancelledError): - logger.info( - "Cancelling wait for linearizer lock %r for key %r", - self.name, key, - ) - else: - logger.warn( - "Unexpected exception waiting for linearizer lock %r for key %r", - self.name, key, - ) - - # we just have to take ourselves back out of the queue. - del entry[1][new_defer] - raise - - logger.info("Acquired linearizer lock %r for key %r", self.name, key) - entry[0] += 1 - - # if the code holding the lock completes synchronously, then it - # will recursively run the next claimant on the list. That can - # relatively rapidly lead to stack exhaustion. This is essentially - # the same problem as http://twistedmatrix.com/trac/ticket/9304. - # - # In order to break the cycle, we add a cheeky sleep(0) here to - # ensure that we fall back to the reactor between each iteration. - # - # (This needs to happen while we hold the lock, and the context manager's exit - # code must be synchronous, so this is the only sensible place.) - yield self._clock.sleep(0) - - else: - logger.info( - "Acquired uncontended linearizer lock %r for key %r", self.name, key, - ) - entry[0] += 1 - - @contextmanager - def _ctx_manager(): - try: - yield - finally: - logger.info("Releasing linearizer lock %r for key %r", self.name, key) - - # We've finished executing so check if there are any things - # blocked waiting to execute and start one of them - entry[0] -= 1 - - if entry[1]: - (next_def, _) = entry[1].popitem(last=False) - - # we need to run the next thing in the sentinel context. - with PreserveLoggingContext(): - next_def.callback(None) - elif entry[0] == 0: - # We were the last thing for this key: remove it from the - # map. - del self.key_to_defer[key] - - defer.returnValue(_ctx_manager()) - - -class ReadWriteLock(object): - """A deferred style read write lock. - - Example: - - with (yield read_write_lock.read("test_key")): - # do some work - """ - - # IMPLEMENTATION NOTES - # - # We track the most recent queued reader and writer deferreds (which get - # resolved when they release the lock). - # - # Read: We know its safe to acquire a read lock when the latest writer has - # been resolved. The new reader is appeneded to the list of latest readers. - # - # Write: We know its safe to acquire the write lock when both the latest - # writers and readers have been resolved. The new writer replaces the latest - # writer. - - def __init__(self): - # Latest readers queued - self.key_to_current_readers = {} - - # Latest writer queued - self.key_to_current_writer = {} - - @defer.inlineCallbacks - def read(self, key): - new_defer = defer.Deferred() - - curr_readers = self.key_to_current_readers.setdefault(key, set()) - curr_writer = self.key_to_current_writer.get(key, None) - - curr_readers.add(new_defer) - - # We wait for the latest writer to finish writing. We can safely ignore - # any existing readers... as they're readers. - yield make_deferred_yieldable(curr_writer) - - @contextmanager - def _ctx_manager(): - try: - yield - finally: - new_defer.callback(None) - self.key_to_current_readers.get(key, set()).discard(new_defer) - - defer.returnValue(_ctx_manager()) - - @defer.inlineCallbacks - def write(self, key): - new_defer = defer.Deferred() - - curr_readers = self.key_to_current_readers.get(key, set()) - curr_writer = self.key_to_current_writer.get(key, None) - - # We wait on all latest readers and writer. - to_wait_on = list(curr_readers) - if curr_writer: - to_wait_on.append(curr_writer) - - # We can clear the list of current readers since the new writer waits - # for them to finish. - curr_readers.clear() - self.key_to_current_writer[key] = new_defer - - yield make_deferred_yieldable(defer.gatherResults(to_wait_on)) - - @contextmanager - def _ctx_manager(): - try: - yield - finally: - new_defer.callback(None) - if self.key_to_current_writer[key] == new_defer: - self.key_to_current_writer.pop(key) - - defer.returnValue(_ctx_manager()) - - -class DeferredTimeoutError(Exception): - """ - This error is raised by default when a L{Deferred} times out. - """ - - -def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None): - """ - Add a timeout to a deferred by scheduling it to be cancelled after - timeout seconds. - - This is essentially a backport of deferred.addTimeout, which was introduced - in twisted 16.5. - - If the deferred gets timed out, it errbacks with a DeferredTimeoutError, - unless a cancelable function was passed to its initialization or unless - a different on_timeout_cancel callable is provided. - - Args: - deferred (defer.Deferred): deferred to be timed out - timeout (Number): seconds to time out after - reactor (twisted.internet.reactor): the Twisted reactor to use - - on_timeout_cancel (callable): A callable which is called immediately - after the deferred times out, and not if this deferred is - otherwise cancelled before the timeout. - - It takes an arbitrary value, which is the value of the deferred at - that exact point in time (probably a CancelledError Failure), and - the timeout. - - The default callable (if none is provided) will translate a - CancelledError Failure into a DeferredTimeoutError. - """ - timed_out = [False] - - def time_it_out(): - timed_out[0] = True - deferred.cancel() - - delayed_call = reactor.callLater(timeout, time_it_out) - - def convert_cancelled(value): - if timed_out[0]: - to_call = on_timeout_cancel or _cancelled_to_timed_out_error - return to_call(value, timeout) - return value - - deferred.addBoth(convert_cancelled) - - def cancel_timeout(result): - # stop the pending call to cancel the deferred if it's been fired - if delayed_call.active(): - delayed_call.cancel() - return result - - deferred.addBoth(cancel_timeout) - - -def _cancelled_to_timed_out_error(value, timeout): - if isinstance(value, failure.Failure): - value.trap(CancelledError) - raise DeferredTimeoutError(timeout, "Deferred") - return value diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py new file mode 100644 index 0000000000..a7094e2fb4 --- /dev/null +++ b/synapse/util/async_helpers.py @@ -0,0 +1,415 @@ +# -*- coding: utf-8 -*- +# Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import collections +import logging +from contextlib import contextmanager + +from six.moves import range + +from twisted.internet import defer +from twisted.internet.defer import CancelledError +from twisted.python import failure + +from synapse.util import Clock, logcontext, unwrapFirstError + +from .logcontext import ( + PreserveLoggingContext, + make_deferred_yieldable, + run_in_background, +) + +logger = logging.getLogger(__name__) + + +class ObservableDeferred(object): + """Wraps a deferred object so that we can add observer deferreds. These + observer deferreds do not affect the callback chain of the original + deferred. + + If consumeErrors is true errors will be captured from the origin deferred. + + Cancelling or otherwise resolving an observer will not affect the original + ObservableDeferred. + + NB that it does not attempt to do anything with logcontexts; in general + you should probably make_deferred_yieldable the deferreds + returned by `observe`, and ensure that the original deferred runs its + callbacks in the sentinel logcontext. + """ + + __slots__ = ["_deferred", "_observers", "_result"] + + def __init__(self, deferred, consumeErrors=False): + object.__setattr__(self, "_deferred", deferred) + object.__setattr__(self, "_result", None) + object.__setattr__(self, "_observers", set()) + + def callback(r): + object.__setattr__(self, "_result", (True, r)) + while self._observers: + try: + # TODO: Handle errors here. + self._observers.pop().callback(r) + except Exception: + pass + return r + + def errback(f): + object.__setattr__(self, "_result", (False, f)) + while self._observers: + try: + # TODO: Handle errors here. + self._observers.pop().errback(f) + except Exception: + pass + + if consumeErrors: + return None + else: + return f + + deferred.addCallbacks(callback, errback) + + def observe(self): + """Observe the underlying deferred. + + Can return either a deferred if the underlying deferred is still pending + (or has failed), or the actual value. Callers may need to use maybeDeferred. + """ + if not self._result: + d = defer.Deferred() + + def remove(r): + self._observers.discard(d) + return r + d.addBoth(remove) + + self._observers.add(d) + return d + else: + success, res = self._result + return res if success else defer.fail(res) + + def observers(self): + return self._observers + + def has_called(self): + return self._result is not None + + def has_succeeded(self): + return self._result is not None and self._result[0] is True + + def get_result(self): + return self._result[1] + + def __getattr__(self, name): + return getattr(self._deferred, name) + + def __setattr__(self, name, value): + setattr(self._deferred, name, value) + + def __repr__(self): + return "" % ( + id(self), self._result, self._deferred, + ) + + +def concurrently_execute(func, args, limit): + """Executes the function with each argument conncurrently while limiting + the number of concurrent executions. + + Args: + func (func): Function to execute, should return a deferred. + args (list): List of arguments to pass to func, each invocation of func + gets a signle argument. + limit (int): Maximum number of conccurent executions. + + Returns: + deferred: Resolved when all function invocations have finished. + """ + it = iter(args) + + @defer.inlineCallbacks + def _concurrently_execute_inner(): + try: + while True: + yield func(next(it)) + except StopIteration: + pass + + return logcontext.make_deferred_yieldable(defer.gatherResults([ + run_in_background(_concurrently_execute_inner) + for _ in range(limit) + ], consumeErrors=True)).addErrback(unwrapFirstError) + + +class Linearizer(object): + """Limits concurrent access to resources based on a key. Useful to ensure + only a few things happen at a time on a given resource. + + Example: + + with (yield limiter.queue("test_key")): + # do some work. + + """ + def __init__(self, name=None, max_count=1, clock=None): + """ + Args: + max_count(int): The maximum number of concurrent accesses + """ + if name is None: + self.name = id(self) + else: + self.name = name + + if not clock: + from twisted.internet import reactor + clock = Clock(reactor) + self._clock = clock + self.max_count = max_count + + # key_to_defer is a map from the key to a 2 element list where + # the first element is the number of things executing, and + # the second element is an OrderedDict, where the keys are deferreds for the + # things blocked from executing. + self.key_to_defer = {} + + @defer.inlineCallbacks + def queue(self, key): + entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()]) + + # If the number of things executing is greater than the maximum + # then add a deferred to the list of blocked items + # When on of the things currently executing finishes it will callback + # this item so that it can continue executing. + if entry[0] >= self.max_count: + new_defer = defer.Deferred() + entry[1][new_defer] = 1 + + logger.info( + "Waiting to acquire linearizer lock %r for key %r", self.name, key, + ) + try: + yield make_deferred_yieldable(new_defer) + except Exception as e: + if isinstance(e, CancelledError): + logger.info( + "Cancelling wait for linearizer lock %r for key %r", + self.name, key, + ) + else: + logger.warn( + "Unexpected exception waiting for linearizer lock %r for key %r", + self.name, key, + ) + + # we just have to take ourselves back out of the queue. + del entry[1][new_defer] + raise + + logger.info("Acquired linearizer lock %r for key %r", self.name, key) + entry[0] += 1 + + # if the code holding the lock completes synchronously, then it + # will recursively run the next claimant on the list. That can + # relatively rapidly lead to stack exhaustion. This is essentially + # the same problem as http://twistedmatrix.com/trac/ticket/9304. + # + # In order to break the cycle, we add a cheeky sleep(0) here to + # ensure that we fall back to the reactor between each iteration. + # + # (This needs to happen while we hold the lock, and the context manager's exit + # code must be synchronous, so this is the only sensible place.) + yield self._clock.sleep(0) + + else: + logger.info( + "Acquired uncontended linearizer lock %r for key %r", self.name, key, + ) + entry[0] += 1 + + @contextmanager + def _ctx_manager(): + try: + yield + finally: + logger.info("Releasing linearizer lock %r for key %r", self.name, key) + + # We've finished executing so check if there are any things + # blocked waiting to execute and start one of them + entry[0] -= 1 + + if entry[1]: + (next_def, _) = entry[1].popitem(last=False) + + # we need to run the next thing in the sentinel context. + with PreserveLoggingContext(): + next_def.callback(None) + elif entry[0] == 0: + # We were the last thing for this key: remove it from the + # map. + del self.key_to_defer[key] + + defer.returnValue(_ctx_manager()) + + +class ReadWriteLock(object): + """A deferred style read write lock. + + Example: + + with (yield read_write_lock.read("test_key")): + # do some work + """ + + # IMPLEMENTATION NOTES + # + # We track the most recent queued reader and writer deferreds (which get + # resolved when they release the lock). + # + # Read: We know its safe to acquire a read lock when the latest writer has + # been resolved. The new reader is appeneded to the list of latest readers. + # + # Write: We know its safe to acquire the write lock when both the latest + # writers and readers have been resolved. The new writer replaces the latest + # writer. + + def __init__(self): + # Latest readers queued + self.key_to_current_readers = {} + + # Latest writer queued + self.key_to_current_writer = {} + + @defer.inlineCallbacks + def read(self, key): + new_defer = defer.Deferred() + + curr_readers = self.key_to_current_readers.setdefault(key, set()) + curr_writer = self.key_to_current_writer.get(key, None) + + curr_readers.add(new_defer) + + # We wait for the latest writer to finish writing. We can safely ignore + # any existing readers... as they're readers. + yield make_deferred_yieldable(curr_writer) + + @contextmanager + def _ctx_manager(): + try: + yield + finally: + new_defer.callback(None) + self.key_to_current_readers.get(key, set()).discard(new_defer) + + defer.returnValue(_ctx_manager()) + + @defer.inlineCallbacks + def write(self, key): + new_defer = defer.Deferred() + + curr_readers = self.key_to_current_readers.get(key, set()) + curr_writer = self.key_to_current_writer.get(key, None) + + # We wait on all latest readers and writer. + to_wait_on = list(curr_readers) + if curr_writer: + to_wait_on.append(curr_writer) + + # We can clear the list of current readers since the new writer waits + # for them to finish. + curr_readers.clear() + self.key_to_current_writer[key] = new_defer + + yield make_deferred_yieldable(defer.gatherResults(to_wait_on)) + + @contextmanager + def _ctx_manager(): + try: + yield + finally: + new_defer.callback(None) + if self.key_to_current_writer[key] == new_defer: + self.key_to_current_writer.pop(key) + + defer.returnValue(_ctx_manager()) + + +class DeferredTimeoutError(Exception): + """ + This error is raised by default when a L{Deferred} times out. + """ + + +def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None): + """ + Add a timeout to a deferred by scheduling it to be cancelled after + timeout seconds. + + This is essentially a backport of deferred.addTimeout, which was introduced + in twisted 16.5. + + If the deferred gets timed out, it errbacks with a DeferredTimeoutError, + unless a cancelable function was passed to its initialization or unless + a different on_timeout_cancel callable is provided. + + Args: + deferred (defer.Deferred): deferred to be timed out + timeout (Number): seconds to time out after + reactor (twisted.internet.reactor): the Twisted reactor to use + + on_timeout_cancel (callable): A callable which is called immediately + after the deferred times out, and not if this deferred is + otherwise cancelled before the timeout. + + It takes an arbitrary value, which is the value of the deferred at + that exact point in time (probably a CancelledError Failure), and + the timeout. + + The default callable (if none is provided) will translate a + CancelledError Failure into a DeferredTimeoutError. + """ + timed_out = [False] + + def time_it_out(): + timed_out[0] = True + deferred.cancel() + + delayed_call = reactor.callLater(timeout, time_it_out) + + def convert_cancelled(value): + if timed_out[0]: + to_call = on_timeout_cancel or _cancelled_to_timed_out_error + return to_call(value, timeout) + return value + + deferred.addBoth(convert_cancelled) + + def cancel_timeout(result): + # stop the pending call to cancel the deferred if it's been fired + if delayed_call.active(): + delayed_call.cancel() + return result + + deferred.addBoth(cancel_timeout) + + +def _cancelled_to_timed_out_error(value, timeout): + if isinstance(value, failure.Failure): + value.trap(CancelledError) + raise DeferredTimeoutError(timeout, "Deferred") + return value diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 861c24809c..187510576a 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -25,7 +25,7 @@ from six import itervalues, string_types from twisted.internet import defer from synapse.util import logcontext, unwrapFirstError -from synapse.util.async import ObservableDeferred +from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches import get_cache_factor_for from synapse.util.caches.lrucache import LruCache from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index a8491b42d5..afb03b2e1b 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -16,7 +16,7 @@ import logging from twisted.internet import defer -from synapse.util.async import ObservableDeferred +from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches import register_cache from synapse.util.logcontext import make_deferred_yieldable, run_in_background diff --git a/synapse/util/caches/snapshot_cache.py b/synapse/util/caches/snapshot_cache.py index d03678b8c8..8318db8d2c 100644 --- a/synapse/util/caches/snapshot_cache.py +++ b/synapse/util/caches/snapshot_cache.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.async import ObservableDeferred +from synapse.util.async_helpers import ObservableDeferred class SnapshotCache(object): diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 8dcae50b39..07e83fadda 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -526,7 +526,7 @@ _to_ignore = [ "synapse.util.logcontext", "synapse.http.server", "synapse.storage._base", - "synapse.util.async", + "synapse.util.async_helpers", ] diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index 6d6f00c5c5..52eff2a104 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -18,7 +18,7 @@ from mock import Mock from twisted.internet import defer -from synapse.util.async import ObservableDeferred +from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.descriptors import Cache, cached from tests import unittest diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index 4729bd5a0a..7f48a72de8 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -20,7 +20,7 @@ from twisted.internet import defer, reactor from twisted.internet.defer import CancelledError from synapse.util import Clock, logcontext -from synapse.util.async import Linearizer +from synapse.util.async_helpers import Linearizer from tests import unittest diff --git a/tests/util/test_rwlock.py b/tests/util/test_rwlock.py index 24194e3b25..7cd470be67 100644 --- a/tests/util/test_rwlock.py +++ b/tests/util/test_rwlock.py @@ -14,7 +14,7 @@ # limitations under the License. -from synapse.util.async import ReadWriteLock +from synapse.util.async_helpers import ReadWriteLock from tests import unittest -- cgit 1.5.1 From c334ca67bb89039b3a00b7c9a1ce610e99859653 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Sat, 18 Aug 2018 01:08:45 +1000 Subject: Integrate presence from hotfixes (#3694) --- changelog.d/3694.feature | 1 + docs/workers.rst | 8 +++ synapse/app/_base.py | 6 ++- synapse/app/frontend_proxy.py | 39 ++++++++++++++- synapse/app/synchrotron.py | 16 ++++-- synapse/config/server.py | 6 +++ synapse/federation/transaction_queue.py | 4 ++ synapse/handlers/initial_sync.py | 4 ++ synapse/handlers/presence.py | 26 +++++++--- synapse/handlers/sync.py | 3 +- synapse/rest/client/v1/presence.py | 3 +- tests/app/__init__.py | 0 tests/app/test_frontend_proxy.py | 88 +++++++++++++++++++++++++++++++++ tests/rest/client/v1/test_presence.py | 72 +++++++++++++++++++++++++++ tests/rest/client/v2_alpha/test_sync.py | 79 +++++++++++++---------------- tests/unittest.py | 8 ++- tests/utils.py | 7 +-- 17 files changed, 303 insertions(+), 67 deletions(-) create mode 100644 changelog.d/3694.feature create mode 100644 tests/app/__init__.py create mode 100644 tests/app/test_frontend_proxy.py create mode 100644 tests/rest/client/v1/test_presence.py (limited to 'synapse/federation') diff --git a/changelog.d/3694.feature b/changelog.d/3694.feature new file mode 100644 index 0000000000..916a342ff4 --- /dev/null +++ b/changelog.d/3694.feature @@ -0,0 +1 @@ +Synapse's presence functionality can now be disabled with the "use_presence" configuration option. diff --git a/docs/workers.rst b/docs/workers.rst index ac9efb621f..aec319dd84 100644 --- a/docs/workers.rst +++ b/docs/workers.rst @@ -241,6 +241,14 @@ regular expressions:: ^/_matrix/client/(api/v1|r0|unstable)/keys/upload +If ``use_presence`` is False in the homeserver config, it can also handle REST +endpoints matching the following regular expressions:: + + ^/_matrix/client/(api/v1|r0|unstable)/presence/[^/]+/status + +This "stub" presence handler will pass through ``GET`` request but make the +``PUT`` effectively a no-op. + It will proxy any requests it cannot handle to the main synapse instance. It must therefore be configured with the location of the main instance, via the ``worker_main_http_uri`` setting in the frontend_proxy worker configuration diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 391bd14c5c..7c866e246a 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -140,7 +140,7 @@ def listen_metrics(bind_addresses, port): logger.info("Metrics now reporting on %s:%d", host, port) -def listen_tcp(bind_addresses, port, factory, backlog=50): +def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50): """ Create a TCP socket for a port and several addresses """ @@ -156,7 +156,9 @@ def listen_tcp(bind_addresses, port, factory, backlog=50): check_bind_error(e, address, bind_addresses) -def listen_ssl(bind_addresses, port, factory, context_factory, backlog=50): +def listen_ssl( + bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50 +): """ Create an SSL socket for a port and several addresses """ diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index 671fbbcb2a..8d484c1cd4 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -38,6 +38,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns from synapse.rest.client.v2_alpha._base import client_v2_patterns from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -49,6 +50,35 @@ from synapse.util.versionstring import get_version_string logger = logging.getLogger("synapse.app.frontend_proxy") +class PresenceStatusStubServlet(ClientV1RestServlet): + PATTERNS = client_path_patterns("/presence/(?P[^/]*)/status") + + def __init__(self, hs): + super(PresenceStatusStubServlet, self).__init__(hs) + self.http_client = hs.get_simple_http_client() + self.auth = hs.get_auth() + self.main_uri = hs.config.worker_main_http_uri + + @defer.inlineCallbacks + def on_GET(self, request, user_id): + # Pass through the auth headers, if any, in case the access token + # is there. + auth_headers = request.requestHeaders.getRawHeaders("Authorization", []) + headers = { + "Authorization": auth_headers, + } + result = yield self.http_client.get_json( + self.main_uri + request.uri, + headers=headers, + ) + defer.returnValue((200, result)) + + @defer.inlineCallbacks + def on_PUT(self, request, user_id): + yield self.auth.get_user_by_req(request) + defer.returnValue((200, {})) + + class KeyUploadServlet(RestServlet): PATTERNS = client_v2_patterns("/keys/upload(/(?P[^/]+))?$") @@ -135,6 +165,12 @@ class FrontendProxyServer(HomeServer): elif name == "client": resource = JsonResource(self, canonical_json=False) KeyUploadServlet(self).register(resource) + + # If presence is disabled, use the stub servlet that does + # not allow sending presence + if not self.config.use_presence: + PresenceStatusStubServlet(self).register(resource) + resources.update({ "/_matrix/client/r0": resource, "/_matrix/client/unstable": resource, @@ -153,7 +189,8 @@ class FrontendProxyServer(HomeServer): listener_config, root_resource, self.version_string, - ) + ), + reactor=self.get_reactor() ) logger.info("Synapse client reader now listening on port %d", port) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index e201f18efd..cade09d60e 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -114,7 +114,10 @@ class SynchrotronPresence(object): logger.info("Presence process_id is %r", self.process_id) def send_user_sync(self, user_id, is_syncing, last_sync_ms): - self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms) + if self.hs.config.use_presence: + self.hs.get_tcp_replication().send_user_sync( + user_id, is_syncing, last_sync_ms + ) def mark_as_coming_online(self, user_id): """A user has started syncing. Send a UserSync to the master, unless they @@ -211,10 +214,13 @@ class SynchrotronPresence(object): yield self.notify_from_replication(states, stream_id) def get_currently_syncing_users(self): - return [ - user_id for user_id, count in iteritems(self.user_to_num_current_syncs) - if count > 0 - ] + if self.hs.config.use_presence: + return [ + user_id for user_id, count in iteritems(self.user_to_num_current_syncs) + if count > 0 + ] + else: + return set() class SynchrotronTyping(object): diff --git a/synapse/config/server.py b/synapse/config/server.py index a41c48e69c..68a612e594 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -49,6 +49,9 @@ class ServerConfig(Config): # "disable" federation self.send_federation = config.get("send_federation", True) + # Whether to enable user presence. + self.use_presence = config.get("use_presence", True) + # Whether to update the user directory or not. This should be set to # false only if we are updating the user directory in a worker self.update_user_directory = config.get("update_user_directory", True) @@ -250,6 +253,9 @@ class ServerConfig(Config): # hard limit. soft_file_limit: 0 + # Set to false to disable presence tracking on this homeserver. + use_presence: true + # The GC threshold parameters to pass to `gc.set_threshold`, if defined # gc_thresholds: [700, 10, 10] diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index f603c8a368..94d7423d01 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -58,6 +58,7 @@ class TransactionQueue(object): """ def __init__(self, hs): + self.hs = hs self.server_name = hs.hostname self.store = hs.get_datastore() @@ -308,6 +309,9 @@ class TransactionQueue(object): Args: states (list(UserPresenceState)) """ + if not self.hs.config.use_presence: + # No-op if presence is disabled. + return # First we queue up the new presence by user ID, so multiple presence # updates in quick successtion are correctly handled diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 1fb17fd9a5..e009395207 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -372,6 +372,10 @@ class InitialSyncHandler(BaseHandler): @defer.inlineCallbacks def get_presence(): + # If presence is disabled, return an empty list + if not self.hs.config.use_presence: + defer.returnValue([]) + states = yield presence_handler.get_states( [m.user_id for m in room_members], as_event=True, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 3671d24f60..ba3856674d 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -395,6 +395,10 @@ class PresenceHandler(object): """We've seen the user do something that indicates they're interacting with the app. """ + # If presence is disabled, no-op + if not self.hs.config.use_presence: + return + user_id = user.to_string() bump_active_time_counter.inc() @@ -424,6 +428,11 @@ class PresenceHandler(object): Useful for streams that are not associated with an actual client that is being used by a user. """ + # Override if it should affect the user's presence, if presence is + # disabled. + if not self.hs.config.use_presence: + affect_presence = False + if affect_presence: curr_sync = self.user_to_num_current_syncs.get(user_id, 0) self.user_to_num_current_syncs[user_id] = curr_sync + 1 @@ -469,13 +478,16 @@ class PresenceHandler(object): Returns: set(str): A set of user_id strings. """ - syncing_user_ids = { - user_id for user_id, count in self.user_to_num_current_syncs.items() - if count - } - for user_ids in self.external_process_to_current_syncs.values(): - syncing_user_ids.update(user_ids) - return syncing_user_ids + if self.hs.config.use_presence: + syncing_user_ids = { + user_id for user_id, count in self.user_to_num_current_syncs.items() + if count + } + for user_ids in self.external_process_to_current_syncs.values(): + syncing_user_ids.update(user_ids) + return syncing_user_ids + else: + return set() @defer.inlineCallbacks def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ac3edf0cc9..648debc8aa 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -185,6 +185,7 @@ class SyncResult(collections.namedtuple("SyncResult", [ class SyncHandler(object): def __init__(self, hs): + self.hs_config = hs.config self.store = hs.get_datastore() self.notifier = hs.get_notifier() self.presence_handler = hs.get_presence_handler() @@ -860,7 +861,7 @@ class SyncHandler(object): since_token is None and sync_config.filter_collection.blocks_all_presence() ) - if not block_all_presence_data: + if self.hs_config.use_presence and not block_all_presence_data: yield self._generate_sync_entry_for_presence( sync_result_builder, newly_joined_rooms, newly_joined_users ) diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py index a14f0c807e..b5a6d6aebf 100644 --- a/synapse/rest/client/v1/presence.py +++ b/synapse/rest/client/v1/presence.py @@ -84,7 +84,8 @@ class PresenceStatusRestServlet(ClientV1RestServlet): except Exception: raise SynapseError(400, "Unable to parse state") - yield self.presence_handler.set_state(user, state) + if self.hs.config.use_presence: + yield self.presence_handler.set_state(user, state) defer.returnValue((200, {})) diff --git a/tests/app/__init__.py b/tests/app/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/app/test_frontend_proxy.py b/tests/app/test_frontend_proxy.py new file mode 100644 index 0000000000..76b5090fff --- /dev/null +++ b/tests/app/test_frontend_proxy.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.app.frontend_proxy import FrontendProxyServer + +from tests.unittest import HomeserverTestCase + + +class FrontendProxyTests(HomeserverTestCase): + def make_homeserver(self, reactor, clock): + + hs = self.setup_test_homeserver( + http_client=None, homeserverToUse=FrontendProxyServer + ) + + return hs + + def test_listen_http_with_presence_enabled(self): + """ + When presence is on, the stub servlet will not register. + """ + # Presence is on + self.hs.config.use_presence = True + + config = { + "port": 8080, + "bind_addresses": ["0.0.0.0"], + "resources": [{"names": ["client"]}], + } + + # Listen with the config + self.hs._listen_http(config) + + # Grab the resource from the site that was told to listen + self.assertEqual(len(self.reactor.tcpServers), 1) + site = self.reactor.tcpServers[0][1] + self.resource = ( + site.resource.children["_matrix"].children["client"].children["r0"] + ) + + request, channel = self.make_request("PUT", "presence/a/status") + self.render(request) + + # 400 + unrecognised, because nothing is registered + self.assertEqual(channel.code, 400) + self.assertEqual(channel.json_body["errcode"], "M_UNRECOGNIZED") + + def test_listen_http_with_presence_disabled(self): + """ + When presence is on, the stub servlet will register. + """ + # Presence is off + self.hs.config.use_presence = False + + config = { + "port": 8080, + "bind_addresses": ["0.0.0.0"], + "resources": [{"names": ["client"]}], + } + + # Listen with the config + self.hs._listen_http(config) + + # Grab the resource from the site that was told to listen + self.assertEqual(len(self.reactor.tcpServers), 1) + site = self.reactor.tcpServers[0][1] + self.resource = ( + site.resource.children["_matrix"].children["client"].children["r0"] + ) + + request, channel = self.make_request("PUT", "presence/a/status") + self.render(request) + + # 401, because the stub servlet still checks authentication + self.assertEqual(channel.code, 401) + self.assertEqual(channel.json_body["errcode"], "M_MISSING_TOKEN") diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py new file mode 100644 index 0000000000..66c2b68707 --- /dev/null +++ b/tests/rest/client/v1/test_presence.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mock import Mock + +from synapse.rest.client.v1 import presence +from synapse.types import UserID + +from tests import unittest + + +class PresenceTestCase(unittest.HomeserverTestCase): + """ Tests presence REST API. """ + + user_id = "@sid:red" + + user = UserID.from_string(user_id) + servlets = [presence.register_servlets] + + def make_homeserver(self, reactor, clock): + + hs = self.setup_test_homeserver( + "red", http_client=None, federation_client=Mock() + ) + + hs.presence_handler = Mock() + + return hs + + def test_put_presence(self): + """ + PUT to the status endpoint with use_presence enabled will call + set_state on the presence handler. + """ + self.hs.config.use_presence = True + + body = {"presence": "here", "status_msg": "beep boop"} + request, channel = self.make_request( + "PUT", "/presence/%s/status" % (self.user_id,), body + ) + self.render(request) + + self.assertEqual(channel.code, 200) + self.assertEqual(self.hs.presence_handler.set_state.call_count, 1) + + def test_put_presence_disabled(self): + """ + PUT to the status endpoint with use_presence disbled will NOT call + set_state on the presence handler. + """ + self.hs.config.use_presence = False + + body = {"presence": "here", "status_msg": "beep boop"} + request, channel = self.make_request( + "PUT", "/presence/%s/status" % (self.user_id,), body + ) + self.render(request) + + self.assertEqual(channel.code, 200) + self.assertEqual(self.hs.presence_handler.set_state.call_count, 0) diff --git a/tests/rest/client/v2_alpha/test_sync.py b/tests/rest/client/v2_alpha/test_sync.py index 9f3d8bd1db..560b1fba96 100644 --- a/tests/rest/client/v2_alpha/test_sync.py +++ b/tests/rest/client/v2_alpha/test_sync.py @@ -13,71 +13,58 @@ # See the License for the specific language governing permissions and # limitations under the License. -import synapse.types -from synapse.http.server import JsonResource +from mock import Mock + from synapse.rest.client.v2_alpha import sync -from synapse.types import UserID -from synapse.util import Clock from tests import unittest -from tests.server import ( - ThreadedMemoryReactorClock as MemoryReactorClock, - make_request, - render, - setup_test_homeserver, -) - -PATH_PREFIX = "/_matrix/client/v2_alpha" -class FilterTestCase(unittest.TestCase): +class FilterTestCase(unittest.HomeserverTestCase): - USER_ID = "@apple:test" - TO_REGISTER = [sync] + user_id = "@apple:test" + servlets = [sync.register_servlets] - def setUp(self): - self.clock = MemoryReactorClock() - self.hs_clock = Clock(self.clock) + def make_homeserver(self, reactor, clock): - self.hs = setup_test_homeserver( - self.addCleanup, http_client=None, clock=self.hs_clock, reactor=self.clock + hs = self.setup_test_homeserver( + "red", http_client=None, federation_client=Mock() ) + return hs - self.auth = self.hs.get_auth() - - def get_user_by_access_token(token=None, allow_guest=False): - return { - "user": UserID.from_string(self.USER_ID), - "token_id": 1, - "is_guest": False, - } - - def get_user_by_req(request, allow_guest=False, rights="access"): - return synapse.types.create_requester( - UserID.from_string(self.USER_ID), 1, False, None - ) - - self.auth.get_user_by_access_token = get_user_by_access_token - self.auth.get_user_by_req = get_user_by_req + def test_sync_argless(self): + request, channel = self.make_request("GET", "/sync") + self.render(request) - self.store = self.hs.get_datastore() - self.filtering = self.hs.get_filtering() - self.resource = JsonResource(self.hs) + self.assertEqual(channel.code, 200) + self.assertTrue( + set( + [ + "next_batch", + "rooms", + "presence", + "account_data", + "to_device", + "device_lists", + ] + ).issubset(set(channel.json_body.keys())) + ) - for r in self.TO_REGISTER: - r.register_servlets(self.hs, self.resource) + def test_sync_presence_disabled(self): + """ + When presence is disabled, the key does not appear in /sync. + """ + self.hs.config.use_presence = False - def test_sync_argless(self): - request, channel = make_request("GET", "/_matrix/client/r0/sync") - render(request, self.resource, self.clock) + request, channel = self.make_request("GET", "/sync") + self.render(request) - self.assertEqual(channel.result["code"], b"200") + self.assertEqual(channel.code, 200) self.assertTrue( set( [ "next_batch", "rooms", - "presence", "account_data", "to_device", "device_lists", diff --git a/tests/unittest.py b/tests/unittest.py index e6afe3b96d..d852e2465a 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -18,6 +18,8 @@ import logging from mock import Mock +from canonicaljson import json + import twisted import twisted.logger from twisted.trial import unittest @@ -241,11 +243,15 @@ class HomeserverTestCase(TestCase): method (bytes/unicode): The HTTP request method ("verb"). path (bytes/unicode): The HTTP path, suitably URL encoded (e.g. escaped UTF-8 & spaces and such). - content (bytes): The body of the request. + content (bytes or dict): The body of the request. JSON-encoded, if + a dict. Returns: A synapse.http.site.SynapseRequest. """ + if isinstance(content, dict): + content = json.dumps(content).encode('utf8') + return make_request(method, path, content) def render(self, request): diff --git a/tests/utils.py b/tests/utils.py index 6f8b1de3e7..bb0fc74054 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -93,7 +93,8 @@ def setupdb(): @defer.inlineCallbacks def setup_test_homeserver( - cleanup_func, name="test", datastore=None, config=None, reactor=None, **kargs + cleanup_func, name="test", datastore=None, config=None, reactor=None, + homeserverToUse=HomeServer, **kargs ): """ Setup a homeserver suitable for running tests against. Keyword arguments @@ -192,7 +193,7 @@ def setup_test_homeserver( config.database_config["args"]["cp_openfun"] = db_engine.on_new_connection if datastore is None: - hs = HomeServer( + hs = homeserverToUse( name, config=config, db_config=config.database_config, @@ -235,7 +236,7 @@ def setup_test_homeserver( hs.setup() else: - hs = HomeServer( + hs = homeserverToUse( name, db_pool=None, datastore=datastore, -- cgit 1.5.1