diff options
Diffstat (limited to 'synapse/appservice')
-rw-r--r-- | synapse/appservice/__init__.py | 59 | ||||
-rw-r--r-- | synapse/appservice/api.py | 39 | ||||
-rw-r--r-- | synapse/appservice/scheduler.py | 43 |
3 files changed, 104 insertions, 37 deletions
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index b989007314..57ed8a3ca2 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -12,13 +12,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from synapse.api.constants import EventTypes -from synapse.util.caches.descriptors import cachedInlineCallbacks +import logging +import re + +from six import string_types from twisted.internet import defer -import logging -import re +from synapse.api.constants import EventTypes +from synapse.types import GroupID, get_domain_from_id +from synapse.util.caches.descriptors import cachedInlineCallbacks logger = logging.getLogger(__name__) @@ -81,14 +84,17 @@ class ApplicationService(object): # values. NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS] - def __init__(self, token, url=None, namespaces=None, hs_token=None, - sender=None, id=None, protocols=None, rate_limited=True): + def __init__(self, token, hostname, url=None, namespaces=None, hs_token=None, + sender=None, id=None, protocols=None, rate_limited=True, + ip_range_whitelist=None): self.token = token self.url = url self.hs_token = hs_token self.sender = sender + self.server_name = hostname self.namespaces = self._check_namespaces(namespaces) self.id = id + self.ip_range_whitelist = ip_range_whitelist if "|" in self.id: raise Exception("application service ID cannot contain '|' character") @@ -125,8 +131,26 @@ class ApplicationService(object): raise ValueError( "Expected bool for 'exclusive' in ns '%s'" % ns ) + group_id = regex_obj.get("group_id") + if group_id: + if not isinstance(group_id, str): + raise ValueError( + "Expected string for 'group_id' in ns '%s'" % ns + ) + try: + GroupID.from_string(group_id) + except Exception: + raise ValueError( + "Expected valid group ID for 'group_id' in ns '%s'" % ns + ) + + if get_domain_from_id(group_id) != self.server_name: + raise ValueError( + "Expected 'group_id' to be this host in ns '%s'" % ns + ) + regex = regex_obj.get("regex") - if isinstance(regex, basestring): + if isinstance(regex, string_types): regex_obj["regex"] = re.compile(regex) # Pre-compile regex else: raise ValueError( @@ -251,8 +275,27 @@ class ApplicationService(object): if regex_obj["exclusive"] ] + def get_groups_for_user(self, user_id): + """Get the groups that this user is associated with by this AS + + Args: + user_id (str): The ID of the user. + + Returns: + iterable[str]: an iterable that yields group_id strings. + """ + return ( + regex_obj["group_id"] + for regex_obj in self.namespaces[ApplicationService.NS_USERS] + if "group_id" in regex_obj and regex_obj["regex"].match(user_id) + ) + def is_rate_limited(self): return self.rate_limited def __str__(self): - return "ApplicationService: %s" % (self.__dict__,) + # copy dictionary and redact token fields so they don't get logged + dict_copy = self.__dict__.copy() + dict_copy["token"] = "<redacted>" + dict_copy["hs_token"] = "<redacted>" + return "ApplicationService: %s" % (dict_copy,) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 6893610e71..6980e5890e 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -12,20 +12,39 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import urllib + +from prometheus_client import Counter + from twisted.internet import defer from synapse.api.constants import ThirdPartyEntityKind from synapse.api.errors import CodeMessageException -from synapse.http.client import SimpleHttpClient from synapse.events.utils import serialize_event -from synapse.util.caches.response_cache import ResponseCache +from synapse.http.client import SimpleHttpClient from synapse.types import ThirdPartyInstanceID - -import logging -import urllib +from synapse.util.caches.response_cache import ResponseCache logger = logging.getLogger(__name__) +sent_transactions_counter = Counter( + "synapse_appservice_api_sent_transactions", + "Number of /transactions/ requests sent", + ["service"] +) + +failed_transactions_counter = Counter( + "synapse_appservice_api_failed_transactions", + "Number of /transactions/ requests that failed to send", + ["service"] +) + +sent_events_counter = Counter( + "synapse_appservice_api_sent_events", + "Number of events sent to the AS", + ["service"] +) HOUR_IN_MS = 60 * 60 * 1000 @@ -72,7 +91,8 @@ class ApplicationServiceApi(SimpleHttpClient): super(ApplicationServiceApi, self).__init__(hs) self.clock = hs.get_clock() - self.protocol_meta_cache = ResponseCache(hs, timeout_ms=HOUR_IN_MS) + self.protocol_meta_cache = ResponseCache(hs, "as_protocol_meta", + timeout_ms=HOUR_IN_MS) @defer.inlineCallbacks def query_user(self, service, user_id): @@ -192,9 +212,7 @@ class ApplicationServiceApi(SimpleHttpClient): defer.returnValue(None) key = (service.id, protocol) - return self.protocol_meta_cache.get(key) or ( - self.protocol_meta_cache.set(key, _get()) - ) + return self.protocol_meta_cache.wrap(key, _get) @defer.inlineCallbacks def push_bulk(self, service, events, txn_id=None): @@ -220,12 +238,15 @@ class ApplicationServiceApi(SimpleHttpClient): args={ "access_token": service.hs_token }) + sent_transactions_counter.labels(service.id).inc() + sent_events_counter.labels(service.id).inc(len(events)) defer.returnValue(True) return except CodeMessageException as e: logger.warning("push_bulk to %s received %s", uri, e.code) except Exception as ex: logger.warning("push_bulk to %s threw exception %s", uri, ex) + failed_transactions_counter.labels(service.id).inc() defer.returnValue(False) def _serialize(self, events): diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 68a9de17b8..2430814796 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -48,14 +48,14 @@ UP & quit +---------- YES SUCCESS This is all tied together by the AppServiceScheduler which DIs the required components. """ +import logging + from twisted.internet import defer from synapse.appservice import ApplicationServiceState -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from synapse.util.metrics import Measure -import logging - logger = logging.getLogger(__name__) @@ -106,7 +106,7 @@ class _ServiceQueuer(object): def enqueue(self, service, event): # if this service isn't being sent something self.queued_events.setdefault(service.id, []).append(event) - preserve_fn(self._send_request)(service) + run_in_background(self._send_request, service) @defer.inlineCallbacks def _send_request(self, service): @@ -123,7 +123,7 @@ class _ServiceQueuer(object): with Measure(self.clock, "servicequeuer.send"): try: yield self.txn_ctrl.send(service, events) - except: + except Exception: logger.exception("AS request failed") finally: self.requests_in_flight.discard(service.id) @@ -152,10 +152,10 @@ class _TransactionController(object): if sent: yield txn.complete(self.store) else: - preserve_fn(self._start_recoverer)(service) - except Exception as e: - logger.exception(e) - preserve_fn(self._start_recoverer)(service) + run_in_background(self._start_recoverer, service) + except Exception: + logger.exception("Error creating appservice transaction") + run_in_background(self._start_recoverer, service) @defer.inlineCallbacks def on_recovered(self, recoverer): @@ -176,17 +176,20 @@ class _TransactionController(object): @defer.inlineCallbacks def _start_recoverer(self, service): - yield self.store.set_appservice_state( - service, - ApplicationServiceState.DOWN - ) - logger.info( - "Application service falling behind. Starting recoverer. AS ID %s", - service.id - ) - recoverer = self.recoverer_fn(service, self.on_recovered) - self.add_recoverers([recoverer]) - recoverer.recover() + try: + yield self.store.set_appservice_state( + service, + ApplicationServiceState.DOWN + ) + logger.info( + "Application service falling behind. Starting recoverer. AS ID %s", + service.id + ) + recoverer = self.recoverer_fn(service, self.on_recovered) + self.add_recoverers([recoverer]) + recoverer.recover() + except Exception: + logger.exception("Error starting AS recoverer") @defer.inlineCallbacks def _is_service_up(self, service): |