diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e3f0d99a3f..0b85b377e3 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -20,6 +20,7 @@ import sys
from six import iteritems
+import psutil
from prometheus_client import Gauge
from twisted.application import service
@@ -502,7 +503,6 @@ def run(hs):
def performance_stats_init():
try:
- import psutil
process = psutil.Process()
# Ensure we can fetch both, and make the initial request for cpu_percent
# so the next request will use this as the initial point.
@@ -510,12 +510,9 @@ def run(hs):
process.cpu_percent(interval=None)
logger.info("report_stats can use psutil")
stats_process.append(process)
- except (ImportError, AttributeError):
- logger.warn(
- "report_stats enabled but psutil is not installed or incorrect version."
- " Disabling reporting of memory/cpu stats."
- " Ensuring psutil is available will help matrix.org track performance"
- " changes across releases."
+ except (AttributeError):
+ logger.warning(
+ "Unable to read memory/cpu stats. Disabling reporting."
)
def generate_user_daily_visit_stats():
@@ -530,10 +527,13 @@ def run(hs):
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
# monthly active user limiting functionality
- clock.looping_call(
- hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60
- )
- hs.get_datastore().reap_monthly_active_users()
+ def reap_monthly_active_users():
+ return run_as_background_process(
+ "reap_monthly_active_users",
+ hs.get_datastore().reap_monthly_active_users,
+ )
+ clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
+ reap_monthly_active_users()
@defer.inlineCallbacks
def generate_monthly_active_users():
@@ -547,12 +547,23 @@ def run(hs):
registered_reserved_users_mau_gauge.set(float(reserved_count))
max_mau_gauge.set(float(hs.config.max_mau_value))
- hs.get_datastore().initialise_reserved_users(
- hs.config.mau_limits_reserved_threepids
+ def start_generate_monthly_active_users():
+ return run_as_background_process(
+ "generate_monthly_active_users",
+ generate_monthly_active_users,
+ )
+
+ # XXX is this really supposed to be a background process? it looks
+ # like it needs to complete before some of the other stuff runs.
+ run_as_background_process(
+ "initialise_reserved_users",
+ hs.get_datastore().initialise_reserved_users,
+ hs.config.mau_limits_reserved_threepids,
)
- generate_monthly_active_users()
+
+ start_generate_monthly_active_users()
if hs.config.limit_usage_by_mau:
- clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
+ clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
# End of monthly active user settings
if hs.config.report_stats:
@@ -568,7 +579,7 @@ def run(hs):
clock.call_later(5 * 60, start_phone_stats_home)
if hs.config.daemonize and hs.config.print_pidfile:
- print (hs.config.pid_file)
+ print(hs.config.pid_file)
_base.start_reactor(
"synapse-homeserver",
diff --git a/synapse/config/__main__.py b/synapse/config/__main__.py
index 8fccf573ee..79fe9c3dac 100644
--- a/synapse/config/__main__.py
+++ b/synapse/config/__main__.py
@@ -28,7 +28,7 @@ if __name__ == "__main__":
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
- print (getattr(config, key))
+ print(getattr(config, key))
sys.exit(0)
else:
sys.stderr.write("Unknown command %r\n" % (action,))
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 3d2e90dd5b..14dae65ea0 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -106,10 +106,7 @@ class Config(object):
@classmethod
def check_file(cls, file_path, config_name):
if file_path is None:
- raise ConfigError(
- "Missing config for %s."
- % (config_name,)
- )
+ raise ConfigError("Missing config for %s." % (config_name,))
try:
os.stat(file_path)
except OSError as e:
@@ -128,9 +125,7 @@ class Config(object):
if e.errno != errno.EEXIST:
raise
if not os.path.isdir(dir_path):
- raise ConfigError(
- "%s is not a directory" % (dir_path,)
- )
+ raise ConfigError("%s is not a directory" % (dir_path,))
return dir_path
@classmethod
@@ -156,21 +151,20 @@ class Config(object):
return results
def generate_config(
- self,
- config_dir_path,
- server_name,
- is_generating_file,
- report_stats=None,
+ self, config_dir_path, server_name, is_generating_file, report_stats=None
):
default_config = "# vim:ft=yaml\n"
- default_config += "\n\n".join(dedent(conf) for conf in self.invoke_all(
- "default_config",
- config_dir_path=config_dir_path,
- server_name=server_name,
- is_generating_file=is_generating_file,
- report_stats=report_stats,
- ))
+ default_config += "\n\n".join(
+ dedent(conf)
+ for conf in self.invoke_all(
+ "default_config",
+ config_dir_path=config_dir_path,
+ server_name=server_name,
+ is_generating_file=is_generating_file,
+ report_stats=report_stats,
+ )
+ )
config = yaml.load(default_config)
@@ -178,23 +172,22 @@ class Config(object):
@classmethod
def load_config(cls, description, argv):
- config_parser = argparse.ArgumentParser(
- description=description,
- )
+ config_parser = argparse.ArgumentParser(description=description)
config_parser.add_argument(
- "-c", "--config-path",
+ "-c",
+ "--config-path",
action="append",
metavar="CONFIG_FILE",
help="Specify config file. Can be given multiple times and"
- " may specify directories containing *.yaml files."
+ " may specify directories containing *.yaml files.",
)
config_parser.add_argument(
"--keys-directory",
metavar="DIRECTORY",
help="Where files such as certs and signing keys are stored when"
- " their location is given explicitly in the config."
- " Defaults to the directory containing the last config file",
+ " their location is given explicitly in the config."
+ " Defaults to the directory containing the last config file",
)
config_args = config_parser.parse_args(argv)
@@ -203,9 +196,7 @@ class Config(object):
obj = cls()
obj.read_config_files(
- config_files,
- keys_directory=config_args.keys_directory,
- generate_keys=False,
+ config_files, keys_directory=config_args.keys_directory, generate_keys=False
)
return obj
@@ -213,38 +204,38 @@ class Config(object):
def load_or_generate_config(cls, description, argv):
config_parser = argparse.ArgumentParser(add_help=False)
config_parser.add_argument(
- "-c", "--config-path",
+ "-c",
+ "--config-path",
action="append",
metavar="CONFIG_FILE",
help="Specify config file. Can be given multiple times and"
- " may specify directories containing *.yaml files."
+ " may specify directories containing *.yaml files.",
)
config_parser.add_argument(
"--generate-config",
action="store_true",
- help="Generate a config file for the server name"
+ help="Generate a config file for the server name",
)
config_parser.add_argument(
"--report-stats",
action="store",
help="Whether the generated config reports anonymized usage statistics",
- choices=["yes", "no"]
+ choices=["yes", "no"],
)
config_parser.add_argument(
"--generate-keys",
action="store_true",
- help="Generate any missing key files then exit"
+ help="Generate any missing key files then exit",
)
config_parser.add_argument(
"--keys-directory",
metavar="DIRECTORY",
help="Used with 'generate-*' options to specify where files such as"
- " certs and signing keys should be stored in, unless explicitly"
- " specified in the config."
+ " certs and signing keys should be stored in, unless explicitly"
+ " specified in the config.",
)
config_parser.add_argument(
- "-H", "--server-name",
- help="The server name to generate a config file for"
+ "-H", "--server-name", help="The server name to generate a config file for"
)
config_args, remaining_args = config_parser.parse_known_args(argv)
@@ -257,8 +248,8 @@ class Config(object):
if config_args.generate_config:
if config_args.report_stats is None:
config_parser.error(
- "Please specify either --report-stats=yes or --report-stats=no\n\n" +
- MISSING_REPORT_STATS_SPIEL
+ "Please specify either --report-stats=yes or --report-stats=no\n\n"
+ + MISSING_REPORT_STATS_SPIEL
)
if not config_files:
config_parser.error(
@@ -287,26 +278,32 @@ class Config(object):
config_dir_path=config_dir_path,
server_name=server_name,
report_stats=(config_args.report_stats == "yes"),
- is_generating_file=True
+ is_generating_file=True,
)
obj.invoke_all("generate_files", config)
config_file.write(config_str)
- print((
- "A config file has been generated in %r for server name"
- " %r with corresponding SSL keys and self-signed"
- " certificates. Please review this file and customise it"
- " to your needs."
- ) % (config_path, server_name))
+ print(
+ (
+ "A config file has been generated in %r for server name"
+ " %r with corresponding SSL keys and self-signed"
+ " certificates. Please review this file and customise it"
+ " to your needs."
+ )
+ % (config_path, server_name)
+ )
print(
"If this server name is incorrect, you will need to"
" regenerate the SSL certificates"
)
return
else:
- print((
- "Config file %r already exists. Generating any missing key"
- " files."
- ) % (config_path,))
+ print(
+ (
+ "Config file %r already exists. Generating any missing key"
+ " files."
+ )
+ % (config_path,)
+ )
generate_keys = True
parser = argparse.ArgumentParser(
@@ -338,8 +335,7 @@ class Config(object):
return obj
- def read_config_files(self, config_files, keys_directory=None,
- generate_keys=False):
+ def read_config_files(self, config_files, keys_directory=None, generate_keys=False):
if not keys_directory:
keys_directory = os.path.dirname(config_files[-1])
@@ -364,8 +360,9 @@ class Config(object):
if "report_stats" not in config:
raise ConfigError(
- MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS + "\n" +
- MISSING_REPORT_STATS_SPIEL
+ MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS
+ + "\n"
+ + MISSING_REPORT_STATS_SPIEL
)
if generate_keys:
@@ -399,16 +396,16 @@ def find_config_files(search_paths):
for entry in os.listdir(config_path):
entry_path = os.path.join(config_path, entry)
if not os.path.isfile(entry_path):
- print (
- "Found subdirectory in config directory: %r. IGNORING."
- ) % (entry_path, )
+ err = "Found subdirectory in config directory: %r. IGNORING."
+ print(err % (entry_path,))
continue
if not entry.endswith(".yaml"):
- print (
- "Found file in config directory that does not"
- " end in '.yaml': %r. IGNORING."
- ) % (entry_path, )
+ err = (
+ "Found file in config directory that does not end in "
+ "'.yaml': %r. IGNORING."
+ )
+ print(err % (entry_path,))
continue
files.append(entry_path)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 2a5eab124f..329e3c7d71 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -22,7 +22,7 @@ import bcrypt
import pymacaroons
from canonicaljson import json
-from twisted.internet import defer, threads
+from twisted.internet import defer
from twisted.web.client import PartialDownloadError
import synapse.util.stringutils as stringutils
@@ -37,8 +37,8 @@ from synapse.api.errors import (
)
from synapse.module_api import ModuleApi
from synapse.types import UserID
+from synapse.util import logcontext
from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.logcontext import make_deferred_yieldable
from ._base import BaseHandler
@@ -884,11 +884,7 @@ class AuthHandler(BaseHandler):
bcrypt.gensalt(self.bcrypt_rounds),
).decode('ascii')
- return make_deferred_yieldable(
- threads.deferToThreadPool(
- self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
- ),
- )
+ return logcontext.defer_to_thread(self.hs.get_reactor(), _do_hash)
def validate_hash(self, password, stored_hash):
"""Validates that self.hash(password) == stored_hash.
@@ -913,13 +909,7 @@ class AuthHandler(BaseHandler):
if not isinstance(stored_hash, bytes):
stored_hash = stored_hash.encode('ascii')
- return make_deferred_yieldable(
- threads.deferToThreadPool(
- self.hs.get_reactor(),
- self.hs.get_reactor().getThreadPool(),
- _do_validate_hash,
- ),
- )
+ return logcontext.defer_to_thread(self.hs.get_reactor(), _do_validate_hash)
else:
return defer.succeed(False)
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index b078df4a76..75fe50c42c 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -17,8 +17,8 @@ import logging
from twisted.internet import defer
from synapse.api.errors import SynapseError
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID, create_requester
-from synapse.util.logcontext import run_in_background
from ._base import BaseHandler
@@ -121,7 +121,7 @@ class DeactivateAccountHandler(BaseHandler):
None
"""
if not self._user_parter_running:
- run_in_background(self._user_parter_loop)
+ run_as_background_process("user_parter_loop", self._user_parter_loop)
@defer.inlineCallbacks
def _user_parter_loop(self):
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 18741c5fac..02f12f6645 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -80,42 +80,60 @@ class DirectoryHandler(BaseHandler):
)
@defer.inlineCallbacks
- def create_association(self, user_id, room_alias, room_id, servers=None):
- # association creation for human users
- # TODO(erikj): Do user auth.
+ def create_association(self, requester, room_alias, room_id, servers=None,
+ send_event=True):
+ """Attempt to create a new alias
- if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
- raise SynapseError(
- 403, "This user is not permitted to create this alias",
- )
+ Args:
+ requester (Requester)
+ room_alias (RoomAlias)
+ room_id (str)
+ servers (list[str]|None): List of servers that others servers
+ should try and join via
+ send_event (bool): Whether to send an updated m.room.aliases event
- can_create = yield self.can_modify_alias(
- room_alias,
- user_id=user_id
- )
- if not can_create:
- raise SynapseError(
- 400, "This alias is reserved by an application service.",
- errcode=Codes.EXCLUSIVE
- )
- yield self._create_association(room_alias, room_id, servers, creator=user_id)
+ Returns:
+ Deferred
+ """
- @defer.inlineCallbacks
- def create_appservice_association(self, service, room_alias, room_id,
- servers=None):
- if not service.is_interested_in_alias(room_alias.to_string()):
- raise SynapseError(
- 400, "This application service has not reserved"
- " this kind of alias.", errcode=Codes.EXCLUSIVE
+ user_id = requester.user.to_string()
+
+ service = requester.app_service
+ if service:
+ if not service.is_interested_in_alias(room_alias.to_string()):
+ raise SynapseError(
+ 400, "This application service has not reserved"
+ " this kind of alias.", errcode=Codes.EXCLUSIVE
+ )
+ else:
+ if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
+ raise AuthError(
+ 403, "This user is not permitted to create this alias",
+ )
+
+ can_create = yield self.can_modify_alias(
+ room_alias,
+ user_id=user_id
)
+ if not can_create:
+ raise AuthError(
+ 400, "This alias is reserved by an application service.",
+ errcode=Codes.EXCLUSIVE
+ )
- # association creation for app services
- yield self._create_association(room_alias, room_id, servers)
+ yield self._create_association(room_alias, room_id, servers, creator=user_id)
+ if send_event:
+ yield self.send_room_alias_update_event(
+ requester,
+ room_id
+ )
@defer.inlineCallbacks
- def delete_association(self, requester, user_id, room_alias):
+ def delete_association(self, requester, room_alias):
# association deletion for human users
+ user_id = requester.user.to_string()
+
try:
can_delete = yield self._user_can_delete_alias(room_alias, user_id)
except StoreError as e:
@@ -143,7 +161,6 @@ class DirectoryHandler(BaseHandler):
try:
yield self.send_room_alias_update_event(
requester,
- requester.user.to_string(),
room_id
)
@@ -261,7 +278,7 @@ class DirectoryHandler(BaseHandler):
)
@defer.inlineCallbacks
- def send_room_alias_update_event(self, requester, user_id, room_id):
+ def send_room_alias_update_event(self, requester, room_id):
aliases = yield self.store.get_aliases_for_room(room_id)
yield self.event_creation_handler.create_and_send_nonmember_event(
@@ -270,7 +287,7 @@ class DirectoryHandler(BaseHandler):
"type": EventTypes.Aliases,
"state_key": self.hs.hostname,
"room_id": room_id,
- "sender": user_id,
+ "sender": requester.user.to_string(),
"content": {"aliases": aliases},
},
ratelimit=False
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 53e5e2648b..173315af6c 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -20,7 +20,7 @@ from six import iteritems
from twisted.internet import defer
-from synapse.api.errors import SynapseError
+from synapse.api.errors import HttpResponseException, SynapseError
from synapse.types import get_domain_from_id
logger = logging.getLogger(__name__)
@@ -37,9 +37,23 @@ def _create_rerouter(func_name):
)
else:
destination = get_domain_from_id(group_id)
- return getattr(self.transport_client, func_name)(
+ d = getattr(self.transport_client, func_name)(
destination, group_id, *args, **kwargs
)
+
+ # Capture errors returned by the remote homeserver and
+ # re-throw specific errors as SynapseErrors. This is so
+ # when the remote end responds with things like 403 Not
+ # In Group, we can communicate that to the client instead
+ # of a 500.
+ def h(failure):
+ failure.trap(HttpResponseException)
+ e = failure.value
+ if e.code == 403:
+ raise e.to_synapse_error()
+ return failure
+ d.addErrback(h)
+ return d
return f
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index c3f820b975..ab1571b27b 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -190,10 +190,11 @@ class RoomCreationHandler(BaseHandler):
if room_alias:
directory_handler = self.hs.get_handlers().directory_handler
yield directory_handler.create_association(
- user_id=user_id,
+ requester=requester,
room_id=room_id,
room_alias=room_alias,
servers=[self.hs.hostname],
+ send_event=False,
)
preset_config = config.get(
@@ -289,7 +290,7 @@ class RoomCreationHandler(BaseHandler):
if room_alias:
result["room_alias"] = room_alias.to_string()
yield directory_handler.send_room_alias_update_event(
- requester, user_id, room_id
+ requester, room_id
)
defer.returnValue(result)
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index d8413d6aa7..f11b430126 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -20,6 +20,7 @@ from six import iteritems
from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.roommember import ProfileInfo
from synapse.types import get_localpart_from_id
from synapse.util.metrics import Measure
@@ -98,7 +99,6 @@ class UserDirectoryHandler(object):
"""
return self.store.search_user_dir(user_id, search_term, limit)
- @defer.inlineCallbacks
def notify_new_event(self):
"""Called when there may be more deltas to process
"""
@@ -108,11 +108,15 @@ class UserDirectoryHandler(object):
if self._is_processing:
return
+ @defer.inlineCallbacks
+ def process():
+ try:
+ yield self._unsafe_process()
+ finally:
+ self._is_processing = False
+
self._is_processing = True
- try:
- yield self._unsafe_process()
- finally:
- self._is_processing = False
+ run_as_background_process("user_directory.notify_new_event", process)
@defer.inlineCallbacks
def handle_local_profile_change(self, user_id, profile):
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index fcc02fc77d..24b6110c20 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -230,7 +230,7 @@ class MatrixFederationHttpClient(object):
Returns:
Deferred: resolves with the http response object on success.
- Fails with ``HTTPRequestException``: if we get an HTTP response
+ Fails with ``HttpResponseException``: if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -480,7 +480,7 @@ class MatrixFederationHttpClient(object):
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
- Fails with ``HTTPRequestException`` if we get an HTTP response
+ Fails with ``HttpResponseException`` if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -534,7 +534,7 @@ class MatrixFederationHttpClient(object):
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
- Fails with ``HTTPRequestException`` if we get an HTTP response
+ Fails with ``HttpResponseException`` if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -589,7 +589,7 @@ class MatrixFederationHttpClient(object):
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
- Fails with ``HTTPRequestException`` if we get an HTTP response
+ Fails with ``HttpResponseException`` if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -640,7 +640,7 @@ class MatrixFederationHttpClient(object):
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
- Fails with ``HTTPRequestException`` if we get an HTTP response
+ Fails with ``HttpResponseException`` if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -684,7 +684,7 @@ class MatrixFederationHttpClient(object):
Deferred: resolves with an (int,dict) tuple of the file length and
a dict of the response headers.
- Fails with ``HTTPRequestException`` if we get an HTTP response code
+ Fails with ``HttpResponseException`` if we get an HTTP response code
>= 300
Fails with ``NotRetryingDestination`` if we are not yet ready
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index f51184b50d..943876456b 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -53,6 +53,7 @@ REQUIREMENTS = {
"pillow>=3.1.2": ["PIL"],
"pydenticon>=0.2": ["pydenticon"],
"sortedcontainers>=1.4.4": ["sortedcontainers"],
+ "psutil>=2.0.0": ["psutil>=2.0.0"],
"pysaml2>=3.0.0": ["saml2"],
"pymacaroons-pynacl>=0.9.3": ["pymacaroons"],
"msgpack-python>=0.4.2": ["msgpack"],
@@ -79,9 +80,6 @@ CONDITIONAL_REQUIREMENTS = {
"matrix-synapse-ldap3": {
"matrix-synapse-ldap3>=0.1": ["ldap_auth_provider"],
},
- "psutil": {
- "psutil>=2.0.0": ["psutil>=2.0.0"],
- },
"postgres": {
"psycopg2>=2.6": ["psycopg2"]
}
diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py
index 97733f3026..0220acf644 100644
--- a/synapse/rest/client/v1/directory.py
+++ b/synapse/rest/client/v1/directory.py
@@ -74,38 +74,11 @@ class ClientDirectoryServer(ClientV1RestServlet):
if room is None:
raise SynapseError(400, "Room does not exist")
- dir_handler = self.handlers.directory_handler
+ requester = yield self.auth.get_user_by_req(request)
- try:
- # try to auth as a user
- requester = yield self.auth.get_user_by_req(request)
- try:
- user_id = requester.user.to_string()
- yield dir_handler.create_association(
- user_id, room_alias, room_id, servers
- )
- yield dir_handler.send_room_alias_update_event(
- requester,
- user_id,
- room_id
- )
- except SynapseError as e:
- raise e
- except Exception:
- logger.exception("Failed to create association")
- raise
- except AuthError:
- # try to auth as an application service
- service = yield self.auth.get_appservice_by_req(request)
- yield dir_handler.create_appservice_association(
- service, room_alias, room_id, servers
- )
- logger.info(
- "Application service at %s created alias %s pointing to %s",
- service.url,
- room_alias.to_string(),
- room_id
- )
+ yield self.handlers.directory_handler.create_association(
+ requester, room_alias, room_id, servers
+ )
defer.returnValue((200, {}))
@@ -135,7 +108,7 @@ class ClientDirectoryServer(ClientV1RestServlet):
room_alias = RoomAlias.from_string(room_alias)
yield dir_handler.delete_association(
- requester, user.to_string(), room_alias
+ requester, room_alias
)
logger.info(
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index a828ff4438..08b1867fab 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -25,7 +25,7 @@ from six.moves.urllib import parse as urlparse
import twisted.internet.error
import twisted.web.http
-from twisted.internet import defer, threads
+from twisted.internet import defer
from twisted.web.resource import Resource
from synapse.api.errors import (
@@ -36,8 +36,8 @@ from synapse.api.errors import (
)
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util import logcontext
from synapse.util.async_helpers import Linearizer
-from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import is_ascii, random_string
@@ -492,10 +492,11 @@ class MediaRepository(object):
))
thumbnailer = Thumbnailer(input_path)
- t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
+ t_byte_source = yield logcontext.defer_to_thread(
+ self.hs.get_reactor(),
self._generate_thumbnail,
thumbnailer, t_width, t_height, t_method, t_type
- ))
+ )
if t_byte_source:
try:
@@ -534,10 +535,11 @@ class MediaRepository(object):
))
thumbnailer = Thumbnailer(input_path)
- t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
+ t_byte_source = yield logcontext.defer_to_thread(
+ self.hs.get_reactor(),
self._generate_thumbnail,
thumbnailer, t_width, t_height, t_method, t_type
- ))
+ )
if t_byte_source:
try:
@@ -620,15 +622,17 @@ class MediaRepository(object):
for (t_width, t_height, t_type), t_method in iteritems(thumbnails):
# Generate the thumbnail
if t_method == "crop":
- t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
+ t_byte_source = yield logcontext.defer_to_thread(
+ self.hs.get_reactor(),
thumbnailer.crop,
t_width, t_height, t_type,
- ))
+ )
elif t_method == "scale":
- t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
+ t_byte_source = yield logcontext.defer_to_thread(
+ self.hs.get_reactor(),
thumbnailer.scale,
t_width, t_height, t_type,
- ))
+ )
else:
logger.error("Unrecognized method: %r", t_method)
continue
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index a6189224ee..896078fe76 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -21,9 +21,10 @@ import sys
import six
-from twisted.internet import defer, threads
+from twisted.internet import defer
from twisted.protocols.basic import FileSender
+from synapse.util import logcontext
from synapse.util.file_consumer import BackgroundFileConsumer
from synapse.util.logcontext import make_deferred_yieldable
@@ -64,9 +65,10 @@ class MediaStorage(object):
with self.store_into_file(file_info) as (f, fname, finish_cb):
# Write to the main repository
- yield make_deferred_yieldable(threads.deferToThread(
+ yield logcontext.defer_to_thread(
+ self.hs.get_reactor(),
_write_file_synchronously, source, f,
- ))
+ )
yield finish_cb()
defer.returnValue(fname)
diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py
index 7b9f8b4d79..5aa03031f6 100644
--- a/synapse/rest/media/v1/storage_provider.py
+++ b/synapse/rest/media/v1/storage_provider.py
@@ -17,9 +17,10 @@ import logging
import os
import shutil
-from twisted.internet import defer, threads
+from twisted.internet import defer
from synapse.config._base import Config
+from synapse.util import logcontext
from synapse.util.logcontext import run_in_background
from .media_storage import FileResponder
@@ -120,7 +121,8 @@ class FileStorageProviderBackend(StorageProvider):
if not os.path.exists(dirname):
os.makedirs(dirname)
- return threads.deferToThread(
+ return logcontext.defer_to_thread(
+ self.hs.get_reactor(),
shutil.copyfile, primary_fname, backup_fname,
)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index be61147b9b..d9d0255d0b 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -18,7 +18,7 @@ import threading
import time
from six import PY2, iteritems, iterkeys, itervalues
-from six.moves import intern, range
+from six.moves import builtins, intern, range
from canonicaljson import json
from prometheus_client import Histogram
@@ -1233,7 +1233,7 @@ def db_to_json(db_content):
# psycopg2 on Python 2 returns buffer objects, which we need to cast to
# bytes to decode
- if PY2 and isinstance(db_content, buffer):
+ if PY2 and isinstance(db_content, builtins.buffer):
db_content = bytes(db_content)
# Decode it to a Unicode string before feeding it to json.loads, so we
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index a1331c1a61..8af17921e3 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -32,7 +32,7 @@ logger = logging.getLogger(__name__)
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
if six.PY2:
- db_binary_type = buffer
+ db_binary_type = six.moves.builtins.buffer
else:
db_binary_type = memoryview
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index c7987bfcdd..2743b52bad 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -29,7 +29,7 @@ from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
if six.PY2:
- db_binary_type = buffer
+ db_binary_type = six.moves.builtins.buffer
else:
db_binary_type = memoryview
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 5623391f6e..158e9dbe7b 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -27,7 +27,7 @@ from ._base import SQLBaseStore
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
if six.PY2:
- db_binary_type = buffer
+ db_binary_type = six.moves.builtins.buffer
else:
db_binary_type = memoryview
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index a3032cdce9..d8bf953ec0 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -30,7 +30,7 @@ from ._base import SQLBaseStore, db_to_json
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
if six.PY2:
- db_binary_type = buffer
+ db_binary_type = six.moves.builtins.buffer
else:
db_binary_type = memoryview
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index f2bde74dc5..625aedc940 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -15,6 +15,8 @@
import logging
+from six import integer_types
+
from sortedcontainers import SortedDict
from synapse.util import caches
@@ -47,7 +49,7 @@ class StreamChangeCache(object):
def has_entity_changed(self, entity, stream_pos):
"""Returns True if the entity may have been updated since stream_pos
"""
- assert type(stream_pos) is int or type(stream_pos) is long
+ assert type(stream_pos) in integer_types
if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 89224b26cc..4c6e92beb8 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -25,7 +25,7 @@ See doc/log_contexts.rst for details on how this works.
import logging
import threading
-from twisted.internet import defer
+from twisted.internet import defer, threads
logger = logging.getLogger(__name__)
@@ -562,58 +562,76 @@ def _set_context_cb(result, context):
return result
-# modules to ignore in `logcontext_tracer`
-_to_ignore = [
- "synapse.util.logcontext",
- "synapse.http.server",
- "synapse.storage._base",
- "synapse.util.async_helpers",
-]
+def defer_to_thread(reactor, f, *args, **kwargs):
+ """
+ Calls the function `f` using a thread from the reactor's default threadpool and
+ returns the result as a Deferred.
+
+ Creates a new logcontext for `f`, which is created as a child of the current
+ logcontext (so its CPU usage metrics will get attributed to the current
+ logcontext). `f` should preserve the logcontext it is given.
+
+ The result deferred follows the Synapse logcontext rules: you should `yield`
+ on it.
+
+ Args:
+ reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
+ the Deferred will be invoked, and whose threadpool we should use for the
+ function.
+
+ Normally this will be hs.get_reactor().
+
+ f (callable): The function to call.
+ args: positional arguments to pass to f.
-def logcontext_tracer(frame, event, arg):
- """A tracer that logs whenever a logcontext "unexpectedly" changes within
- a function. Probably inaccurate.
+ kwargs: keyword arguments to pass to f.
- Use by calling `sys.settrace(logcontext_tracer)` in the main thread.
+ Returns:
+ Deferred: A Deferred which fires a callback with the result of `f`, or an
+ errback if `f` throws an exception.
"""
- if event == 'call':
- name = frame.f_globals["__name__"]
- if name.startswith("synapse"):
- if name == "synapse.util.logcontext":
- if frame.f_code.co_name in ["__enter__", "__exit__"]:
- tracer = frame.f_back.f_trace
- if tracer:
- tracer.just_changed = True
-
- tracer = frame.f_trace
- if tracer:
- return tracer
-
- if not any(name.startswith(ig) for ig in _to_ignore):
- return LineTracer()
-
-
-class LineTracer(object):
- __slots__ = ["context", "just_changed"]
-
- def __init__(self):
- self.context = LoggingContext.current_context()
- self.just_changed = False
-
- def __call__(self, frame, event, arg):
- if event in 'line':
- if self.just_changed:
- self.context = LoggingContext.current_context()
- self.just_changed = False
- else:
- c = LoggingContext.current_context()
- if c != self.context:
- logger.info(
- "Context changed! %s -> %s, %s, %s",
- self.context, c,
- frame.f_code.co_filename, frame.f_lineno
- )
- self.context = c
+ return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs)
- return self
+
+def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs):
+ """
+ A wrapper for twisted.internet.threads.deferToThreadpool, which handles
+ logcontexts correctly.
+
+ Calls the function `f` using a thread from the given threadpool and returns
+ the result as a Deferred.
+
+ Creates a new logcontext for `f`, which is created as a child of the current
+ logcontext (so its CPU usage metrics will get attributed to the current
+ logcontext). `f` should preserve the logcontext it is given.
+
+ The result deferred follows the Synapse logcontext rules: you should `yield`
+ on it.
+
+ Args:
+ reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
+ the Deferred will be invoked. Normally this will be hs.get_reactor().
+
+ threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for
+ running `f`. Normally this will be hs.get_reactor().getThreadPool().
+
+ f (callable): The function to call.
+
+ args: positional arguments to pass to f.
+
+ kwargs: keyword arguments to pass to f.
+
+ Returns:
+ Deferred: A Deferred which fires a callback with the result of `f`, or an
+ errback if `f` throws an exception.
+ """
+ logcontext = LoggingContext.current_context()
+
+ def g():
+ with LoggingContext(parent_context=logcontext):
+ return f(*args, **kwargs)
+
+ return make_deferred_yieldable(
+ threads.deferToThreadPool(reactor, threadpool, g)
+ )
|