summary refs log tree commit diff
path: root/synapse/appservice
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2019-06-26 22:34:41 +0100
committerRichard van der Hoff <richard@matrix.org>2019-06-26 22:34:41 +0100
commita4daa899ec4cd195fc10936f68df5c78314b366c (patch)
tree35e88ff388b0f7652773a79930b732aa04f16bde /synapse/appservice
parentchangelog (diff)
parentImprove docs on choosing server_name (#5558) (diff)
downloadsynapse-a4daa899ec4cd195fc10936f68df5c78314b366c.tar.xz
Merge branch 'develop' into rav/saml2_client
Diffstat (limited to 'synapse/appservice')
-rw-r--r--synapse/appservice/__init__.py39
-rw-r--r--synapse/appservice/api.py66
-rw-r--r--synapse/appservice/scheduler.py50
3 files changed, 67 insertions, 88 deletions
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 57ed8a3ca2..b26a31dd54 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -48,9 +48,7 @@ class AppServiceTransaction(object):
             A Deferred which resolves to True if the transaction was sent.
         """
         return as_api.push_bulk(
-            service=self.service,
-            events=self.events,
-            txn_id=self.id
+            service=self.service, events=self.events, txn_id=self.id
         )
 
     def complete(self, store):
@@ -64,10 +62,7 @@ class AppServiceTransaction(object):
         Returns:
             A Deferred which resolves to True if the transaction was completed.
         """
-        return store.complete_appservice_txn(
-            service=self.service,
-            txn_id=self.id
-        )
+        return store.complete_appservice_txn(service=self.service, txn_id=self.id)
 
 
 class ApplicationService(object):
@@ -76,6 +71,7 @@ class ApplicationService(object):
 
     Provides methods to check if this service is "interested" in events.
     """
+
     NS_USERS = "users"
     NS_ALIASES = "aliases"
     NS_ROOMS = "rooms"
@@ -84,9 +80,19 @@ class ApplicationService(object):
     # values.
     NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS]
 
-    def __init__(self, token, hostname, url=None, namespaces=None, hs_token=None,
-                 sender=None, id=None, protocols=None, rate_limited=True,
-                 ip_range_whitelist=None):
+    def __init__(
+        self,
+        token,
+        hostname,
+        url=None,
+        namespaces=None,
+        hs_token=None,
+        sender=None,
+        id=None,
+        protocols=None,
+        rate_limited=True,
+        ip_range_whitelist=None,
+    ):
         self.token = token
         self.url = url
         self.hs_token = hs_token
@@ -128,9 +134,7 @@ class ApplicationService(object):
                 if not isinstance(regex_obj, dict):
                     raise ValueError("Expected dict regex for ns '%s'" % ns)
                 if not isinstance(regex_obj.get("exclusive"), bool):
-                    raise ValueError(
-                        "Expected bool for 'exclusive' in ns '%s'" % ns
-                    )
+                    raise ValueError("Expected bool for 'exclusive' in ns '%s'" % ns)
                 group_id = regex_obj.get("group_id")
                 if group_id:
                     if not isinstance(group_id, str):
@@ -153,9 +157,7 @@ class ApplicationService(object):
                 if isinstance(regex, string_types):
                     regex_obj["regex"] = re.compile(regex)  # Pre-compile regex
                 else:
-                    raise ValueError(
-                        "Expected string for 'regex' in ns '%s'" % ns
-                    )
+                    raise ValueError("Expected string for 'regex' in ns '%s'" % ns)
         return namespaces
 
     def _matches_regex(self, test_string, namespace_key):
@@ -178,8 +180,9 @@ class ApplicationService(object):
         if self.is_interested_in_user(event.sender):
             defer.returnValue(True)
         # also check m.room.member state key
-        if (event.type == EventTypes.Member and
-                self.is_interested_in_user(event.state_key)):
+        if event.type == EventTypes.Member and self.is_interested_in_user(
+            event.state_key
+        ):
             defer.returnValue(True)
 
         if not store:
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 9ccc5a80fc..571881775b 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -32,19 +32,17 @@ logger = logging.getLogger(__name__)
 sent_transactions_counter = Counter(
     "synapse_appservice_api_sent_transactions",
     "Number of /transactions/ requests sent",
-    ["service"]
+    ["service"],
 )
 
 failed_transactions_counter = Counter(
     "synapse_appservice_api_failed_transactions",
     "Number of /transactions/ requests that failed to send",
-    ["service"]
+    ["service"],
 )
 
 sent_events_counter = Counter(
-    "synapse_appservice_api_sent_events",
-    "Number of events sent to the AS",
-    ["service"]
+    "synapse_appservice_api_sent_events", "Number of events sent to the AS", ["service"]
 )
 
 HOUR_IN_MS = 60 * 60 * 1000
@@ -92,8 +90,9 @@ class ApplicationServiceApi(SimpleHttpClient):
         super(ApplicationServiceApi, self).__init__(hs)
         self.clock = hs.get_clock()
 
-        self.protocol_meta_cache = ResponseCache(hs, "as_protocol_meta",
-                                                 timeout_ms=HOUR_IN_MS)
+        self.protocol_meta_cache = ResponseCache(
+            hs, "as_protocol_meta", timeout_ms=HOUR_IN_MS
+        )
 
     @defer.inlineCallbacks
     def query_user(self, service, user_id):
@@ -102,9 +101,7 @@ class ApplicationServiceApi(SimpleHttpClient):
         uri = service.url + ("/users/%s" % urllib.parse.quote(user_id))
         response = None
         try:
-            response = yield self.get_json(uri, {
-                "access_token": service.hs_token
-            })
+            response = yield self.get_json(uri, {"access_token": service.hs_token})
             if response is not None:  # just an empty json object
                 defer.returnValue(True)
         except CodeMessageException as e:
@@ -123,9 +120,7 @@ class ApplicationServiceApi(SimpleHttpClient):
         uri = service.url + ("/rooms/%s" % urllib.parse.quote(alias))
         response = None
         try:
-            response = yield self.get_json(uri, {
-                "access_token": service.hs_token
-            })
+            response = yield self.get_json(uri, {"access_token": service.hs_token})
             if response is not None:  # just an empty json object
                 defer.returnValue(True)
         except CodeMessageException as e:
@@ -144,9 +139,7 @@ class ApplicationServiceApi(SimpleHttpClient):
         elif kind == ThirdPartyEntityKind.LOCATION:
             required_field = "alias"
         else:
-            raise ValueError(
-                "Unrecognised 'kind' argument %r to query_3pe()", kind
-            )
+            raise ValueError("Unrecognised 'kind' argument %r to query_3pe()", kind)
         if service.url is None:
             defer.returnValue([])
 
@@ -154,14 +147,13 @@ class ApplicationServiceApi(SimpleHttpClient):
             service.url,
             APP_SERVICE_PREFIX,
             kind,
-            urllib.parse.quote(protocol)
+            urllib.parse.quote(protocol),
         )
         try:
             response = yield self.get_json(uri, fields)
             if not isinstance(response, list):
                 logger.warning(
-                    "query_3pe to %s returned an invalid response %r",
-                    uri, response
+                    "query_3pe to %s returned an invalid response %r", uri, response
                 )
                 defer.returnValue([])
 
@@ -171,8 +163,7 @@ class ApplicationServiceApi(SimpleHttpClient):
                     ret.append(r)
                 else:
                     logger.warning(
-                        "query_3pe to %s returned an invalid result %r",
-                        uri, r
+                        "query_3pe to %s returned an invalid result %r", uri, r
                     )
 
             defer.returnValue(ret)
@@ -189,27 +180,27 @@ class ApplicationServiceApi(SimpleHttpClient):
             uri = "%s%s/thirdparty/protocol/%s" % (
                 service.url,
                 APP_SERVICE_PREFIX,
-                urllib.parse.quote(protocol)
+                urllib.parse.quote(protocol),
             )
             try:
                 info = yield self.get_json(uri, {})
 
                 if not _is_valid_3pe_metadata(info):
-                    logger.warning("query_3pe_protocol to %s did not return a"
-                                   " valid result", uri)
+                    logger.warning(
+                        "query_3pe_protocol to %s did not return a" " valid result", uri
+                    )
                     defer.returnValue(None)
 
                 for instance in info.get("instances", []):
                     network_id = instance.get("network_id", None)
                     if network_id is not None:
                         instance["instance_id"] = ThirdPartyInstanceID(
-                            service.id, network_id,
+                            service.id, network_id
                         ).to_string()
 
                 defer.returnValue(info)
             except Exception as ex:
-                logger.warning("query_3pe_protocol to %s threw exception %s",
-                               uri, ex)
+                logger.warning("query_3pe_protocol to %s threw exception %s", uri, ex)
                 defer.returnValue(None)
 
         key = (service.id, protocol)
@@ -223,22 +214,19 @@ class ApplicationServiceApi(SimpleHttpClient):
         events = self._serialize(events)
 
         if txn_id is None:
-            logger.warning("push_bulk: Missing txn ID sending events to %s",
-                           service.url)
+            logger.warning(
+                "push_bulk: Missing txn ID sending events to %s", service.url
+            )
             txn_id = str(0)
         txn_id = str(txn_id)
 
-        uri = service.url + ("/transactions/%s" %
-                             urllib.parse.quote(txn_id))
+        uri = service.url + ("/transactions/%s" % urllib.parse.quote(txn_id))
         try:
             yield self.put_json(
                 uri=uri,
-                json_body={
-                    "events": events
-                },
-                args={
-                    "access_token": service.hs_token
-                })
+                json_body={"events": events},
+                args={"access_token": service.hs_token},
+            )
             sent_transactions_counter.labels(service.id).inc()
             sent_events_counter.labels(service.id).inc(len(events))
             defer.returnValue(True)
@@ -252,6 +240,4 @@ class ApplicationServiceApi(SimpleHttpClient):
 
     def _serialize(self, events):
         time_now = self.clock.time_msec()
-        return [
-            serialize_event(e, time_now, as_client_event=True) for e in events
-        ]
+        return [serialize_event(e, time_now, as_client_event=True) for e in events]
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 685f15c061..b54bf5411f 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -112,15 +112,14 @@ class _ServiceQueuer(object):
             return
 
         run_as_background_process(
-            "as-sender-%s" % (service.id, ),
-            self._send_request, service,
+            "as-sender-%s" % (service.id,), self._send_request, service
         )
 
     @defer.inlineCallbacks
     def _send_request(self, service):
         # sanity-check: we shouldn't get here if this service already has a sender
         # running.
-        assert(service.id not in self.requests_in_flight)
+        assert service.id not in self.requests_in_flight
 
         self.requests_in_flight.add(service.id)
         try:
@@ -137,7 +136,6 @@ class _ServiceQueuer(object):
 
 
 class _TransactionController(object):
-
     def __init__(self, clock, store, as_api, recoverer_fn):
         self.clock = clock
         self.store = store
@@ -149,10 +147,7 @@ class _TransactionController(object):
     @defer.inlineCallbacks
     def send(self, service, events):
         try:
-            txn = yield self.store.create_appservice_txn(
-                service=service,
-                events=events
-            )
+            txn = yield self.store.create_appservice_txn(service=service, events=events)
             service_is_up = yield self._is_service_up(service)
             if service_is_up:
                 sent = yield txn.send(self.as_api)
@@ -167,12 +162,12 @@ class _TransactionController(object):
     @defer.inlineCallbacks
     def on_recovered(self, recoverer):
         self.recoverers.remove(recoverer)
-        logger.info("Successfully recovered application service AS ID %s",
-                    recoverer.service.id)
+        logger.info(
+            "Successfully recovered application service AS ID %s", recoverer.service.id
+        )
         logger.info("Remaining active recoverers: %s", len(self.recoverers))
         yield self.store.set_appservice_state(
-            recoverer.service,
-            ApplicationServiceState.UP
+            recoverer.service, ApplicationServiceState.UP
         )
 
     def add_recoverers(self, recoverers):
@@ -184,13 +179,10 @@ class _TransactionController(object):
     @defer.inlineCallbacks
     def _start_recoverer(self, service):
         try:
-            yield self.store.set_appservice_state(
-                service,
-                ApplicationServiceState.DOWN
-            )
+            yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
             logger.info(
                 "Application service falling behind. Starting recoverer. AS ID %s",
-                service.id
+                service.id,
             )
             recoverer = self.recoverer_fn(service, self.on_recovered)
             self.add_recoverers([recoverer])
@@ -205,19 +197,16 @@ class _TransactionController(object):
 
 
 class _Recoverer(object):
-
     @staticmethod
     @defer.inlineCallbacks
     def start(clock, store, as_api, callback):
-        services = yield store.get_appservices_by_state(
-            ApplicationServiceState.DOWN
-        )
-        recoverers = [
-            _Recoverer(clock, store, as_api, s, callback) for s in services
-        ]
+        services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN)
+        recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services]
         for r in recoverers:
-            logger.info("Starting recoverer for AS ID %s which was marked as "
-                        "DOWN", r.service.id)
+            logger.info(
+                "Starting recoverer for AS ID %s which was marked as " "DOWN",
+                r.service.id,
+            )
             r.recover()
         defer.returnValue(recoverers)
 
@@ -232,9 +221,9 @@ class _Recoverer(object):
     def recover(self):
         def _retry():
             run_as_background_process(
-                "as-recoverer-%s" % (self.service.id,),
-                self.retry,
+                "as-recoverer-%s" % (self.service.id,), self.retry
             )
+
         self.clock.call_later((2 ** self.backoff_counter), _retry)
 
     def _backoff(self):
@@ -248,8 +237,9 @@ class _Recoverer(object):
         try:
             txn = yield self.store.get_oldest_unsent_txn(self.service)
             if txn:
-                logger.info("Retrying transaction %s for AS ID %s",
-                            txn.id, txn.service.id)
+                logger.info(
+                    "Retrying transaction %s for AS ID %s", txn.id, txn.service.id
+                )
                 sent = yield txn.send(self.as_api)
                 if sent:
                     yield txn.complete(self.store)