summary refs log tree commit diff
path: root/synapse/appservice
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/appservice')
-rw-r--r--synapse/appservice/__init__.py59
-rw-r--r--synapse/appservice/api.py39
-rw-r--r--synapse/appservice/scheduler.py43
3 files changed, 104 insertions, 37 deletions
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index b989007314..57ed8a3ca2 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -12,13 +12,16 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from synapse.api.constants import EventTypes
-from synapse.util.caches.descriptors import cachedInlineCallbacks
+import logging
+import re
+
+from six import string_types
 
 from twisted.internet import defer
 
-import logging
-import re
+from synapse.api.constants import EventTypes
+from synapse.types import GroupID, get_domain_from_id
+from synapse.util.caches.descriptors import cachedInlineCallbacks
 
 logger = logging.getLogger(__name__)
 
@@ -81,14 +84,17 @@ class ApplicationService(object):
     # values.
     NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS]
 
-    def __init__(self, token, url=None, namespaces=None, hs_token=None,
-                 sender=None, id=None, protocols=None, rate_limited=True):
+    def __init__(self, token, hostname, url=None, namespaces=None, hs_token=None,
+                 sender=None, id=None, protocols=None, rate_limited=True,
+                 ip_range_whitelist=None):
         self.token = token
         self.url = url
         self.hs_token = hs_token
         self.sender = sender
+        self.server_name = hostname
         self.namespaces = self._check_namespaces(namespaces)
         self.id = id
+        self.ip_range_whitelist = ip_range_whitelist
 
         if "|" in self.id:
             raise Exception("application service ID cannot contain '|' character")
@@ -125,8 +131,26 @@ class ApplicationService(object):
                     raise ValueError(
                         "Expected bool for 'exclusive' in ns '%s'" % ns
                     )
+                group_id = regex_obj.get("group_id")
+                if group_id:
+                    if not isinstance(group_id, str):
+                        raise ValueError(
+                            "Expected string for 'group_id' in ns '%s'" % ns
+                        )
+                    try:
+                        GroupID.from_string(group_id)
+                    except Exception:
+                        raise ValueError(
+                            "Expected valid group ID for 'group_id' in ns '%s'" % ns
+                        )
+
+                    if get_domain_from_id(group_id) != self.server_name:
+                        raise ValueError(
+                            "Expected 'group_id' to be this host in ns '%s'" % ns
+                        )
+
                 regex = regex_obj.get("regex")
-                if isinstance(regex, basestring):
+                if isinstance(regex, string_types):
                     regex_obj["regex"] = re.compile(regex)  # Pre-compile regex
                 else:
                     raise ValueError(
@@ -251,8 +275,27 @@ class ApplicationService(object):
             if regex_obj["exclusive"]
         ]
 
+    def get_groups_for_user(self, user_id):
+        """Get the groups that this user is associated with by this AS
+
+        Args:
+            user_id (str): The ID of the user.
+
+        Returns:
+            iterable[str]: an iterable that yields group_id strings.
+        """
+        return (
+            regex_obj["group_id"]
+            for regex_obj in self.namespaces[ApplicationService.NS_USERS]
+            if "group_id" in regex_obj and regex_obj["regex"].match(user_id)
+        )
+
     def is_rate_limited(self):
         return self.rate_limited
 
     def __str__(self):
-        return "ApplicationService: %s" % (self.__dict__,)
+        # copy dictionary and redact token fields so they don't get logged
+        dict_copy = self.__dict__.copy()
+        dict_copy["token"] = "<redacted>"
+        dict_copy["hs_token"] = "<redacted>"
+        return "ApplicationService: %s" % (dict_copy,)
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 6893610e71..6980e5890e 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -12,20 +12,39 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import urllib
+
+from prometheus_client import Counter
+
 from twisted.internet import defer
 
 from synapse.api.constants import ThirdPartyEntityKind
 from synapse.api.errors import CodeMessageException
-from synapse.http.client import SimpleHttpClient
 from synapse.events.utils import serialize_event
-from synapse.util.caches.response_cache import ResponseCache
+from synapse.http.client import SimpleHttpClient
 from synapse.types import ThirdPartyInstanceID
-
-import logging
-import urllib
+from synapse.util.caches.response_cache import ResponseCache
 
 logger = logging.getLogger(__name__)
 
+sent_transactions_counter = Counter(
+    "synapse_appservice_api_sent_transactions",
+    "Number of /transactions/ requests sent",
+    ["service"]
+)
+
+failed_transactions_counter = Counter(
+    "synapse_appservice_api_failed_transactions",
+    "Number of /transactions/ requests that failed to send",
+    ["service"]
+)
+
+sent_events_counter = Counter(
+    "synapse_appservice_api_sent_events",
+    "Number of events sent to the AS",
+    ["service"]
+)
 
 HOUR_IN_MS = 60 * 60 * 1000
 
@@ -72,7 +91,8 @@ class ApplicationServiceApi(SimpleHttpClient):
         super(ApplicationServiceApi, self).__init__(hs)
         self.clock = hs.get_clock()
 
-        self.protocol_meta_cache = ResponseCache(hs, timeout_ms=HOUR_IN_MS)
+        self.protocol_meta_cache = ResponseCache(hs, "as_protocol_meta",
+                                                 timeout_ms=HOUR_IN_MS)
 
     @defer.inlineCallbacks
     def query_user(self, service, user_id):
@@ -192,9 +212,7 @@ class ApplicationServiceApi(SimpleHttpClient):
                 defer.returnValue(None)
 
         key = (service.id, protocol)
-        return self.protocol_meta_cache.get(key) or (
-            self.protocol_meta_cache.set(key, _get())
-        )
+        return self.protocol_meta_cache.wrap(key, _get)
 
     @defer.inlineCallbacks
     def push_bulk(self, service, events, txn_id=None):
@@ -220,12 +238,15 @@ class ApplicationServiceApi(SimpleHttpClient):
                 args={
                     "access_token": service.hs_token
                 })
+            sent_transactions_counter.labels(service.id).inc()
+            sent_events_counter.labels(service.id).inc(len(events))
             defer.returnValue(True)
             return
         except CodeMessageException as e:
             logger.warning("push_bulk to %s received %s", uri, e.code)
         except Exception as ex:
             logger.warning("push_bulk to %s threw exception %s", uri, ex)
+        failed_transactions_counter.labels(service.id).inc()
         defer.returnValue(False)
 
     def _serialize(self, events):
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 68a9de17b8..2430814796 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -48,14 +48,14 @@ UP & quit           +---------- YES                       SUCCESS
 This is all tied together by the AppServiceScheduler which DIs the required
 components.
 """
+import logging
+
 from twisted.internet import defer
 
 from synapse.appservice import ApplicationServiceState
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import Measure
 
-import logging
-
 logger = logging.getLogger(__name__)
 
 
@@ -106,7 +106,7 @@ class _ServiceQueuer(object):
     def enqueue(self, service, event):
         # if this service isn't being sent something
         self.queued_events.setdefault(service.id, []).append(event)
-        preserve_fn(self._send_request)(service)
+        run_in_background(self._send_request, service)
 
     @defer.inlineCallbacks
     def _send_request(self, service):
@@ -123,7 +123,7 @@ class _ServiceQueuer(object):
                 with Measure(self.clock, "servicequeuer.send"):
                     try:
                         yield self.txn_ctrl.send(service, events)
-                    except:
+                    except Exception:
                         logger.exception("AS request failed")
         finally:
             self.requests_in_flight.discard(service.id)
@@ -152,10 +152,10 @@ class _TransactionController(object):
                 if sent:
                     yield txn.complete(self.store)
                 else:
-                    preserve_fn(self._start_recoverer)(service)
-        except Exception as e:
-            logger.exception(e)
-            preserve_fn(self._start_recoverer)(service)
+                    run_in_background(self._start_recoverer, service)
+        except Exception:
+            logger.exception("Error creating appservice transaction")
+            run_in_background(self._start_recoverer, service)
 
     @defer.inlineCallbacks
     def on_recovered(self, recoverer):
@@ -176,17 +176,20 @@ class _TransactionController(object):
 
     @defer.inlineCallbacks
     def _start_recoverer(self, service):
-        yield self.store.set_appservice_state(
-            service,
-            ApplicationServiceState.DOWN
-        )
-        logger.info(
-            "Application service falling behind. Starting recoverer. AS ID %s",
-            service.id
-        )
-        recoverer = self.recoverer_fn(service, self.on_recovered)
-        self.add_recoverers([recoverer])
-        recoverer.recover()
+        try:
+            yield self.store.set_appservice_state(
+                service,
+                ApplicationServiceState.DOWN
+            )
+            logger.info(
+                "Application service falling behind. Starting recoverer. AS ID %s",
+                service.id
+            )
+            recoverer = self.recoverer_fn(service, self.on_recovered)
+            self.add_recoverers([recoverer])
+            recoverer.recover()
+        except Exception:
+            logger.exception("Error starting AS recoverer")
 
     @defer.inlineCallbacks
     def _is_service_up(self, service):