summary refs log tree commit diff
path: root/synapse/appservice
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/appservice')
-rw-r--r--synapse/appservice/api.py32
-rw-r--r--synapse/appservice/scheduler.py13
2 files changed, 28 insertions, 17 deletions
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py

index de7a94bf26..b1523be208 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py
@@ -40,6 +40,7 @@ from synapse.appservice import ( from synapse.events import EventBase from synapse.events.utils import SerializeEventConfig, serialize_event from synapse.http.client import SimpleHttpClient, is_unknown_endpoint +from synapse.logging import opentracing from synapse.types import DeviceListUpdates, JsonDict, ThirdPartyInstanceID from synapse.util.caches.response_cache import ResponseCache @@ -125,6 +126,17 @@ class ApplicationServiceApi(SimpleHttpClient): hs.get_clock(), "as_protocol_meta", timeout_ms=HOUR_IN_MS ) + def _get_headers(self, service: "ApplicationService") -> Dict[bytes, List[bytes]]: + """This makes sure we have always the auth header and opentracing headers set.""" + + # This is also ensured before in the functions. However this is needed to please + # the typechecks. + assert service.hs_token is not None + + headers = {b"Authorization": [b"Bearer " + service.hs_token.encode("ascii")]} + opentracing.inject_header_dict(headers, check_destination=False) + return headers + async def query_user(self, service: "ApplicationService", user_id: str) -> bool: if service.url is None: return False @@ -136,10 +148,11 @@ class ApplicationServiceApi(SimpleHttpClient): args = None if self.config.use_appservice_legacy_authorization: args = {"access_token": service.hs_token} + response = await self.get_json( f"{service.url}{APP_SERVICE_PREFIX}/users/{urllib.parse.quote(user_id)}", args, - headers={"Authorization": [f"Bearer {service.hs_token}"]}, + headers=self._get_headers(service), ) if response is not None: # just an empty json object return True @@ -162,10 +175,11 @@ class ApplicationServiceApi(SimpleHttpClient): args = None if self.config.use_appservice_legacy_authorization: args = {"access_token": service.hs_token} + response = await self.get_json( f"{service.url}{APP_SERVICE_PREFIX}/rooms/{urllib.parse.quote(alias)}", args, - headers={"Authorization": [f"Bearer {service.hs_token}"]}, + headers=self._get_headers(service), ) if response is not None: # just an empty json object return True @@ -203,10 +217,11 @@ class ApplicationServiceApi(SimpleHttpClient): **fields, b"access_token": service.hs_token, } + response = await self.get_json( f"{service.url}{APP_SERVICE_PREFIX}/thirdparty/{kind}/{urllib.parse.quote(protocol)}", args=args, - headers={"Authorization": [f"Bearer {service.hs_token}"]}, + headers=self._get_headers(service), ) if not isinstance(response, list): logger.warning( @@ -243,10 +258,11 @@ class ApplicationServiceApi(SimpleHttpClient): args = None if self.config.use_appservice_legacy_authorization: args = {"access_token": service.hs_token} + info = await self.get_json( f"{service.url}{APP_SERVICE_PREFIX}/thirdparty/protocol/{urllib.parse.quote(protocol)}", args, - headers={"Authorization": [f"Bearer {service.hs_token}"]}, + headers=self._get_headers(service), ) if not _is_valid_3pe_metadata(info): @@ -283,7 +299,7 @@ class ApplicationServiceApi(SimpleHttpClient): await self.post_json_get_json( uri=f"{service.url}{APP_SERVICE_PREFIX}/ping", post_json={"transaction_id": txn_id}, - headers={"Authorization": [f"Bearer {service.hs_token}"]}, + headers=self._get_headers(service), ) async def push_bulk( @@ -364,7 +380,7 @@ class ApplicationServiceApi(SimpleHttpClient): f"{service.url}{APP_SERVICE_PREFIX}/transactions/{urllib.parse.quote(str(txn_id))}", json_body=body, args=args, - headers={"Authorization": [f"Bearer {service.hs_token}"]}, + headers=self._get_headers(service), ) if logger.isEnabledFor(logging.DEBUG): logger.debug( @@ -437,7 +453,7 @@ class ApplicationServiceApi(SimpleHttpClient): response = await self.post_json_get_json( uri, body, - headers={"Authorization": [f"Bearer {service.hs_token}"]}, + headers=self._get_headers(service), ) except HttpResponseException as e: # The appservice doesn't support this endpoint. @@ -498,7 +514,7 @@ class ApplicationServiceApi(SimpleHttpClient): response = await self.post_json_get_json( uri, query, - headers={"Authorization": [f"Bearer {service.hs_token}"]}, + headers=self._get_headers(service), ) except HttpResponseException as e: # The appservice doesn't support this endpoint. diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 3a319b0d42..79f95f7653 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py
@@ -200,9 +200,7 @@ class _ServiceQueuer: if service.id in self.requests_in_flight: return - run_as_background_process( - "as-sender-%s" % (service.id,), self._send_request, service - ) + run_as_background_process("as-sender", self._send_request, service) async def _send_request(self, service: ApplicationService) -> None: # sanity-check: we shouldn't get here if this service already has a sender @@ -478,14 +476,11 @@ class _Recoverer: self.backoff_counter = 1 def recover(self) -> None: - def _retry() -> None: - run_as_background_process( - "as-recoverer-%s" % (self.service.id,), self.retry - ) - delay = 2**self.backoff_counter logger.info("Scheduling retries on %s in %fs", self.service.id, delay) - self.clock.call_later(delay, _retry) + self.clock.call_later( + delay, run_as_background_process, "as-recoverer", self.retry + ) def _backoff(self) -> None: # cap the backoff to be around 8.5min => (2^9) = 512 secs