diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 487be7ce9c..a7f428a96c 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -44,6 +44,11 @@ class Auth(object):
def check(self, event, auth_events):
""" Checks if this event is correctly authed.
+ Args:
+ event: the event being checked.
+ auth_events (dict: event-key -> event): the existing room state.
+
+
Returns:
True if the auth checks pass.
"""
@@ -319,7 +324,7 @@ class Auth(object):
Returns:
tuple : of UserID and device string:
User ID object of the user making the request
- Client ID object of the client instance the user is using
+ ClientInfo object of the client instance the user is using
Raises:
AuthError if no user by that token exists or the token is invalid.
"""
@@ -352,7 +357,7 @@ class Auth(object):
)
return
except KeyError:
- pass # normal users won't have this query parameter set
+ pass # normal users won't have the user_id query parameter set.
user_info = yield self.get_user_by_token(access_token)
user = user_info["user"]
@@ -521,23 +526,22 @@ class Auth(object):
# Check state_key
if hasattr(event, "state_key"):
- if not event.state_key.startswith("_"):
- if event.state_key.startswith("@"):
- if event.state_key != event.user_id:
+ if event.state_key.startswith("@"):
+ if event.state_key != event.user_id:
+ raise AuthError(
+ 403,
+ "You are not allowed to set others state"
+ )
+ else:
+ sender_domain = UserID.from_string(
+ event.user_id
+ ).domain
+
+ if sender_domain != event.state_key:
raise AuthError(
403,
"You are not allowed to set others state"
)
- else:
- sender_domain = UserID.from_string(
- event.user_id
- ).domain
-
- if sender_domain != event.state_key:
- raise AuthError(
- 403,
- "You are not allowed to set others state"
- )
return True
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 49e27c9e11..f04493f92a 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -657,7 +657,8 @@ def run(hs):
if hs.config.daemonize:
- print hs.config.pid_file
+ if hs.config.print_pidfile:
+ print hs.config.pid_file
daemon = Daemonize(
app="synapse-homeserver",
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index d483c67c6a..73f6959959 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -139,11 +139,18 @@ class Config(object):
help="Generate a config file for the server name"
)
config_parser.add_argument(
+ "--generate-keys",
+ action="store_true",
+ help="Generate any missing key files then exit"
+ )
+ config_parser.add_argument(
"-H", "--server-name",
help="The server name to generate a config file for"
)
config_args, remaining_args = config_parser.parse_known_args(argv)
+ generate_keys = config_args.generate_keys
+
if config_args.generate_config:
if not config_args.config_path:
config_parser.error(
@@ -151,51 +158,40 @@ class Config(object):
" generated using \"--generate-config -H SERVER_NAME"
" -c CONFIG-FILE\""
)
-
- config_dir_path = os.path.dirname(config_args.config_path[0])
- config_dir_path = os.path.abspath(config_dir_path)
-
- server_name = config_args.server_name
- if not server_name:
- print "Must specify a server_name to a generate config for."
- sys.exit(1)
(config_path,) = config_args.config_path
- if not os.path.exists(config_dir_path):
- os.makedirs(config_dir_path)
- if os.path.exists(config_path):
- print "Config file %r already exists" % (config_path,)
- yaml_config = cls.read_config_file(config_path)
- yaml_name = yaml_config["server_name"]
- if server_name != yaml_name:
- print (
- "Config file %r has a different server_name: "
- " %r != %r" % (config_path, server_name, yaml_name)
- )
+ if not os.path.exists(config_path):
+ config_dir_path = os.path.dirname(config_path)
+ config_dir_path = os.path.abspath(config_dir_path)
+
+ server_name = config_args.server_name
+ if not server_name:
+ print "Must specify a server_name to a generate config for."
sys.exit(1)
- config_bytes, config = obj.generate_config(
- config_dir_path, server_name
+ if not os.path.exists(config_dir_path):
+ os.makedirs(config_dir_path)
+ with open(config_path, "wb") as config_file:
+ config_bytes, config = obj.generate_config(
+ config_dir_path, server_name
+ )
+ obj.invoke_all("generate_files", config)
+ config_file.write(config_bytes)
+ print (
+ "A config file has been generated in %r for server name"
+ " %r with corresponding SSL keys and self-signed"
+ " certificates. Please review this file and customise it"
+ " to your needs."
+ ) % (config_path, server_name)
+ print (
+ "If this server name is incorrect, you will need to"
+ " regenerate the SSL certificates"
)
- config.update(yaml_config)
- print "Generating any missing keys for %r" % (server_name,)
- obj.invoke_all("generate_files", config)
sys.exit(0)
- with open(config_path, "wb") as config_file:
- config_bytes, config = obj.generate_config(
- config_dir_path, server_name
- )
- obj.invoke_all("generate_files", config)
- config_file.write(config_bytes)
+ else:
print (
- "A config file has been generated in %s for server name"
- " '%s' with corresponding SSL keys and self-signed"
- " certificates. Please review this file and customise it to"
- " your needs."
- ) % (config_path, server_name)
- print (
- "If this server name is incorrect, you will need to regenerate"
- " the SSL certificates"
- )
- sys.exit(0)
+ "Config file %r already exists. Generating any missing key"
+ " files."
+ ) % (config_path,)
+ generate_keys = True
parser = argparse.ArgumentParser(
parents=[config_parser],
@@ -213,7 +209,7 @@ class Config(object):
" -c CONFIG-FILE\""
)
- config_dir_path = os.path.dirname(config_args.config_path[0])
+ config_dir_path = os.path.dirname(config_args.config_path[-1])
config_dir_path = os.path.abspath(config_dir_path)
specified_config = {}
@@ -226,6 +222,10 @@ class Config(object):
config.pop("log_config")
config.update(specified_config)
+ if generate_keys:
+ obj.invoke_all("generate_files", config)
+ sys.exit(0)
+
obj.invoke_all("read_config", config)
obj.invoke_all("read_arguments", args)
diff --git a/synapse/config/server.py b/synapse/config/server.py
index f4d4a87103..f9a3b5f15b 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -24,6 +24,7 @@ class ServerConfig(Config):
self.web_client = config["web_client"]
self.soft_file_limit = config["soft_file_limit"]
self.daemonize = config.get("daemonize")
+ self.print_pidfile = config.get("print_pidfile")
self.use_frozen_dicts = config.get("use_frozen_dicts", True)
self.listeners = config.get("listeners", [])
@@ -208,12 +209,18 @@ class ServerConfig(Config):
self.manhole = args.manhole
if args.daemonize is not None:
self.daemonize = args.daemonize
+ if args.print_pidfile is not None:
+ self.print_pidfile = args.print_pidfile
def add_arguments(self, parser):
server_group = parser.add_argument_group("server")
server_group.add_argument("-D", "--daemonize", action='store_true',
default=None,
help="Daemonize the home server")
+ server_group.add_argument("--print-pidfile", action='store_true',
+ default=None,
+ help="Print the path to the pidfile just"
+ " before daemonizing")
server_group.add_argument("--manhole", metavar="PORT", dest="manhole",
type=int,
help="Turn on the twisted telnet manhole"
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 6200e10775..c1095708a0 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -44,7 +44,7 @@ class IdentityHandler(BaseHandler):
http_client = SimpleHttpClient(self.hs)
# XXX: make this configurable!
# trustedIdServers = ['matrix.org', 'localhost:8090']
- trustedIdServers = ['matrix.org']
+ trustedIdServers = ['matrix.org', 'vector.im']
if 'id_server' in creds:
id_server = creds['id_server']
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index a1288b4252..f81d75017d 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -73,7 +73,8 @@ class RegistrationHandler(BaseHandler):
localpart : The local part of the user ID to register. If None,
one will be randomly generated.
password (str) : The password to assign to this user so they can
- login again.
+ login again. This can be None which means they cannot login again
+ via a password (e.g. the user is an application service user).
Returns:
A tuple of (user_id, access_token).
Raises:
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index ed47e701e7..854e17a473 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -16,7 +16,7 @@
from twisted.internet import defer, reactor, protocol
from twisted.internet.error import DNSLookupError
-from twisted.web.client import readBody, _AgentBase, _URI, HTTPConnectionPool
+from twisted.web.client import readBody, HTTPConnectionPool, Agent
from twisted.web.http_headers import Headers
from twisted.web._newclient import ResponseDone
@@ -55,41 +55,17 @@ incoming_responses_counter = metrics.register_counter(
)
-class MatrixFederationHttpAgent(_AgentBase):
-
- def __init__(self, reactor, pool=None):
- _AgentBase.__init__(self, reactor, pool)
-
- def request(self, destination, endpoint, method, path, params, query,
- headers, body_producer):
-
- outgoing_requests_counter.inc(method)
-
- host = b""
- port = 0
- fragment = b""
-
- parsed_URI = _URI(b"http", destination, host, port, path, params,
- query, fragment)
-
- # Set the connection pool key to be the destination.
- key = destination
-
- d = self._requestWithEndpoint(key, endpoint, method, parsed_URI,
- headers, body_producer,
- parsed_URI.originForm)
-
- def _cb(response):
- incoming_responses_counter.inc(method, response.code)
- return response
-
- def _eb(failure):
- incoming_responses_counter.inc(method, "ERR")
- return failure
+class MatrixFederationEndpointFactory(object):
+ def __init__(self, hs):
+ self.tls_context_factory = hs.tls_context_factory
- d.addCallbacks(_cb, _eb)
+ def endpointForURI(self, uri):
+ destination = uri.netloc
- return d
+ return matrix_federation_endpoint(
+ reactor, destination, timeout=10,
+ ssl_context_factory=self.tls_context_factory
+ )
class MatrixFederationHttpClient(object):
@@ -107,12 +83,18 @@ class MatrixFederationHttpClient(object):
self.server_name = hs.hostname
pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 10
- self.agent = MatrixFederationHttpAgent(reactor, pool=pool)
+ self.agent = Agent.usingEndpointFactory(
+ reactor, MatrixFederationEndpointFactory(hs), pool=pool
+ )
self.clock = hs.get_clock()
self.version_string = hs.version_string
-
self._next_id = 1
+ def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
+ return urlparse.urlunparse(
+ ("matrix", destination, path_bytes, param_bytes, query_bytes, "")
+ )
+
@defer.inlineCallbacks
def _create_request(self, destination, method, path_bytes,
body_callback, headers_dict={}, param_bytes=b"",
@@ -123,8 +105,8 @@ class MatrixFederationHttpClient(object):
headers_dict[b"User-Agent"] = [self.version_string]
headers_dict[b"Host"] = [destination]
- url_bytes = urlparse.urlunparse(
- ("", "", path_bytes, param_bytes, query_bytes, "",)
+ url_bytes = self._create_url(
+ destination, path_bytes, param_bytes, query_bytes
)
txn_id = "%s-O-%s" % (method, self._next_id)
@@ -139,8 +121,8 @@ class MatrixFederationHttpClient(object):
# (once we have reliable transactions in place)
retries_left = 5
- endpoint = preserve_context_over_fn(
- self._getEndpoint, reactor, destination
+ http_url_bytes = urlparse.urlunparse(
+ ("", "", path_bytes, param_bytes, query_bytes, "")
)
log_result = None
@@ -148,17 +130,14 @@ class MatrixFederationHttpClient(object):
while True:
producer = None
if body_callback:
- producer = body_callback(method, url_bytes, headers_dict)
+ producer = body_callback(method, http_url_bytes, headers_dict)
try:
def send_request():
- request_deferred = self.agent.request(
- destination,
- endpoint,
+ request_deferred = preserve_context_over_fn(
+ self.agent.request,
method,
- path_bytes,
- param_bytes,
- query_bytes,
+ url_bytes,
Headers(headers_dict),
producer
)
@@ -452,12 +431,6 @@ class MatrixFederationHttpClient(object):
defer.returnValue((length, headers))
- def _getEndpoint(self, reactor, destination):
- return matrix_federation_endpoint(
- reactor, destination, timeout=10,
- ssl_context_factory=self.hs.tls_context_factory
- )
-
class _ReadBodyToFileProtocol(protocol.Protocol):
def __init__(self, stream, deferred, max_size):
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 9233ea3da9..0be9772991 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -18,8 +18,12 @@ from __future__ import absolute_import
import logging
from resource import getrusage, getpagesize, RUSAGE_SELF
+import functools
import os
import stat
+import time
+
+from twisted.internet import reactor
from .metric import (
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric
@@ -144,3 +148,28 @@ def _process_fds():
return counts
get_metrics_for("process").register_callback("fds", _process_fds, labels=["type"])
+
+reactor_metrics = get_metrics_for("reactor")
+tick_time = reactor_metrics.register_distribution("tick_time")
+pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
+
+
+def runUntilCurrentTimer(func):
+
+ @functools.wraps(func)
+ def f(*args, **kwargs):
+ pending_calls = len(reactor.getDelayedCalls())
+ start = time.time() * 1000
+ ret = func(*args, **kwargs)
+ end = time.time() * 1000
+ tick_time.inc_by(end - start)
+ pending_calls_metric.inc_by(pending_calls)
+ return ret
+
+ return f
+
+
+if hasattr(reactor, "runUntilCurrent"):
+ # runUntilCurrent is called when we have pending calls. It is called once
+ # per iteratation after fd polling.
+ reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent)
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 115bee8c41..fa06480ad1 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -19,7 +19,7 @@ logger = logging.getLogger(__name__)
REQUIREMENTS = {
"syutil>=0.0.7": ["syutil>=0.0.7"],
- "Twisted==14.0.2": ["twisted==14.0.2"],
+ "Twisted>=15.1.0": ["twisted>=15.1.0"],
"service_identity>=1.0.0": ["service_identity>=1.0.0"],
"pyopenssl>=0.14": ["OpenSSL>=0.14"],
"pyyaml": ["yaml"],
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 0c737d73b8..b5926f9ca6 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -19,7 +19,7 @@ from synapse.api.constants import LoginType
from synapse.api.errors import SynapseError, Codes
from synapse.http.servlet import RestServlet
-from ._base import client_v2_pattern, parse_request_allow_empty
+from ._base import client_v2_pattern, parse_json_dict_from_request
import logging
import hmac
@@ -55,30 +55,55 @@ class RegisterRestServlet(RestServlet):
@defer.inlineCallbacks
def on_POST(self, request):
yield run_on_reactor()
+ body = parse_json_dict_from_request(request)
- body = parse_request_allow_empty(request)
- # we do basic sanity checks here because the auth
- # layer will store these in sessions
+ # we do basic sanity checks here because the auth layer will store these
+ # in sessions. Pull out the username/password provided to us.
+ desired_password = None
if 'password' in body:
- if ((not isinstance(body['password'], str) and
- not isinstance(body['password'], unicode)) or
+ if (not isinstance(body['password'], basestring) or
len(body['password']) > 512):
raise SynapseError(400, "Invalid password")
+ desired_password = body["password"]
+ desired_username = None
if 'username' in body:
- if ((not isinstance(body['username'], str) and
- not isinstance(body['username'], unicode)) or
+ if (not isinstance(body['username'], basestring) or
len(body['username']) > 512):
raise SynapseError(400, "Invalid username")
desired_username = body['username']
- yield self.registration_handler.check_username(desired_username)
-
- is_using_shared_secret = False
- is_application_server = False
- service = None
+ appservice = None
if 'access_token' in request.args:
- service = yield self.auth.get_appservice_by_req(request)
+ appservice = yield self.auth.get_appservice_by_req(request)
+
+ # fork off as soon as possible for ASes and shared secret auth which
+ # have completely different registration flows to normal users
+
+ # == Application Service Registration ==
+ if appservice:
+ result = yield self._do_appservice_registration(
+ desired_username, request.args["access_token"][0]
+ )
+ defer.returnValue((200, result)) # we throw for non 200 responses
+ return
+
+ # == Shared Secret Registration == (e.g. create new user scripts)
+ if 'mac' in body:
+ # FIXME: Should we really be determining if this is shared secret
+ # auth based purely on the 'mac' key?
+ result = yield self._do_shared_secret_registration(
+ desired_username, desired_password, body["mac"]
+ )
+ defer.returnValue((200, result)) # we throw for non 200 responses
+ return
+
+ # == Normal User Registration == (everyone else)
+ if self.hs.config.disable_registration:
+ raise SynapseError(403, "Registration has been disabled")
+
+ if desired_username is not None:
+ yield self.registration_handler.check_username(desired_username)
if self.hs.config.enable_registration_captcha:
flows = [
@@ -91,39 +116,20 @@ class RegisterRestServlet(RestServlet):
[LoginType.EMAIL_IDENTITY]
]
- result = None
- if service:
- is_application_server = True
- params = body
- elif 'mac' in body:
- # Check registration-specific shared secret auth
- if 'username' not in body:
- raise SynapseError(400, "", Codes.MISSING_PARAM)
- self._check_shared_secret_auth(
- body['username'], body['mac']
- )
- is_using_shared_secret = True
- params = body
- else:
- authed, result, params = yield self.auth_handler.check_auth(
- flows, body, self.hs.get_ip_from_request(request)
- )
-
- if not authed:
- defer.returnValue((401, result))
-
- can_register = (
- not self.hs.config.disable_registration
- or is_application_server
- or is_using_shared_secret
+ authed, result, params = yield self.auth_handler.check_auth(
+ flows, body, self.hs.get_ip_from_request(request)
)
- if not can_register:
- raise SynapseError(403, "Registration has been disabled")
+ if not authed:
+ defer.returnValue((401, result))
+ return
+
+ # NB: This may be from the auth handler and NOT from the POST
if 'password' not in params:
- raise SynapseError(400, "", Codes.MISSING_PARAM)
- desired_username = params['username'] if 'username' in params else None
- new_password = params['password']
+ raise SynapseError(400, "Missing password.", Codes.MISSING_PARAM)
+
+ desired_username = params.get("username", None)
+ new_password = params.get("password", None)
(user_id, token) = yield self.registration_handler.register(
localpart=desired_username,
@@ -156,18 +162,21 @@ class RegisterRestServlet(RestServlet):
else:
logger.info("bind_email not specified: not binding email")
- result = {
- "user_id": user_id,
- "access_token": token,
- "home_server": self.hs.hostname,
- }
-
+ result = self._create_registration_details(user_id, token)
defer.returnValue((200, result))
def on_OPTIONS(self, _):
return 200, {}
- def _check_shared_secret_auth(self, username, mac):
+ @defer.inlineCallbacks
+ def _do_appservice_registration(self, username, as_token):
+ (user_id, token) = yield self.registration_handler.appservice_register(
+ username, as_token
+ )
+ defer.returnValue(self._create_registration_details(user_id, token))
+
+ @defer.inlineCallbacks
+ def _do_shared_secret_registration(self, username, password, mac):
if not self.hs.config.registration_shared_secret:
raise SynapseError(400, "Shared secret registration is not enabled")
@@ -183,13 +192,23 @@ class RegisterRestServlet(RestServlet):
digestmod=sha1,
).hexdigest()
- if compare_digest(want_mac, got_mac):
- return True
- else:
+ if not compare_digest(want_mac, got_mac):
raise SynapseError(
403, "HMAC incorrect",
)
+ (user_id, token) = yield self.registration_handler.register(
+ localpart=username, password=password
+ )
+ defer.returnValue(self._create_registration_details(user_id, token))
+
+ def _create_registration_details(self, user_id, token):
+ return {
+ "user_id": user_id,
+ "access_token": token,
+ "home_server": self.hs.hostname,
+ }
+
def register_servlets(hs, http_server):
RegisterRestServlet(hs).register(http_server)
diff --git a/synapse/rest/media/v1/base_resource.py b/synapse/rest/media/v1/base_resource.py
index c43ae0314b..84e1961a21 100644
--- a/synapse/rest/media/v1/base_resource.py
+++ b/synapse/rest/media/v1/base_resource.py
@@ -244,43 +244,52 @@ class BaseMediaResource(Resource):
)
return
- scales = set()
- crops = set()
- for r_width, r_height, r_method, r_type in requirements:
- if r_method == "scale":
- t_width, t_height = thumbnailer.aspect(r_width, r_height)
- scales.add((
- min(m_width, t_width), min(m_height, t_height), r_type,
+ local_thumbnails = []
+
+ def generate_thumbnails():
+ scales = set()
+ crops = set()
+ for r_width, r_height, r_method, r_type in requirements:
+ if r_method == "scale":
+ t_width, t_height = thumbnailer.aspect(r_width, r_height)
+ scales.add((
+ min(m_width, t_width), min(m_height, t_height), r_type,
+ ))
+ elif r_method == "crop":
+ crops.add((r_width, r_height, r_type))
+
+ for t_width, t_height, t_type in scales:
+ t_method = "scale"
+ t_path = self.filepaths.local_media_thumbnail(
+ media_id, t_width, t_height, t_type, t_method
+ )
+ self._makedirs(t_path)
+ t_len = thumbnailer.scale(t_path, t_width, t_height, t_type)
+
+ local_thumbnails.append((
+ media_id, t_width, t_height, t_type, t_method, t_len
))
- elif r_method == "crop":
- crops.add((r_width, r_height, r_type))
- for t_width, t_height, t_type in scales:
- t_method = "scale"
- t_path = self.filepaths.local_media_thumbnail(
- media_id, t_width, t_height, t_type, t_method
- )
- self._makedirs(t_path)
- t_len = thumbnailer.scale(t_path, t_width, t_height, t_type)
- yield self.store.store_local_thumbnail(
- media_id, t_width, t_height, t_type, t_method, t_len
- )
+ for t_width, t_height, t_type in crops:
+ if (t_width, t_height, t_type) in scales:
+ # If the aspect ratio of the cropped thumbnail matches a purely
+ # scaled one then there is no point in calculating a separate
+ # thumbnail.
+ continue
+ t_method = "crop"
+ t_path = self.filepaths.local_media_thumbnail(
+ media_id, t_width, t_height, t_type, t_method
+ )
+ self._makedirs(t_path)
+ t_len = thumbnailer.crop(t_path, t_width, t_height, t_type)
+ local_thumbnails.append((
+ media_id, t_width, t_height, t_type, t_method, t_len
+ ))
- for t_width, t_height, t_type in crops:
- if (t_width, t_height, t_type) in scales:
- # If the aspect ratio of the cropped thumbnail matches a purely
- # scaled one then there is no point in calculating a separate
- # thumbnail.
- continue
- t_method = "crop"
- t_path = self.filepaths.local_media_thumbnail(
- media_id, t_width, t_height, t_type, t_method
- )
- self._makedirs(t_path)
- t_len = thumbnailer.crop(t_path, t_width, t_height, t_type)
- yield self.store.store_local_thumbnail(
- media_id, t_width, t_height, t_type, t_method, t_len
- )
+ yield threads.deferToThread(generate_thumbnails)
+
+ for l in local_thumbnails:
+ yield self.store.store_local_thumbnail(*l)
defer.returnValue({
"width": m_width,
diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py
index 4a9b6d8eeb..61f88e486e 100644
--- a/synapse/rest/media/v1/thumbnail_resource.py
+++ b/synapse/rest/media/v1/thumbnail_resource.py
@@ -162,11 +162,12 @@ class ThumbnailResource(BaseMediaResource):
t_method = info["thumbnail_method"]
if t_method == "scale" or t_method == "crop":
aspect_quality = abs(d_w * t_h - d_h * t_w)
+ min_quality = 0 if d_w <= t_w and d_h <= t_h else 1
size_quality = abs((d_w - t_w) * (d_h - t_h))
type_quality = desired_type != info["thumbnail_type"]
length_quality = info["thumbnail_length"]
info_list.append((
- aspect_quality, size_quality, type_quality,
+ aspect_quality, min_quality, size_quality, type_quality,
length_quality, info
))
if info_list:
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 71d5d92500..c6ce65b4cc 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -99,7 +99,7 @@ class DataStore(RoomMemberStore, RoomStore,
key = (user.to_string(), access_token, device_id, ip)
try:
- last_seen = self.client_ip_last_seen.get(*key)
+ last_seen = self.client_ip_last_seen.get(key)
except KeyError:
last_seen = None
@@ -107,7 +107,7 @@ class DataStore(RoomMemberStore, RoomStore,
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
defer.returnValue(None)
- self.client_ip_last_seen.prefill(*key + (now,))
+ self.client_ip_last_seen.prefill(key, now)
# It's safe not to lock here: a) no unique constraint,
# b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely
@@ -354,6 +354,11 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
)
logger.debug("Running script %s", relative_path)
module.run_upgrade(cur, database_engine)
+ elif ext == ".pyc":
+ # Sometimes .pyc files turn up anyway even though we've
+ # disabled their generation; e.g. from distribution package
+ # installers. Silently skip it
+ pass
elif ext == ".sql":
# A plain old .sql file, just read and execute it
logger.debug("Applying schema %s", relative_path)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 8f812f0fd7..73eea157a4 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -15,6 +15,7 @@
import logging
from synapse.api.errors import StoreError
+from synapse.util.async import ObservableDeferred
from synapse.util.logutils import log_function
from synapse.util.logcontext import preserve_context_over_fn, LoggingContext
from synapse.util.lrucache import LruCache
@@ -27,6 +28,7 @@ from twisted.internet import defer
from collections import namedtuple, OrderedDict
import functools
+import inspect
import sys
import time
import threading
@@ -55,9 +57,12 @@ cache_counter = metrics.register_cache(
)
+_CacheSentinel = object()
+
+
class Cache(object):
- def __init__(self, name, max_entries=1000, keylen=1, lru=False):
+ def __init__(self, name, max_entries=1000, keylen=1, lru=True):
if lru:
self.cache = LruCache(max_size=max_entries)
self.max_entries = None
@@ -81,45 +86,44 @@ class Cache(object):
"Cache objects can only be accessed from the main thread"
)
- def get(self, *keyargs):
- if len(keyargs) != self.keylen:
- raise ValueError("Expected a key to have %d items", self.keylen)
-
- if keyargs in self.cache:
+ def get(self, key, default=_CacheSentinel):
+ val = self.cache.get(key, _CacheSentinel)
+ if val is not _CacheSentinel:
cache_counter.inc_hits(self.name)
- return self.cache[keyargs]
+ return val
cache_counter.inc_misses(self.name)
- raise KeyError()
- def update(self, sequence, *args):
+ if default is _CacheSentinel:
+ raise KeyError()
+ else:
+ return default
+
+ def update(self, sequence, key, value):
self.check_thread()
if self.sequence == sequence:
# Only update the cache if the caches sequence number matches the
# number that the cache had before the SELECT was started (SYN-369)
- self.prefill(*args)
-
- def prefill(self, *args): # because I can't *keyargs, value
- keyargs = args[:-1]
- value = args[-1]
-
- if len(keyargs) != self.keylen:
- raise ValueError("Expected a key to have %d items", self.keylen)
+ self.prefill(key, value)
+ def prefill(self, key, value):
if self.max_entries is not None:
while len(self.cache) >= self.max_entries:
self.cache.popitem(last=False)
- self.cache[keyargs] = value
+ self.cache[key] = value
- def invalidate(self, *keyargs):
+ def invalidate(self, key):
self.check_thread()
- if len(keyargs) != self.keylen:
- raise ValueError("Expected a key to have %d items", self.keylen)
+ if not isinstance(key, tuple):
+ raise TypeError(
+ "The cache key must be a tuple not %r" % (type(key),)
+ )
+
# Increment the sequence number so that any SELECT statements that
# raced with the INSERT don't update the cache (SYN-369)
self.sequence += 1
- self.cache.pop(keyargs, None)
+ self.cache.pop(key, None)
def invalidate_all(self):
self.check_thread()
@@ -130,6 +134,9 @@ class Cache(object):
class CacheDescriptor(object):
""" A method decorator that applies a memoizing cache around the function.
+ This caches deferreds, rather than the results themselves. Deferreds that
+ fail are removed from the cache.
+
The function is presumed to take zero or more arguments, which are used in
a tuple as the key for the cache. Hits are served directly from the cache;
misses use the function body to generate the value.
@@ -141,58 +148,92 @@ class CacheDescriptor(object):
which can be used to insert values into the cache specifically, without
calling the calculation function.
"""
- def __init__(self, orig, max_entries=1000, num_args=1, lru=False):
+ def __init__(self, orig, max_entries=1000, num_args=1, lru=True,
+ inlineCallbacks=False):
self.orig = orig
+ if inlineCallbacks:
+ self.function_to_call = defer.inlineCallbacks(orig)
+ else:
+ self.function_to_call = orig
+
self.max_entries = max_entries
self.num_args = num_args
self.lru = lru
- def __get__(self, obj, objtype=None):
- cache = Cache(
+ self.arg_names = inspect.getargspec(orig).args[1:num_args+1]
+
+ if len(self.arg_names) < self.num_args:
+ raise Exception(
+ "Not enough explicit positional arguments to key off of for %r."
+ " (@cached cannot key off of *args or **kwars)"
+ % (orig.__name__,)
+ )
+
+ self.cache = Cache(
name=self.orig.__name__,
max_entries=self.max_entries,
keylen=self.num_args,
lru=self.lru,
)
+ def __get__(self, obj, objtype=None):
+
@functools.wraps(self.orig)
- @defer.inlineCallbacks
- def wrapped(*keyargs):
+ def wrapped(*args, **kwargs):
+ arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs)
+ cache_key = tuple(arg_dict[arg_nm] for arg_nm in self.arg_names)
try:
- cached_result = cache.get(*keyargs[:self.num_args])
+ cached_result_d = self.cache.get(cache_key)
+
+ observer = cached_result_d.observe()
if DEBUG_CACHES:
- actual_result = yield self.orig(obj, *keyargs)
- if actual_result != cached_result:
- logger.error(
- "Stale cache entry %s%r: cached: %r, actual %r",
- self.orig.__name__, keyargs,
- cached_result, actual_result,
- )
- raise ValueError("Stale cache entry")
- defer.returnValue(cached_result)
+ @defer.inlineCallbacks
+ def check_result(cached_result):
+ actual_result = yield self.function_to_call(obj, *args, **kwargs)
+ if actual_result != cached_result:
+ logger.error(
+ "Stale cache entry %s%r: cached: %r, actual %r",
+ self.orig.__name__, cache_key,
+ cached_result, actual_result,
+ )
+ raise ValueError("Stale cache entry")
+ defer.returnValue(cached_result)
+ observer.addCallback(check_result)
+
+ return observer
except KeyError:
# Get the sequence number of the cache before reading from the
# database so that we can tell if the cache is invalidated
# while the SELECT is executing (SYN-369)
- sequence = cache.sequence
+ sequence = self.cache.sequence
+
+ ret = defer.maybeDeferred(
+ self.function_to_call,
+ obj, *args, **kwargs
+ )
+
+ def onErr(f):
+ self.cache.invalidate(cache_key)
+ return f
- ret = yield self.orig(obj, *keyargs)
+ ret.addErrback(onErr)
- cache.update(sequence, *keyargs[:self.num_args] + (ret,))
+ ret = ObservableDeferred(ret, consumeErrors=True)
+ self.cache.update(sequence, cache_key, ret)
- defer.returnValue(ret)
+ return ret.observe()
- wrapped.invalidate = cache.invalidate
- wrapped.invalidate_all = cache.invalidate_all
- wrapped.prefill = cache.prefill
+ wrapped.invalidate = self.cache.invalidate
+ wrapped.invalidate_all = self.cache.invalidate_all
+ wrapped.prefill = self.cache.prefill
obj.__dict__[self.orig.__name__] = wrapped
return wrapped
-def cached(max_entries=1000, num_args=1, lru=False):
+def cached(max_entries=1000, num_args=1, lru=True):
return lambda orig: CacheDescriptor(
orig,
max_entries=max_entries,
@@ -201,6 +242,16 @@ def cached(max_entries=1000, num_args=1, lru=False):
)
+def cachedInlineCallbacks(max_entries=1000, num_args=1, lru=False):
+ return lambda orig: CacheDescriptor(
+ orig,
+ max_entries=max_entries,
+ num_args=num_args,
+ lru=lru,
+ inlineCallbacks=True,
+ )
+
+
class LoggingTransaction(object):
"""An object that almost-transparently proxies for the 'txn' object
passed to the constructor. Adds logging and metrics to the .execute()
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index 2b2bdf8615..f3947bbe89 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -104,7 +104,7 @@ class DirectoryStore(SQLBaseStore):
},
desc="create_room_alias_association",
)
- self.get_aliases_for_room.invalidate(room_id)
+ self.get_aliases_for_room.invalidate((room_id,))
@defer.inlineCallbacks
def delete_room_alias(self, room_alias):
@@ -114,7 +114,7 @@ class DirectoryStore(SQLBaseStore):
room_alias,
)
- self.get_aliases_for_room.invalidate(room_id)
+ self.get_aliases_for_room.invalidate((room_id,))
defer.returnValue(room_id)
def _delete_room_alias_txn(self, txn, room_alias):
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 45b86c94e8..910b6598a7 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -362,7 +362,7 @@ class EventFederationStore(SQLBaseStore):
for room_id in events_by_room:
txn.call_after(
- self.get_latest_event_ids_in_room.invalidate, room_id
+ self.get_latest_event_ids_in_room.invalidate, (room_id,)
)
def get_backfill_events(self, room_id, event_list, limit):
@@ -505,4 +505,4 @@ class EventFederationStore(SQLBaseStore):
query = "DELETE FROM event_forward_extremities WHERE room_id = ?"
txn.execute(query, (room_id,))
- txn.call_after(self.get_latest_event_ids_in_room.invalidate, room_id)
+ txn.call_after(self.get_latest_event_ids_in_room.invalidate, (room_id,))
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ed7ea38804..5b64918024 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -162,8 +162,8 @@ class EventsStore(SQLBaseStore):
if current_state:
txn.call_after(self.get_current_state_for_key.invalidate_all)
txn.call_after(self.get_rooms_for_user.invalidate_all)
- txn.call_after(self.get_users_in_room.invalidate, event.room_id)
- txn.call_after(self.get_joined_hosts_for_room.invalidate, event.room_id)
+ txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
+ txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
txn.call_after(self.get_room_name_and_aliases, event.room_id)
self._simple_delete_txn(
@@ -430,13 +430,13 @@ class EventsStore(SQLBaseStore):
if not context.rejected:
txn.call_after(
self.get_current_state_for_key.invalidate,
- event.room_id, event.type, event.state_key
- )
+ (event.room_id, event.type, event.state_key,)
+ )
if event.type in [EventTypes.Name, EventTypes.Aliases]:
txn.call_after(
self.get_room_name_and_aliases.invalidate,
- event.room_id
+ (event.room_id,)
)
self._simple_upsert_txn(
@@ -567,8 +567,9 @@ class EventsStore(SQLBaseStore):
def _invalidate_get_event_cache(self, event_id):
for check_redacted in (False, True):
for get_prev_content in (False, True):
- self._get_event_cache.invalidate(event_id, check_redacted,
- get_prev_content)
+ self._get_event_cache.invalidate(
+ (event_id, check_redacted, get_prev_content)
+ )
def _get_event_txn(self, txn, event_id, check_redacted=True,
get_prev_content=False, allow_rejected=False):
@@ -589,7 +590,7 @@ class EventsStore(SQLBaseStore):
for event_id in events:
try:
ret = self._get_event_cache.get(
- event_id, check_redacted, get_prev_content
+ (event_id, check_redacted, get_prev_content,)
)
if allow_rejected or not ret.rejected_reason:
@@ -822,7 +823,7 @@ class EventsStore(SQLBaseStore):
ev.unsigned["prev_content"] = prev.get_dict()["content"]
self._get_event_cache.prefill(
- ev.event_id, check_redacted, get_prev_content, ev
+ (ev.event_id, check_redacted, get_prev_content), ev
)
defer.returnValue(ev)
@@ -879,7 +880,7 @@ class EventsStore(SQLBaseStore):
ev.unsigned["prev_content"] = prev.get_dict()["content"]
self._get_event_cache.prefill(
- ev.event_id, check_redacted, get_prev_content, ev
+ (ev.event_id, check_redacted, get_prev_content), ev
)
return ev
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 940a5f7e08..49b8e37cfd 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from _base import SQLBaseStore, cached
+from _base import SQLBaseStore, cachedInlineCallbacks
from twisted.internet import defer
@@ -71,8 +71,7 @@ class KeyStore(SQLBaseStore):
desc="store_server_certificate",
)
- @cached()
- @defer.inlineCallbacks
+ @cachedInlineCallbacks()
def get_all_server_verify_keys(self, server_name):
rows = yield self._simple_select_list(
table="server_signature_keys",
@@ -132,7 +131,7 @@ class KeyStore(SQLBaseStore):
desc="store_server_verify_key",
)
- self.get_all_server_verify_keys.invalidate(server_name)
+ self.get_all_server_verify_keys.invalidate((server_name,))
def store_server_keys_json(self, server_name, key_id, from_server,
ts_now_ms, ts_expires_ms, key_json_bytes):
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index fefcf6bce0..576cf670cc 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -98,7 +98,7 @@ class PresenceStore(SQLBaseStore):
updatevalues={"accepted": True},
desc="set_presence_list_accepted",
)
- self.get_presence_list_accepted.invalidate(observer_localpart)
+ self.get_presence_list_accepted.invalidate((observer_localpart,))
defer.returnValue(result)
def get_presence_list(self, observer_localpart, accepted=None):
@@ -133,4 +133,4 @@ class PresenceStore(SQLBaseStore):
"observed_user_id": observed_userid},
desc="del_presence_list",
)
- self.get_presence_list_accepted.invalidate(observer_localpart)
+ self.get_presence_list_accepted.invalidate((observer_localpart,))
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 4cac118d17..9b88ca7b39 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore, cached
+from ._base import SQLBaseStore, cachedInlineCallbacks
from twisted.internet import defer
import logging
@@ -23,8 +23,7 @@ logger = logging.getLogger(__name__)
class PushRuleStore(SQLBaseStore):
- @cached()
- @defer.inlineCallbacks
+ @cachedInlineCallbacks()
def get_push_rules_for_user(self, user_name):
rows = yield self._simple_select_list(
table=PushRuleTable.table_name,
@@ -41,8 +40,7 @@ class PushRuleStore(SQLBaseStore):
defer.returnValue(rows)
- @cached()
- @defer.inlineCallbacks
+ @cachedInlineCallbacks()
def get_push_rules_enabled_for_user(self, user_name):
results = yield self._simple_select_list(
table=PushRuleEnableTable.table_name,
@@ -153,11 +151,11 @@ class PushRuleStore(SQLBaseStore):
txn.execute(sql, (user_name, priority_class, new_rule_priority))
txn.call_after(
- self.get_push_rules_for_user.invalidate, user_name
+ self.get_push_rules_for_user.invalidate, (user_name,)
)
txn.call_after(
- self.get_push_rules_enabled_for_user.invalidate, user_name
+ self.get_push_rules_enabled_for_user.invalidate, (user_name,)
)
self._simple_insert_txn(
@@ -189,10 +187,10 @@ class PushRuleStore(SQLBaseStore):
new_rule['priority'] = new_prio
txn.call_after(
- self.get_push_rules_for_user.invalidate, user_name
+ self.get_push_rules_for_user.invalidate, (user_name,)
)
txn.call_after(
- self.get_push_rules_enabled_for_user.invalidate, user_name
+ self.get_push_rules_enabled_for_user.invalidate, (user_name,)
)
self._simple_insert_txn(
@@ -218,8 +216,8 @@ class PushRuleStore(SQLBaseStore):
desc="delete_push_rule",
)
- self.get_push_rules_for_user.invalidate(user_name)
- self.get_push_rules_enabled_for_user.invalidate(user_name)
+ self.get_push_rules_for_user.invalidate((user_name,))
+ self.get_push_rules_enabled_for_user.invalidate((user_name,))
@defer.inlineCallbacks
def set_push_rule_enabled(self, user_name, rule_id, enabled):
@@ -240,10 +238,10 @@ class PushRuleStore(SQLBaseStore):
{'id': new_id},
)
txn.call_after(
- self.get_push_rules_for_user.invalidate, user_name
+ self.get_push_rules_for_user.invalidate, (user_name,)
)
txn.call_after(
- self.get_push_rules_enabled_for_user.invalidate, user_name
+ self.get_push_rules_enabled_for_user.invalidate, (user_name,)
)
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 7a6af98d98..b79d6683ca 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore, cached
+from ._base import SQLBaseStore, cachedInlineCallbacks
from twisted.internet import defer
@@ -128,8 +128,7 @@ class ReceiptsStore(SQLBaseStore):
def get_max_receipt_stream_id(self):
return self._receipts_id_gen.get_max_token(self)
- @cached
- @defer.inlineCallbacks
+ @cachedInlineCallbacks()
def get_graph_receipts_for_room(self, room_id):
"""Get receipts for sending to remote servers.
"""
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 90e2606be2..4eaa088b36 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -131,7 +131,7 @@ class RegistrationStore(SQLBaseStore):
user_id
)
for r in rows:
- self.get_user_by_token.invalidate(r)
+ self.get_user_by_token.invalidate((r,))
@cached()
def get_user_by_token(self, token):
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 4612a8aa83..dd5bc2c8fb 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -17,7 +17,7 @@ from twisted.internet import defer
from synapse.api.errors import StoreError
-from ._base import SQLBaseStore, cached
+from ._base import SQLBaseStore, cachedInlineCallbacks
import collections
import logging
@@ -186,8 +186,7 @@ class RoomStore(SQLBaseStore):
}
)
- @cached()
- @defer.inlineCallbacks
+ @cachedInlineCallbacks()
def get_room_name_and_aliases(self, room_id):
def f(txn):
sql = (
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 4db07f6fb4..9f14f38f24 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -54,9 +54,9 @@ class RoomMemberStore(SQLBaseStore):
)
for event in events:
- txn.call_after(self.get_rooms_for_user.invalidate, event.state_key)
- txn.call_after(self.get_joined_hosts_for_room.invalidate, event.room_id)
- txn.call_after(self.get_users_in_room.invalidate, event.room_id)
+ txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,))
+ txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
+ txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
def get_room_member(self, user_id, room_id):
"""Retrieve the current state of a room member.
@@ -78,7 +78,7 @@ class RoomMemberStore(SQLBaseStore):
lambda events: events[0] if events else None
)
- @cached()
+ @cached(max_entries=5000)
def get_users_in_room(self, room_id):
def f(txn):
@@ -154,7 +154,7 @@ class RoomMemberStore(SQLBaseStore):
RoomsForUser(**r) for r in self.cursor_to_dict(txn)
]
- @cached()
+ @cached(max_entries=5000)
def get_joined_hosts_for_room(self, room_id):
return self.runInteraction(
"get_joined_hosts_for_room",
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 47bec65497..7ce51b9bdc 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore, cached
+from ._base import SQLBaseStore, cachedInlineCallbacks
from twisted.internet import defer
@@ -91,7 +91,6 @@ class StateStore(SQLBaseStore):
defer.returnValue(dict(state_list))
- @cached(num_args=1)
def _fetch_events_for_group(self, key, events):
return self._get_events(
events, get_prev_content=False
@@ -189,8 +188,7 @@ class StateStore(SQLBaseStore):
events = yield self._get_events(event_ids, get_prev_content=False)
defer.returnValue(events)
- @cached(num_args=3)
- @defer.inlineCallbacks
+ @cachedInlineCallbacks(num_args=3)
def get_current_state_for_key(self, room_id, event_type, state_key):
def f(txn):
sql = (
diff --git a/synapse/types.py b/synapse/types.py
index dd1b10d646..e190374cbd 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -178,7 +178,7 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
Live tokens start with an "s" followed by the "stream_ordering" id of the
event it comes after. Historic tokens start with a "t" followed by the
- "topological_ordering" id of the event it comes after, follewed by "-",
+ "topological_ordering" id of the event it comes after, followed by "-",
followed by the "stream_ordering" id of the event it comes after.
"""
__slots__ = []
@@ -211,4 +211,5 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
return "s%d" % (self.stream,)
+# token_id is the primary key ID of the access token, not the access token itself.
ClientInfo = namedtuple("ClientInfo", ("device_id", "token_id"))
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 5a1d545c96..7bf2d38bb8 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -51,7 +51,7 @@ class ObservableDeferred(object):
object.__setattr__(self, "_observers", set())
def callback(r):
- self._result = (True, r)
+ object.__setattr__(self, "_result", (True, r))
while self._observers:
try:
self._observers.pop().callback(r)
@@ -60,7 +60,7 @@ class ObservableDeferred(object):
return r
def errback(f):
- self._result = (False, f)
+ object.__setattr__(self, "_result", (False, f))
while self._observers:
try:
self._observers.pop().errback(f)
@@ -97,3 +97,8 @@ class ObservableDeferred(object):
def __setattr__(self, name, value):
setattr(self._deferred, name, value)
+
+ def __repr__(self):
+ return "<ObservableDeferred object at %s, result=%r, _deferred=%r>" % (
+ id(self), self._result, self._deferred,
+ )
|