diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index b989007314..57ed8a3ca2 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -12,13 +12,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.api.constants import EventTypes
-from synapse.util.caches.descriptors import cachedInlineCallbacks
+import logging
+import re
+
+from six import string_types
from twisted.internet import defer
-import logging
-import re
+from synapse.api.constants import EventTypes
+from synapse.types import GroupID, get_domain_from_id
+from synapse.util.caches.descriptors import cachedInlineCallbacks
logger = logging.getLogger(__name__)
@@ -81,14 +84,17 @@ class ApplicationService(object):
# values.
NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS]
- def __init__(self, token, url=None, namespaces=None, hs_token=None,
- sender=None, id=None, protocols=None, rate_limited=True):
+ def __init__(self, token, hostname, url=None, namespaces=None, hs_token=None,
+ sender=None, id=None, protocols=None, rate_limited=True,
+ ip_range_whitelist=None):
self.token = token
self.url = url
self.hs_token = hs_token
self.sender = sender
+ self.server_name = hostname
self.namespaces = self._check_namespaces(namespaces)
self.id = id
+ self.ip_range_whitelist = ip_range_whitelist
if "|" in self.id:
raise Exception("application service ID cannot contain '|' character")
@@ -125,8 +131,26 @@ class ApplicationService(object):
raise ValueError(
"Expected bool for 'exclusive' in ns '%s'" % ns
)
+ group_id = regex_obj.get("group_id")
+ if group_id:
+ if not isinstance(group_id, str):
+ raise ValueError(
+ "Expected string for 'group_id' in ns '%s'" % ns
+ )
+ try:
+ GroupID.from_string(group_id)
+ except Exception:
+ raise ValueError(
+ "Expected valid group ID for 'group_id' in ns '%s'" % ns
+ )
+
+ if get_domain_from_id(group_id) != self.server_name:
+ raise ValueError(
+ "Expected 'group_id' to be this host in ns '%s'" % ns
+ )
+
regex = regex_obj.get("regex")
- if isinstance(regex, basestring):
+ if isinstance(regex, string_types):
regex_obj["regex"] = re.compile(regex) # Pre-compile regex
else:
raise ValueError(
@@ -251,8 +275,27 @@ class ApplicationService(object):
if regex_obj["exclusive"]
]
+ def get_groups_for_user(self, user_id):
+ """Get the groups that this user is associated with by this AS
+
+ Args:
+ user_id (str): The ID of the user.
+
+ Returns:
+ iterable[str]: an iterable that yields group_id strings.
+ """
+ return (
+ regex_obj["group_id"]
+ for regex_obj in self.namespaces[ApplicationService.NS_USERS]
+ if "group_id" in regex_obj and regex_obj["regex"].match(user_id)
+ )
+
def is_rate_limited(self):
return self.rate_limited
def __str__(self):
- return "ApplicationService: %s" % (self.__dict__,)
+ # copy dictionary and redact token fields so they don't get logged
+ dict_copy = self.__dict__.copy()
+ dict_copy["token"] = "<redacted>"
+ dict_copy["hs_token"] = "<redacted>"
+ return "ApplicationService: %s" % (dict_copy,)
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 6893610e71..6980e5890e 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -12,20 +12,39 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import urllib
+
+from prometheus_client import Counter
+
from twisted.internet import defer
from synapse.api.constants import ThirdPartyEntityKind
from synapse.api.errors import CodeMessageException
-from synapse.http.client import SimpleHttpClient
from synapse.events.utils import serialize_event
-from synapse.util.caches.response_cache import ResponseCache
+from synapse.http.client import SimpleHttpClient
from synapse.types import ThirdPartyInstanceID
-
-import logging
-import urllib
+from synapse.util.caches.response_cache import ResponseCache
logger = logging.getLogger(__name__)
+sent_transactions_counter = Counter(
+ "synapse_appservice_api_sent_transactions",
+ "Number of /transactions/ requests sent",
+ ["service"]
+)
+
+failed_transactions_counter = Counter(
+ "synapse_appservice_api_failed_transactions",
+ "Number of /transactions/ requests that failed to send",
+ ["service"]
+)
+
+sent_events_counter = Counter(
+ "synapse_appservice_api_sent_events",
+ "Number of events sent to the AS",
+ ["service"]
+)
HOUR_IN_MS = 60 * 60 * 1000
@@ -72,7 +91,8 @@ class ApplicationServiceApi(SimpleHttpClient):
super(ApplicationServiceApi, self).__init__(hs)
self.clock = hs.get_clock()
- self.protocol_meta_cache = ResponseCache(hs, timeout_ms=HOUR_IN_MS)
+ self.protocol_meta_cache = ResponseCache(hs, "as_protocol_meta",
+ timeout_ms=HOUR_IN_MS)
@defer.inlineCallbacks
def query_user(self, service, user_id):
@@ -192,9 +212,7 @@ class ApplicationServiceApi(SimpleHttpClient):
defer.returnValue(None)
key = (service.id, protocol)
- return self.protocol_meta_cache.get(key) or (
- self.protocol_meta_cache.set(key, _get())
- )
+ return self.protocol_meta_cache.wrap(key, _get)
@defer.inlineCallbacks
def push_bulk(self, service, events, txn_id=None):
@@ -220,12 +238,15 @@ class ApplicationServiceApi(SimpleHttpClient):
args={
"access_token": service.hs_token
})
+ sent_transactions_counter.labels(service.id).inc()
+ sent_events_counter.labels(service.id).inc(len(events))
defer.returnValue(True)
return
except CodeMessageException as e:
logger.warning("push_bulk to %s received %s", uri, e.code)
except Exception as ex:
logger.warning("push_bulk to %s threw exception %s", uri, ex)
+ failed_transactions_counter.labels(service.id).inc()
defer.returnValue(False)
def _serialize(self, events):
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 68a9de17b8..2430814796 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -48,14 +48,14 @@ UP & quit +---------- YES SUCCESS
This is all tied together by the AppServiceScheduler which DIs the required
components.
"""
+import logging
+
from twisted.internet import defer
from synapse.appservice import ApplicationServiceState
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure
-import logging
-
logger = logging.getLogger(__name__)
@@ -106,7 +106,7 @@ class _ServiceQueuer(object):
def enqueue(self, service, event):
# if this service isn't being sent something
self.queued_events.setdefault(service.id, []).append(event)
- preserve_fn(self._send_request)(service)
+ run_in_background(self._send_request, service)
@defer.inlineCallbacks
def _send_request(self, service):
@@ -123,7 +123,7 @@ class _ServiceQueuer(object):
with Measure(self.clock, "servicequeuer.send"):
try:
yield self.txn_ctrl.send(service, events)
- except:
+ except Exception:
logger.exception("AS request failed")
finally:
self.requests_in_flight.discard(service.id)
@@ -152,10 +152,10 @@ class _TransactionController(object):
if sent:
yield txn.complete(self.store)
else:
- preserve_fn(self._start_recoverer)(service)
- except Exception as e:
- logger.exception(e)
- preserve_fn(self._start_recoverer)(service)
+ run_in_background(self._start_recoverer, service)
+ except Exception:
+ logger.exception("Error creating appservice transaction")
+ run_in_background(self._start_recoverer, service)
@defer.inlineCallbacks
def on_recovered(self, recoverer):
@@ -176,17 +176,20 @@ class _TransactionController(object):
@defer.inlineCallbacks
def _start_recoverer(self, service):
- yield self.store.set_appservice_state(
- service,
- ApplicationServiceState.DOWN
- )
- logger.info(
- "Application service falling behind. Starting recoverer. AS ID %s",
- service.id
- )
- recoverer = self.recoverer_fn(service, self.on_recovered)
- self.add_recoverers([recoverer])
- recoverer.recover()
+ try:
+ yield self.store.set_appservice_state(
+ service,
+ ApplicationServiceState.DOWN
+ )
+ logger.info(
+ "Application service falling behind. Starting recoverer. AS ID %s",
+ service.id
+ )
+ recoverer = self.recoverer_fn(service, self.on_recovered)
+ self.add_recoverers([recoverer])
+ recoverer.recover()
+ except Exception:
+ logger.exception("Error starting AS recoverer")
@defer.inlineCallbacks
def _is_service_up(self, service):
|