diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 57ed8a3ca2..b26a31dd54 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -48,9 +48,7 @@ class AppServiceTransaction(object):
A Deferred which resolves to True if the transaction was sent.
"""
return as_api.push_bulk(
- service=self.service,
- events=self.events,
- txn_id=self.id
+ service=self.service, events=self.events, txn_id=self.id
)
def complete(self, store):
@@ -64,10 +62,7 @@ class AppServiceTransaction(object):
Returns:
A Deferred which resolves to True if the transaction was completed.
"""
- return store.complete_appservice_txn(
- service=self.service,
- txn_id=self.id
- )
+ return store.complete_appservice_txn(service=self.service, txn_id=self.id)
class ApplicationService(object):
@@ -76,6 +71,7 @@ class ApplicationService(object):
Provides methods to check if this service is "interested" in events.
"""
+
NS_USERS = "users"
NS_ALIASES = "aliases"
NS_ROOMS = "rooms"
@@ -84,9 +80,19 @@ class ApplicationService(object):
# values.
NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS]
- def __init__(self, token, hostname, url=None, namespaces=None, hs_token=None,
- sender=None, id=None, protocols=None, rate_limited=True,
- ip_range_whitelist=None):
+ def __init__(
+ self,
+ token,
+ hostname,
+ url=None,
+ namespaces=None,
+ hs_token=None,
+ sender=None,
+ id=None,
+ protocols=None,
+ rate_limited=True,
+ ip_range_whitelist=None,
+ ):
self.token = token
self.url = url
self.hs_token = hs_token
@@ -128,9 +134,7 @@ class ApplicationService(object):
if not isinstance(regex_obj, dict):
raise ValueError("Expected dict regex for ns '%s'" % ns)
if not isinstance(regex_obj.get("exclusive"), bool):
- raise ValueError(
- "Expected bool for 'exclusive' in ns '%s'" % ns
- )
+ raise ValueError("Expected bool for 'exclusive' in ns '%s'" % ns)
group_id = regex_obj.get("group_id")
if group_id:
if not isinstance(group_id, str):
@@ -153,9 +157,7 @@ class ApplicationService(object):
if isinstance(regex, string_types):
regex_obj["regex"] = re.compile(regex) # Pre-compile regex
else:
- raise ValueError(
- "Expected string for 'regex' in ns '%s'" % ns
- )
+ raise ValueError("Expected string for 'regex' in ns '%s'" % ns)
return namespaces
def _matches_regex(self, test_string, namespace_key):
@@ -178,8 +180,9 @@ class ApplicationService(object):
if self.is_interested_in_user(event.sender):
defer.returnValue(True)
# also check m.room.member state key
- if (event.type == EventTypes.Member and
- self.is_interested_in_user(event.state_key)):
+ if event.type == EventTypes.Member and self.is_interested_in_user(
+ event.state_key
+ ):
defer.returnValue(True)
if not store:
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 9ccc5a80fc..571881775b 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -32,19 +32,17 @@ logger = logging.getLogger(__name__)
sent_transactions_counter = Counter(
"synapse_appservice_api_sent_transactions",
"Number of /transactions/ requests sent",
- ["service"]
+ ["service"],
)
failed_transactions_counter = Counter(
"synapse_appservice_api_failed_transactions",
"Number of /transactions/ requests that failed to send",
- ["service"]
+ ["service"],
)
sent_events_counter = Counter(
- "synapse_appservice_api_sent_events",
- "Number of events sent to the AS",
- ["service"]
+ "synapse_appservice_api_sent_events", "Number of events sent to the AS", ["service"]
)
HOUR_IN_MS = 60 * 60 * 1000
@@ -92,8 +90,9 @@ class ApplicationServiceApi(SimpleHttpClient):
super(ApplicationServiceApi, self).__init__(hs)
self.clock = hs.get_clock()
- self.protocol_meta_cache = ResponseCache(hs, "as_protocol_meta",
- timeout_ms=HOUR_IN_MS)
+ self.protocol_meta_cache = ResponseCache(
+ hs, "as_protocol_meta", timeout_ms=HOUR_IN_MS
+ )
@defer.inlineCallbacks
def query_user(self, service, user_id):
@@ -102,9 +101,7 @@ class ApplicationServiceApi(SimpleHttpClient):
uri = service.url + ("/users/%s" % urllib.parse.quote(user_id))
response = None
try:
- response = yield self.get_json(uri, {
- "access_token": service.hs_token
- })
+ response = yield self.get_json(uri, {"access_token": service.hs_token})
if response is not None: # just an empty json object
defer.returnValue(True)
except CodeMessageException as e:
@@ -123,9 +120,7 @@ class ApplicationServiceApi(SimpleHttpClient):
uri = service.url + ("/rooms/%s" % urllib.parse.quote(alias))
response = None
try:
- response = yield self.get_json(uri, {
- "access_token": service.hs_token
- })
+ response = yield self.get_json(uri, {"access_token": service.hs_token})
if response is not None: # just an empty json object
defer.returnValue(True)
except CodeMessageException as e:
@@ -144,9 +139,7 @@ class ApplicationServiceApi(SimpleHttpClient):
elif kind == ThirdPartyEntityKind.LOCATION:
required_field = "alias"
else:
- raise ValueError(
- "Unrecognised 'kind' argument %r to query_3pe()", kind
- )
+ raise ValueError("Unrecognised 'kind' argument %r to query_3pe()", kind)
if service.url is None:
defer.returnValue([])
@@ -154,14 +147,13 @@ class ApplicationServiceApi(SimpleHttpClient):
service.url,
APP_SERVICE_PREFIX,
kind,
- urllib.parse.quote(protocol)
+ urllib.parse.quote(protocol),
)
try:
response = yield self.get_json(uri, fields)
if not isinstance(response, list):
logger.warning(
- "query_3pe to %s returned an invalid response %r",
- uri, response
+ "query_3pe to %s returned an invalid response %r", uri, response
)
defer.returnValue([])
@@ -171,8 +163,7 @@ class ApplicationServiceApi(SimpleHttpClient):
ret.append(r)
else:
logger.warning(
- "query_3pe to %s returned an invalid result %r",
- uri, r
+ "query_3pe to %s returned an invalid result %r", uri, r
)
defer.returnValue(ret)
@@ -189,27 +180,27 @@ class ApplicationServiceApi(SimpleHttpClient):
uri = "%s%s/thirdparty/protocol/%s" % (
service.url,
APP_SERVICE_PREFIX,
- urllib.parse.quote(protocol)
+ urllib.parse.quote(protocol),
)
try:
info = yield self.get_json(uri, {})
if not _is_valid_3pe_metadata(info):
- logger.warning("query_3pe_protocol to %s did not return a"
- " valid result", uri)
+ logger.warning(
+ "query_3pe_protocol to %s did not return a" " valid result", uri
+ )
defer.returnValue(None)
for instance in info.get("instances", []):
network_id = instance.get("network_id", None)
if network_id is not None:
instance["instance_id"] = ThirdPartyInstanceID(
- service.id, network_id,
+ service.id, network_id
).to_string()
defer.returnValue(info)
except Exception as ex:
- logger.warning("query_3pe_protocol to %s threw exception %s",
- uri, ex)
+ logger.warning("query_3pe_protocol to %s threw exception %s", uri, ex)
defer.returnValue(None)
key = (service.id, protocol)
@@ -223,22 +214,19 @@ class ApplicationServiceApi(SimpleHttpClient):
events = self._serialize(events)
if txn_id is None:
- logger.warning("push_bulk: Missing txn ID sending events to %s",
- service.url)
+ logger.warning(
+ "push_bulk: Missing txn ID sending events to %s", service.url
+ )
txn_id = str(0)
txn_id = str(txn_id)
- uri = service.url + ("/transactions/%s" %
- urllib.parse.quote(txn_id))
+ uri = service.url + ("/transactions/%s" % urllib.parse.quote(txn_id))
try:
yield self.put_json(
uri=uri,
- json_body={
- "events": events
- },
- args={
- "access_token": service.hs_token
- })
+ json_body={"events": events},
+ args={"access_token": service.hs_token},
+ )
sent_transactions_counter.labels(service.id).inc()
sent_events_counter.labels(service.id).inc(len(events))
defer.returnValue(True)
@@ -252,6 +240,4 @@ class ApplicationServiceApi(SimpleHttpClient):
def _serialize(self, events):
time_now = self.clock.time_msec()
- return [
- serialize_event(e, time_now, as_client_event=True) for e in events
- ]
+ return [serialize_event(e, time_now, as_client_event=True) for e in events]
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 685f15c061..b54bf5411f 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -112,15 +112,14 @@ class _ServiceQueuer(object):
return
run_as_background_process(
- "as-sender-%s" % (service.id, ),
- self._send_request, service,
+ "as-sender-%s" % (service.id,), self._send_request, service
)
@defer.inlineCallbacks
def _send_request(self, service):
# sanity-check: we shouldn't get here if this service already has a sender
# running.
- assert(service.id not in self.requests_in_flight)
+ assert service.id not in self.requests_in_flight
self.requests_in_flight.add(service.id)
try:
@@ -137,7 +136,6 @@ class _ServiceQueuer(object):
class _TransactionController(object):
-
def __init__(self, clock, store, as_api, recoverer_fn):
self.clock = clock
self.store = store
@@ -149,10 +147,7 @@ class _TransactionController(object):
@defer.inlineCallbacks
def send(self, service, events):
try:
- txn = yield self.store.create_appservice_txn(
- service=service,
- events=events
- )
+ txn = yield self.store.create_appservice_txn(service=service, events=events)
service_is_up = yield self._is_service_up(service)
if service_is_up:
sent = yield txn.send(self.as_api)
@@ -167,12 +162,12 @@ class _TransactionController(object):
@defer.inlineCallbacks
def on_recovered(self, recoverer):
self.recoverers.remove(recoverer)
- logger.info("Successfully recovered application service AS ID %s",
- recoverer.service.id)
+ logger.info(
+ "Successfully recovered application service AS ID %s", recoverer.service.id
+ )
logger.info("Remaining active recoverers: %s", len(self.recoverers))
yield self.store.set_appservice_state(
- recoverer.service,
- ApplicationServiceState.UP
+ recoverer.service, ApplicationServiceState.UP
)
def add_recoverers(self, recoverers):
@@ -184,13 +179,10 @@ class _TransactionController(object):
@defer.inlineCallbacks
def _start_recoverer(self, service):
try:
- yield self.store.set_appservice_state(
- service,
- ApplicationServiceState.DOWN
- )
+ yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
logger.info(
"Application service falling behind. Starting recoverer. AS ID %s",
- service.id
+ service.id,
)
recoverer = self.recoverer_fn(service, self.on_recovered)
self.add_recoverers([recoverer])
@@ -205,19 +197,16 @@ class _TransactionController(object):
class _Recoverer(object):
-
@staticmethod
@defer.inlineCallbacks
def start(clock, store, as_api, callback):
- services = yield store.get_appservices_by_state(
- ApplicationServiceState.DOWN
- )
- recoverers = [
- _Recoverer(clock, store, as_api, s, callback) for s in services
- ]
+ services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN)
+ recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services]
for r in recoverers:
- logger.info("Starting recoverer for AS ID %s which was marked as "
- "DOWN", r.service.id)
+ logger.info(
+ "Starting recoverer for AS ID %s which was marked as " "DOWN",
+ r.service.id,
+ )
r.recover()
defer.returnValue(recoverers)
@@ -232,9 +221,9 @@ class _Recoverer(object):
def recover(self):
def _retry():
run_as_background_process(
- "as-recoverer-%s" % (self.service.id,),
- self.retry,
+ "as-recoverer-%s" % (self.service.id,), self.retry
)
+
self.clock.call_later((2 ** self.backoff_counter), _retry)
def _backoff(self):
@@ -248,8 +237,9 @@ class _Recoverer(object):
try:
txn = yield self.store.get_oldest_unsent_txn(self.service)
if txn:
- logger.info("Retrying transaction %s for AS ID %s",
- txn.id, txn.service.id)
+ logger.info(
+ "Retrying transaction %s for AS ID %s", txn.id, txn.service.id
+ )
sent = yield txn.send(self.as_api)
if sent:
yield txn.complete(self.store)
|