summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/federation_server.py217
-rw-r--r--synapse/util/async_helpers.py7
2 files changed, 96 insertions, 128 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 5fc7c1d67b..7c331753ad 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -21,7 +21,6 @@ from six import iteritems
 from canonicaljson import json
 from prometheus_client import Counter
 
-from twisted.internet import defer
 from twisted.internet.abstract import isIPAddress
 from twisted.python import failure
 
@@ -86,14 +85,12 @@ class FederationServer(FederationBase):
         # come in waves.
         self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000)
 
-    @defer.inlineCallbacks
-    @log_function
-    def on_backfill_request(self, origin, room_id, versions, limit):
-        with (yield self._server_linearizer.queue((origin, room_id))):
+    async def on_backfill_request(self, origin, room_id, versions, limit):
+        with (await self._server_linearizer.queue((origin, room_id))):
             origin_host, _ = parse_server_name(origin)
-            yield self.check_server_matches_acl(origin_host, room_id)
+            await self.check_server_matches_acl(origin_host, room_id)
 
-            pdus = yield self.handler.on_backfill_request(
+            pdus = await self.handler.on_backfill_request(
                 origin, room_id, versions, limit
             )
 
@@ -101,9 +98,7 @@ class FederationServer(FederationBase):
 
         return 200, res
 
-    @defer.inlineCallbacks
-    @log_function
-    def on_incoming_transaction(self, origin, transaction_data):
+    async def on_incoming_transaction(self, origin, transaction_data):
         # keep this as early as possible to make the calculated origin ts as
         # accurate as possible.
         request_time = self._clock.time_msec()
@@ -118,18 +113,17 @@ class FederationServer(FederationBase):
         # use a linearizer to ensure that we don't process the same transaction
         # multiple times in parallel.
         with (
-            yield self._transaction_linearizer.queue(
+            await self._transaction_linearizer.queue(
                 (origin, transaction.transaction_id)
             )
         ):
-            result = yield self._handle_incoming_transaction(
+            result = await self._handle_incoming_transaction(
                 origin, transaction, request_time
             )
 
         return result
 
-    @defer.inlineCallbacks
-    def _handle_incoming_transaction(self, origin, transaction, request_time):
+    async def _handle_incoming_transaction(self, origin, transaction, request_time):
         """ Process an incoming transaction and return the HTTP response
 
         Args:
@@ -140,7 +134,7 @@ class FederationServer(FederationBase):
         Returns:
             Deferred[(int, object)]: http response code and body
         """
-        response = yield self.transaction_actions.have_responded(origin, transaction)
+        response = await self.transaction_actions.have_responded(origin, transaction)
 
         if response:
             logger.debug(
@@ -159,7 +153,7 @@ class FederationServer(FederationBase):
             logger.info("Transaction PDU or EDU count too large. Returning 400")
 
             response = {}
-            yield self.transaction_actions.set_response(
+            await self.transaction_actions.set_response(
                 origin, transaction, 400, response
             )
             return 400, response
@@ -195,7 +189,7 @@ class FederationServer(FederationBase):
                 continue
 
             try:
-                room_version = yield self.store.get_room_version(room_id)
+                room_version = await self.store.get_room_version(room_id)
             except NotFoundError:
                 logger.info("Ignoring PDU for unknown room_id: %s", room_id)
                 continue
@@ -221,11 +215,10 @@ class FederationServer(FederationBase):
         # require callouts to other servers to fetch missing events), but
         # impose a limit to avoid going too crazy with ram/cpu.
 
-        @defer.inlineCallbacks
-        def process_pdus_for_room(room_id):
+        async def process_pdus_for_room(room_id):
             logger.debug("Processing PDUs for %s", room_id)
             try:
-                yield self.check_server_matches_acl(origin_host, room_id)
+                await self.check_server_matches_acl(origin_host, room_id)
             except AuthError as e:
                 logger.warn("Ignoring PDUs for room %s from banned server", room_id)
                 for pdu in pdus_by_room[room_id]:
@@ -237,7 +230,7 @@ class FederationServer(FederationBase):
                 event_id = pdu.event_id
                 with nested_logging_context(event_id):
                     try:
-                        yield self._handle_received_pdu(origin, pdu)
+                        await self._handle_received_pdu(origin, pdu)
                         pdu_results[event_id] = {}
                     except FederationError as e:
                         logger.warn("Error handling PDU %s: %s", event_id, e)
@@ -251,36 +244,33 @@ class FederationServer(FederationBase):
                             exc_info=(f.type, f.value, f.getTracebackObject()),
                         )
 
-        yield concurrently_execute(
+        await concurrently_execute(
             process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
         )
 
         if hasattr(transaction, "edus"):
             for edu in (Edu(**x) for x in transaction.edus):
-                yield self.received_edu(origin, edu.edu_type, edu.content)
+                await self.received_edu(origin, edu.edu_type, edu.content)
 
         response = {"pdus": pdu_results}
 
         logger.debug("Returning: %s", str(response))
 
-        yield self.transaction_actions.set_response(origin, transaction, 200, response)
+        await self.transaction_actions.set_response(origin, transaction, 200, response)
         return 200, response
 
-    @defer.inlineCallbacks
-    def received_edu(self, origin, edu_type, content):
+    async def received_edu(self, origin, edu_type, content):
         received_edus_counter.inc()
-        yield self.registry.on_edu(edu_type, origin, content)
+        await self.registry.on_edu(edu_type, origin, content)
 
-    @defer.inlineCallbacks
-    @log_function
-    def on_context_state_request(self, origin, room_id, event_id):
+    async def on_context_state_request(self, origin, room_id, event_id):
         if not event_id:
             raise NotImplementedError("Specify an event")
 
         origin_host, _ = parse_server_name(origin)
-        yield self.check_server_matches_acl(origin_host, room_id)
+        await self.check_server_matches_acl(origin_host, room_id)
 
-        in_room = yield self.auth.check_host_in_room(room_id, origin)
+        in_room = await self.auth.check_host_in_room(room_id, origin)
         if not in_room:
             raise AuthError(403, "Host not in room.")
 
@@ -289,8 +279,8 @@ class FederationServer(FederationBase):
         # in the cache so we could return it without waiting for the linearizer
         # - but that's non-trivial to get right, and anyway somewhat defeats
         # the point of the linearizer.
-        with (yield self._server_linearizer.queue((origin, room_id))):
-            resp = yield self._state_resp_cache.wrap(
+        with (await self._server_linearizer.queue((origin, room_id))):
+            resp = await self._state_resp_cache.wrap(
                 (room_id, event_id),
                 self._on_context_state_request_compute,
                 room_id,
@@ -299,65 +289,58 @@ class FederationServer(FederationBase):
 
         return 200, resp
 
-    @defer.inlineCallbacks
-    def on_state_ids_request(self, origin, room_id, event_id):
+    async def on_state_ids_request(self, origin, room_id, event_id):
         if not event_id:
             raise NotImplementedError("Specify an event")
 
         origin_host, _ = parse_server_name(origin)
-        yield self.check_server_matches_acl(origin_host, room_id)
+        await self.check_server_matches_acl(origin_host, room_id)
 
-        in_room = yield self.auth.check_host_in_room(room_id, origin)
+        in_room = await self.auth.check_host_in_room(room_id, origin)
         if not in_room:
             raise AuthError(403, "Host not in room.")
 
-        state_ids = yield self.handler.get_state_ids_for_pdu(room_id, event_id)
-        auth_chain_ids = yield self.store.get_auth_chain_ids(state_ids)
+        state_ids = await self.handler.get_state_ids_for_pdu(room_id, event_id)
+        auth_chain_ids = await self.store.get_auth_chain_ids(state_ids)
 
         return 200, {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids}
 
-    @defer.inlineCallbacks
-    def _on_context_state_request_compute(self, room_id, event_id):
-        pdus = yield self.handler.get_state_for_pdu(room_id, event_id)
-        auth_chain = yield self.store.get_auth_chain([pdu.event_id for pdu in pdus])
+    async def _on_context_state_request_compute(self, room_id, event_id):
+        pdus = await self.handler.get_state_for_pdu(room_id, event_id)
+        auth_chain = await self.store.get_auth_chain([pdu.event_id for pdu in pdus])
 
         return {
             "pdus": [pdu.get_pdu_json() for pdu in pdus],
             "auth_chain": [pdu.get_pdu_json() for pdu in auth_chain],
         }
 
-    @defer.inlineCallbacks
-    @log_function
-    def on_pdu_request(self, origin, event_id):
-        pdu = yield self.handler.get_persisted_pdu(origin, event_id)
+    async def on_pdu_request(self, origin, event_id):
+        pdu = await self.handler.get_persisted_pdu(origin, event_id)
 
         if pdu:
             return 200, self._transaction_from_pdus([pdu]).get_dict()
         else:
             return 404, ""
 
-    @defer.inlineCallbacks
-    def on_query_request(self, query_type, args):
+    async def on_query_request(self, query_type, args):
         received_queries_counter.labels(query_type).inc()
-        resp = yield self.registry.on_query(query_type, args)
+        resp = await self.registry.on_query(query_type, args)
         return 200, resp
 
-    @defer.inlineCallbacks
-    def on_make_join_request(self, origin, room_id, user_id, supported_versions):
+    async def on_make_join_request(self, origin, room_id, user_id, supported_versions):
         origin_host, _ = parse_server_name(origin)
-        yield self.check_server_matches_acl(origin_host, room_id)
+        await self.check_server_matches_acl(origin_host, room_id)
 
-        room_version = yield self.store.get_room_version(room_id)
+        room_version = await self.store.get_room_version(room_id)
         if room_version not in supported_versions:
             logger.warn("Room version %s not in %s", room_version, supported_versions)
             raise IncompatibleRoomVersionError(room_version=room_version)
 
-        pdu = yield self.handler.on_make_join_request(origin, room_id, user_id)
+        pdu = await self.handler.on_make_join_request(origin, room_id, user_id)
         time_now = self._clock.time_msec()
         return {"event": pdu.get_pdu_json(time_now), "room_version": room_version}
 
-    @defer.inlineCallbacks
-    def on_invite_request(self, origin, content, room_version):
+    async def on_invite_request(self, origin, content, room_version):
         if room_version not in KNOWN_ROOM_VERSIONS:
             raise SynapseError(
                 400,
@@ -369,28 +352,27 @@ class FederationServer(FederationBase):
 
         pdu = event_from_pdu_json(content, format_ver)
         origin_host, _ = parse_server_name(origin)
-        yield self.check_server_matches_acl(origin_host, pdu.room_id)
-        pdu = yield self._check_sigs_and_hash(room_version, pdu)
-        ret_pdu = yield self.handler.on_invite_request(origin, pdu)
+        await self.check_server_matches_acl(origin_host, pdu.room_id)
+        pdu = await self._check_sigs_and_hash(room_version, pdu)
+        ret_pdu = await self.handler.on_invite_request(origin, pdu)
         time_now = self._clock.time_msec()
         return {"event": ret_pdu.get_pdu_json(time_now)}
 
-    @defer.inlineCallbacks
-    def on_send_join_request(self, origin, content, room_id):
+    async def on_send_join_request(self, origin, content, room_id):
         logger.debug("on_send_join_request: content: %s", content)
 
-        room_version = yield self.store.get_room_version(room_id)
+        room_version = await self.store.get_room_version(room_id)
         format_ver = room_version_to_event_format(room_version)
         pdu = event_from_pdu_json(content, format_ver)
 
         origin_host, _ = parse_server_name(origin)
-        yield self.check_server_matches_acl(origin_host, pdu.room_id)
+        await self.check_server_matches_acl(origin_host, pdu.room_id)
 
         logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures)
 
-        pdu = yield self._check_sigs_and_hash(room_version, pdu)
+        pdu = await self._check_sigs_and_hash(room_version, pdu)
 
-        res_pdus = yield self.handler.on_send_join_request(origin, pdu)
+        res_pdus = await self.handler.on_send_join_request(origin, pdu)
         time_now = self._clock.time_msec()
         return (
             200,
@@ -402,48 +384,44 @@ class FederationServer(FederationBase):
             },
         )
 
-    @defer.inlineCallbacks
-    def on_make_leave_request(self, origin, room_id, user_id):
+    async def on_make_leave_request(self, origin, room_id, user_id):
         origin_host, _ = parse_server_name(origin)
-        yield self.check_server_matches_acl(origin_host, room_id)
-        pdu = yield self.handler.on_make_leave_request(origin, room_id, user_id)
+        await self.check_server_matches_acl(origin_host, room_id)
+        pdu = await self.handler.on_make_leave_request(origin, room_id, user_id)
 
-        room_version = yield self.store.get_room_version(room_id)
+        room_version = await self.store.get_room_version(room_id)
 
         time_now = self._clock.time_msec()
         return {"event": pdu.get_pdu_json(time_now), "room_version": room_version}
 
-    @defer.inlineCallbacks
-    def on_send_leave_request(self, origin, content, room_id):
+    async def on_send_leave_request(self, origin, content, room_id):
         logger.debug("on_send_leave_request: content: %s", content)
 
-        room_version = yield self.store.get_room_version(room_id)
+        room_version = await self.store.get_room_version(room_id)
         format_ver = room_version_to_event_format(room_version)
         pdu = event_from_pdu_json(content, format_ver)
 
         origin_host, _ = parse_server_name(origin)
-        yield self.check_server_matches_acl(origin_host, pdu.room_id)
+        await self.check_server_matches_acl(origin_host, pdu.room_id)
 
         logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures)
 
-        pdu = yield self._check_sigs_and_hash(room_version, pdu)
+        pdu = await self._check_sigs_and_hash(room_version, pdu)
 
-        yield self.handler.on_send_leave_request(origin, pdu)
+        await self.handler.on_send_leave_request(origin, pdu)
         return 200, {}
 
-    @defer.inlineCallbacks
-    def on_event_auth(self, origin, room_id, event_id):
-        with (yield self._server_linearizer.queue((origin, room_id))):
+    async def on_event_auth(self, origin, room_id, event_id):
+        with (await self._server_linearizer.queue((origin, room_id))):
             origin_host, _ = parse_server_name(origin)
-            yield self.check_server_matches_acl(origin_host, room_id)
+            await self.check_server_matches_acl(origin_host, room_id)
 
             time_now = self._clock.time_msec()
-            auth_pdus = yield self.handler.on_event_auth(event_id)
+            auth_pdus = await self.handler.on_event_auth(event_id)
             res = {"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus]}
         return 200, res
 
-    @defer.inlineCallbacks
-    def on_query_auth_request(self, origin, content, room_id, event_id):
+    async def on_query_auth_request(self, origin, content, room_id, event_id):
         """
         Content is a dict with keys::
             auth_chain (list): A list of events that give the auth chain.
@@ -462,22 +440,22 @@ class FederationServer(FederationBase):
         Returns:
             Deferred: Results in `dict` with the same format as `content`
         """
-        with (yield self._server_linearizer.queue((origin, room_id))):
+        with (await self._server_linearizer.queue((origin, room_id))):
             origin_host, _ = parse_server_name(origin)
-            yield self.check_server_matches_acl(origin_host, room_id)
+            await self.check_server_matches_acl(origin_host, room_id)
 
-            room_version = yield self.store.get_room_version(room_id)
+            room_version = await self.store.get_room_version(room_id)
             format_ver = room_version_to_event_format(room_version)
 
             auth_chain = [
                 event_from_pdu_json(e, format_ver) for e in content["auth_chain"]
             ]
 
-            signed_auth = yield self._check_sigs_and_hash_and_fetch(
+            signed_auth = await self._check_sigs_and_hash_and_fetch(
                 origin, auth_chain, outlier=True, room_version=room_version
             )
 
-            ret = yield self.handler.on_query_auth(
+            ret = await self.handler.on_query_auth(
                 origin,
                 event_id,
                 room_id,
@@ -503,16 +481,14 @@ class FederationServer(FederationBase):
         return self.on_query_request("user_devices", user_id)
 
     @trace
-    @defer.inlineCallbacks
-    @log_function
-    def on_claim_client_keys(self, origin, content):
+    async def on_claim_client_keys(self, origin, content):
         query = []
         for user_id, device_keys in content.get("one_time_keys", {}).items():
             for device_id, algorithm in device_keys.items():
                 query.append((user_id, device_id, algorithm))
 
         log_kv({"message": "Claiming one time keys.", "user, device pairs": query})
-        results = yield self.store.claim_e2e_one_time_keys(query)
+        results = await self.store.claim_e2e_one_time_keys(query)
 
         json_result = {}
         for user_id, device_keys in results.items():
@@ -536,14 +512,12 @@ class FederationServer(FederationBase):
 
         return {"one_time_keys": json_result}
 
-    @defer.inlineCallbacks
-    @log_function
-    def on_get_missing_events(
+    async def on_get_missing_events(
         self, origin, room_id, earliest_events, latest_events, limit
     ):
-        with (yield self._server_linearizer.queue((origin, room_id))):
+        with (await self._server_linearizer.queue((origin, room_id))):
             origin_host, _ = parse_server_name(origin)
-            yield self.check_server_matches_acl(origin_host, room_id)
+            await self.check_server_matches_acl(origin_host, room_id)
 
             logger.info(
                 "on_get_missing_events: earliest_events: %r, latest_events: %r,"
@@ -553,7 +527,7 @@ class FederationServer(FederationBase):
                 limit,
             )
 
-            missing_events = yield self.handler.on_get_missing_events(
+            missing_events = await self.handler.on_get_missing_events(
                 origin, room_id, earliest_events, latest_events, limit
             )
 
@@ -586,8 +560,7 @@ class FederationServer(FederationBase):
             destination=None,
         )
 
-    @defer.inlineCallbacks
-    def _handle_received_pdu(self, origin, pdu):
+    async def _handle_received_pdu(self, origin, pdu):
         """ Process a PDU received in a federation /send/ transaction.
 
         If the event is invalid, then this method throws a FederationError.
@@ -640,37 +613,34 @@ class FederationServer(FederationBase):
                 logger.info("Accepting join PDU %s from %s", pdu.event_id, origin)
 
         # We've already checked that we know the room version by this point
-        room_version = yield self.store.get_room_version(pdu.room_id)
+        room_version = await self.store.get_room_version(pdu.room_id)
 
         # Check signature.
         try:
-            pdu = yield self._check_sigs_and_hash(room_version, pdu)
+            pdu = await self._check_sigs_and_hash(room_version, pdu)
         except SynapseError as e:
             raise FederationError("ERROR", e.code, e.msg, affected=pdu.event_id)
 
-        yield self.handler.on_receive_pdu(origin, pdu, sent_to_us_directly=True)
+        await self.handler.on_receive_pdu(origin, pdu, sent_to_us_directly=True)
 
     def __str__(self):
         return "<ReplicationLayer(%s)>" % self.server_name
 
-    @defer.inlineCallbacks
-    def exchange_third_party_invite(
+    async def exchange_third_party_invite(
         self, sender_user_id, target_user_id, room_id, signed
     ):
-        ret = yield self.handler.exchange_third_party_invite(
+        ret = await self.handler.exchange_third_party_invite(
             sender_user_id, target_user_id, room_id, signed
         )
         return ret
 
-    @defer.inlineCallbacks
-    def on_exchange_third_party_invite_request(self, room_id, event_dict):
-        ret = yield self.handler.on_exchange_third_party_invite_request(
+    async def on_exchange_third_party_invite_request(self, room_id, event_dict):
+        ret = await self.handler.on_exchange_third_party_invite_request(
             room_id, event_dict
         )
         return ret
 
-    @defer.inlineCallbacks
-    def check_server_matches_acl(self, server_name, room_id):
+    async def check_server_matches_acl(self, server_name, room_id):
         """Check if the given server is allowed by the server ACLs in the room
 
         Args:
@@ -680,13 +650,13 @@ class FederationServer(FederationBase):
         Raises:
             AuthError if the server does not match the ACL
         """
-        state_ids = yield self.store.get_current_state_ids(room_id)
+        state_ids = await self.store.get_current_state_ids(room_id)
         acl_event_id = state_ids.get((EventTypes.ServerACL, ""))
 
         if not acl_event_id:
             return
 
-        acl_event = yield self.store.get_event(acl_event_id)
+        acl_event = await self.store.get_event(acl_event_id)
         if server_matches_acl_event(server_name, acl_event):
             return
 
@@ -799,15 +769,14 @@ class FederationHandlerRegistry(object):
 
         self.query_handlers[query_type] = handler
 
-    @defer.inlineCallbacks
-    def on_edu(self, edu_type, origin, content):
+    async def on_edu(self, edu_type, origin, content):
         handler = self.edu_handlers.get(edu_type)
         if not handler:
             logger.warn("No handler registered for EDU type %s", edu_type)
 
         with start_active_span_from_edu(content, "handle_edu"):
             try:
-                yield handler(origin, content)
+                await handler(origin, content)
             except SynapseError as e:
                 logger.info("Failed to handle edu %r: %r", edu_type, e)
             except Exception:
@@ -840,7 +809,7 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
 
         super(ReplicationFederationHandlerRegistry, self).__init__()
 
-    def on_edu(self, edu_type, origin, content):
+    async def on_edu(self, edu_type, origin, content):
         """Overrides FederationHandlerRegistry
         """
         if not self.config.use_presence and edu_type == "m.presence":
@@ -848,17 +817,17 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
 
         handler = self.edu_handlers.get(edu_type)
         if handler:
-            return super(ReplicationFederationHandlerRegistry, self).on_edu(
+            return await super(ReplicationFederationHandlerRegistry, self).on_edu(
                 edu_type, origin, content
             )
 
-        return self._send_edu(edu_type=edu_type, origin=origin, content=content)
+        return await self._send_edu(edu_type=edu_type, origin=origin, content=content)
 
-    def on_query(self, query_type, args):
+    async def on_query(self, query_type, args):
         """Overrides FederationHandlerRegistry
         """
         handler = self.query_handlers.get(query_type)
         if handler:
-            return handler(args)
+            return await handler(args)
 
-        return self._get_query_client(query_type=query_type, args=args)
+        return await self._get_query_client(query_type=query_type, args=args)
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 804dbca443..7659eaeb42 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -138,7 +138,7 @@ def concurrently_execute(func, args, limit):
     the number of concurrent executions.
 
     Args:
-        func (func): Function to execute, should return a deferred.
+        func (func): Function to execute, should return a deferred or coroutine.
         args (list): List of arguments to pass to func, each invocation of func
             gets a signle argument.
         limit (int): Maximum number of conccurent executions.
@@ -148,11 +148,10 @@ def concurrently_execute(func, args, limit):
     """
     it = iter(args)
 
-    @defer.inlineCallbacks
-    def _concurrently_execute_inner():
+    async def _concurrently_execute_inner():
         try:
             while True:
-                yield func(next(it))
+                await maybe_awaitable(func(next(it)))
         except StopIteration:
             pass