diff --git a/synapse/__init__.py b/synapse/__init__.py
index ca113db434..faa183a99e 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,4 +17,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.31.0"
+__version__ = "0.31.2"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 06fa38366d..66639b0089 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -655,7 +655,7 @@ class Auth(object):
auth_events[(EventTypes.PowerLevels, "")] = power_level_event
send_level = event_auth.get_send_level(
- EventTypes.Aliases, "", auth_events
+ EventTypes.Aliases, "", power_level_event,
)
user_level = event_auth.get_user_power_level(user_id, auth_events)
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index dd114dee07..4319ddce03 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -23,6 +23,7 @@ from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
+from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.directory import DirectoryStore
@@ -62,7 +63,7 @@ class AppserviceServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
- resources[METRICS_PREFIX] = MetricsResource(self)
+ resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
root_resource = create_resource_tree(resources, NoResource())
@@ -97,7 +98,7 @@ class AppserviceServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 85dada7f9f..654ddb8414 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -122,7 +122,7 @@ class ClientReaderServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index 5ca77c0f1a..441467093a 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -138,7 +138,7 @@ class EventCreatorServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 2a1995d0cd..b2415cc671 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -111,7 +111,7 @@ class FederationReaderServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 81ad574043..13d2b70053 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -125,7 +125,7 @@ class FederationSenderServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index 5a164a7a95..d2bae4ad03 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -176,7 +176,7 @@ class FrontendProxyServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 714f98a3e0..ae5fc751d5 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -266,7 +266,7 @@ class SynapseHomeServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
@@ -318,11 +318,6 @@ def setup(config_options):
# check any extra requirements we have now we have a config
check_requirements(config)
- version_string = "Synapse/" + get_version_string(synapse)
-
- logger.info("Server hostname: %s", config.server_name)
- logger.info("Server version: %s", version_string)
-
events.USE_FROZEN_DICTS = config.use_frozen_dicts
tls_server_context_factory = context_factory.ServerContextFactory(config)
@@ -335,7 +330,7 @@ def setup(config_options):
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
config=config,
- version_string=version_string,
+ version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 006bba80a8..19a682cce3 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -118,7 +118,7 @@ class MediaRepositoryServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 64df47f9cc..13cfbd08b0 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -128,7 +128,7 @@ class PusherServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 6808d6d3e0..82f06ea185 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -305,7 +305,7 @@ class SynchrotronServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index 712dfa870e..56ae086128 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -171,6 +171,10 @@ def main():
if cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
+ cache_factors = config.get("synctl_cache_factors", {})
+ for cache_name, factor in cache_factors.iteritems():
+ os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
+
worker_configfiles = []
if options.worker:
start_stop_synapse = False
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index ada1c13cec..f5726e3df6 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -150,7 +150,7 @@ class UserDirectoryServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 5fdb579723..d1c598622a 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -292,4 +292,8 @@ class ApplicationService(object):
return self.rate_limited
def __str__(self):
- return "ApplicationService: %s" % (self.__dict__,)
+ # copy dictionary and redact token fields so they don't get logged
+ dict_copy = self.__dict__.copy()
+ dict_copy["token"] = "<redacted>"
+ dict_copy["hs_token"] = "<redacted>"
+ return "ApplicationService: %s" % (dict_copy,)
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 00efff1464..47251fb6ad 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -24,8 +24,27 @@ from synapse.types import ThirdPartyInstanceID
import logging
import urllib
+from prometheus_client import Counter
+
logger = logging.getLogger(__name__)
+sent_transactions_counter = Counter(
+ "synapse_appservice_api_sent_transactions",
+ "Number of /transactions/ requests sent",
+ ["service"]
+)
+
+failed_transactions_counter = Counter(
+ "synapse_appservice_api_failed_transactions",
+ "Number of /transactions/ requests that failed to send",
+ ["service"]
+)
+
+sent_events_counter = Counter(
+ "synapse_appservice_api_sent_events",
+ "Number of events sent to the AS",
+ ["service"]
+)
HOUR_IN_MS = 60 * 60 * 1000
@@ -219,12 +238,15 @@ class ApplicationServiceApi(SimpleHttpClient):
args={
"access_token": service.hs_token
})
+ sent_transactions_counter.labels(service.id).inc()
+ sent_events_counter.labels(service.id).inc(len(events))
defer.returnValue(True)
return
except CodeMessageException as e:
logger.warning("push_bulk to %s received %s", uri, e.code)
except Exception as ex:
logger.warning("push_bulk to %s threw exception %s", uri, ex)
+ failed_transactions_counter.labels(service.id).inc()
defer.returnValue(False)
def _serialize(self, events):
diff --git a/synapse/config/consent_config.py b/synapse/config/consent_config.py
index 8f6ed73328..e22c731aad 100644
--- a/synapse/config/consent_config.py
+++ b/synapse/config/consent_config.py
@@ -18,6 +18,9 @@ from ._base import Config
DEFAULT_CONFIG = """\
# User Consent configuration
#
+# for detailed instructions, see
+# https://github.com/matrix-org/synapse/blob/master/docs/consent_tracking.md
+#
# Parts of this section are required if enabling the 'consent' resource under
# 'listeners', in particular 'template_dir' and 'version'.
#
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index 6a7228dc2f..557c270fbe 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -12,17 +12,20 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-from ._base import Config
-from synapse.util.logcontext import LoggingContextFilter
-from twisted.logger import globalLogBeginner, STDLibLogObserver
import logging
import logging.config
-import yaml
-from string import Template
import os
import signal
+from string import Template
+import sys
+from twisted.logger import STDLibLogObserver, globalLogBeginner
+import yaml
+
+import synapse
+from synapse.util.logcontext import LoggingContextFilter
+from synapse.util.versionstring import get_version_string
+from ._base import Config
DEFAULT_LOG_CONFIG = Template("""
version: 1
@@ -202,6 +205,15 @@ def setup_logging(config, use_worker_options=False):
if getattr(signal, "SIGHUP"):
signal.signal(signal.SIGHUP, sighup)
+ # make sure that the first thing we log is a thing we can grep backwards
+ # for
+ logging.warn("***** STARTING SERVER *****")
+ logging.warn(
+ "Server %s version %s",
+ sys.argv[0], get_version_string(synapse),
+ )
+ logging.info("Server hostname: %s", config.server_name)
+
# It's critical to point twisted's internal logging somewhere, otherwise it
# stacks up and leaks kup to 64K object;
# see: https://twistedmatrix.com/trac/ticket/8164
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 22ee0fc93f..9b17ef0a08 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -27,10 +27,12 @@ from synapse.util.metrics import Measure
from twisted.internet import defer
from signedjson.sign import (
- verify_signed_json, signature_ids, sign_json, encode_canonical_json
+ verify_signed_json, signature_ids, sign_json, encode_canonical_json,
+ SignatureVerifyException,
)
from signedjson.key import (
- is_signing_algorithm_supported, decode_verify_key_bytes
+ is_signing_algorithm_supported, decode_verify_key_bytes,
+ encode_verify_key_base64,
)
from unpaddedbase64 import decode_base64, encode_base64
@@ -56,7 +58,7 @@ Attributes:
key_ids(set(str)): The set of key_ids to that could be used to verify the
JSON object
json_object(dict): The JSON object to verify.
- deferred(twisted.internet.defer.Deferred):
+ deferred(Deferred[str, str, nacl.signing.VerifyKey]):
A deferred (server_name, key_id, verify_key) tuple that resolves when
a verify key has been fetched. The deferreds' callbacks are run with no
logcontext.
@@ -736,6 +738,17 @@ class Keyring(object):
@defer.inlineCallbacks
def _handle_key_deferred(verify_request):
+ """Waits for the key to become available, and then performs a verification
+
+ Args:
+ verify_request (VerifyKeyRequest):
+
+ Returns:
+ Deferred[None]
+
+ Raises:
+ SynapseError if there was a problem performing the verification
+ """
server_name = verify_request.server_name
try:
with PreserveLoggingContext():
@@ -768,11 +781,17 @@ def _handle_key_deferred(verify_request):
))
try:
verify_signed_json(json_object, server_name, verify_key)
- except Exception:
+ except SignatureVerifyException as e:
+ logger.debug(
+ "Error verifying signature for %s:%s:%s with key %s: %s",
+ server_name, verify_key.alg, verify_key.version,
+ encode_verify_key_base64(verify_key),
+ str(e),
+ )
raise SynapseError(
401,
- "Invalid signature for server %s with key %s:%s" % (
- server_name, verify_key.alg, verify_key.version
+ "Invalid signature for server %s with key %s:%s: %s" % (
+ server_name, verify_key.alg, verify_key.version, str(e),
),
Codes.UNAUTHORIZED,
)
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index eaf9cecde6..f512d88145 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -34,9 +34,11 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
event: the event being checked.
auth_events (dict: event-key -> event): the existing room state.
+ Raises:
+ AuthError if the checks fail
Returns:
- True if the auth checks pass.
+ if the auth checks pass.
"""
if do_size_check:
_check_size_limits(event)
@@ -71,7 +73,7 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
# Oh, we don't know what the state of the room was, so we
# are trusting that this is allowed (at least for now)
logger.warn("Trusting event: %s", event.event_id)
- return True
+ return
if event.type == EventTypes.Create:
room_id_domain = get_domain_from_id(event.room_id)
@@ -81,7 +83,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
"Creation event's room_id domain does not match sender's"
)
# FIXME
- return True
+ logger.debug("Allowing! %s", event)
+ return
creation_event = auth_events.get((EventTypes.Create, ""), None)
@@ -118,7 +121,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
403,
"Alias event's state_key does not match sender's domain"
)
- return True
+ logger.debug("Allowing! %s", event)
+ return
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
@@ -127,14 +131,9 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
)
if event.type == EventTypes.Member:
- allowed = _is_membership_change_allowed(
- event, auth_events
- )
- if allowed:
- logger.debug("Allowing! %s", event)
- else:
- logger.debug("Denying! %s", event)
- return allowed
+ _is_membership_change_allowed(event, auth_events)
+ logger.debug("Allowing! %s", event)
+ return
_check_event_sender_in_room(event, auth_events)
@@ -153,7 +152,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
)
)
else:
- return True
+ logger.debug("Allowing! %s", event)
+ return
_can_send_event(event, auth_events)
@@ -200,7 +200,7 @@ def _is_membership_change_allowed(event, auth_events):
create = auth_events.get(key)
if create and event.prev_events[0][0] == create.event_id:
if create.content["creator"] == event.state_key:
- return True
+ return
target_user_id = event.state_key
@@ -265,13 +265,13 @@ def _is_membership_change_allowed(event, auth_events):
raise AuthError(
403, "%s is banned from the room" % (target_user_id,)
)
- return True
+ return
if Membership.JOIN != membership:
if (caller_invited
and Membership.LEAVE == membership
and target_user_id == event.user_id):
- return True
+ return
if not caller_in_room: # caller isn't joined
raise AuthError(
@@ -334,8 +334,6 @@ def _is_membership_change_allowed(event, auth_events):
else:
raise AuthError(500, "Unknown membership %s" % membership)
- return True
-
def _check_event_sender_in_room(event, auth_events):
key = (EventTypes.Member, event.user_id, )
@@ -355,35 +353,46 @@ def _check_joined_room(member, user_id, room_id):
))
-def get_send_level(etype, state_key, auth_events):
- key = (EventTypes.PowerLevels, "", )
- send_level_event = auth_events.get(key)
- send_level = None
- if send_level_event:
- send_level = send_level_event.content.get("events", {}).get(
- etype
- )
- if send_level is None:
- if state_key is not None:
- send_level = send_level_event.content.get(
- "state_default", 50
- )
- else:
- send_level = send_level_event.content.get(
- "events_default", 0
- )
+def get_send_level(etype, state_key, power_levels_event):
+ """Get the power level required to send an event of a given type
+
+ The federation spec [1] refers to this as "Required Power Level".
+
+ https://matrix.org/docs/spec/server_server/unstable.html#definitions
- if send_level:
- send_level = int(send_level)
+ Args:
+ etype (str): type of event
+ state_key (str|None): state_key of state event, or None if it is not
+ a state event.
+ power_levels_event (synapse.events.EventBase|None): power levels event
+ in force at this point in the room
+ Returns:
+ int: power level required to send this event.
+ """
+
+ if power_levels_event:
+ power_levels_content = power_levels_event.content
else:
- send_level = 0
+ power_levels_content = {}
+
+ # see if we have a custom level for this event type
+ send_level = power_levels_content.get("events", {}).get(etype)
+
+ # otherwise, fall back to the state_default/events_default.
+ if send_level is None:
+ if state_key is not None:
+ send_level = power_levels_content.get("state_default", 50)
+ else:
+ send_level = power_levels_content.get("events_default", 0)
- return send_level
+ return int(send_level)
def _can_send_event(event, auth_events):
+ power_levels_event = _get_power_level_event(auth_events)
+
send_level = get_send_level(
- event.type, event.get("state_key", None), auth_events
+ event.type, event.get("state_key"), power_levels_event,
)
user_level = get_user_power_level(event.user_id, auth_events)
@@ -524,13 +533,22 @@ def _check_power_levels(event, auth_events):
def _get_power_level_event(auth_events):
- key = (EventTypes.PowerLevels, "", )
- return auth_events.get(key)
+ return auth_events.get((EventTypes.PowerLevels, ""))
def get_user_power_level(user_id, auth_events):
- power_level_event = _get_power_level_event(auth_events)
+ """Get a user's power level
+
+ Args:
+ user_id (str): user's id to look up in power_levels
+ auth_events (dict[(str, str), synapse.events.EventBase]):
+ state in force at this point in the room (or rather, a subset of
+ it including at least the create event and power levels event.
+ Returns:
+ int: the user's power level in this room.
+ """
+ power_level_event = _get_power_level_event(auth_events)
if power_level_event:
level = power_level_event.content.get("users", {}).get(user_id)
if not level:
@@ -541,6 +559,11 @@ def get_user_power_level(user_id, auth_events):
else:
return int(level)
else:
+ # if there is no power levels event, the creator gets 100 and everyone
+ # else gets 0.
+
+ # some things which call this don't pass the create event: hack around
+ # that.
key = (EventTypes.Create, "", )
create_event = auth_events.get(key)
if (create_event is not None and
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 9f1142b5a9..1d5c0f3797 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -35,7 +35,7 @@ from synapse.storage.presence import UserPresenceState
from synapse.util.metrics import Measure
from synapse.metrics import LaterGauge
-from blist import sorteddict
+from sortedcontainers import SortedDict
from collections import namedtuple
import logging
@@ -55,19 +55,19 @@ class FederationRemoteSendQueue(object):
self.is_mine_id = hs.is_mine_id
self.presence_map = {} # Pending presence map user_id -> UserPresenceState
- self.presence_changed = sorteddict() # Stream position -> user_id
+ self.presence_changed = SortedDict() # Stream position -> user_id
self.keyed_edu = {} # (destination, key) -> EDU
- self.keyed_edu_changed = sorteddict() # stream position -> (destination, key)
+ self.keyed_edu_changed = SortedDict() # stream position -> (destination, key)
- self.edus = sorteddict() # stream position -> Edu
+ self.edus = SortedDict() # stream position -> Edu
- self.failures = sorteddict() # stream position -> (destination, Failure)
+ self.failures = SortedDict() # stream position -> (destination, Failure)
- self.device_messages = sorteddict() # stream position -> destination
+ self.device_messages = SortedDict() # stream position -> destination
self.pos = 1
- self.pos_time = sorteddict()
+ self.pos_time = SortedDict()
# EVERYTHING IS SAD. In particular, python only makes new scopes when
# we make a new function, so we need to make a new function so the inner
@@ -98,7 +98,7 @@ class FederationRemoteSendQueue(object):
now = self.clock.time_msec()
keys = self.pos_time.keys()
- time = keys.bisect_left(now - FIVE_MINUTES_AGO)
+ time = self.pos_time.bisect_left(now - FIVE_MINUTES_AGO)
if not keys[:time]:
return
@@ -113,7 +113,7 @@ class FederationRemoteSendQueue(object):
with Measure(self.clock, "send_queue._clear"):
# Delete things out of presence maps
keys = self.presence_changed.keys()
- i = keys.bisect_left(position_to_delete)
+ i = self.presence_changed.bisect_left(position_to_delete)
for key in keys[:i]:
del self.presence_changed[key]
@@ -131,7 +131,7 @@ class FederationRemoteSendQueue(object):
# Delete things out of keyed edus
keys = self.keyed_edu_changed.keys()
- i = keys.bisect_left(position_to_delete)
+ i = self.keyed_edu_changed.bisect_left(position_to_delete)
for key in keys[:i]:
del self.keyed_edu_changed[key]
@@ -145,19 +145,19 @@ class FederationRemoteSendQueue(object):
# Delete things out of edu map
keys = self.edus.keys()
- i = keys.bisect_left(position_to_delete)
+ i = self.edus.bisect_left(position_to_delete)
for key in keys[:i]:
del self.edus[key]
# Delete things out of failure map
keys = self.failures.keys()
- i = keys.bisect_left(position_to_delete)
+ i = self.failures.bisect_left(position_to_delete)
for key in keys[:i]:
del self.failures[key]
# Delete things out of device map
keys = self.device_messages.keys()
- i = keys.bisect_left(position_to_delete)
+ i = self.device_messages.bisect_left(position_to_delete)
for key in keys[:i]:
del self.device_messages[key]
@@ -250,13 +250,12 @@ class FederationRemoteSendQueue(object):
self._clear_queue_before_pos(federation_ack)
# Fetch changed presence
- keys = self.presence_changed.keys()
- i = keys.bisect_right(from_token)
- j = keys.bisect_right(to_token) + 1
+ i = self.presence_changed.bisect_right(from_token)
+ j = self.presence_changed.bisect_right(to_token) + 1
dest_user_ids = [
(pos, user_id)
- for pos in keys[i:j]
- for user_id in self.presence_changed[pos]
+ for pos, user_id_list in self.presence_changed.items()[i:j]
+ for user_id in user_id_list
]
for (key, user_id) in dest_user_ids:
@@ -265,13 +264,12 @@ class FederationRemoteSendQueue(object):
)))
# Fetch changes keyed edus
- keys = self.keyed_edu_changed.keys()
- i = keys.bisect_right(from_token)
- j = keys.bisect_right(to_token) + 1
+ i = self.keyed_edu_changed.bisect_right(from_token)
+ j = self.keyed_edu_changed.bisect_right(to_token) + 1
# We purposefully clobber based on the key here, python dict comprehensions
# always use the last value, so this will correctly point to the last
# stream position.
- keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
+ keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]}
for ((destination, edu_key), pos) in iteritems(keyed_edus):
rows.append((pos, KeyedEduRow(
@@ -280,19 +278,17 @@ class FederationRemoteSendQueue(object):
)))
# Fetch changed edus
- keys = self.edus.keys()
- i = keys.bisect_right(from_token)
- j = keys.bisect_right(to_token) + 1
- edus = ((k, self.edus[k]) for k in keys[i:j])
+ i = self.edus.bisect_right(from_token)
+ j = self.edus.bisect_right(to_token) + 1
+ edus = self.edus.items()[i:j]
for (pos, edu) in edus:
rows.append((pos, EduRow(edu)))
# Fetch changed failures
- keys = self.failures.keys()
- i = keys.bisect_right(from_token)
- j = keys.bisect_right(to_token) + 1
- failures = ((k, self.failures[k]) for k in keys[i:j])
+ i = self.failures.bisect_right(from_token)
+ j = self.failures.bisect_right(to_token) + 1
+ failures = self.failures.items()[i:j]
for (pos, (destination, failure)) in failures:
rows.append((pos, FailureRow(
@@ -301,10 +297,9 @@ class FederationRemoteSendQueue(object):
)))
# Fetch changed device messages
- keys = self.device_messages.keys()
- i = keys.bisect_right(from_token)
- j = keys.bisect_right(to_token) + 1
- device_messages = {self.device_messages[k]: k for k in keys[i:j]}
+ i = self.device_messages.bisect_right(from_token)
+ j = self.device_messages.bisect_right(to_token) + 1
+ device_messages = {v: k for k, v in self.device_messages.items()[i:j]}
for (destination, pos) in iteritems(device_messages):
rows.append((pos, DeviceRow(
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index f0aeb5a0d3..d72b057e28 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -21,7 +21,6 @@ from .units import Transaction, Edu
from synapse.api.errors import HttpResponseException, FederationDeniedError
from synapse.util import logcontext, PreserveLoggingContext
-from synapse.util.async import run_on_reactor
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.util.metrics import measure_func
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
@@ -42,8 +41,11 @@ import logging
logger = logging.getLogger(__name__)
-sent_pdus_destination_dist = Counter(
- "synapse_federation_transaction_queue_sent_pdu_destinations", ""
+sent_pdus_destination_dist_count = Counter(
+ "synapse_federation_client_sent_pdu_destinations:count", ""
+)
+sent_pdus_destination_dist_total = Counter(
+ "synapse_federation_client_sent_pdu_destinations:total", ""
)
@@ -280,7 +282,8 @@ class TransactionQueue(object):
if not destinations:
return
- sent_pdus_destination_dist.inc(len(destinations))
+ sent_pdus_destination_dist_total.inc(len(destinations))
+ sent_pdus_destination_dist_count.inc()
for destination in destinations:
self.pending_pdus_by_dest.setdefault(destination, []).append(
@@ -451,9 +454,6 @@ class TransactionQueue(object):
# hence why we throw the result away.
yield get_retry_limiter(destination, self.clock, self.store)
- # XXX: what's this for?
- yield run_on_reactor()
-
pending_pdus = []
while True:
device_message_edus, device_stream_id, dev_list_id = (
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 3c0051586d..a131b7f73f 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -13,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
from twisted.internet import defer, threads
from ._base import BaseHandler
@@ -23,7 +24,6 @@ from synapse.api.errors import (
)
from synapse.module_api import ModuleApi
from synapse.types import UserID
-from synapse.util.async import run_on_reactor
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable
@@ -33,6 +33,7 @@ import logging
import bcrypt
import pymacaroons
import simplejson
+import attr
import synapse.util.stringutils as stringutils
@@ -423,15 +424,11 @@ class AuthHandler(BaseHandler):
def _check_msisdn(self, authdict, _):
return self._check_threepid('msisdn', authdict)
- @defer.inlineCallbacks
def _check_dummy_auth(self, authdict, _):
- yield run_on_reactor()
- defer.returnValue(True)
+ return defer.succeed(True)
@defer.inlineCallbacks
def _check_threepid(self, medium, authdict):
- yield run_on_reactor()
-
if 'threepid_creds' not in authdict:
raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
@@ -825,6 +822,15 @@ class AuthHandler(BaseHandler):
if medium == 'email':
address = address.lower()
+ identity_handler = self.hs.get_handlers().identity_handler
+ yield identity_handler.unbind_threepid(
+ user_id,
+ {
+ 'medium': medium,
+ 'address': address,
+ },
+ )
+
ret = yield self.store.user_delete_threepid(
user_id, medium, address,
)
@@ -849,7 +855,11 @@ class AuthHandler(BaseHandler):
return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper,
bcrypt.gensalt(self.bcrypt_rounds))
- return make_deferred_yieldable(threads.deferToThread(_do_hash))
+ return make_deferred_yieldable(
+ threads.deferToThreadPool(
+ self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
+ ),
+ )
def validate_hash(self, password, stored_hash):
"""Validates that self.hash(password) == stored_hash.
@@ -869,16 +879,21 @@ class AuthHandler(BaseHandler):
)
if stored_hash:
- return make_deferred_yieldable(threads.deferToThread(_do_validate_hash))
+ return make_deferred_yieldable(
+ threads.deferToThreadPool(
+ self.hs.get_reactor(),
+ self.hs.get_reactor().getThreadPool(),
+ _do_validate_hash,
+ ),
+ )
else:
return defer.succeed(False)
-class MacaroonGeneartor(object):
- def __init__(self, hs):
- self.clock = hs.get_clock()
- self.server_name = hs.config.server_name
- self.macaroon_secret_key = hs.config.macaroon_secret_key
+@attr.s
+class MacaroonGenerator(object):
+
+ hs = attr.ib()
def generate_access_token(self, user_id, extra_caveats=None):
extra_caveats = extra_caveats or []
@@ -896,7 +911,7 @@ class MacaroonGeneartor(object):
def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)):
macaroon = self._generate_base_macaroon(user_id)
macaroon.add_first_party_caveat("type = login")
- now = self.clock.time_msec()
+ now = self.hs.get_clock().time_msec()
expiry = now + duration_in_ms
macaroon.add_first_party_caveat("time < %d" % (expiry,))
return macaroon.serialize()
@@ -908,9 +923,9 @@ class MacaroonGeneartor(object):
def _generate_base_macaroon(self, user_id):
macaroon = pymacaroons.Macaroon(
- location=self.server_name,
+ location=self.hs.config.server_name,
identifier="key",
- key=self.macaroon_secret_key)
+ key=self.hs.config.macaroon_secret_key)
macaroon.add_first_party_caveat("gen = 1")
macaroon.add_first_party_caveat("user_id = %s" % (user_id,))
return macaroon
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index c5e92f6214..8ec5ba2012 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -17,6 +17,7 @@ from twisted.internet import defer, reactor
from ._base import BaseHandler
from synapse.types import UserID, create_requester
from synapse.util.logcontext import run_in_background
+from synapse.api.errors import SynapseError
import logging
@@ -30,6 +31,7 @@ class DeactivateAccountHandler(BaseHandler):
self._auth_handler = hs.get_auth_handler()
self._device_handler = hs.get_device_handler()
self._room_member_handler = hs.get_room_member_handler()
+ self._identity_handler = hs.get_handlers().identity_handler
self.user_directory_handler = hs.get_user_directory_handler()
# Flag that indicates whether the process to part users from rooms is running
@@ -52,14 +54,35 @@ class DeactivateAccountHandler(BaseHandler):
# FIXME: Theoretically there is a race here wherein user resets
# password using threepid.
- # first delete any devices belonging to the user, which will also
+ # delete threepids first. We remove these from the IS so if this fails,
+ # leave the user still active so they can try again.
+ # Ideally we would prevent password resets and then do this in the
+ # background thread.
+ threepids = yield self.store.user_get_threepids(user_id)
+ for threepid in threepids:
+ try:
+ yield self._identity_handler.unbind_threepid(
+ user_id,
+ {
+ 'medium': threepid['medium'],
+ 'address': threepid['address'],
+ },
+ )
+ except Exception:
+ # Do we want this to be a fatal error or should we carry on?
+ logger.exception("Failed to remove threepid from ID server")
+ raise SynapseError(400, "Failed to remove threepid from ID server")
+ yield self.store.user_delete_threepid(
+ user_id, threepid['medium'], threepid['address'],
+ )
+
+ # delete any devices belonging to the user, which will also
# delete corresponding access tokens.
yield self._device_handler.delete_all_devices_for_user(user_id)
# then delete any remaining access tokens which weren't associated with
# a device.
yield self._auth_handler.delete_access_tokens_for_user(user_id)
- yield self.store.user_delete_threepids(user_id)
yield self.store.user_set_password_hash(user_id, None)
# Add the user to a table of users pending deactivation (ie.
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 495ac4c648..af94bf33bc 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -39,7 +39,7 @@ from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError, logcontext
from synapse.util.metrics import measure_func
from synapse.util.logutils import log_function
-from synapse.util.async import run_on_reactor, Linearizer
+from synapse.util.async import Linearizer
from synapse.util.frozenutils import unfreeze
from synapse.crypto.event_signing import (
compute_event_signature, add_hashes_and_signatures,
@@ -1381,8 +1381,6 @@ class FederationHandler(BaseHandler):
def get_state_for_pdu(self, room_id, event_id):
"""Returns the state at the event. i.e. not including said event.
"""
- yield run_on_reactor()
-
state_groups = yield self.store.get_state_groups(
room_id, [event_id]
)
@@ -1425,8 +1423,6 @@ class FederationHandler(BaseHandler):
def get_state_ids_for_pdu(self, room_id, event_id):
"""Returns the state at the event. i.e. not including said event.
"""
- yield run_on_reactor()
-
state_groups = yield self.store.get_state_groups_ids(
room_id, [event_id]
)
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 91a0898860..f00dfe1d3e 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2017 Vector Creations Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -26,7 +27,6 @@ from synapse.api.errors import (
MatrixCodeMessageException, CodeMessageException
)
from ._base import BaseHandler
-from synapse.util.async import run_on_reactor
from synapse.api.errors import SynapseError, Codes
logger = logging.getLogger(__name__)
@@ -38,6 +38,7 @@ class IdentityHandler(BaseHandler):
super(IdentityHandler, self).__init__(hs)
self.http_client = hs.get_simple_http_client()
+ self.federation_http_client = hs.get_http_client()
self.trusted_id_servers = set(hs.config.trusted_third_party_id_servers)
self.trust_any_id_server_just_for_testing_do_not_use = (
@@ -60,8 +61,6 @@ class IdentityHandler(BaseHandler):
@defer.inlineCallbacks
def threepid_from_creds(self, creds):
- yield run_on_reactor()
-
if 'id_server' in creds:
id_server = creds['id_server']
elif 'idServer' in creds:
@@ -104,7 +103,6 @@ class IdentityHandler(BaseHandler):
@defer.inlineCallbacks
def bind_threepid(self, creds, mxid):
- yield run_on_reactor()
logger.debug("binding threepid %r to %s", creds, mxid)
data = None
@@ -139,9 +137,53 @@ class IdentityHandler(BaseHandler):
defer.returnValue(data)
@defer.inlineCallbacks
- def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
- yield run_on_reactor()
+ def unbind_threepid(self, mxid, threepid):
+ """
+ Removes a binding from an identity server
+ Args:
+ mxid (str): Matrix user ID of binding to be removed
+ threepid (dict): Dict with medium & address of binding to be removed
+
+ Returns:
+ Deferred[bool]: True on success, otherwise False
+ """
+ logger.debug("unbinding threepid %r from %s", threepid, mxid)
+ if not self.trusted_id_servers:
+ logger.warn("Can't unbind threepid: no trusted ID servers set in config")
+ defer.returnValue(False)
+
+ # We don't track what ID server we added 3pids on (perhaps we ought to)
+ # but we assume that any of the servers in the trusted list are in the
+ # same ID server federation, so we can pick any one of them to send the
+ # deletion request to.
+ id_server = next(iter(self.trusted_id_servers))
+
+ url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
+ content = {
+ "mxid": mxid,
+ "threepid": threepid,
+ }
+ headers = {}
+ # we abuse the federation http client to sign the request, but we have to send it
+ # using the normal http client since we don't want the SRV lookup and want normal
+ # 'browser-like' HTTPS.
+ self.federation_http_client.sign_request(
+ destination=None,
+ method='POST',
+ url_bytes='/_matrix/identity/api/v1/3pid/unbind'.encode('ascii'),
+ headers_dict=headers,
+ content=content,
+ destination_is=id_server,
+ )
+ yield self.http_client.post_json_get_json(
+ url,
+ content,
+ headers,
+ )
+ defer.returnValue(True)
+ @defer.inlineCallbacks
+ def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
if not self._should_trust_id_server(id_server):
raise SynapseError(
400, "Untrusted ID server '%s'" % id_server,
@@ -176,8 +218,6 @@ class IdentityHandler(BaseHandler):
self, id_server, country, phone_number,
client_secret, send_attempt, **kwargs
):
- yield run_on_reactor()
-
if not self._should_trust_id_server(id_server):
raise SynapseError(
400, "Untrusted ID server '%s'" % id_server,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 1cb81b6cf8..7b9946ab91 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -36,7 +36,7 @@ from synapse.events.validator import EventValidator
from synapse.types import (
UserID, RoomAlias, RoomStreamToken,
)
-from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
+from synapse.util.async import ReadWriteLock, Limiter
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
from synapse.util.frozenutils import frozendict_json_encoder
@@ -806,6 +806,7 @@ class EventCreationHandler(object):
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
yield send_event_to_master(
+ self.hs.get_clock(),
self.http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
@@ -959,9 +960,7 @@ class EventCreationHandler(object):
event_stream_id, max_stream_id
)
- @defer.inlineCallbacks
def _notify():
- yield run_on_reactor()
try:
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 7e52adda3c..e76ef5426d 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -24,7 +24,7 @@ from synapse.api.errors import (
from synapse.http.client import CaptchaServerHttpClient
from synapse import types
from synapse.types import UserID, create_requester, RoomID, RoomAlias
-from synapse.util.async import run_on_reactor, Linearizer
+from synapse.util.async import Linearizer
from synapse.util.threepids import check_3pid_allowed
from ._base import BaseHandler
@@ -139,7 +139,6 @@ class RegistrationHandler(BaseHandler):
Raises:
RegistrationError if there was a problem registering.
"""
- yield run_on_reactor()
password_hash = None
if password:
password_hash = yield self.auth_handler().hash(password)
@@ -431,8 +430,6 @@ class RegistrationHandler(BaseHandler):
Raises:
RegistrationError if there was a problem registering.
"""
- yield run_on_reactor()
-
if localpart is None:
raise SynapseError(400, "Request must include user id")
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 2abd63ad05..ab72963d87 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -23,7 +23,7 @@ from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken
from synapse.api.constants import (
EventTypes, JoinRules, RoomCreationPreset
)
-from synapse.api.errors import AuthError, StoreError, SynapseError
+from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
from synapse.util import stringutils
from synapse.visibility import filter_events_for_client
@@ -115,7 +115,11 @@ class RoomCreationHandler(BaseHandler):
)
if mapping:
- raise SynapseError(400, "Room alias already taken")
+ raise SynapseError(
+ 400,
+ "Room alias already taken",
+ Codes.ROOM_IN_USE
+ )
else:
room_alias = None
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index a39f0f7343..7e4a114d4f 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -19,7 +19,6 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.storage.roommember import ProfileInfo
from synapse.util.metrics import Measure
-from synapse.util.async import sleep
from synapse.types import get_localpart_from_id
from six import iteritems
@@ -174,7 +173,7 @@ class UserDirectoryHandler(object):
logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
yield self._handle_initial_room(room_id)
num_processed_rooms += 1
- yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+ yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
logger.info("Processed all rooms.")
@@ -188,7 +187,7 @@ class UserDirectoryHandler(object):
logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids))
yield self._handle_local_user(user_id)
num_processed_users += 1
- yield sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
+ yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
logger.info("Processed all users")
@@ -236,7 +235,7 @@ class UserDirectoryHandler(object):
count = 0
for user_id in user_ids:
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
- yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+ yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
if not self.is_mine_id(user_id):
count += 1
@@ -251,7 +250,7 @@ class UserDirectoryHandler(object):
continue
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
- yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+ yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
count += 1
user_set = (user_id, other_user_id)
diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py
index 054372e179..58ef8d3ce4 100644
--- a/synapse/http/__init__.py
+++ b/synapse/http/__init__.py
@@ -13,6 +13,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import re
+
from twisted.internet.defer import CancelledError
from twisted.python import failure
@@ -34,3 +36,14 @@ def cancelled_to_request_timed_out_error(value, timeout):
value.trap(CancelledError)
raise RequestTimedOutError()
return value
+
+
+ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
+
+
+def redact_uri(uri):
+ """Strips access tokens from the uri replaces with <redacted>"""
+ return ACCESS_TOKEN_RE.sub(
+ br'\1<redacted>\3',
+ uri
+ )
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 4d4eee3d64..46ffb41de1 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -19,7 +19,7 @@ from OpenSSL.SSL import VERIFY_NONE
from synapse.api.errors import (
CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
)
-from synapse.http import cancelled_to_request_timed_out_error
+from synapse.http import cancelled_to_request_timed_out_error, redact_uri
from synapse.util.async import add_timeout_to_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable
@@ -90,31 +90,32 @@ class SimpleHttpClient(object):
# counters to it
outgoing_requests_counter.labels(method).inc()
- logger.info("Sending request %s %s", method, uri)
+ # log request but strip `access_token` (AS requests for example include this)
+ logger.info("Sending request %s %s", method, redact_uri(uri))
try:
request_deferred = self.agent.request(
method, uri, *args, **kwargs
)
add_timeout_to_deferred(
- request_deferred,
- 60, cancelled_to_request_timed_out_error,
+ request_deferred, 60, self.hs.get_reactor(),
+ cancelled_to_request_timed_out_error,
)
response = yield make_deferred_yieldable(request_deferred)
incoming_responses_counter.labels(method, response.code).inc()
logger.info(
"Received response to %s %s: %s",
- method, uri, response.code
+ method, redact_uri(uri), response.code
)
defer.returnValue(response)
except Exception as e:
incoming_responses_counter.labels(method, "ERR").inc()
logger.info(
"Error sending request to %s %s: %s %s",
- method, uri, type(e).__name__, e.message
+ method, redact_uri(uri), type(e).__name__, e.message
)
- raise e
+ raise
@defer.inlineCallbacks
def post_urlencoded_get_json(self, uri, args={}, headers=None):
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 821aed362b..4e0399e762 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -22,7 +22,7 @@ from twisted.web._newclient import ResponseDone
from synapse.http import cancelled_to_request_timed_out_error
from synapse.http.endpoint import matrix_federation_endpoint
import synapse.metrics
-from synapse.util.async import sleep, add_timeout_to_deferred
+from synapse.util.async import add_timeout_to_deferred
from synapse.util import logcontext
from synapse.util.logcontext import make_deferred_yieldable
import synapse.util.retryutils
@@ -193,6 +193,7 @@ class MatrixFederationHttpClient(object):
add_timeout_to_deferred(
request_deferred,
timeout / 1000. if timeout else 60,
+ self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
response = yield make_deferred_yieldable(
@@ -234,7 +235,7 @@ class MatrixFederationHttpClient(object):
delay = min(delay, 2)
delay *= random.uniform(0.8, 1.4)
- yield sleep(delay)
+ yield self.clock.sleep(delay)
retries_left -= 1
else:
raise
@@ -260,14 +261,35 @@ class MatrixFederationHttpClient(object):
defer.returnValue(response)
def sign_request(self, destination, method, url_bytes, headers_dict,
- content=None):
+ content=None, destination_is=None):
+ """
+ Signs a request by adding an Authorization header to headers_dict
+ Args:
+ destination (bytes|None): The desination home server of the request.
+ May be None if the destination is an identity server, in which case
+ destination_is must be non-None.
+ method (bytes): The HTTP method of the request
+ url_bytes (bytes): The URI path of the request
+ headers_dict (dict): Dictionary of request headers to append to
+ content (bytes): The body of the request
+ destination_is (bytes): As 'destination', but if the destination is an
+ identity server
+
+ Returns:
+ None
+ """
request = {
"method": method,
"uri": url_bytes,
"origin": self.server_name,
- "destination": destination,
}
+ if destination is not None:
+ request["destination"] = destination
+
+ if destination_is is not None:
+ request["destination_is"] = destination_is
+
if content is not None:
request["content"] = content
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index dc06f6c443..1b711ca2de 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -117,13 +117,17 @@ def _get_in_flight_counts():
Returns:
dict[tuple[str, str], int]
"""
- for rm in _in_flight_requests:
+ # Cast to a list to prevent it changing while the Prometheus
+ # thread is collecting metrics
+ reqs = list(_in_flight_requests)
+
+ for rm in reqs:
rm.update_metrics()
# Map from (method, name) -> int, the number of in flight requests of that
# type
counts = {}
- for rm in _in_flight_requests:
+ for rm in reqs:
key = (rm.method, rm.name,)
counts[key] = counts.get(key, 0) + 1
@@ -131,7 +135,7 @@ def _get_in_flight_counts():
LaterGauge(
- "synapse_http_request_metrics_in_flight_requests_count",
+ "synapse_http_server_in_flight_requests_count",
"",
["method", "servlet"],
_get_in_flight_counts,
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 60299657b9..74a752d6cf 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -14,18 +14,16 @@
import contextlib
import logging
-import re
import time
from twisted.web.server import Site, Request
+from synapse.http import redact_uri
from synapse.http.request_metrics import RequestMetrics
from synapse.util.logcontext import LoggingContext
logger = logging.getLogger(__name__)
-ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
-
_next_request_seq = 0
@@ -69,10 +67,7 @@ class SynapseRequest(Request):
return "%s-%i" % (self.method, self.request_seq)
def get_redacted_uri(self):
- return ACCESS_TOKEN_RE.sub(
- br'\1<redacted>\3',
- self.uri
- )
+ return redact_uri(self.uri)
def get_user_agent(self):
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
@@ -104,16 +99,18 @@ class SynapseRequest(Request):
db_txn_count = context.db_txn_count
db_txn_duration_sec = context.db_txn_duration_sec
db_sched_duration_sec = context.db_sched_duration_sec
+ evt_db_fetch_count = context.evt_db_fetch_count
except Exception:
ru_utime, ru_stime = (0, 0)
db_txn_count, db_txn_duration_sec = (0, 0)
+ evt_db_fetch_count = 0
end_time = time.time()
self.site.access_logger.info(
"%s - %s - {%s}"
" Processed request: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
- " %sB %s \"%s %s %s\" \"%s\"",
+ " %sB %s \"%s %s %s\" \"%s\" [%d dbevts]",
self.getClientIP(),
self.site.site_tag,
self.authenticated_entity,
@@ -129,6 +126,7 @@ class SynapseRequest(Request):
self.get_redacted_uri(),
self.clientproto,
self.get_user_agent(),
+ evt_db_fetch_count,
)
try:
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 429e79c472..7d6e0232ed 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -62,7 +62,7 @@ class LaterGauge(object):
calls = self.caller()
except Exception:
logger.exception(
- "Exception running callback for LaterGuage(%s)",
+ "Exception running callback for LaterGauge(%s)",
self.name,
)
yield g
@@ -140,7 +140,7 @@ gc_time = Histogram(
class GCCounts(object):
def collect(self):
- cm = GaugeMetricFamily("python_gc_counts", "GC cycle counts", labels=["gen"])
+ cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
for n, m in enumerate(gc.get_count()):
cm.add_metric([str(n)], m)
@@ -190,6 +190,22 @@ event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"
# finished being processed.
event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
+last_ticked = time.time()
+
+
+class ReactorLastSeenMetric(object):
+
+ def collect(self):
+ cm = GaugeMetricFamily(
+ "python_twisted_reactor_last_seen",
+ "Seconds since the Twisted reactor was last seen",
+ )
+ cm.add_metric([], time.time() - last_ticked)
+ yield cm
+
+
+REGISTRY.register(ReactorLastSeenMetric())
+
def runUntilCurrentTimer(func):
@@ -222,6 +238,11 @@ def runUntilCurrentTimer(func):
tick_time.observe(end - start)
pending_calls_metric.observe(num_pending)
+ # Update the time we last ticked, for the metric to test whether
+ # Synapse's reactor has frozen
+ global last_ticked
+ last_ticked = end
+
if running_on_pypy:
return ret
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 6dce20a284..3c0622a294 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -161,6 +161,7 @@ class Notifier(object):
self.user_to_user_stream = {}
self.room_to_user_streams = {}
+ self.hs = hs
self.event_sources = hs.get_event_sources()
self.store = hs.get_datastore()
self.pending_new_room_events = []
@@ -340,6 +341,7 @@ class Notifier(object):
add_timeout_to_deferred(
listener.deferred,
(end_time - now) / 1000.,
+ self.hs.get_reactor(),
)
with PreserveLoggingContext():
yield listener.deferred
@@ -561,6 +563,7 @@ class Notifier(object):
add_timeout_to_deferred(
listener.deferred.addTimeout,
(end_time - now) / 1000.,
+ self.hs.get_reactor(),
)
try:
with PreserveLoggingContext():
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 750d11ca38..36bb5bbc65 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -19,7 +19,6 @@ import logging
from twisted.internet import defer
from synapse.push.pusher import PusherFactory
-from synapse.util.async import run_on_reactor
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
@@ -125,7 +124,6 @@ class PusherPool:
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_id, max_stream_id):
- yield run_on_reactor()
try:
users_affected = yield self.store.get_push_action_users_in_range(
min_stream_id, max_stream_id
@@ -151,7 +149,6 @@ class PusherPool:
@defer.inlineCallbacks
def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
- yield run_on_reactor()
try:
# Need to subtract 1 from the minimum because the lower bound here
# is not inclusive
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 478c497722..faf6dfdb8d 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -50,14 +50,16 @@ REQUIREMENTS = {
"bcrypt": ["bcrypt>=3.1.0"],
"pillow": ["PIL"],
"pydenticon": ["pydenticon"],
- "blist": ["blist"],
+ "sortedcontainers": ["sortedcontainers"],
"pysaml2>=3.0.0": ["saml2>=3.0.0"],
"pymacaroons-pynacl": ["pymacaroons"],
"msgpack-python>=0.3.0": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"],
"six": ["six"],
"prometheus_client": ["prometheus_client"],
+ "attr": ["attr"],
}
+
CONDITIONAL_REQUIREMENTS = {
"web_client": {
"matrix_angular_sdk>=0.6.8": ["syweb>=0.6.8"],
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index a9baa2c1c3..f080f96cc1 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -21,7 +21,6 @@ from synapse.api.errors import (
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.http.servlet import RestServlet, parse_json_object_from_request
-from synapse.util.async import sleep
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.metrics import Measure
from synapse.types import Requester, UserID
@@ -33,11 +32,12 @@ logger = logging.getLogger(__name__)
@defer.inlineCallbacks
-def send_event_to_master(client, host, port, requester, event, context,
+def send_event_to_master(clock, client, host, port, requester, event, context,
ratelimit, extra_users):
"""Send event to be handled on the master
Args:
+ clock (synapse.util.Clock)
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
@@ -77,7 +77,7 @@ def send_event_to_master(client, host, port, requester, event, context,
# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
- yield sleep(1)
+ yield clock.sleep(1)
except MatrixCodeMessageException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index c870475cd1..171a698e14 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -564,11 +564,13 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
# The following simply registers metrics for the replication connections
pending_commands = LaterGauge(
- "pending_commands", "", ["name", "conn_id"],
+ "synapse_replication_tcp_protocol_pending_commands",
+ "",
+ ["name", "conn_id"],
lambda: {
- (p.name, p.conn_id): len(p.pending_commands)
- for p in connected_connections
- })
+ (p.name, p.conn_id): len(p.pending_commands) for p in connected_connections
+ },
+)
def transport_buffer_size(protocol):
@@ -579,11 +581,13 @@ def transport_buffer_size(protocol):
transport_send_buffer = LaterGauge(
- "synapse_replication_tcp_transport_send_buffer", "", ["name", "conn_id"],
+ "synapse_replication_tcp_protocol_transport_send_buffer",
+ "",
+ ["name", "conn_id"],
lambda: {
- (p.name, p.conn_id): transport_buffer_size(p)
- for p in connected_connections
- })
+ (p.name, p.conn_id): transport_buffer_size(p) for p in connected_connections
+ },
+)
def transport_kernel_read_buffer_size(protocol, read=True):
@@ -602,37 +606,50 @@ def transport_kernel_read_buffer_size(protocol, read=True):
tcp_transport_kernel_send_buffer = LaterGauge(
- "synapse_replication_tcp_transport_kernel_send_buffer", "", ["name", "conn_id"],
+ "synapse_replication_tcp_protocol_transport_kernel_send_buffer",
+ "",
+ ["name", "conn_id"],
lambda: {
(p.name, p.conn_id): transport_kernel_read_buffer_size(p, False)
for p in connected_connections
- })
+ },
+)
tcp_transport_kernel_read_buffer = LaterGauge(
- "synapse_replication_tcp_transport_kernel_read_buffer", "", ["name", "conn_id"],
+ "synapse_replication_tcp_protocol_transport_kernel_read_buffer",
+ "",
+ ["name", "conn_id"],
lambda: {
(p.name, p.conn_id): transport_kernel_read_buffer_size(p, True)
for p in connected_connections
- })
+ },
+)
tcp_inbound_commands = LaterGauge(
- "synapse_replication_tcp_inbound_commands", "", ["command", "name", "conn_id"],
+ "synapse_replication_tcp_protocol_inbound_commands",
+ "",
+ ["command", "name", "conn_id"],
lambda: {
(k[0], p.name, p.conn_id): count
for p in connected_connections
for k, count in iteritems(p.inbound_commands_counter)
- })
+ },
+)
tcp_outbound_commands = LaterGauge(
- "synapse_replication_tcp_outbound_commands", "", ["command", "name", "conn_id"],
+ "synapse_replication_tcp_protocol_outbound_commands",
+ "",
+ ["command", "name", "conn_id"],
lambda: {
(k[0], p.name, p.conn_id): count
for p in connected_connections
for k, count in iteritems(p.outbound_commands_counter)
- })
+ },
+)
# number of updates received for each RDATA stream
-inbound_rdata_count = Counter("synapse_replication_tcp_inbound_rdata_count", "",
- ["stream_name"])
+inbound_rdata_count = Counter(
+ "synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"]
+)
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 6835a7bba2..b8665a45eb 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -169,16 +169,12 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
yield self.store.find_first_stream_ordering_after_ts(ts)
)
- room_event_after_stream_ordering = (
+ r = (
yield self.store.get_room_event_after_stream_ordering(
room_id, stream_ordering,
)
)
- if room_event_after_stream_ordering:
- token = yield self.store.get_topological_token_for_event(
- room_event_after_stream_ordering,
- )
- else:
+ if not r:
logger.warn(
"[purge] purging events not possible: No event found "
"(received_ts %i => stream_ordering %i)",
@@ -189,8 +185,10 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
"there is no event to be purged",
errcode=Codes.NOT_FOUND,
)
+ (stream, topo, _event_id) = r
+ token = "t%d-%d" % (topo, stream)
logger.info(
- "[purge] purging up to token %d (received_ts %i => "
+ "[purge] purging up to token %s (received_ts %i => "
"stream_ordering %i)",
token, ts, stream_ordering,
)
diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py
index 9b3022e0b0..c10320dedf 100644
--- a/synapse/rest/client/v1/register.py
+++ b/synapse/rest/client/v1/register.py
@@ -24,8 +24,6 @@ import synapse.util.stringutils as stringutils
from synapse.http.servlet import parse_json_object_from_request
from synapse.types import create_requester
-from synapse.util.async import run_on_reactor
-
from hashlib import sha1
import hmac
import logging
@@ -272,7 +270,6 @@ class RegisterRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def _do_password(self, request, register_json, session):
- yield run_on_reactor()
if (self.hs.config.enable_registration_captcha and
not session[LoginType.RECAPTCHA]):
# captcha should've been done by this stage!
@@ -333,8 +330,6 @@ class RegisterRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def _do_shared_secret(self, request, register_json, session):
- yield run_on_reactor()
-
if not isinstance(register_json.get("mac", None), string_types):
raise SynapseError(400, "Expected mac.")
if not isinstance(register_json.get("user", None), string_types):
@@ -423,8 +418,6 @@ class CreateUserRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def _do_create(self, requester, user_json):
- yield run_on_reactor()
-
if "localpart" not in user_json:
raise SynapseError(400, "Expected 'localpart' key.")
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index 30523995af..e1281cfbb6 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -24,7 +24,6 @@ from synapse.http.servlet import (
RestServlet, assert_params_in_request,
parse_json_object_from_request,
)
-from synapse.util.async import run_on_reactor
from synapse.util.msisdn import phone_number_to_msisdn
from synapse.util.threepids import check_3pid_allowed
from ._base import client_v2_patterns, interactive_auth_handler
@@ -300,8 +299,6 @@ class ThreepidRestServlet(RestServlet):
@defer.inlineCallbacks
def on_GET(self, request):
- yield run_on_reactor()
-
requester = yield self.auth.get_user_by_req(request)
threepids = yield self.datastore.user_get_threepids(
@@ -312,8 +309,6 @@ class ThreepidRestServlet(RestServlet):
@defer.inlineCallbacks
def on_POST(self, request):
- yield run_on_reactor()
-
body = parse_json_object_from_request(request)
threePidCreds = body.get('threePidCreds')
@@ -365,8 +360,6 @@ class ThreepidDeleteRestServlet(RestServlet):
@defer.inlineCallbacks
def on_POST(self, request):
- yield run_on_reactor()
-
body = parse_json_object_from_request(request)
required = ['medium', 'address']
@@ -381,9 +374,16 @@ class ThreepidDeleteRestServlet(RestServlet):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
- yield self.auth_handler.delete_threepid(
- user_id, body['medium'], body['address']
- )
+ try:
+ yield self.auth_handler.delete_threepid(
+ user_id, body['medium'], body['address']
+ )
+ except Exception:
+ # NB. This endpoint should succeed if there is nothing to
+ # delete, so it should only throw if something is wrong
+ # that we ought to care about.
+ logger.exception("Failed to remove threepid")
+ raise SynapseError(500, "Failed to remove threepid")
defer.returnValue((200, {}))
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 5cab00aea9..97e7c0f7c6 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -32,7 +32,6 @@ from ._base import client_v2_patterns, interactive_auth_handler
import logging
import hmac
from hashlib import sha1
-from synapse.util.async import run_on_reactor
from synapse.util.ratelimitutils import FederationRateLimiter
from six import string_types
@@ -191,8 +190,6 @@ class RegisterRestServlet(RestServlet):
@interactive_auth_handler
@defer.inlineCallbacks
def on_POST(self, request):
- yield run_on_reactor()
-
body = parse_json_object_from_request(request)
kind = "user"
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 2ac767d2dc..218ba7a083 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -58,6 +58,7 @@ UPDATE_RECENTLY_ACCESSED_TS = 60 * 1000
class MediaRepository(object):
def __init__(self, hs):
+ self.hs = hs
self.auth = hs.get_auth()
self.client = MatrixFederationHttpClient(hs)
self.clock = hs.get_clock()
@@ -94,7 +95,7 @@ class MediaRepository(object):
storage_providers.append(provider)
self.media_storage = MediaStorage(
- self.primary_base_path, self.filepaths, storage_providers,
+ self.hs, self.primary_base_path, self.filepaths, storage_providers,
)
self.clock.looping_call(
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index d23fe10b07..d6b8ebbedb 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -37,13 +37,15 @@ class MediaStorage(object):
"""Responsible for storing/fetching files from local sources.
Args:
+ hs (synapse.server.Homeserver)
local_media_directory (str): Base path where we store media on disk
filepaths (MediaFilePaths)
storage_providers ([StorageProvider]): List of StorageProvider that are
used to fetch and store files.
"""
- def __init__(self, local_media_directory, filepaths, storage_providers):
+ def __init__(self, hs, local_media_directory, filepaths, storage_providers):
+ self.hs = hs
self.local_media_directory = local_media_directory
self.filepaths = filepaths
self.storage_providers = storage_providers
@@ -175,7 +177,8 @@ class MediaStorage(object):
res = yield provider.fetch(path, file_info)
if res:
with res:
- consumer = BackgroundFileConsumer(open(local_path, "w"))
+ consumer = BackgroundFileConsumer(
+ open(local_path, "w"), self.hs.get_reactor())
yield res.write_to_consumer(consumer)
yield consumer.wait()
defer.returnValue(local_path)
diff --git a/synapse/server.py b/synapse/server.py
index 58dbf78437..c29c19289a 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -40,7 +40,7 @@ from synapse.federation.transport.client import TransportLayerClient
from synapse.federation.transaction_queue import TransactionQueue
from synapse.handlers import Handlers
from synapse.handlers.appservice import ApplicationServicesHandler
-from synapse.handlers.auth import AuthHandler, MacaroonGeneartor
+from synapse.handlers.auth import AuthHandler, MacaroonGenerator
from synapse.handlers.deactivate_account import DeactivateAccountHandler
from synapse.handlers.devicemessage import DeviceMessageHandler
from synapse.handlers.device import DeviceHandler
@@ -165,15 +165,19 @@ class HomeServer(object):
'server_notices_sender',
]
- def __init__(self, hostname, **kwargs):
+ def __init__(self, hostname, reactor=None, **kwargs):
"""
Args:
hostname : The hostname for the server.
"""
+ if not reactor:
+ from twisted.internet import reactor
+
+ self._reactor = reactor
self.hostname = hostname
self._building = {}
- self.clock = Clock()
+ self.clock = Clock(reactor)
self.distributor = Distributor()
self.ratelimiter = Ratelimiter()
@@ -186,6 +190,12 @@ class HomeServer(object):
self.datastore = DataStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
+ def get_reactor(self):
+ """
+ Fetch the Twisted reactor in use by this HomeServer.
+ """
+ return self._reactor
+
def get_ip_from_request(self, request):
# X-Forwarded-For is handled by our custom request type.
return request.getClientIP()
@@ -261,7 +271,7 @@ class HomeServer(object):
return AuthHandler(self)
def build_macaroon_generator(self):
- return MacaroonGeneartor(self)
+ return MacaroonGenerator(self)
def build_device_handler(self):
return DeviceHandler(self)
@@ -328,6 +338,7 @@ class HomeServer(object):
return adbapi.ConnectionPool(
name,
+ cp_reactor=self.get_reactor(),
**self.db_config.get("args", {})
)
diff --git a/synapse/state.py b/synapse/state.py
index 216418f58d..8098db94b4 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -694,10 +694,10 @@ def _create_auth_events_from_maps(unconflicted_state, conflicted_state, state_ma
return auth_events
-def _resolve_with_state(unconflicted_state_ids, conflicted_state_ds, auth_event_ids,
+def _resolve_with_state(unconflicted_state_ids, conflicted_state_ids, auth_event_ids,
state_map):
conflicted_state = {}
- for key, event_ids in iteritems(conflicted_state_ds):
+ for key, event_ids in iteritems(conflicted_state_ids):
events = [state_map[ev_id] for ev_id in event_ids if ev_id in state_map]
if len(events) > 1:
conflicted_state[key] = events
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 8af325a9f5..b7e9c716c8 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import synapse.util.async
from ._base import SQLBaseStore
from . import engines
@@ -92,7 +91,7 @@ class BackgroundUpdateStore(SQLBaseStore):
logger.info("Starting background schema updates")
while True:
- yield synapse.util.async.sleep(
+ yield self.hs.get_clock().sleep(
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)
try:
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index ce338514e8..968d2fed22 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -15,7 +15,7 @@
import logging
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from ._base import Cache
from . import background_updates
@@ -70,7 +70,9 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
self._client_ip_looper = self._clock.looping_call(
self._update_client_ips_batch, 5 * 1000
)
- reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch)
+ self.hs.get_reactor().addSystemEventTrigger(
+ "before", "shutdown", self._update_client_ips_batch
+ )
def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id,
now=None):
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index d0350ee5fe..c4a0208ce4 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -16,7 +16,6 @@
from synapse.storage._base import SQLBaseStore, LoggingTransaction
from twisted.internet import defer
-from synapse.util.async import sleep
from synapse.util.caches.descriptors import cachedInlineCallbacks
import logging
@@ -800,7 +799,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
)
if caught_up:
break
- yield sleep(5)
+ yield self.hs.get_clock().sleep(5)
finally:
self._doing_notif_rotation = False
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 32d9d00ffb..f6a6e46b43 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -14,13 +14,14 @@
# limitations under the License.
from ._base import SQLBaseStore
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from synapse.events import FrozenEvent
from synapse.events.utils import prune_event
from synapse.util.logcontext import (
PreserveLoggingContext, make_deferred_yieldable, run_in_background,
+ LoggingContext,
)
from synapse.util.metrics import Measure
from synapse.api.errors import SynapseError
@@ -145,6 +146,9 @@ class EventsWorkerStore(SQLBaseStore):
missing_events_ids = [e for e in event_ids if e not in event_entry_map]
if missing_events_ids:
+ log_ctx = LoggingContext.current_context()
+ log_ctx.record_event_fetch(len(missing_events_ids))
+
missing_events = yield self._enqueue_events(
missing_events_ids,
check_redacted=check_redacted,
@@ -265,7 +269,7 @@ class EventsWorkerStore(SQLBaseStore):
except Exception:
logger.exception("Failed to callback")
with PreserveLoggingContext():
- reactor.callFromThread(fire, event_list, row_dict)
+ self.hs.get_reactor().callFromThread(fire, event_list, row_dict)
except Exception as e:
logger.exception("do_fetch")
@@ -278,7 +282,7 @@ class EventsWorkerStore(SQLBaseStore):
if event_list:
with PreserveLoggingContext():
- reactor.callFromThread(fire, event_list)
+ self.hs.get_reactor().callFromThread(fire, event_list)
@defer.inlineCallbacks
def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index c241167fbe..9c9cf46e7f 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -460,15 +460,6 @@ class RegistrationStore(RegistrationWorkerStore,
defer.returnValue(ret['user_id'])
defer.returnValue(None)
- def user_delete_threepids(self, user_id):
- return self._simple_delete(
- "user_threepids",
- keyvalues={
- "user_id": user_id,
- },
- desc="user_delete_threepids",
- )
-
def user_delete_threepid(self, user_id, medium, address):
return self._simple_delete(
"user_threepids",
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 7bfc3d91b5..48a88f755e 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -578,7 +578,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
)
txn.execute(sql, (user_id, room_id))
- txn.call_after(self.was_forgotten_at.invalidate_all)
txn.call_after(self.did_forget.invalidate, (user_id, room_id))
self._invalidate_cache_and_stream(
txn, self.who_forgot_in_room, (room_id,)
@@ -609,31 +608,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
count = yield self.runInteraction("did_forget_membership", f)
defer.returnValue(count == 0)
- @cachedInlineCallbacks(num_args=3)
- def was_forgotten_at(self, user_id, room_id, event_id):
- """Returns whether user_id has elected to discard history for room_id at
- event_id.
-
- event_id must be a membership event."""
- def f(txn):
- sql = (
- "SELECT"
- " forgotten"
- " FROM"
- " room_memberships"
- " WHERE"
- " user_id = ?"
- " AND"
- " room_id = ?"
- " AND"
- " event_id = ?"
- )
- txn.execute(sql, (user_id, room_id, event_id))
- rows = txn.fetchall()
- return rows[0][0]
- forgot = yield self.runInteraction("did_forget_membership_at", f)
- defer.returnValue(forgot == 1)
-
@defer.inlineCallbacks
def _background_add_membership_profile(self, progress, batch_size):
target_min_stream_id = progress.get(
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 85c8fffc19..986a20400c 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -23,7 +23,7 @@ from twisted.internet import defer
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.engines import PostgresEngine
-from synapse.util.caches import intern_string, CACHE_SIZE_FACTOR
+from synapse.util.caches import intern_string, get_cache_factor_for
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.stringutils import to_ascii
@@ -57,7 +57,7 @@ class StateGroupWorkerStore(SQLBaseStore):
super(StateGroupWorkerStore, self).__init__(db_conn, hs)
self._state_group_cache = DictionaryCache(
- "*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR
+ "*stateGroupCache*", 500000 * get_cache_factor_for("stateGroupCache")
)
@cached(max_entries=100000, iterable=True)
@@ -272,7 +272,7 @@ class StateGroupWorkerStore(SQLBaseStore):
for typ in types:
if typ[1] is None:
where_clauses.append("(type = ?)")
- where_args.extend(typ[0])
+ where_args.append(typ[0])
wildcard_types = True
else:
where_clauses.append("(type = ? AND state_key = ?)")
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index fc11e26623..2a3df7c71d 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -13,15 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.util.logcontext import PreserveLoggingContext
-
-from twisted.internet import defer, reactor, task
-
-import time
import logging
-
from itertools import islice
+import attr
+from twisted.internet import defer, task
+
+from synapse.util.logcontext import PreserveLoggingContext
+
logger = logging.getLogger(__name__)
@@ -31,16 +30,24 @@ def unwrapFirstError(failure):
return failure.value.subFailure
+@attr.s
class Clock(object):
- """A small utility that obtains current time-of-day so that time may be
- mocked during unit-tests.
-
- TODO(paul): Also move the sleep() functionality into it
"""
+ A Clock wraps a Twisted reactor and provides utilities on top of it.
+ """
+ _reactor = attr.ib()
+
+ @defer.inlineCallbacks
+ def sleep(self, seconds):
+ d = defer.Deferred()
+ with PreserveLoggingContext():
+ self._reactor.callLater(seconds, d.callback, seconds)
+ res = yield d
+ defer.returnValue(res)
def time(self):
"""Returns the current system time in seconds since epoch."""
- return time.time()
+ return self._reactor.seconds()
def time_msec(self):
"""Returns the current system time in miliseconds since epoch."""
@@ -56,6 +63,7 @@ class Clock(object):
msec(float): How long to wait between calls in milliseconds.
"""
call = task.LoopingCall(f)
+ call.clock = self._reactor
call.start(msec / 1000.0, now=False)
return call
@@ -73,7 +81,7 @@ class Clock(object):
callback(*args, **kwargs)
with PreserveLoggingContext():
- return reactor.callLater(delay, wrapped_callback, *args, **kwargs)
+ return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs)
def cancel_call_later(self, timer, ignore_errs=False):
try:
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 9dd4e6b5bc..1668df4ce6 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -13,15 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from twisted.internet.defer import CancelledError
from twisted.python import failure
from .logcontext import (
PreserveLoggingContext, make_deferred_yieldable, run_in_background
)
-from synapse.util import logcontext, unwrapFirstError
+from synapse.util import logcontext, unwrapFirstError, Clock
from contextlib import contextmanager
@@ -32,22 +31,6 @@ from six.moves import range
logger = logging.getLogger(__name__)
-@defer.inlineCallbacks
-def sleep(seconds):
- d = defer.Deferred()
- with PreserveLoggingContext():
- reactor.callLater(seconds, d.callback, seconds)
- res = yield d
- defer.returnValue(res)
-
-
-def run_on_reactor():
- """ This will cause the rest of the function to be invoked upon the next
- iteration of the main loop
- """
- return sleep(0)
-
-
class ObservableDeferred(object):
"""Wraps a deferred object so that we can add observer deferreds. These
observer deferreds do not affect the callback chain of the original
@@ -180,13 +163,18 @@ class Linearizer(object):
# do some work.
"""
- def __init__(self, name=None):
+ def __init__(self, name=None, clock=None):
if name is None:
self.name = id(self)
else:
self.name = name
self.key_to_defer = {}
+ if not clock:
+ from twisted.internet import reactor
+ clock = Clock(reactor)
+ self._clock = clock
+
@defer.inlineCallbacks
def queue(self, key):
# If there is already a deferred in the queue, we pull it out so that
@@ -227,7 +215,7 @@ class Linearizer(object):
# the context manager, but it needs to happen while we hold the
# lock, and the context manager's exit code must be synchronous,
# so actually this is the only sensible place.
- yield run_on_reactor()
+ yield self._clock.sleep(0)
else:
logger.info("Acquired uncontended linearizer lock %r for key %r",
@@ -404,7 +392,7 @@ class DeferredTimeoutError(Exception):
"""
-def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None):
+def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
"""
Add a timeout to a deferred by scheduling it to be cancelled after
timeout seconds.
@@ -419,6 +407,7 @@ def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None):
Args:
deferred (defer.Deferred): deferred to be timed out
timeout (Number): seconds to time out after
+ reactor (twisted.internet.reactor): the Twisted reactor to use
on_timeout_cancel (callable): A callable which is called immediately
after the deferred times out, and not if this deferred is
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index 183faf75a1..900575eb3c 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -22,6 +22,16 @@ import six
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
+
+def get_cache_factor_for(cache_name):
+ env_var = "SYNAPSE_CACHE_FACTOR_" + cache_name.upper()
+ factor = os.environ.get(env_var)
+ if factor:
+ return float(factor)
+
+ return CACHE_SIZE_FACTOR
+
+
caches_by_name = {}
collectors_by_name = {}
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index fc1874b65b..65a1042de1 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -17,7 +17,7 @@ import logging
from synapse.util.async import ObservableDeferred
from synapse.util import unwrapFirstError, logcontext
-from synapse.util.caches import CACHE_SIZE_FACTOR
+from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
from synapse.util.stringutils import to_ascii
@@ -313,7 +313,7 @@ class CacheDescriptor(_CacheDescriptorBase):
orig, num_args=num_args, inlineCallbacks=inlineCallbacks,
cache_context=cache_context)
- max_entries = int(max_entries * CACHE_SIZE_FACTOR)
+ max_entries = int(max_entries * get_cache_factor_for(orig.__name__))
self.max_entries = max_entries
self.tree = tree
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index a7fe0397fa..817118e30f 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -13,10 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR
+from synapse.util import caches
-from blist import sorteddict
+from sortedcontainers import SortedDict
import logging
@@ -32,16 +32,18 @@ class StreamChangeCache(object):
entities that may have changed since that position. If position key is too
old then the cache will simply return all given entities.
"""
- def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
- self._max_size = int(max_size * CACHE_SIZE_FACTOR)
+
+ def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache=None):
+ self._max_size = int(max_size * caches.CACHE_SIZE_FACTOR)
self._entity_to_key = {}
- self._cache = sorteddict()
+ self._cache = SortedDict()
self._earliest_known_stream_pos = current_stream_pos
self.name = name
- self.metrics = register_cache("cache", self.name, self._cache)
+ self.metrics = caches.register_cache("cache", self.name, self._cache)
- for entity, stream_pos in prefilled_cache.items():
- self.entity_has_changed(entity, stream_pos)
+ if prefilled_cache:
+ for entity, stream_pos in prefilled_cache.items():
+ self.entity_has_changed(entity, stream_pos)
def has_entity_changed(self, entity, stream_pos):
"""Returns True if the entity may have been updated since stream_pos
@@ -65,22 +67,25 @@ class StreamChangeCache(object):
return False
def get_entities_changed(self, entities, stream_pos):
- """Returns subset of entities that have had new things since the
- given position. If the position is too old it will just return the given list.
+ """
+ Returns subset of entities that have had new things since the given
+ position. Entities unknown to the cache will be returned. If the
+ position is too old it will just return the given list.
"""
assert type(stream_pos) is int
if stream_pos >= self._earliest_known_stream_pos:
- keys = self._cache.keys()
- i = keys.bisect_right(stream_pos)
+ not_known_entities = set(entities) - set(self._entity_to_key)
- result = set(
- self._cache[k] for k in keys[i:]
- ).intersection(entities)
+ result = (
+ set(self._cache.values()[self._cache.bisect_right(stream_pos) :])
+ .intersection(entities)
+ .union(not_known_entities)
+ )
self.metrics.inc_hits()
else:
- result = entities
+ result = set(entities)
self.metrics.inc_misses()
return result
@@ -90,12 +95,13 @@ class StreamChangeCache(object):
"""
assert type(stream_pos) is int
+ if not self._cache:
+ # If we have no cache, nothing can have changed.
+ return False
+
if stream_pos >= self._earliest_known_stream_pos:
self.metrics.inc_hits()
- keys = self._cache.keys()
- i = keys.bisect_right(stream_pos)
-
- return i < len(keys)
+ return self._cache.bisect_right(stream_pos) < len(self._cache)
else:
self.metrics.inc_misses()
return True
@@ -107,10 +113,7 @@ class StreamChangeCache(object):
assert type(stream_pos) is int
if stream_pos >= self._earliest_known_stream_pos:
- keys = self._cache.keys()
- i = keys.bisect_right(stream_pos)
-
- return [self._cache[k] for k in keys[i:]]
+ return self._cache.values()[self._cache.bisect_right(stream_pos) :]
else:
return None
@@ -129,8 +132,10 @@ class StreamChangeCache(object):
self._entity_to_key[entity] = stream_pos
while len(self._cache) > self._max_size:
- k, r = self._cache.popitem()
- self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos)
+ k, r = self._cache.popitem(0)
+ self._earliest_known_stream_pos = max(
+ k, self._earliest_known_stream_pos,
+ )
self._entity_to_key.pop(r, None)
def get_max_pos_of_last_change(self, entity):
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py
index 3380970e4e..c78801015b 100644
--- a/synapse/util/file_consumer.py
+++ b/synapse/util/file_consumer.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import threads, reactor
+from twisted.internet import threads
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
@@ -27,6 +27,7 @@ class BackgroundFileConsumer(object):
Args:
file_obj (file): The file like object to write to. Closed when
finished.
+ reactor (twisted.internet.reactor): the Twisted reactor to use
"""
# For PushProducers pause if we have this many unwritten slices
@@ -34,9 +35,11 @@ class BackgroundFileConsumer(object):
# And resume once the size of the queue is less than this
_RESUME_ON_QUEUE_SIZE = 2
- def __init__(self, file_obj):
+ def __init__(self, file_obj, reactor):
self._file_obj = file_obj
+ self._reactor = reactor
+
# Producer we're registered with
self._producer = None
@@ -71,7 +74,10 @@ class BackgroundFileConsumer(object):
self._producer = producer
self.streaming = streaming
self._finished_deferred = run_in_background(
- threads.deferToThread, self._writer
+ threads.deferToThreadPool,
+ self._reactor,
+ self._reactor.getThreadPool(),
+ self._writer,
)
if not streaming:
self._producer.resumeProducing()
@@ -109,7 +115,7 @@ class BackgroundFileConsumer(object):
# producer.
if self._producer and self._paused_producer:
if self._bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE:
- reactor.callFromThread(self._resume_paused_producer)
+ self._reactor.callFromThread(self._resume_paused_producer)
bytes = self._bytes_queue.get()
@@ -121,7 +127,7 @@ class BackgroundFileConsumer(object):
# If its a pull producer then we need to explicitly ask for
# more stuff.
if not self.streaming and self._producer:
- reactor.callFromThread(self._producer.resumeProducing)
+ self._reactor.callFromThread(self._producer.resumeProducing)
except Exception as e:
self._write_exception = e
raise
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index a58c723403..df2b71b791 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -60,6 +60,7 @@ class LoggingContext(object):
__slots__ = [
"previous_context", "name", "ru_stime", "ru_utime",
"db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec",
+ "evt_db_fetch_count",
"usage_start",
"main_thread", "alive",
"request", "tag",
@@ -90,6 +91,9 @@ class LoggingContext(object):
def add_database_scheduled(self, sched_sec):
pass
+ def record_event_fetch(self, event_count):
+ pass
+
def __nonzero__(self):
return False
__bool__ = __nonzero__ # python3
@@ -109,6 +113,9 @@ class LoggingContext(object):
# sec spent waiting for db txns to be scheduled
self.db_sched_duration_sec = 0
+ # number of events this thread has fetched from the db
+ self.evt_db_fetch_count = 0
+
# If alive has the thread resource usage when the logcontext last
# became active.
self.usage_start = None
@@ -243,6 +250,14 @@ class LoggingContext(object):
"""
self.db_sched_duration_sec += sched_sec
+ def record_event_fetch(self, event_count):
+ """Record a number of events being fetched from the db
+
+ Args:
+ event_count (int): number of events being fetched
+ """
+ self.evt_db_fetch_count += event_count
+
class LoggingContextFilter(logging.Filter):
"""Logging filter that adds values from the current logging context to each
diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py
index 0ab63c3d7d..c5a45cef7c 100644
--- a/synapse/util/ratelimitutils.py
+++ b/synapse/util/ratelimitutils.py
@@ -17,7 +17,6 @@ from twisted.internet import defer
from synapse.api.errors import LimitExceededError
-from synapse.util.async import sleep
from synapse.util.logcontext import (
run_in_background, make_deferred_yieldable,
PreserveLoggingContext,
@@ -153,7 +152,7 @@ class _PerHostRatelimiter(object):
"Ratelimit [%s]: sleeping req",
id(request_id),
)
- ret_defer = run_in_background(sleep, self.sleep_msec / 1000.0)
+ ret_defer = run_in_background(self.clock.sleep, self.sleep_msec / 1000.0)
self.sleeping_requests.add(request_id)
|