From a3f11567d930b7da0db068c3b313f6f4abbf12a1 Mon Sep 17 00:00:00 2001 From: Dagfinn Ilmari Mannsåker Date: Tue, 16 Jun 2020 13:51:47 +0100 Subject: Replace all remaining six usage with native Python 3 equivalents (#7704) --- synapse/appservice/api.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'synapse/appservice/api.py') diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 57174da021..da9a5e86d4 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -13,8 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging - -from six.moves import urllib +import urllib from prometheus_client import Counter -- cgit 1.5.1 From 6d687ebba11c701698df1f3da6fccec3b486c25a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 6 Jul 2020 07:40:35 -0400 Subject: Convert the appservice handler to async/await. (#7775) --- changelog.d/7775.misc | 1 + synapse/appservice/api.py | 1 - synapse/handlers/appservice.py | 74 +++++++++++++++++---------------------- tests/handlers/test_appservice.py | 68 ++++++++++++++++++----------------- 4 files changed, 68 insertions(+), 76 deletions(-) create mode 100644 changelog.d/7775.misc (limited to 'synapse/appservice/api.py') diff --git a/changelog.d/7775.misc b/changelog.d/7775.misc new file mode 100644 index 0000000000..af6fdb782f --- /dev/null +++ b/changelog.d/7775.misc @@ -0,0 +1 @@ +Convert the appserver handler to async/await. diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index da9a5e86d4..f92bfb420b 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -98,7 +98,6 @@ class ApplicationServiceApi(SimpleHttpClient): if service.url is None: return False uri = service.url + ("/users/%s" % urllib.parse.quote(user_id)) - response = None try: response = yield self.get_json(uri, {"access_token": service.hs_token}) if response is not None: # just an empty json object diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 904c96eeec..92d4c6e16c 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -48,8 +48,7 @@ class ApplicationServicesHandler(object): self.current_max = 0 self.is_processing = False - @defer.inlineCallbacks - def notify_interested_services(self, current_id): + async def notify_interested_services(self, current_id): """Notifies (pushes) all application services interested in this event. Pushing is done asynchronously, so this method won't block for any @@ -74,7 +73,7 @@ class ApplicationServicesHandler(object): ( upper_bound, events, - ) = yield self.store.get_new_events_for_appservice( + ) = await self.store.get_new_events_for_appservice( self.current_max, limit ) @@ -85,10 +84,9 @@ class ApplicationServicesHandler(object): for event in events: events_by_room.setdefault(event.room_id, []).append(event) - @defer.inlineCallbacks - def handle_event(event): + async def handle_event(event): # Gather interested services - services = yield self._get_services_for_event(event) + services = await self._get_services_for_event(event) if len(services) == 0: return # no services need notifying @@ -96,9 +94,9 @@ class ApplicationServicesHandler(object): # query API for all services which match that user regex. # This needs to block as these user queries need to be # made BEFORE pushing the event. - yield self._check_user_exists(event.sender) + await self._check_user_exists(event.sender) if event.type == EventTypes.Member: - yield self._check_user_exists(event.state_key) + await self._check_user_exists(event.state_key) if not self.started_scheduler: @@ -115,17 +113,16 @@ class ApplicationServicesHandler(object): self.scheduler.submit_event_for_as(service, event) now = self.clock.time_msec() - ts = yield self.store.get_received_ts(event.event_id) + ts = await self.store.get_received_ts(event.event_id) synapse.metrics.event_processing_lag_by_event.labels( "appservice_sender" ).observe((now - ts) / 1000) - @defer.inlineCallbacks - def handle_room_events(events): + async def handle_room_events(events): for event in events: - yield handle_event(event) + await handle_event(event) - yield make_deferred_yieldable( + await make_deferred_yieldable( defer.gatherResults( [ run_in_background(handle_room_events, evs) @@ -135,10 +132,10 @@ class ApplicationServicesHandler(object): ) ) - yield self.store.set_appservice_last_pos(upper_bound) + await self.store.set_appservice_last_pos(upper_bound) now = self.clock.time_msec() - ts = yield self.store.get_received_ts(events[-1].event_id) + ts = await self.store.get_received_ts(events[-1].event_id) synapse.metrics.event_processing_positions.labels( "appservice_sender" @@ -161,8 +158,7 @@ class ApplicationServicesHandler(object): finally: self.is_processing = False - @defer.inlineCallbacks - def query_user_exists(self, user_id): + async def query_user_exists(self, user_id): """Check if any application service knows this user_id exists. Args: @@ -170,15 +166,14 @@ class ApplicationServicesHandler(object): Returns: True if this user exists on at least one application service. """ - user_query_services = yield self._get_services_for_user(user_id=user_id) + user_query_services = self._get_services_for_user(user_id=user_id) for user_service in user_query_services: - is_known_user = yield self.appservice_api.query_user(user_service, user_id) + is_known_user = await self.appservice_api.query_user(user_service, user_id) if is_known_user: return True return False - @defer.inlineCallbacks - def query_room_alias_exists(self, room_alias): + async def query_room_alias_exists(self, room_alias): """Check if an application service knows this room alias exists. Args: @@ -193,19 +188,18 @@ class ApplicationServicesHandler(object): s for s in services if (s.is_interested_in_alias(room_alias_str)) ] for alias_service in alias_query_services: - is_known_alias = yield self.appservice_api.query_alias( + is_known_alias = await self.appservice_api.query_alias( alias_service, room_alias_str ) if is_known_alias: # the alias exists now so don't query more ASes. - result = yield self.store.get_association_from_room_alias(room_alias) + result = await self.store.get_association_from_room_alias(room_alias) return result - @defer.inlineCallbacks - def query_3pe(self, kind, protocol, fields): - services = yield self._get_services_for_3pn(protocol) + async def query_3pe(self, kind, protocol, fields): + services = self._get_services_for_3pn(protocol) - results = yield make_deferred_yieldable( + results = await make_deferred_yieldable( defer.DeferredList( [ run_in_background( @@ -224,8 +218,7 @@ class ApplicationServicesHandler(object): return ret - @defer.inlineCallbacks - def get_3pe_protocols(self, only_protocol=None): + async def get_3pe_protocols(self, only_protocol=None): services = self.store.get_app_services() protocols = {} @@ -238,7 +231,7 @@ class ApplicationServicesHandler(object): if p not in protocols: protocols[p] = [] - info = yield self.appservice_api.get_3pe_protocol(s, p) + info = await self.appservice_api.get_3pe_protocol(s, p) if info is not None: protocols[p].append(info) @@ -263,8 +256,7 @@ class ApplicationServicesHandler(object): return protocols - @defer.inlineCallbacks - def _get_services_for_event(self, event): + async def _get_services_for_event(self, event): """Retrieve a list of application services interested in this event. Args: @@ -280,7 +272,7 @@ class ApplicationServicesHandler(object): # inside of a list comprehension anymore. interested_list = [] for s in services: - if (yield s.is_interested(event, self.store)): + if await s.is_interested(event, self.store): interested_list.append(s) return interested_list @@ -288,21 +280,20 @@ class ApplicationServicesHandler(object): def _get_services_for_user(self, user_id): services = self.store.get_app_services() interested_list = [s for s in services if (s.is_interested_in_user(user_id))] - return defer.succeed(interested_list) + return interested_list def _get_services_for_3pn(self, protocol): services = self.store.get_app_services() interested_list = [s for s in services if s.is_interested_in_protocol(protocol)] - return defer.succeed(interested_list) + return interested_list - @defer.inlineCallbacks - def _is_unknown_user(self, user_id): + async def _is_unknown_user(self, user_id): if not self.is_mine_id(user_id): # we don't know if they are unknown or not since it isn't one of our # users. We can't poke ASes. return False - user_info = yield self.store.get_user_by_id(user_id) + user_info = await self.store.get_user_by_id(user_id) if user_info: return False @@ -311,10 +302,9 @@ class ApplicationServicesHandler(object): service_list = [s for s in services if s.sender == user_id] return len(service_list) == 0 - @defer.inlineCallbacks - def _check_user_exists(self, user_id): - unknown_user = yield self._is_unknown_user(user_id) + async def _check_user_exists(self, user_id): + unknown_user = await self._is_unknown_user(user_id) if unknown_user: - exists = yield self.query_user_exists(user_id) + exists = await self.query_user_exists(user_id) return exists return True diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index ba7148ec01..ebabe9a7d6 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -32,10 +32,11 @@ class AppServiceHandlerTestCase(unittest.TestCase): self.mock_as_api = Mock() self.mock_scheduler = Mock() hs = Mock() - hs.get_datastore = Mock(return_value=self.mock_store) - self.mock_store.get_received_ts.return_value = 0 - hs.get_application_service_api = Mock(return_value=self.mock_as_api) - hs.get_application_service_scheduler = Mock(return_value=self.mock_scheduler) + hs.get_datastore.return_value = self.mock_store + self.mock_store.get_received_ts.return_value = defer.succeed(0) + self.mock_store.set_appservice_last_pos.return_value = defer.succeed(None) + hs.get_application_service_api.return_value = self.mock_as_api + hs.get_application_service_scheduler.return_value = self.mock_scheduler hs.get_clock.return_value = MockClock() self.handler = ApplicationServicesHandler(hs) @@ -48,18 +49,18 @@ class AppServiceHandlerTestCase(unittest.TestCase): self._mkservice(is_interested=False), ] - self.mock_store.get_app_services = Mock(return_value=services) - self.mock_store.get_user_by_id = Mock(return_value=[]) + self.mock_as_api.query_user.return_value = defer.succeed(True) + self.mock_store.get_app_services.return_value = services + self.mock_store.get_user_by_id.return_value = defer.succeed([]) event = Mock( sender="@someone:anywhere", type="m.room.message", room_id="!foo:bar" ) self.mock_store.get_new_events_for_appservice.side_effect = [ - (0, [event]), - (0, []), + defer.succeed((0, [event])), + defer.succeed((0, [])), ] - self.mock_as_api.push = Mock() - yield self.handler.notify_interested_services(0) + yield defer.ensureDeferred(self.handler.notify_interested_services(0)) self.mock_scheduler.submit_event_for_as.assert_called_once_with( interested_service, event ) @@ -68,36 +69,34 @@ class AppServiceHandlerTestCase(unittest.TestCase): def test_query_user_exists_unknown_user(self): user_id = "@someone:anywhere" services = [self._mkservice(is_interested=True)] - services[0].is_interested_in_user = Mock(return_value=True) - self.mock_store.get_app_services = Mock(return_value=services) - self.mock_store.get_user_by_id = Mock(return_value=None) + services[0].is_interested_in_user.return_value = True + self.mock_store.get_app_services.return_value = services + self.mock_store.get_user_by_id.return_value = defer.succeed(None) event = Mock(sender=user_id, type="m.room.message", room_id="!foo:bar") - self.mock_as_api.push = Mock() - self.mock_as_api.query_user = Mock() + self.mock_as_api.query_user.return_value = defer.succeed(True) self.mock_store.get_new_events_for_appservice.side_effect = [ - (0, [event]), - (0, []), + defer.succeed((0, [event])), + defer.succeed((0, [])), ] - yield self.handler.notify_interested_services(0) + yield defer.ensureDeferred(self.handler.notify_interested_services(0)) self.mock_as_api.query_user.assert_called_once_with(services[0], user_id) @defer.inlineCallbacks def test_query_user_exists_known_user(self): user_id = "@someone:anywhere" services = [self._mkservice(is_interested=True)] - services[0].is_interested_in_user = Mock(return_value=True) - self.mock_store.get_app_services = Mock(return_value=services) - self.mock_store.get_user_by_id = Mock(return_value={"name": user_id}) + services[0].is_interested_in_user.return_value = True + self.mock_store.get_app_services.return_value = services + self.mock_store.get_user_by_id.return_value = defer.succeed({"name": user_id}) event = Mock(sender=user_id, type="m.room.message", room_id="!foo:bar") - self.mock_as_api.push = Mock() - self.mock_as_api.query_user = Mock() + self.mock_as_api.query_user.return_value = defer.succeed(True) self.mock_store.get_new_events_for_appservice.side_effect = [ - (0, [event]), - (0, []), + defer.succeed((0, [event])), + defer.succeed((0, [])), ] - yield self.handler.notify_interested_services(0) + yield defer.ensureDeferred(self.handler.notify_interested_services(0)) self.assertFalse( self.mock_as_api.query_user.called, "query_user called when it shouldn't have been.", @@ -107,7 +106,7 @@ class AppServiceHandlerTestCase(unittest.TestCase): def test_query_room_alias_exists(self): room_alias_str = "#foo:bar" room_alias = Mock() - room_alias.to_string = Mock(return_value=room_alias_str) + room_alias.to_string.return_value = room_alias_str room_id = "!alpha:bet" servers = ["aperture"] @@ -118,12 +117,15 @@ class AppServiceHandlerTestCase(unittest.TestCase): self._mkservice_alias(is_interested_in_alias=False), ] - self.mock_store.get_app_services = Mock(return_value=services) - self.mock_store.get_association_from_room_alias = Mock( - return_value=Mock(room_id=room_id, servers=servers) + self.mock_as_api.query_alias.return_value = defer.succeed(True) + self.mock_store.get_app_services.return_value = services + self.mock_store.get_association_from_room_alias.return_value = defer.succeed( + Mock(room_id=room_id, servers=servers) ) - result = yield self.handler.query_room_alias_exists(room_alias) + result = yield defer.ensureDeferred( + self.handler.query_room_alias_exists(room_alias) + ) self.mock_as_api.query_alias.assert_called_once_with( interested_service, room_alias_str @@ -133,14 +135,14 @@ class AppServiceHandlerTestCase(unittest.TestCase): def _mkservice(self, is_interested): service = Mock() - service.is_interested = Mock(return_value=is_interested) + service.is_interested.return_value = defer.succeed(is_interested) service.token = "mock_service_token" service.url = "mock_service_url" return service def _mkservice_alias(self, is_interested_in_alias): service = Mock() - service.is_interested_in_alias = Mock(return_value=is_interested_in_alias) + service.is_interested_in_alias.return_value = is_interested_in_alias service.token = "mock_service_token" service.url = "mock_service_url" return service -- cgit 1.5.1 From 1bca21e1da3853d64561d39eea8b487db378ce22 Mon Sep 17 00:00:00 2001 From: Sorunome Date: Fri, 10 Jul 2020 19:44:56 +0200 Subject: Include room states on invite events sent to ASes (#6455) --- changelog.d/6455.feature | 1 + synapse/appservice/api.py | 20 ++++++++++++++++---- 2 files changed, 17 insertions(+), 4 deletions(-) create mode 100644 changelog.d/6455.feature (limited to 'synapse/appservice/api.py') diff --git a/changelog.d/6455.feature b/changelog.d/6455.feature new file mode 100644 index 0000000000..eb286cb70f --- /dev/null +++ b/changelog.d/6455.feature @@ -0,0 +1 @@ +Include room states on invite events that are sent to application services. Contributed by @Sorunome. diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index f92bfb420b..1e0e4d497d 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -19,7 +19,7 @@ from prometheus_client import Counter from twisted.internet import defer -from synapse.api.constants import ThirdPartyEntityKind +from synapse.api.constants import EventTypes, ThirdPartyEntityKind from synapse.api.errors import CodeMessageException from synapse.events.utils import serialize_event from synapse.http.client import SimpleHttpClient @@ -207,7 +207,7 @@ class ApplicationServiceApi(SimpleHttpClient): if service.url is None: return True - events = self._serialize(events) + events = self._serialize(service, events) if txn_id is None: logger.warning( @@ -233,6 +233,18 @@ class ApplicationServiceApi(SimpleHttpClient): failed_transactions_counter.labels(service.id).inc() return False - def _serialize(self, events): + def _serialize(self, service, events): time_now = self.clock.time_msec() - return [serialize_event(e, time_now, as_client_event=True) for e in events] + return [ + serialize_event( + e, + time_now, + as_client_event=True, + is_invite=( + e.type == EventTypes.Member + and e.membership == "invite" + and service.is_interested_in_user(e.state_key) + ), + ) + for e in events + ] -- cgit 1.5.1 From 4cce8ef74ec233d8e49361bee705f2e38de2e11e Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 30 Jul 2020 07:27:39 -0400 Subject: Convert appservice to async. (#7973) --- changelog.d/7973.misc | 1 + synapse/appservice/__init__.py | 31 ++++++------- synapse/appservice/api.py | 21 ++++----- synapse/appservice/scheduler.py | 49 +++++++++----------- synapse/handlers/appservice.py | 10 ++--- tests/appservice/test_appservice.py | 89 +++++++++++++++++++++++++------------ tests/appservice/test_scheduler.py | 19 ++++---- tests/handlers/test_appservice.py | 5 ++- 8 files changed, 122 insertions(+), 103 deletions(-) create mode 100644 changelog.d/7973.misc (limited to 'synapse/appservice/api.py') diff --git a/changelog.d/7973.misc b/changelog.d/7973.misc new file mode 100644 index 0000000000..dfe4c03171 --- /dev/null +++ b/changelog.d/7973.misc @@ -0,0 +1 @@ +Convert various parts of the codebase to async/await. diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 0323256472..1ffdc1ed95 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -15,11 +15,9 @@ import logging import re -from twisted.internet import defer - from synapse.api.constants import EventTypes from synapse.types import GroupID, get_domain_from_id -from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.caches.descriptors import cached logger = logging.getLogger(__name__) @@ -43,7 +41,7 @@ class AppServiceTransaction(object): Args: as_api(ApplicationServiceApi): The API to use to send. Returns: - A Deferred which resolves to True if the transaction was sent. + An Awaitable which resolves to True if the transaction was sent. """ return as_api.push_bulk( service=self.service, events=self.events, txn_id=self.id @@ -172,8 +170,7 @@ class ApplicationService(object): return regex_obj["exclusive"] return False - @defer.inlineCallbacks - def _matches_user(self, event, store): + async def _matches_user(self, event, store): if not event: return False @@ -188,12 +185,12 @@ class ApplicationService(object): if not store: return False - does_match = yield self._matches_user_in_member_list(event.room_id, store) + does_match = await self._matches_user_in_member_list(event.room_id, store) return does_match - @cachedInlineCallbacks(num_args=1, cache_context=True) - def _matches_user_in_member_list(self, room_id, store, cache_context): - member_list = yield store.get_users_in_room( + @cached(num_args=1, cache_context=True) + async def _matches_user_in_member_list(self, room_id, store, cache_context): + member_list = await store.get_users_in_room( room_id, on_invalidate=cache_context.invalidate ) @@ -208,35 +205,33 @@ class ApplicationService(object): return self.is_interested_in_room(event.room_id) return False - @defer.inlineCallbacks - def _matches_aliases(self, event, store): + async def _matches_aliases(self, event, store): if not store or not event: return False - alias_list = yield store.get_aliases_for_room(event.room_id) + alias_list = await store.get_aliases_for_room(event.room_id) for alias in alias_list: if self.is_interested_in_alias(alias): return True return False - @defer.inlineCallbacks - def is_interested(self, event, store=None): + async def is_interested(self, event, store=None) -> bool: """Check if this service is interested in this event. Args: event(Event): The event to check. store(DataStore) Returns: - bool: True if this service would like to know about this event. + True if this service would like to know about this event. """ # Do cheap checks first if self._matches_room_id(event): return True - if (yield self._matches_aliases(event, store)): + if await self._matches_aliases(event, store): return True - if (yield self._matches_user(event, store)): + if await self._matches_user(event, store): return True return False diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 1e0e4d497d..db578bda79 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -93,13 +93,12 @@ class ApplicationServiceApi(SimpleHttpClient): hs, "as_protocol_meta", timeout_ms=HOUR_IN_MS ) - @defer.inlineCallbacks - def query_user(self, service, user_id): + async def query_user(self, service, user_id): if service.url is None: return False uri = service.url + ("/users/%s" % urllib.parse.quote(user_id)) try: - response = yield self.get_json(uri, {"access_token": service.hs_token}) + response = await self.get_json(uri, {"access_token": service.hs_token}) if response is not None: # just an empty json object return True except CodeMessageException as e: @@ -110,14 +109,12 @@ class ApplicationServiceApi(SimpleHttpClient): logger.warning("query_user to %s threw exception %s", uri, ex) return False - @defer.inlineCallbacks - def query_alias(self, service, alias): + async def query_alias(self, service, alias): if service.url is None: return False uri = service.url + ("/rooms/%s" % urllib.parse.quote(alias)) - response = None try: - response = yield self.get_json(uri, {"access_token": service.hs_token}) + response = await self.get_json(uri, {"access_token": service.hs_token}) if response is not None: # just an empty json object return True except CodeMessageException as e: @@ -128,8 +125,7 @@ class ApplicationServiceApi(SimpleHttpClient): logger.warning("query_alias to %s threw exception %s", uri, ex) return False - @defer.inlineCallbacks - def query_3pe(self, service, kind, protocol, fields): + async def query_3pe(self, service, kind, protocol, fields): if kind == ThirdPartyEntityKind.USER: required_field = "userid" elif kind == ThirdPartyEntityKind.LOCATION: @@ -146,7 +142,7 @@ class ApplicationServiceApi(SimpleHttpClient): urllib.parse.quote(protocol), ) try: - response = yield self.get_json(uri, fields) + response = await self.get_json(uri, fields) if not isinstance(response, list): logger.warning( "query_3pe to %s returned an invalid response %r", uri, response @@ -202,8 +198,7 @@ class ApplicationServiceApi(SimpleHttpClient): key = (service.id, protocol) return self.protocol_meta_cache.wrap(key, _get) - @defer.inlineCallbacks - def push_bulk(self, service, events, txn_id=None): + async def push_bulk(self, service, events, txn_id=None): if service.url is None: return True @@ -218,7 +213,7 @@ class ApplicationServiceApi(SimpleHttpClient): uri = service.url + ("/transactions/%s" % urllib.parse.quote(txn_id)) try: - yield self.put_json( + await self.put_json( uri=uri, json_body={"events": events}, args={"access_token": service.hs_token}, diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 9998f822f1..d5204b1314 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -50,8 +50,6 @@ components. """ import logging -from twisted.internet import defer - from synapse.appservice import ApplicationServiceState from synapse.logging.context import run_in_background from synapse.metrics.background_process_metrics import run_as_background_process @@ -73,12 +71,11 @@ class ApplicationServiceScheduler(object): self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api) self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock) - @defer.inlineCallbacks - def start(self): + async def start(self): logger.info("Starting appservice scheduler") # check for any DOWN ASes and start recoverers for them. - services = yield self.store.get_appservices_by_state( + services = await self.store.get_appservices_by_state( ApplicationServiceState.DOWN ) @@ -117,8 +114,7 @@ class _ServiceQueuer(object): "as-sender-%s" % (service.id,), self._send_request, service ) - @defer.inlineCallbacks - def _send_request(self, service): + async def _send_request(self, service): # sanity-check: we shouldn't get here if this service already has a sender # running. assert service.id not in self.requests_in_flight @@ -130,7 +126,7 @@ class _ServiceQueuer(object): if not events: return try: - yield self.txn_ctrl.send(service, events) + await self.txn_ctrl.send(service, events) except Exception: logger.exception("AS request failed") finally: @@ -162,36 +158,33 @@ class _TransactionController(object): # for UTs self.RECOVERER_CLASS = _Recoverer - @defer.inlineCallbacks - def send(self, service, events): + async def send(self, service, events): try: - txn = yield self.store.create_appservice_txn(service=service, events=events) - service_is_up = yield self._is_service_up(service) + txn = await self.store.create_appservice_txn(service=service, events=events) + service_is_up = await self._is_service_up(service) if service_is_up: - sent = yield txn.send(self.as_api) + sent = await txn.send(self.as_api) if sent: - yield txn.complete(self.store) + await txn.complete(self.store) else: run_in_background(self._on_txn_fail, service) except Exception: logger.exception("Error creating appservice transaction") run_in_background(self._on_txn_fail, service) - @defer.inlineCallbacks - def on_recovered(self, recoverer): + async def on_recovered(self, recoverer): logger.info( "Successfully recovered application service AS ID %s", recoverer.service.id ) self.recoverers.pop(recoverer.service.id) logger.info("Remaining active recoverers: %s", len(self.recoverers)) - yield self.store.set_appservice_state( + await self.store.set_appservice_state( recoverer.service, ApplicationServiceState.UP ) - @defer.inlineCallbacks - def _on_txn_fail(self, service): + async def _on_txn_fail(self, service): try: - yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN) + await self.store.set_appservice_state(service, ApplicationServiceState.DOWN) self.start_recoverer(service) except Exception: logger.exception("Error starting AS recoverer") @@ -211,9 +204,8 @@ class _TransactionController(object): recoverer.recover() logger.info("Now %i active recoverers", len(self.recoverers)) - @defer.inlineCallbacks - def _is_service_up(self, service): - state = yield self.store.get_appservice_state(service) + async def _is_service_up(self, service): + state = await self.store.get_appservice_state(service) return state == ApplicationServiceState.UP or state is None @@ -254,25 +246,24 @@ class _Recoverer(object): self.backoff_counter += 1 self.recover() - @defer.inlineCallbacks - def retry(self): + async def retry(self): logger.info("Starting retries on %s", self.service.id) try: while True: - txn = yield self.store.get_oldest_unsent_txn(self.service) + txn = await self.store.get_oldest_unsent_txn(self.service) if not txn: # nothing left: we're done! - self.callback(self) + await self.callback(self) return logger.info( "Retrying transaction %s for AS ID %s", txn.id, txn.service.id ) - sent = yield txn.send(self.as_api) + sent = await txn.send(self.as_api) if not sent: break - yield txn.complete(self.store) + await txn.complete(self.store) # reset the backoff counter and then process the next transaction self.backoff_counter = 1 diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 92d4c6e16c..fbc56c351b 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -27,7 +27,6 @@ from synapse.metrics import ( event_processing_loop_room_count, ) from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.util import log_failure from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -100,10 +99,11 @@ class ApplicationServicesHandler(object): if not self.started_scheduler: - def start_scheduler(): - return self.scheduler.start().addErrback( - log_failure, "Application Services Failure" - ) + async def start_scheduler(): + try: + return self.scheduler.start() + except Exception: + logger.error("Application Services Failure") run_as_background_process("as_scheduler", start_scheduler) self.started_scheduler = True diff --git a/tests/appservice/test_appservice.py b/tests/appservice/test_appservice.py index 4003869ed6..236b608d58 100644 --- a/tests/appservice/test_appservice.py +++ b/tests/appservice/test_appservice.py @@ -50,13 +50,17 @@ class ApplicationServiceTestCase(unittest.TestCase): def test_regex_user_id_prefix_match(self): self.service.namespaces[ApplicationService.NS_USERS].append(_regex("@irc_.*")) self.event.sender = "@irc_foobar:matrix.org" - self.assertTrue((yield self.service.is_interested(self.event))) + self.assertTrue( + (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ) @defer.inlineCallbacks def test_regex_user_id_prefix_no_match(self): self.service.namespaces[ApplicationService.NS_USERS].append(_regex("@irc_.*")) self.event.sender = "@someone_else:matrix.org" - self.assertFalse((yield self.service.is_interested(self.event))) + self.assertFalse( + (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ) @defer.inlineCallbacks def test_regex_room_member_is_checked(self): @@ -64,7 +68,9 @@ class ApplicationServiceTestCase(unittest.TestCase): self.event.sender = "@someone_else:matrix.org" self.event.type = "m.room.member" self.event.state_key = "@irc_foobar:matrix.org" - self.assertTrue((yield self.service.is_interested(self.event))) + self.assertTrue( + (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ) @defer.inlineCallbacks def test_regex_room_id_match(self): @@ -72,7 +78,9 @@ class ApplicationServiceTestCase(unittest.TestCase): _regex("!some_prefix.*some_suffix:matrix.org") ) self.event.room_id = "!some_prefixs0m3th1nGsome_suffix:matrix.org" - self.assertTrue((yield self.service.is_interested(self.event))) + self.assertTrue( + (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ) @defer.inlineCallbacks def test_regex_room_id_no_match(self): @@ -80,19 +88,26 @@ class ApplicationServiceTestCase(unittest.TestCase): _regex("!some_prefix.*some_suffix:matrix.org") ) self.event.room_id = "!XqBunHwQIXUiqCaoxq:matrix.org" - self.assertFalse((yield self.service.is_interested(self.event))) + self.assertFalse( + (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ) @defer.inlineCallbacks def test_regex_alias_match(self): self.service.namespaces[ApplicationService.NS_ALIASES].append( _regex("#irc_.*:matrix.org") ) - self.store.get_aliases_for_room.return_value = [ - "#irc_foobar:matrix.org", - "#athing:matrix.org", - ] - self.store.get_users_in_room.return_value = [] - self.assertTrue((yield self.service.is_interested(self.event, self.store))) + self.store.get_aliases_for_room.return_value = defer.succeed( + ["#irc_foobar:matrix.org", "#athing:matrix.org"] + ) + self.store.get_users_in_room.return_value = defer.succeed([]) + self.assertTrue( + ( + yield defer.ensureDeferred( + self.service.is_interested(self.event, self.store) + ) + ) + ) def test_non_exclusive_alias(self): self.service.namespaces[ApplicationService.NS_ALIASES].append( @@ -135,12 +150,17 @@ class ApplicationServiceTestCase(unittest.TestCase): self.service.namespaces[ApplicationService.NS_ALIASES].append( _regex("#irc_.*:matrix.org") ) - self.store.get_aliases_for_room.return_value = [ - "#xmpp_foobar:matrix.org", - "#athing:matrix.org", - ] - self.store.get_users_in_room.return_value = [] - self.assertFalse((yield self.service.is_interested(self.event, self.store))) + self.store.get_aliases_for_room.return_value = defer.succeed( + ["#xmpp_foobar:matrix.org", "#athing:matrix.org"] + ) + self.store.get_users_in_room.return_value = defer.succeed([]) + self.assertFalse( + ( + yield defer.ensureDeferred( + self.service.is_interested(self.event, self.store) + ) + ) + ) @defer.inlineCallbacks def test_regex_multiple_matches(self): @@ -149,9 +169,17 @@ class ApplicationServiceTestCase(unittest.TestCase): ) self.service.namespaces[ApplicationService.NS_USERS].append(_regex("@irc_.*")) self.event.sender = "@irc_foobar:matrix.org" - self.store.get_aliases_for_room.return_value = ["#irc_barfoo:matrix.org"] - self.store.get_users_in_room.return_value = [] - self.assertTrue((yield self.service.is_interested(self.event, self.store))) + self.store.get_aliases_for_room.return_value = defer.succeed( + ["#irc_barfoo:matrix.org"] + ) + self.store.get_users_in_room.return_value = defer.succeed([]) + self.assertTrue( + ( + yield defer.ensureDeferred( + self.service.is_interested(self.event, self.store) + ) + ) + ) @defer.inlineCallbacks def test_interested_in_self(self): @@ -161,19 +189,24 @@ class ApplicationServiceTestCase(unittest.TestCase): self.event.type = "m.room.member" self.event.content = {"membership": "invite"} self.event.state_key = self.service.sender - self.assertTrue((yield self.service.is_interested(self.event))) + self.assertTrue( + (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ) @defer.inlineCallbacks def test_member_list_match(self): self.service.namespaces[ApplicationService.NS_USERS].append(_regex("@irc_.*")) - self.store.get_users_in_room.return_value = [ - "@alice:here", - "@irc_fo:here", # AS user - "@bob:here", - ] - self.store.get_aliases_for_room.return_value = [] + # Note that @irc_fo:here is the AS user. + self.store.get_users_in_room.return_value = defer.succeed( + ["@alice:here", "@irc_fo:here", "@bob:here"] + ) + self.store.get_aliases_for_room.return_value = defer.succeed([]) self.event.sender = "@xmpp_foobar:matrix.org" self.assertTrue( - (yield self.service.is_interested(event=self.event, store=self.store)) + ( + yield defer.ensureDeferred( + self.service.is_interested(event=self.event, store=self.store) + ) + ) ) diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 52f89d3f83..68a4caabbf 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -25,6 +25,7 @@ from synapse.appservice.scheduler import ( from synapse.logging.context import make_deferred_yieldable from tests import unittest +from tests.test_utils import make_awaitable from ..utils import MockClock @@ -52,11 +53,11 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.store.get_appservice_state = Mock( return_value=defer.succeed(ApplicationServiceState.UP) ) - txn.send = Mock(return_value=defer.succeed(True)) + txn.send = Mock(return_value=make_awaitable(True)) self.store.create_appservice_txn = Mock(return_value=defer.succeed(txn)) # actual call - self.txnctrl.send(service, events) + self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events))) self.store.create_appservice_txn.assert_called_once_with( service=service, events=events # txn made and saved @@ -77,7 +78,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.store.create_appservice_txn = Mock(return_value=defer.succeed(txn)) # actual call - self.txnctrl.send(service, events) + self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events))) self.store.create_appservice_txn.assert_called_once_with( service=service, events=events # txn made and saved @@ -98,11 +99,11 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): return_value=defer.succeed(ApplicationServiceState.UP) ) self.store.set_appservice_state = Mock(return_value=defer.succeed(True)) - txn.send = Mock(return_value=defer.succeed(False)) # fails to send + txn.send = Mock(return_value=make_awaitable(False)) # fails to send self.store.create_appservice_txn = Mock(return_value=defer.succeed(txn)) # actual call - self.txnctrl.send(service, events) + self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events))) self.store.create_appservice_txn.assert_called_once_with( service=service, events=events @@ -144,7 +145,8 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.recoverer.recover() # shouldn't have called anything prior to waiting for exp backoff self.assertEquals(0, self.store.get_oldest_unsent_txn.call_count) - txn.send = Mock(return_value=True) + txn.send = Mock(return_value=make_awaitable(True)) + txn.complete.return_value = make_awaitable(None) # wait for exp backoff self.clock.advance_time(2) self.assertEquals(1, txn.send.call_count) @@ -169,7 +171,8 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.recoverer.recover() self.assertEquals(0, self.store.get_oldest_unsent_txn.call_count) - txn.send = Mock(return_value=False) + txn.send = Mock(return_value=make_awaitable(False)) + txn.complete.return_value = make_awaitable(None) self.clock.advance_time(2) self.assertEquals(1, txn.send.call_count) self.assertEquals(0, txn.complete.call_count) @@ -182,7 +185,7 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.assertEquals(3, txn.send.call_count) self.assertEquals(0, txn.complete.call_count) self.assertEquals(0, self.callback.call_count) - txn.send = Mock(return_value=True) # successfully send the txn + txn.send = Mock(return_value=make_awaitable(True)) # successfully send the txn pop_txn = True # returns the txn the first time, then no more. self.clock.advance_time(16) self.assertEquals(1, txn.send.call_count) # new mock reset call count diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index ebabe9a7d6..628f7d8db0 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -19,6 +19,7 @@ from twisted.internet import defer from synapse.handlers.appservice import ApplicationServicesHandler +from tests.test_utils import make_awaitable from tests.utils import MockClock from .. import unittest @@ -117,7 +118,7 @@ class AppServiceHandlerTestCase(unittest.TestCase): self._mkservice_alias(is_interested_in_alias=False), ] - self.mock_as_api.query_alias.return_value = defer.succeed(True) + self.mock_as_api.query_alias.return_value = make_awaitable(True) self.mock_store.get_app_services.return_value = services self.mock_store.get_association_from_room_alias.return_value = defer.succeed( Mock(room_id=room_id, servers=servers) @@ -135,7 +136,7 @@ class AppServiceHandlerTestCase(unittest.TestCase): def _mkservice(self, is_interested): service = Mock() - service.is_interested.return_value = defer.succeed(is_interested) + service.is_interested.return_value = make_awaitable(is_interested) service.token = "mock_service_token" service.url = "mock_service_url" return service -- cgit 1.5.1 From 88a3ff12f0fa41610968ea820afe44cca70efea7 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 4 Aug 2020 07:22:04 -0400 Subject: Convert the SimpleHttpClient to async. (#8016) --- changelog.d/8016.misc | 1 + synapse/appservice/api.py | 2 +- synapse/http/client.py | 55 +++++++++++++++++++++-------------------------- 3 files changed, 26 insertions(+), 32 deletions(-) create mode 100644 changelog.d/8016.misc (limited to 'synapse/appservice/api.py') diff --git a/changelog.d/8016.misc b/changelog.d/8016.misc new file mode 100644 index 0000000000..dfe4c03171 --- /dev/null +++ b/changelog.d/8016.misc @@ -0,0 +1 @@ +Convert various parts of the codebase to async/await. diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index db578bda79..e72a0b9ac0 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -175,7 +175,7 @@ class ApplicationServiceApi(SimpleHttpClient): urllib.parse.quote(protocol), ) try: - info = yield self.get_json(uri, {}) + info = yield defer.ensureDeferred(self.get_json(uri, {})) if not _is_valid_3pe_metadata(info): logger.warning( diff --git a/synapse/http/client.py b/synapse/http/client.py index 155b7460d4..529532a063 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -284,8 +284,7 @@ class SimpleHttpClient(object): ip_blacklist=self._ip_blacklist, ) - @defer.inlineCallbacks - def request(self, method, uri, data=None, headers=None): + async def request(self, method, uri, data=None, headers=None): """ Args: method (str): HTTP method to use. @@ -330,7 +329,7 @@ class SimpleHttpClient(object): self.hs.get_reactor(), cancelled_to_request_timed_out_error, ) - response = yield make_deferred_yieldable(request_deferred) + response = await make_deferred_yieldable(request_deferred) incoming_responses_counter.labels(method, response.code).inc() logger.info( @@ -353,8 +352,7 @@ class SimpleHttpClient(object): set_tag("error_reason", e.args[0]) raise - @defer.inlineCallbacks - def post_urlencoded_get_json(self, uri, args={}, headers=None): + async def post_urlencoded_get_json(self, uri, args={}, headers=None): """ Args: uri (str): @@ -363,7 +361,7 @@ class SimpleHttpClient(object): header name to a list of values for that header Returns: - Deferred[object]: parsed json + object: parsed json Raises: HttpResponseException: On a non-2xx HTTP response. @@ -386,11 +384,11 @@ class SimpleHttpClient(object): if headers: actual_headers.update(headers) - response = yield self.request( + response = await self.request( "POST", uri, headers=Headers(actual_headers), data=query_bytes ) - body = yield make_deferred_yieldable(readBody(response)) + body = await make_deferred_yieldable(readBody(response)) if 200 <= response.code < 300: return json.loads(body.decode("utf-8")) @@ -399,8 +397,7 @@ class SimpleHttpClient(object): response.code, response.phrase.decode("ascii", errors="replace"), body ) - @defer.inlineCallbacks - def post_json_get_json(self, uri, post_json, headers=None): + async def post_json_get_json(self, uri, post_json, headers=None): """ Args: @@ -410,7 +407,7 @@ class SimpleHttpClient(object): header name to a list of values for that header Returns: - Deferred[object]: parsed json + object: parsed json Raises: HttpResponseException: On a non-2xx HTTP response. @@ -429,11 +426,11 @@ class SimpleHttpClient(object): if headers: actual_headers.update(headers) - response = yield self.request( + response = await self.request( "POST", uri, headers=Headers(actual_headers), data=json_str ) - body = yield make_deferred_yieldable(readBody(response)) + body = await make_deferred_yieldable(readBody(response)) if 200 <= response.code < 300: return json.loads(body.decode("utf-8")) @@ -442,8 +439,7 @@ class SimpleHttpClient(object): response.code, response.phrase.decode("ascii", errors="replace"), body ) - @defer.inlineCallbacks - def get_json(self, uri, args={}, headers=None): + async def get_json(self, uri, args={}, headers=None): """ Gets some json from the given URI. Args: @@ -455,7 +451,7 @@ class SimpleHttpClient(object): headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from header name to a list of values for that header Returns: - Deferred: Succeeds when we get *any* 2xx HTTP response, with the + Succeeds when we get *any* 2xx HTTP response, with the HTTP body as JSON. Raises: HttpResponseException On a non-2xx HTTP response. @@ -466,11 +462,10 @@ class SimpleHttpClient(object): if headers: actual_headers.update(headers) - body = yield self.get_raw(uri, args, headers=headers) + body = await self.get_raw(uri, args, headers=headers) return json.loads(body.decode("utf-8")) - @defer.inlineCallbacks - def put_json(self, uri, json_body, args={}, headers=None): + async def put_json(self, uri, json_body, args={}, headers=None): """ Puts some json to the given URI. Args: @@ -483,7 +478,7 @@ class SimpleHttpClient(object): headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from header name to a list of values for that header Returns: - Deferred: Succeeds when we get *any* 2xx HTTP response, with the + Succeeds when we get *any* 2xx HTTP response, with the HTTP body as JSON. Raises: HttpResponseException On a non-2xx HTTP response. @@ -504,11 +499,11 @@ class SimpleHttpClient(object): if headers: actual_headers.update(headers) - response = yield self.request( + response = await self.request( "PUT", uri, headers=Headers(actual_headers), data=json_str ) - body = yield make_deferred_yieldable(readBody(response)) + body = await make_deferred_yieldable(readBody(response)) if 200 <= response.code < 300: return json.loads(body.decode("utf-8")) @@ -517,8 +512,7 @@ class SimpleHttpClient(object): response.code, response.phrase.decode("ascii", errors="replace"), body ) - @defer.inlineCallbacks - def get_raw(self, uri, args={}, headers=None): + async def get_raw(self, uri, args={}, headers=None): """ Gets raw text from the given URI. Args: @@ -530,7 +524,7 @@ class SimpleHttpClient(object): headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from header name to a list of values for that header Returns: - Deferred: Succeeds when we get *any* 2xx HTTP response, with the + Succeeds when we get *any* 2xx HTTP response, with the HTTP body as bytes. Raises: HttpResponseException on a non-2xx HTTP response. @@ -543,9 +537,9 @@ class SimpleHttpClient(object): if headers: actual_headers.update(headers) - response = yield self.request("GET", uri, headers=Headers(actual_headers)) + response = await self.request("GET", uri, headers=Headers(actual_headers)) - body = yield make_deferred_yieldable(readBody(response)) + body = await make_deferred_yieldable(readBody(response)) if 200 <= response.code < 300: return body @@ -557,8 +551,7 @@ class SimpleHttpClient(object): # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient. # The two should be factored out. - @defer.inlineCallbacks - def get_file(self, url, output_stream, max_size=None, headers=None): + async def get_file(self, url, output_stream, max_size=None, headers=None): """GETs a file from a given URL Args: url (str): The URL to GET @@ -574,7 +567,7 @@ class SimpleHttpClient(object): if headers: actual_headers.update(headers) - response = yield self.request("GET", url, headers=Headers(actual_headers)) + response = await self.request("GET", url, headers=Headers(actual_headers)) resp_headers = dict(response.headers.getAllRawHeaders()) @@ -598,7 +591,7 @@ class SimpleHttpClient(object): # straight back in again try: - length = yield make_deferred_yieldable( + length = await make_deferred_yieldable( _readBodyToFile(response, output_stream, max_size) ) except SynapseError: -- cgit 1.5.1