diff --git a/synapse/__init__.py b/synapse/__init__.py
index 43c5821ade..1ddbbbebfb 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -27,4 +27,4 @@ try:
except ImportError:
pass
-__version__ = "0.33.6"
+__version__ = "0.33.7"
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 2e7f98404d..48b903374d 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -59,6 +59,7 @@ class Codes(object):
RESOURCE_LIMIT_EXCEEDED = "M_RESOURCE_LIMIT_EXCEEDED"
UNSUPPORTED_ROOM_VERSION = "M_UNSUPPORTED_ROOM_VERSION"
INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION"
+ WRONG_ROOM_KEYS_VERSION = "M_WRONG_ROOM_KEYS_VERSION"
class CodeMessageException(RuntimeError):
@@ -312,6 +313,20 @@ class LimitExceededError(SynapseError):
)
+class RoomKeysVersionError(SynapseError):
+ """A client has tried to upload to a non-current version of the room_keys store
+ """
+ def __init__(self, current_version):
+ """
+ Args:
+ current_version (str): the current version of the store they should have used
+ """
+ super(RoomKeysVersionError, self).__init__(
+ 403, "Wrong room_keys version", Codes.WRONG_ROOM_KEYS_VERSION
+ )
+ self.current_version = current_version
+
+
class IncompatibleRoomVersionError(SynapseError):
"""A server is trying to join a room whose version it does not support."""
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index eed8c67e6a..677c0bdd4c 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -172,7 +172,10 @@ USER_FILTER_SCHEMA = {
# events a lot easier as we can then use a negative lookbehind
# assertion to split '\.' If we allowed \\ then it would
# incorrectly split '\\.' See synapse.events.utils.serialize_event
- "pattern": "^((?!\\\).)*$"
+ #
+ # Note that because this is a regular expression, we have to escape
+ # each backslash in the pattern.
+ "pattern": r"^((?!\\\\).)*$"
}
}
},
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 7c866e246a..18584226e9 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -17,6 +17,7 @@ import gc
import logging
import sys
+import psutil
from daemonize import Daemonize
from twisted.internet import error, reactor
@@ -24,12 +25,6 @@ from twisted.internet import error, reactor
from synapse.util import PreserveLoggingContext
from synapse.util.rlimit import change_resource_limit
-try:
- import affinity
-except Exception:
- affinity = None
-
-
logger = logging.getLogger(__name__)
@@ -89,15 +84,20 @@ def start_reactor(
with PreserveLoggingContext():
logger.info("Running")
if cpu_affinity is not None:
- if not affinity:
- quit_with_error(
- "Missing package 'affinity' required for cpu_affinity\n"
- "option\n\n"
- "Install by running:\n\n"
- " pip install affinity\n\n"
- )
- logger.info("Setting CPU affinity to %s" % cpu_affinity)
- affinity.set_process_affinity_mask(0, cpu_affinity)
+ # Turn the bitmask into bits, reverse it so we go from 0 up
+ mask_to_bits = bin(cpu_affinity)[2:][::-1]
+
+ cpus = []
+ cpu_num = 0
+
+ for i in mask_to_bits:
+ if i == "1":
+ cpus.append(cpu_num)
+ cpu_num += 1
+
+ p = psutil.Process()
+ p.cpu_affinity(cpus)
+
change_resource_limit(soft_file_limit)
if gc_thresholds:
gc.set_threshold(*gc_thresholds)
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index 9060ab14f6..e4a68715aa 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -178,6 +178,9 @@ def start(config_options):
setup_logging(config, use_worker_options=True)
+ # This should only be done on the user directory worker or the master
+ config.update_user_directory = False
+
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index fc4b25de1c..f5c61dec5b 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -68,7 +68,7 @@ class PresenceStatusStubServlet(ClientV1RestServlet):
"Authorization": auth_headers,
}
result = yield self.http_client.get_json(
- self.main_uri + request.uri,
+ self.main_uri + request.uri.decode('ascii'),
headers=headers,
)
defer.returnValue((200, result))
@@ -125,7 +125,7 @@ class KeyUploadServlet(RestServlet):
"Authorization": auth_headers,
}
result = yield self.http_client.post_json_get_json(
- self.main_uri + request.uri,
+ self.main_uri + request.uri.decode('ascii'),
body,
headers=headers,
)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e3f0d99a3f..0b85b377e3 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -20,6 +20,7 @@ import sys
from six import iteritems
+import psutil
from prometheus_client import Gauge
from twisted.application import service
@@ -502,7 +503,6 @@ def run(hs):
def performance_stats_init():
try:
- import psutil
process = psutil.Process()
# Ensure we can fetch both, and make the initial request for cpu_percent
# so the next request will use this as the initial point.
@@ -510,12 +510,9 @@ def run(hs):
process.cpu_percent(interval=None)
logger.info("report_stats can use psutil")
stats_process.append(process)
- except (ImportError, AttributeError):
- logger.warn(
- "report_stats enabled but psutil is not installed or incorrect version."
- " Disabling reporting of memory/cpu stats."
- " Ensuring psutil is available will help matrix.org track performance"
- " changes across releases."
+ except (AttributeError):
+ logger.warning(
+ "Unable to read memory/cpu stats. Disabling reporting."
)
def generate_user_daily_visit_stats():
@@ -530,10 +527,13 @@ def run(hs):
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
# monthly active user limiting functionality
- clock.looping_call(
- hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60
- )
- hs.get_datastore().reap_monthly_active_users()
+ def reap_monthly_active_users():
+ return run_as_background_process(
+ "reap_monthly_active_users",
+ hs.get_datastore().reap_monthly_active_users,
+ )
+ clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
+ reap_monthly_active_users()
@defer.inlineCallbacks
def generate_monthly_active_users():
@@ -547,12 +547,23 @@ def run(hs):
registered_reserved_users_mau_gauge.set(float(reserved_count))
max_mau_gauge.set(float(hs.config.max_mau_value))
- hs.get_datastore().initialise_reserved_users(
- hs.config.mau_limits_reserved_threepids
+ def start_generate_monthly_active_users():
+ return run_as_background_process(
+ "generate_monthly_active_users",
+ generate_monthly_active_users,
+ )
+
+ # XXX is this really supposed to be a background process? it looks
+ # like it needs to complete before some of the other stuff runs.
+ run_as_background_process(
+ "initialise_reserved_users",
+ hs.get_datastore().initialise_reserved_users,
+ hs.config.mau_limits_reserved_threepids,
)
- generate_monthly_active_users()
+
+ start_generate_monthly_active_users()
if hs.config.limit_usage_by_mau:
- clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
+ clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
# End of monthly active user settings
if hs.config.report_stats:
@@ -568,7 +579,7 @@ def run(hs):
clock.call_later(5 * 60, start_phone_stats_home)
if hs.config.daemonize and hs.config.print_pidfile:
- print (hs.config.pid_file)
+ print(hs.config.pid_file)
_base.start_reactor(
"synapse-homeserver",
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 630dcda478..83b0863f00 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage._base import __func__
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
@@ -49,31 +50,31 @@ class PusherSlaveStore(
SlavedAccountDataStore
):
update_pusher_last_stream_ordering_and_success = (
- DataStore.update_pusher_last_stream_ordering_and_success.__func__
+ __func__(DataStore.update_pusher_last_stream_ordering_and_success)
)
update_pusher_failing_since = (
- DataStore.update_pusher_failing_since.__func__
+ __func__(DataStore.update_pusher_failing_since)
)
update_pusher_last_stream_ordering = (
- DataStore.update_pusher_last_stream_ordering.__func__
+ __func__(DataStore.update_pusher_last_stream_ordering)
)
get_throttle_params_by_room = (
- DataStore.get_throttle_params_by_room.__func__
+ __func__(DataStore.get_throttle_params_by_room)
)
set_throttle_params = (
- DataStore.set_throttle_params.__func__
+ __func__(DataStore.set_throttle_params)
)
get_time_of_last_push_action_before = (
- DataStore.get_time_of_last_push_action_before.__func__
+ __func__(DataStore.get_time_of_last_push_action_before)
)
get_profile_displayname = (
- DataStore.get_profile_displayname.__func__
+ __func__(DataStore.get_profile_displayname)
)
@@ -160,11 +161,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
else:
yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
elif stream_name == "events":
- self.pusher_pool.on_new_notifications(
+ yield self.pusher_pool.on_new_notifications(
token, token,
)
elif stream_name == "receipts":
- self.pusher_pool.on_new_receipts(
+ yield self.pusher_pool.on_new_receipts(
token, token, set(row.room_id for row in rows)
)
except Exception:
@@ -182,7 +183,7 @@ class PusherReplicationHandler(ReplicationClientHandler):
def start_pusher(self, user_id, app_id, pushkey):
key = "%s:%s" % (app_id, pushkey)
logger.info("Starting pusher %r / %r", user_id, key)
- return self.pusher_pool._refresh_pusher(app_id, pushkey, user_id)
+ return self.pusher_pool.start_pusher_by_id(app_id, pushkey, user_id)
def start(config_options):
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 9a7fc6ee9d..3926c7f263 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -33,7 +33,7 @@ from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
-from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
@@ -147,7 +147,7 @@ class SynchrotronPresence(object):
and haven't come back yet. If there are poke the master about them.
"""
now = self.clock.time_msec()
- for user_id, last_sync_ms in self.users_going_offline.items():
+ for user_id, last_sync_ms in list(self.users_going_offline.items()):
if now - last_sync_ms > 10 * 1000:
self.users_going_offline.pop(user_id, None)
self.send_user_sync(user_id, False, last_sync_ms)
@@ -156,9 +156,9 @@ class SynchrotronPresence(object):
# TODO Hows this supposed to work?
pass
- get_states = PresenceHandler.get_states.__func__
- get_state = PresenceHandler.get_state.__func__
- current_state_for_users = PresenceHandler.current_state_for_users.__func__
+ get_states = __func__(PresenceHandler.get_states)
+ get_state = __func__(PresenceHandler.get_state)
+ current_state_for_users = __func__(PresenceHandler.current_state_for_users)
def user_syncing(self, user_id, affect_presence):
if affect_presence:
@@ -208,7 +208,7 @@ class SynchrotronPresence(object):
) for row in rows]
for state in states:
- self.user_to_current_state[row.user_id] = state
+ self.user_to_current_state[state.user_id] = state
stream_id = token
yield self.notify_from_replication(states, stream_id)
diff --git a/synapse/config/__main__.py b/synapse/config/__main__.py
index 8fccf573ee..79fe9c3dac 100644
--- a/synapse/config/__main__.py
+++ b/synapse/config/__main__.py
@@ -28,7 +28,7 @@ if __name__ == "__main__":
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
- print (getattr(config, key))
+ print(getattr(config, key))
sys.exit(0)
else:
sys.stderr.write("Unknown command %r\n" % (action,))
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 3d2e90dd5b..14dae65ea0 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -106,10 +106,7 @@ class Config(object):
@classmethod
def check_file(cls, file_path, config_name):
if file_path is None:
- raise ConfigError(
- "Missing config for %s."
- % (config_name,)
- )
+ raise ConfigError("Missing config for %s." % (config_name,))
try:
os.stat(file_path)
except OSError as e:
@@ -128,9 +125,7 @@ class Config(object):
if e.errno != errno.EEXIST:
raise
if not os.path.isdir(dir_path):
- raise ConfigError(
- "%s is not a directory" % (dir_path,)
- )
+ raise ConfigError("%s is not a directory" % (dir_path,))
return dir_path
@classmethod
@@ -156,21 +151,20 @@ class Config(object):
return results
def generate_config(
- self,
- config_dir_path,
- server_name,
- is_generating_file,
- report_stats=None,
+ self, config_dir_path, server_name, is_generating_file, report_stats=None
):
default_config = "# vim:ft=yaml\n"
- default_config += "\n\n".join(dedent(conf) for conf in self.invoke_all(
- "default_config",
- config_dir_path=config_dir_path,
- server_name=server_name,
- is_generating_file=is_generating_file,
- report_stats=report_stats,
- ))
+ default_config += "\n\n".join(
+ dedent(conf)
+ for conf in self.invoke_all(
+ "default_config",
+ config_dir_path=config_dir_path,
+ server_name=server_name,
+ is_generating_file=is_generating_file,
+ report_stats=report_stats,
+ )
+ )
config = yaml.load(default_config)
@@ -178,23 +172,22 @@ class Config(object):
@classmethod
def load_config(cls, description, argv):
- config_parser = argparse.ArgumentParser(
- description=description,
- )
+ config_parser = argparse.ArgumentParser(description=description)
config_parser.add_argument(
- "-c", "--config-path",
+ "-c",
+ "--config-path",
action="append",
metavar="CONFIG_FILE",
help="Specify config file. Can be given multiple times and"
- " may specify directories containing *.yaml files."
+ " may specify directories containing *.yaml files.",
)
config_parser.add_argument(
"--keys-directory",
metavar="DIRECTORY",
help="Where files such as certs and signing keys are stored when"
- " their location is given explicitly in the config."
- " Defaults to the directory containing the last config file",
+ " their location is given explicitly in the config."
+ " Defaults to the directory containing the last config file",
)
config_args = config_parser.parse_args(argv)
@@ -203,9 +196,7 @@ class Config(object):
obj = cls()
obj.read_config_files(
- config_files,
- keys_directory=config_args.keys_directory,
- generate_keys=False,
+ config_files, keys_directory=config_args.keys_directory, generate_keys=False
)
return obj
@@ -213,38 +204,38 @@ class Config(object):
def load_or_generate_config(cls, description, argv):
config_parser = argparse.ArgumentParser(add_help=False)
config_parser.add_argument(
- "-c", "--config-path",
+ "-c",
+ "--config-path",
action="append",
metavar="CONFIG_FILE",
help="Specify config file. Can be given multiple times and"
- " may specify directories containing *.yaml files."
+ " may specify directories containing *.yaml files.",
)
config_parser.add_argument(
"--generate-config",
action="store_true",
- help="Generate a config file for the server name"
+ help="Generate a config file for the server name",
)
config_parser.add_argument(
"--report-stats",
action="store",
help="Whether the generated config reports anonymized usage statistics",
- choices=["yes", "no"]
+ choices=["yes", "no"],
)
config_parser.add_argument(
"--generate-keys",
action="store_true",
- help="Generate any missing key files then exit"
+ help="Generate any missing key files then exit",
)
config_parser.add_argument(
"--keys-directory",
metavar="DIRECTORY",
help="Used with 'generate-*' options to specify where files such as"
- " certs and signing keys should be stored in, unless explicitly"
- " specified in the config."
+ " certs and signing keys should be stored in, unless explicitly"
+ " specified in the config.",
)
config_parser.add_argument(
- "-H", "--server-name",
- help="The server name to generate a config file for"
+ "-H", "--server-name", help="The server name to generate a config file for"
)
config_args, remaining_args = config_parser.parse_known_args(argv)
@@ -257,8 +248,8 @@ class Config(object):
if config_args.generate_config:
if config_args.report_stats is None:
config_parser.error(
- "Please specify either --report-stats=yes or --report-stats=no\n\n" +
- MISSING_REPORT_STATS_SPIEL
+ "Please specify either --report-stats=yes or --report-stats=no\n\n"
+ + MISSING_REPORT_STATS_SPIEL
)
if not config_files:
config_parser.error(
@@ -287,26 +278,32 @@ class Config(object):
config_dir_path=config_dir_path,
server_name=server_name,
report_stats=(config_args.report_stats == "yes"),
- is_generating_file=True
+ is_generating_file=True,
)
obj.invoke_all("generate_files", config)
config_file.write(config_str)
- print((
- "A config file has been generated in %r for server name"
- " %r with corresponding SSL keys and self-signed"
- " certificates. Please review this file and customise it"
- " to your needs."
- ) % (config_path, server_name))
+ print(
+ (
+ "A config file has been generated in %r for server name"
+ " %r with corresponding SSL keys and self-signed"
+ " certificates. Please review this file and customise it"
+ " to your needs."
+ )
+ % (config_path, server_name)
+ )
print(
"If this server name is incorrect, you will need to"
" regenerate the SSL certificates"
)
return
else:
- print((
- "Config file %r already exists. Generating any missing key"
- " files."
- ) % (config_path,))
+ print(
+ (
+ "Config file %r already exists. Generating any missing key"
+ " files."
+ )
+ % (config_path,)
+ )
generate_keys = True
parser = argparse.ArgumentParser(
@@ -338,8 +335,7 @@ class Config(object):
return obj
- def read_config_files(self, config_files, keys_directory=None,
- generate_keys=False):
+ def read_config_files(self, config_files, keys_directory=None, generate_keys=False):
if not keys_directory:
keys_directory = os.path.dirname(config_files[-1])
@@ -364,8 +360,9 @@ class Config(object):
if "report_stats" not in config:
raise ConfigError(
- MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS + "\n" +
- MISSING_REPORT_STATS_SPIEL
+ MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS
+ + "\n"
+ + MISSING_REPORT_STATS_SPIEL
)
if generate_keys:
@@ -399,16 +396,16 @@ def find_config_files(search_paths):
for entry in os.listdir(config_path):
entry_path = os.path.join(config_path, entry)
if not os.path.isfile(entry_path):
- print (
- "Found subdirectory in config directory: %r. IGNORING."
- ) % (entry_path, )
+ err = "Found subdirectory in config directory: %r. IGNORING."
+ print(err % (entry_path,))
continue
if not entry.endswith(".yaml"):
- print (
- "Found file in config directory that does not"
- " end in '.yaml': %r. IGNORING."
- ) % (entry_path, )
+ err = (
+ "Found file in config directory that does not end in "
+ "'.yaml': %r. IGNORING."
+ )
+ print(err % (entry_path,))
continue
files.append(entry_path)
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
index fe156b6930..93d70cff14 100644
--- a/synapse/config/emailconfig.py
+++ b/synapse/config/emailconfig.py
@@ -13,10 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import print_function
+
# This file can't be called email.py because if it is, we cannot:
import email.utils
+import logging
+import os
+
+import pkg_resources
-from ._base import Config
+from ._base import Config, ConfigError
+
+logger = logging.getLogger(__name__)
class EmailConfig(Config):
@@ -38,7 +46,6 @@ class EmailConfig(Config):
"smtp_host",
"smtp_port",
"notif_from",
- "template_dir",
"notif_template_html",
"notif_template_text",
]
@@ -62,9 +69,26 @@ class EmailConfig(Config):
self.email_smtp_host = email_config["smtp_host"]
self.email_smtp_port = email_config["smtp_port"]
self.email_notif_from = email_config["notif_from"]
- self.email_template_dir = email_config["template_dir"]
self.email_notif_template_html = email_config["notif_template_html"]
self.email_notif_template_text = email_config["notif_template_text"]
+
+ template_dir = email_config.get("template_dir")
+ # we need an absolute path, because we change directory after starting (and
+ # we don't yet know what auxilliary templates like mail.css we will need).
+ # (Note that loading as package_resources with jinja.PackageLoader doesn't
+ # work for the same reason.)
+ if not template_dir:
+ template_dir = pkg_resources.resource_filename(
+ 'synapse', 'res/templates'
+ )
+ template_dir = os.path.abspath(template_dir)
+
+ for f in self.email_notif_template_text, self.email_notif_template_html:
+ p = os.path.join(template_dir, f)
+ if not os.path.isfile(p):
+ raise ConfigError("Unable to find email template file %s" % (p, ))
+ self.email_template_dir = template_dir
+
self.email_notif_for_new_users = email_config.get(
"notif_for_new_users", True
)
@@ -113,7 +137,9 @@ class EmailConfig(Config):
# require_transport_security: False
# notif_from: "Your Friendly %(app)s Home Server <noreply@example.com>"
# app_name: Matrix
- # template_dir: res/templates
+ # # if template_dir is unset, uses the example templates that are part of
+ # # the Synapse distribution.
+ # #template_dir: res/templates
# notif_template_html: notif_mail.html
# notif_template_text: notif_mail.txt
# notif_for_new_users: True
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index fc909c1fac..06c62ab62c 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -178,7 +178,7 @@ class ContentRepositoryConfig(Config):
def default_config(self, **kwargs):
media_store = self.default_path("media_store")
uploads_path = self.default_path("uploads")
- return """
+ return r"""
# Directory where uploaded images and attachments are stored.
media_store_path: "%(media_store)s"
diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py
index 57d4665e84..080c81f14b 100644
--- a/synapse/crypto/keyclient.py
+++ b/synapse/crypto/keyclient.py
@@ -55,7 +55,7 @@ def fetch_server_key(server_name, tls_client_options_factory, path=KEY_API_V1):
raise IOError("Cannot get key for %r" % server_name)
except (ConnectError, DomainError) as e:
logger.warn("Error getting key for %r: %s", server_name, e)
- except Exception as e:
+ except Exception:
logger.exception("Error getting key for %r", server_name)
raise IOError("Cannot get key for %r" % server_name)
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index af3eee95b9..d4d4474847 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -690,7 +690,7 @@ def auth_types_for_event(event):
auth_types = []
auth_types.append((EventTypes.PowerLevels, "", ))
- auth_types.append((EventTypes.Member, event.user_id, ))
+ auth_types.append((EventTypes.Member, event.sender, ))
auth_types.append((EventTypes.Create, "", ))
if event.type == EventTypes.Member:
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 819e8f7331..af0107a46e 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -507,19 +507,19 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
@log_function
def on_get_missing_events(self, origin, room_id, earliest_events,
- latest_events, limit, min_depth):
+ latest_events, limit):
with (yield self._server_linearizer.queue((origin, room_id))):
origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, room_id)
logger.info(
"on_get_missing_events: earliest_events: %r, latest_events: %r,"
- " limit: %d, min_depth: %d",
- earliest_events, latest_events, limit, min_depth
+ " limit: %d",
+ earliest_events, latest_events, limit,
)
missing_events = yield self.handler.on_get_missing_events(
- origin, room_id, earliest_events, latest_events, limit, min_depth
+ origin, room_id, earliest_events, latest_events, limit,
)
if len(missing_events) < 5:
@@ -800,7 +800,7 @@ class FederationHandlerRegistry(object):
yield handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
- except Exception as e:
+ except Exception:
logger.exception("Failed to handle edu %r", edu_type)
def on_query(self, query_type, args):
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 98b5950800..3fdd63be95 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -633,14 +633,6 @@ class TransactionQueue(object):
transaction, json_data_cb
)
code = 200
-
- if response:
- for e_id, r in response.get("pdus", {}).items():
- if "error" in r:
- logger.warn(
- "Transaction returned error for %s: %s",
- e_id, r,
- )
except HttpResponseException as e:
code = e.code
response = e.response
@@ -657,19 +649,24 @@ class TransactionQueue(object):
destination, txn_id, code
)
- logger.debug("TX [%s] Sent transaction", destination)
- logger.debug("TX [%s] Marking as delivered...", destination)
-
yield self.transaction_actions.delivered(
transaction, code, response
)
- logger.debug("TX [%s] Marked as delivered", destination)
+ logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id)
- if code != 200:
+ if code == 200:
+ for e_id, r in response.get("pdus", {}).items():
+ if "error" in r:
+ logger.warn(
+ "TX [%s] {%s} Remote returned error for %s: %s",
+ destination, txn_id, e_id, r,
+ )
+ else:
for p in pdus:
- logger.info(
- "Failed to send event %s to %s", p.event_id, destination
+ logger.warn(
+ "TX [%s] {%s} Failed to send event %s",
+ destination, txn_id, p.event_id,
)
success = False
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 2ab973d6c8..edba5a9808 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -143,9 +143,17 @@ class TransportLayerClient(object):
transaction (Transaction)
Returns:
- Deferred: Results of the deferred is a tuple in the form of
- (response_code, response_body) where the response_body is a
- python dict decoded from json
+ Deferred: Succeeds when we get a 2xx HTTP response. The result
+ will be the decoded JSON body.
+
+ Fails with ``HTTPRequestException`` if we get an HTTP response
+ code >= 300.
+
+ Fails with ``NotRetryingDestination`` if we are not yet ready
+ to retry this server.
+
+ Fails with ``FederationDeniedError`` if this destination
+ is not on our federation whitelist
"""
logger.debug(
"send_data dest=%s, txid=%s",
@@ -170,11 +178,6 @@ class TransportLayerClient(object):
backoff_on_404=True, # If we get a 404 the other side has gone
)
- logger.debug(
- "send_data dest=%s, txid=%s, got response: 200",
- transaction.destination, transaction.transaction_id,
- )
-
defer.returnValue(response)
@defer.inlineCallbacks
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 2f874b4838..7288d49074 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -560,7 +560,6 @@ class FederationGetMissingEventsServlet(BaseFederationServlet):
@defer.inlineCallbacks
def on_POST(self, origin, content, query, room_id):
limit = int(content.get("limit", 10))
- min_depth = int(content.get("min_depth", 0))
earliest_events = content.get("earliest_events", [])
latest_events = content.get("latest_events", [])
@@ -569,7 +568,6 @@ class FederationGetMissingEventsServlet(BaseFederationServlet):
room_id=room_id,
earliest_events=earliest_events,
latest_events=latest_events,
- min_depth=min_depth,
limit=limit,
)
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index f0f89af7dc..17eedf4dbf 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -28,6 +28,7 @@ from synapse.metrics import (
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util import log_failure
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.metrics import Measure
@@ -36,17 +37,6 @@ logger = logging.getLogger(__name__)
events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
-def log_failure(failure):
- logger.error(
- "Application Services Failure",
- exc_info=(
- failure.type,
- failure.value,
- failure.getTracebackObject()
- )
- )
-
-
class ApplicationServicesHandler(object):
def __init__(self, hs):
@@ -112,7 +102,10 @@ class ApplicationServicesHandler(object):
if not self.started_scheduler:
def start_scheduler():
- return self.scheduler.start().addErrback(log_failure)
+ return self.scheduler.start().addErrback(
+ log_failure, "Application Services Failure",
+ )
+
run_as_background_process("as_scheduler", start_scheduler)
self.started_scheduler = True
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 2a5eab124f..329e3c7d71 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -22,7 +22,7 @@ import bcrypt
import pymacaroons
from canonicaljson import json
-from twisted.internet import defer, threads
+from twisted.internet import defer
from twisted.web.client import PartialDownloadError
import synapse.util.stringutils as stringutils
@@ -37,8 +37,8 @@ from synapse.api.errors import (
)
from synapse.module_api import ModuleApi
from synapse.types import UserID
+from synapse.util import logcontext
from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.logcontext import make_deferred_yieldable
from ._base import BaseHandler
@@ -884,11 +884,7 @@ class AuthHandler(BaseHandler):
bcrypt.gensalt(self.bcrypt_rounds),
).decode('ascii')
- return make_deferred_yieldable(
- threads.deferToThreadPool(
- self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
- ),
- )
+ return logcontext.defer_to_thread(self.hs.get_reactor(), _do_hash)
def validate_hash(self, password, stored_hash):
"""Validates that self.hash(password) == stored_hash.
@@ -913,13 +909,7 @@ class AuthHandler(BaseHandler):
if not isinstance(stored_hash, bytes):
stored_hash = stored_hash.encode('ascii')
- return make_deferred_yieldable(
- threads.deferToThreadPool(
- self.hs.get_reactor(),
- self.hs.get_reactor().getThreadPool(),
- _do_validate_hash,
- ),
- )
+ return logcontext.defer_to_thread(self.hs.get_reactor(), _do_validate_hash)
else:
return defer.succeed(False)
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index b078df4a76..75fe50c42c 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -17,8 +17,8 @@ import logging
from twisted.internet import defer
from synapse.api.errors import SynapseError
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID, create_requester
-from synapse.util.logcontext import run_in_background
from ._base import BaseHandler
@@ -121,7 +121,7 @@ class DeactivateAccountHandler(BaseHandler):
None
"""
if not self._user_parter_running:
- run_in_background(self._user_parter_loop)
+ run_as_background_process("user_parter_loop", self._user_parter_loop)
@defer.inlineCallbacks
def _user_parter_loop(self):
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 18741c5fac..02f12f6645 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -80,42 +80,60 @@ class DirectoryHandler(BaseHandler):
)
@defer.inlineCallbacks
- def create_association(self, user_id, room_alias, room_id, servers=None):
- # association creation for human users
- # TODO(erikj): Do user auth.
+ def create_association(self, requester, room_alias, room_id, servers=None,
+ send_event=True):
+ """Attempt to create a new alias
- if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
- raise SynapseError(
- 403, "This user is not permitted to create this alias",
- )
+ Args:
+ requester (Requester)
+ room_alias (RoomAlias)
+ room_id (str)
+ servers (list[str]|None): List of servers that others servers
+ should try and join via
+ send_event (bool): Whether to send an updated m.room.aliases event
- can_create = yield self.can_modify_alias(
- room_alias,
- user_id=user_id
- )
- if not can_create:
- raise SynapseError(
- 400, "This alias is reserved by an application service.",
- errcode=Codes.EXCLUSIVE
- )
- yield self._create_association(room_alias, room_id, servers, creator=user_id)
+ Returns:
+ Deferred
+ """
- @defer.inlineCallbacks
- def create_appservice_association(self, service, room_alias, room_id,
- servers=None):
- if not service.is_interested_in_alias(room_alias.to_string()):
- raise SynapseError(
- 400, "This application service has not reserved"
- " this kind of alias.", errcode=Codes.EXCLUSIVE
+ user_id = requester.user.to_string()
+
+ service = requester.app_service
+ if service:
+ if not service.is_interested_in_alias(room_alias.to_string()):
+ raise SynapseError(
+ 400, "This application service has not reserved"
+ " this kind of alias.", errcode=Codes.EXCLUSIVE
+ )
+ else:
+ if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
+ raise AuthError(
+ 403, "This user is not permitted to create this alias",
+ )
+
+ can_create = yield self.can_modify_alias(
+ room_alias,
+ user_id=user_id
)
+ if not can_create:
+ raise AuthError(
+ 400, "This alias is reserved by an application service.",
+ errcode=Codes.EXCLUSIVE
+ )
- # association creation for app services
- yield self._create_association(room_alias, room_id, servers)
+ yield self._create_association(room_alias, room_id, servers, creator=user_id)
+ if send_event:
+ yield self.send_room_alias_update_event(
+ requester,
+ room_id
+ )
@defer.inlineCallbacks
- def delete_association(self, requester, user_id, room_alias):
+ def delete_association(self, requester, room_alias):
# association deletion for human users
+ user_id = requester.user.to_string()
+
try:
can_delete = yield self._user_can_delete_alias(room_alias, user_id)
except StoreError as e:
@@ -143,7 +161,6 @@ class DirectoryHandler(BaseHandler):
try:
yield self.send_room_alias_update_event(
requester,
- requester.user.to_string(),
room_id
)
@@ -261,7 +278,7 @@ class DirectoryHandler(BaseHandler):
)
@defer.inlineCallbacks
- def send_room_alias_update_event(self, requester, user_id, room_id):
+ def send_room_alias_update_event(self, requester, room_id):
aliases = yield self.store.get_aliases_for_room(room_id)
yield self.event_creation_handler.create_and_send_nonmember_event(
@@ -270,7 +287,7 @@ class DirectoryHandler(BaseHandler):
"type": EventTypes.Aliases,
"state_key": self.hs.hostname,
"room_id": room_id,
- "sender": user_id,
+ "sender": requester.user.to_string(),
"content": {"aliases": aliases},
},
ratelimit=False
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
new file mode 100644
index 0000000000..5edb3cfe04
--- /dev/null
+++ b/synapse/handlers/e2e_room_keys.py
@@ -0,0 +1,289 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017, 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from six import iteritems
+
+from twisted.internet import defer
+
+from synapse.api.errors import RoomKeysVersionError, StoreError, SynapseError
+from synapse.util.async_helpers import Linearizer
+
+logger = logging.getLogger(__name__)
+
+
+class E2eRoomKeysHandler(object):
+ """
+ Implements an optional realtime backup mechanism for encrypted E2E megolm room keys.
+ This gives a way for users to store and recover their megolm keys if they lose all
+ their clients. It should also extend easily to future room key mechanisms.
+ The actual payload of the encrypted keys is completely opaque to the handler.
+ """
+
+ def __init__(self, hs):
+ self.store = hs.get_datastore()
+
+ # Used to lock whenever a client is uploading key data. This prevents collisions
+ # between clients trying to upload the details of a new session, given all
+ # clients belonging to a user will receive and try to upload a new session at
+ # roughly the same time. Also used to lock out uploads when the key is being
+ # changed.
+ self._upload_linearizer = Linearizer("upload_room_keys_lock")
+
+ @defer.inlineCallbacks
+ def get_room_keys(self, user_id, version, room_id=None, session_id=None):
+ """Bulk get the E2E room keys for a given backup, optionally filtered to a given
+ room, or a given session.
+ See EndToEndRoomKeyStore.get_e2e_room_keys for full details.
+
+ Args:
+ user_id(str): the user whose keys we're getting
+ version(str): the version ID of the backup we're getting keys from
+ room_id(string): room ID to get keys for, for None to get keys for all rooms
+ session_id(string): session ID to get keys for, for None to get keys for all
+ sessions
+ Returns:
+ A deferred list of dicts giving the session_data and message metadata for
+ these room keys.
+ """
+
+ # we deliberately take the lock to get keys so that changing the version
+ # works atomically
+ with (yield self._upload_linearizer.queue(user_id)):
+ results = yield self.store.get_e2e_room_keys(
+ user_id, version, room_id, session_id
+ )
+
+ if results['rooms'] == {}:
+ raise SynapseError(404, "No room_keys found")
+
+ defer.returnValue(results)
+
+ @defer.inlineCallbacks
+ def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
+ """Bulk delete the E2E room keys for a given backup, optionally filtered to a given
+ room or a given session.
+ See EndToEndRoomKeyStore.delete_e2e_room_keys for full details.
+
+ Args:
+ user_id(str): the user whose backup we're deleting
+ version(str): the version ID of the backup we're deleting
+ room_id(string): room ID to delete keys for, for None to delete keys for all
+ rooms
+ session_id(string): session ID to delete keys for, for None to delete keys
+ for all sessions
+ Returns:
+ A deferred of the deletion transaction
+ """
+
+ # lock for consistency with uploading
+ with (yield self._upload_linearizer.queue(user_id)):
+ yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
+
+ @defer.inlineCallbacks
+ def upload_room_keys(self, user_id, version, room_keys):
+ """Bulk upload a list of room keys into a given backup version, asserting
+ that the given version is the current backup version. room_keys are merged
+ into the current backup as described in RoomKeysServlet.on_PUT().
+
+ Args:
+ user_id(str): the user whose backup we're setting
+ version(str): the version ID of the backup we're updating
+ room_keys(dict): a nested dict describing the room_keys we're setting:
+
+ {
+ "rooms": {
+ "!abc:matrix.org": {
+ "sessions": {
+ "c0ff33": {
+ "first_message_index": 1,
+ "forwarded_count": 1,
+ "is_verified": false,
+ "session_data": "SSBBTSBBIEZJU0gK"
+ }
+ }
+ }
+ }
+ }
+
+ Raises:
+ SynapseError: with code 404 if there are no versions defined
+ RoomKeysVersionError: if the uploaded version is not the current version
+ """
+
+ # TODO: Validate the JSON to make sure it has the right keys.
+
+ # XXX: perhaps we should use a finer grained lock here?
+ with (yield self._upload_linearizer.queue(user_id)):
+
+ # Check that the version we're trying to upload is the current version
+ try:
+ version_info = yield self.store.get_e2e_room_keys_version_info(user_id)
+ except StoreError as e:
+ if e.code == 404:
+ raise SynapseError(404, "Version '%s' not found" % (version,))
+ else:
+ raise
+
+ if version_info['version'] != version:
+ # Check that the version we're trying to upload actually exists
+ try:
+ version_info = yield self.store.get_e2e_room_keys_version_info(
+ user_id, version,
+ )
+ # if we get this far, the version must exist
+ raise RoomKeysVersionError(current_version=version_info['version'])
+ except StoreError as e:
+ if e.code == 404:
+ raise SynapseError(404, "Version '%s' not found" % (version,))
+ else:
+ raise
+
+ # go through the room_keys.
+ # XXX: this should/could be done concurrently, given we're in a lock.
+ for room_id, room in iteritems(room_keys['rooms']):
+ for session_id, session in iteritems(room['sessions']):
+ yield self._upload_room_key(
+ user_id, version, room_id, session_id, session
+ )
+
+ @defer.inlineCallbacks
+ def _upload_room_key(self, user_id, version, room_id, session_id, room_key):
+ """Upload a given room_key for a given room and session into a given
+ version of the backup. Merges the key with any which might already exist.
+
+ Args:
+ user_id(str): the user whose backup we're setting
+ version(str): the version ID of the backup we're updating
+ room_id(str): the ID of the room whose keys we're setting
+ session_id(str): the session whose room_key we're setting
+ room_key(dict): the room_key being set
+ """
+
+ # get the room_key for this particular row
+ current_room_key = None
+ try:
+ current_room_key = yield self.store.get_e2e_room_key(
+ user_id, version, room_id, session_id
+ )
+ except StoreError as e:
+ if e.code == 404:
+ pass
+ else:
+ raise
+
+ if self._should_replace_room_key(current_room_key, room_key):
+ yield self.store.set_e2e_room_key(
+ user_id, version, room_id, session_id, room_key
+ )
+
+ @staticmethod
+ def _should_replace_room_key(current_room_key, room_key):
+ """
+ Determine whether to replace a given current_room_key (if any)
+ with a newly uploaded room_key backup
+
+ Args:
+ current_room_key (dict): Optional, the current room_key dict if any
+ room_key (dict): The new room_key dict which may or may not be fit to
+ replace the current_room_key
+
+ Returns:
+ True if current_room_key should be replaced by room_key in the backup
+ """
+
+ if current_room_key:
+ # spelt out with if/elifs rather than nested boolean expressions
+ # purely for legibility.
+
+ if room_key['is_verified'] and not current_room_key['is_verified']:
+ return True
+ elif (
+ room_key['first_message_index'] <
+ current_room_key['first_message_index']
+ ):
+ return True
+ elif room_key['forwarded_count'] < current_room_key['forwarded_count']:
+ return True
+ else:
+ return False
+ return True
+
+ @defer.inlineCallbacks
+ def create_version(self, user_id, version_info):
+ """Create a new backup version. This automatically becomes the new
+ backup version for the user's keys; previous backups will no longer be
+ writeable to.
+
+ Args:
+ user_id(str): the user whose backup version we're creating
+ version_info(dict): metadata about the new version being created
+
+ {
+ "algorithm": "m.megolm_backup.v1",
+ "auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K"
+ }
+
+ Returns:
+ A deferred of a string that gives the new version number.
+ """
+
+ # TODO: Validate the JSON to make sure it has the right keys.
+
+ # lock everyone out until we've switched version
+ with (yield self._upload_linearizer.queue(user_id)):
+ new_version = yield self.store.create_e2e_room_keys_version(
+ user_id, version_info
+ )
+ defer.returnValue(new_version)
+
+ @defer.inlineCallbacks
+ def get_version_info(self, user_id, version=None):
+ """Get the info about a given version of the user's backup
+
+ Args:
+ user_id(str): the user whose current backup version we're querying
+ version(str): Optional; if None gives the most recent version
+ otherwise a historical one.
+ Raises:
+ StoreError: code 404 if the requested backup version doesn't exist
+ Returns:
+ A deferred of a info dict that gives the info about the new version.
+
+ {
+ "version": "1234",
+ "algorithm": "m.megolm_backup.v1",
+ "auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K"
+ }
+ """
+
+ with (yield self._upload_linearizer.queue(user_id)):
+ res = yield self.store.get_e2e_room_keys_version_info(user_id, version)
+ defer.returnValue(res)
+
+ @defer.inlineCallbacks
+ def delete_version(self, user_id, version=None):
+ """Deletes a given version of the user's e2e_room_keys backup
+
+ Args:
+ user_id(str): the user whose current backup version we're deleting
+ version(str): the version id of the backup being deleted
+ Raises:
+ StoreError: code 404 if this backup version doesn't exist
+ """
+
+ with (yield self._upload_linearizer.queue(user_id)):
+ yield self.store.delete_e2e_room_keys_version(user_id, version)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 45d955e6f5..cd5b9bbb19 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -53,7 +53,7 @@ from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet,
)
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
-from synapse.state import resolve_events_with_factory
+from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.types import UserID, get_domain_from_id
from synapse.util import logcontext, unwrapFirstError
from synapse.util.async_helpers import Linearizer
@@ -309,8 +309,8 @@ class FederationHandler(BaseHandler):
if sent_to_us_directly:
logger.warn(
- "[%s %s] Failed to fetch %d prev events: rejecting",
- room_id, event_id, len(prevs - seen),
+ "[%s %s] Rejecting: failed to fetch %d prev events: %s",
+ room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
)
raise FederationError(
"ERROR",
@@ -384,24 +384,24 @@ class FederationHandler(BaseHandler):
for x in remote_state:
event_map[x.event_id] = x
- # Resolve any conflicting state
- @defer.inlineCallbacks
- def fetch(ev_ids):
- fetched = yield self.store.get_events(
- ev_ids, get_prev_content=False, check_redacted=False,
- )
- # add any events we fetch here to the `event_map` so that we
- # can use them to build the state event list below.
- event_map.update(fetched)
- defer.returnValue(fetched)
-
room_version = yield self.store.get_room_version(room_id)
- state_map = yield resolve_events_with_factory(
- room_version, state_maps, event_map, fetch,
+ state_map = yield resolve_events_with_store(
+ room_version, state_maps, event_map,
+ state_res_store=StateResolutionStore(self.store),
)
- # we need to give _process_received_pdu the actual state events
+ # We need to give _process_received_pdu the actual state events
# rather than event ids, so generate that now.
+
+ # First though we need to fetch all the events that are in
+ # state_map, so we can build up the state below.
+ evs = yield self.store.get_events(
+ list(state_map.values()),
+ get_prev_content=False,
+ check_redacted=False,
+ )
+ event_map.update(evs)
+
state = [
event_map[e] for e in six.itervalues(state_map)
]
@@ -452,8 +452,8 @@ class FederationHandler(BaseHandler):
latest |= seen
logger.info(
- "[%s %s]: Requesting %d prev_events: %s",
- room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
+ "[%s %s]: Requesting missing events between %s and %s",
+ room_id, event_id, shortstr(latest), event_id,
)
# XXX: we set timeout to 10s to help workaround
@@ -1852,7 +1852,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def on_get_missing_events(self, origin, room_id, earliest_events,
- latest_events, limit, min_depth):
+ latest_events, limit):
in_room = yield self.auth.check_host_in_room(
room_id,
origin
@@ -1861,14 +1861,12 @@ class FederationHandler(BaseHandler):
raise AuthError(403, "Host not in room.")
limit = min(limit, 20)
- min_depth = max(min_depth, 0)
missing_events = yield self.store.get_missing_events(
room_id=room_id,
earliest_events=earliest_events,
latest_events=latest_events,
limit=limit,
- min_depth=min_depth,
)
missing_events = yield filter_events_for_server(
@@ -2522,7 +2520,7 @@ class FederationHandler(BaseHandler):
if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
- self._notify_persisted_event(event, max_stream_id)
+ yield self._notify_persisted_event(event, max_stream_id)
def _notify_persisted_event(self, event, max_stream_id):
"""Checks to see if notifier/pushers should be notified about the
@@ -2555,7 +2553,7 @@ class FederationHandler(BaseHandler):
extra_users=extra_users
)
- self.pusher_pool.on_new_notifications(
+ return self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id,
)
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 53e5e2648b..173315af6c 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -20,7 +20,7 @@ from six import iteritems
from twisted.internet import defer
-from synapse.api.errors import SynapseError
+from synapse.api.errors import HttpResponseException, SynapseError
from synapse.types import get_domain_from_id
logger = logging.getLogger(__name__)
@@ -37,9 +37,23 @@ def _create_rerouter(func_name):
)
else:
destination = get_domain_from_id(group_id)
- return getattr(self.transport_client, func_name)(
+ d = getattr(self.transport_client, func_name)(
destination, group_id, *args, **kwargs
)
+
+ # Capture errors returned by the remote homeserver and
+ # re-throw specific errors as SynapseErrors. This is so
+ # when the remote end responds with things like 403 Not
+ # In Group, we can communicate that to the client instead
+ # of a 500.
+ def h(failure):
+ failure.trap(HttpResponseException)
+ e = failure.value
+ if e.code == 403:
+ raise e.to_synapse_error()
+ return failure
+ d.addErrback(h)
+ return d
return f
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 4954b23a0d..6c4fcfb10a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -779,7 +779,7 @@ class EventCreationHandler(object):
event, context=context
)
- self.pusher_pool.on_new_notifications(
+ yield self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id,
)
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index a6f3181f09..4c2690ba26 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -119,7 +119,7 @@ class ReceiptsHandler(BaseHandler):
"receipt_key", max_batch_id, rooms=affected_room_ids
)
# Note that the min here shouldn't be relied upon to be accurate.
- self.hs.get_pusherpool().on_new_receipts(
+ yield self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids,
)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index c3f820b975..ab1571b27b 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -190,10 +190,11 @@ class RoomCreationHandler(BaseHandler):
if room_alias:
directory_handler = self.hs.get_handlers().directory_handler
yield directory_handler.create_association(
- user_id=user_id,
+ requester=requester,
room_id=room_id,
room_alias=room_alias,
servers=[self.hs.hostname],
+ send_event=False,
)
preset_config = config.get(
@@ -289,7 +290,7 @@ class RoomCreationHandler(BaseHandler):
if room_alias:
result["room_alias"] = room_alias.to_string()
yield directory_handler.send_room_alias_update_event(
- requester, user_id, room_id
+ requester, room_id
)
defer.returnValue(result)
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 38e1737ec9..dc88620885 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -16,7 +16,7 @@
import logging
from collections import namedtuple
-from six import iteritems
+from six import PY3, iteritems
from six.moves import range
import msgpack
@@ -444,9 +444,16 @@ class RoomListNextBatch(namedtuple("RoomListNextBatch", (
@classmethod
def from_token(cls, token):
+ if PY3:
+ # The argument raw=False is only available on new versions of
+ # msgpack, and only really needed on Python 3. Gate it behind
+ # a PY3 check to avoid causing issues on Debian-packaged versions.
+ decoded = msgpack.loads(decode_base64(token), raw=False)
+ else:
+ decoded = msgpack.loads(decode_base64(token))
return RoomListNextBatch(**{
cls.REVERSE_KEY_DICT[key]: val
- for key, val in msgpack.loads(decode_base64(token)).items()
+ for key, val in decoded.items()
})
def to_token(self):
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 67b8ca28c7..351892a94f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -20,6 +20,8 @@ import logging
from six import iteritems, itervalues
+from prometheus_client import Counter
+
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
@@ -36,6 +38,19 @@ from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__)
+
+# Counts the number of times we returned a non-empty sync. `type` is one of
+# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
+# "true" or "false" depending on if the request asked for lazy loaded members or
+# not.
+non_empty_sync_counter = Counter(
+ "synapse_handlers_sync_nonempty_total",
+ "Count of non empty sync responses. type is initial_sync/full_state_sync"
+ "/incremental_sync. lazy_loaded indicates if lazy loaded members were "
+ "enabled for that request.",
+ ["type", "lazy_loaded"],
+)
+
# Store the cache that tracks which lazy-loaded members have been sent to a given
# client for no more than 30 minutes.
LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
@@ -227,14 +242,16 @@ class SyncHandler(object):
@defer.inlineCallbacks
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
full_state):
+ if since_token is None:
+ sync_type = "initial_sync"
+ elif full_state:
+ sync_type = "full_state_sync"
+ else:
+ sync_type = "incremental_sync"
+
context = LoggingContext.current_context()
if context:
- if since_token is None:
- context.tag = "initial_sync"
- elif full_state:
- context.tag = "full_state_sync"
- else:
- context.tag = "incremental_sync"
+ context.tag = sync_type
if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
@@ -242,7 +259,6 @@ class SyncHandler(object):
result = yield self.current_sync_for_user(
sync_config, since_token, full_state=full_state,
)
- defer.returnValue(result)
else:
def current_sync_callback(before_token, after_token):
return self.current_sync_for_user(sync_config, since_token)
@@ -251,7 +267,15 @@ class SyncHandler(object):
sync_config.user.to_string(), timeout, current_sync_callback,
from_token=since_token,
)
- defer.returnValue(result)
+
+ if result:
+ if sync_config.filter_collection.lazy_load_members():
+ lazy_loaded = "true"
+ else:
+ lazy_loaded = "false"
+ non_empty_sync_counter.labels(sync_type, lazy_loaded).inc()
+
+ defer.returnValue(result)
def current_sync_for_user(self, sync_config, since_token=None,
full_state=False):
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index d8413d6aa7..f11b430126 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -20,6 +20,7 @@ from six import iteritems
from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.roommember import ProfileInfo
from synapse.types import get_localpart_from_id
from synapse.util.metrics import Measure
@@ -98,7 +99,6 @@ class UserDirectoryHandler(object):
"""
return self.store.search_user_dir(user_id, search_term, limit)
- @defer.inlineCallbacks
def notify_new_event(self):
"""Called when there may be more deltas to process
"""
@@ -108,11 +108,15 @@ class UserDirectoryHandler(object):
if self._is_processing:
return
+ @defer.inlineCallbacks
+ def process():
+ try:
+ yield self._unsafe_process()
+ finally:
+ self._is_processing = False
+
self._is_processing = True
- try:
- yield self._unsafe_process()
- finally:
- self._is_processing = False
+ run_as_background_process("user_directory.notify_new_event", process)
@defer.inlineCallbacks
def handle_local_profile_change(self, user_id, profile):
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 14b12cd1c4..24b6110c20 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -195,7 +195,7 @@ class MatrixFederationHttpClient(object):
)
self.clock = hs.get_clock()
self._store = hs.get_datastore()
- self.version_string = hs.version_string.encode('ascii')
+ self.version_string_bytes = hs.version_string.encode('ascii')
self.default_timeout = 60
def schedule(x):
@@ -230,7 +230,7 @@ class MatrixFederationHttpClient(object):
Returns:
Deferred: resolves with the http response object on success.
- Fails with ``HTTPRequestException``: if we get an HTTP response
+ Fails with ``HttpResponseException``: if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -261,8 +261,8 @@ class MatrixFederationHttpClient(object):
ignore_backoff=ignore_backoff,
)
- method = request.method
- destination = request.destination
+ method_bytes = request.method.encode("ascii")
+ destination_bytes = request.destination.encode("ascii")
path_bytes = request.path.encode("ascii")
if request.query:
query_bytes = encode_query_args(request.query)
@@ -270,8 +270,8 @@ class MatrixFederationHttpClient(object):
query_bytes = b""
headers_dict = {
- "User-Agent": [self.version_string],
- "Host": [request.destination],
+ b"User-Agent": [self.version_string_bytes],
+ b"Host": [destination_bytes],
}
with limiter:
@@ -282,50 +282,51 @@ class MatrixFederationHttpClient(object):
else:
retries_left = MAX_SHORT_RETRIES
- url = urllib.parse.urlunparse((
- b"matrix", destination.encode("ascii"),
+ url_bytes = urllib.parse.urlunparse((
+ b"matrix", destination_bytes,
path_bytes, None, query_bytes, b"",
- )).decode('ascii')
+ ))
+ url_str = url_bytes.decode('ascii')
- http_url = urllib.parse.urlunparse((
+ url_to_sign_bytes = urllib.parse.urlunparse((
b"", b"",
path_bytes, None, query_bytes, b"",
- )).decode('ascii')
+ ))
while True:
try:
json = request.get_json()
if json:
- data = encode_canonical_json(json)
- headers_dict["Content-Type"] = ["application/json"]
+ headers_dict[b"Content-Type"] = [b"application/json"]
self.sign_request(
- destination, method, http_url, headers_dict, json
+ destination_bytes, method_bytes, url_to_sign_bytes,
+ headers_dict, json,
)
- else:
- data = None
- self.sign_request(destination, method, http_url, headers_dict)
-
- logger.info(
- "{%s} [%s] Sending request: %s %s",
- request.txn_id, destination, method, url
- )
-
- if data:
+ data = encode_canonical_json(json)
producer = FileBodyProducer(
BytesIO(data),
- cooperator=self._cooperator
+ cooperator=self._cooperator,
)
else:
producer = None
+ self.sign_request(
+ destination_bytes, method_bytes, url_to_sign_bytes,
+ headers_dict,
+ )
- request_deferred = treq.request(
- method,
- url,
+ logger.info(
+ "{%s} [%s] Sending request: %s %s",
+ request.txn_id, request.destination, request.method,
+ url_str,
+ )
+
+ # we don't want all the fancy cookie and redirect handling that
+ # treq.request gives: just use the raw Agent.
+ request_deferred = self.agent.request(
+ method_bytes,
+ url_bytes,
headers=Headers(headers_dict),
- data=producer,
- agent=self.agent,
- reactor=self.hs.get_reactor(),
- unbuffered=True
+ bodyProducer=producer,
)
request_deferred = timeout_deferred(
@@ -344,9 +345,9 @@ class MatrixFederationHttpClient(object):
logger.warn(
"{%s} [%s] Request failed: %s %s: %s",
request.txn_id,
- destination,
- method,
- url,
+ request.destination,
+ request.method,
+ url_str,
_flatten_response_never_received(e),
)
@@ -366,7 +367,7 @@ class MatrixFederationHttpClient(object):
logger.debug(
"{%s} [%s] Waiting %ss before re-sending...",
request.txn_id,
- destination,
+ request.destination,
delay,
)
@@ -378,7 +379,7 @@ class MatrixFederationHttpClient(object):
logger.info(
"{%s} [%s] Got response headers: %d %s",
request.txn_id,
- destination,
+ request.destination,
response.code,
response.phrase.decode('ascii', errors='replace'),
)
@@ -411,8 +412,9 @@ class MatrixFederationHttpClient(object):
destination_is must be non-None.
method (bytes): The HTTP method of the request
url_bytes (bytes): The URI path of the request
- headers_dict (dict): Dictionary of request headers to append to
- content (bytes): The body of the request
+ headers_dict (dict[bytes, list[bytes]]): Dictionary of request headers to
+ append to
+ content (object): The body of the request
destination_is (bytes): As 'destination', but if the destination is an
identity server
@@ -478,7 +480,7 @@ class MatrixFederationHttpClient(object):
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
- Fails with ``HTTPRequestException`` if we get an HTTP response
+ Fails with ``HttpResponseException`` if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -532,7 +534,7 @@ class MatrixFederationHttpClient(object):
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
- Fails with ``HTTPRequestException`` if we get an HTTP response
+ Fails with ``HttpResponseException`` if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -587,7 +589,7 @@ class MatrixFederationHttpClient(object):
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
- Fails with ``HTTPRequestException`` if we get an HTTP response
+ Fails with ``HttpResponseException`` if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -638,7 +640,7 @@ class MatrixFederationHttpClient(object):
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
- Fails with ``HTTPRequestException`` if we get an HTTP response
+ Fails with ``HttpResponseException`` if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@@ -682,7 +684,7 @@ class MatrixFederationHttpClient(object):
Deferred: resolves with an (int,dict) tuple of the file length and
a dict of the response headers.
- Fails with ``HTTPRequestException`` if we get an HTTP response code
+ Fails with ``HttpResponseException`` if we get an HTTP response code
>= 300
Fails with ``NotRetryingDestination`` if we are not yet ready
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index fedb4e6b18..62045a918b 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -39,7 +39,8 @@ outgoing_responses_counter = Counter(
)
response_timer = Histogram(
- "synapse_http_server_response_time_seconds", "sec",
+ "synapse_http_server_response_time_seconds",
+ "sec",
["method", "servlet", "tag", "code"],
)
@@ -79,15 +80,11 @@ response_size = Counter(
# than when the response was written.
in_flight_requests_ru_utime = Counter(
- "synapse_http_server_in_flight_requests_ru_utime_seconds",
- "",
- ["method", "servlet"],
+ "synapse_http_server_in_flight_requests_ru_utime_seconds", "", ["method", "servlet"]
)
in_flight_requests_ru_stime = Counter(
- "synapse_http_server_in_flight_requests_ru_stime_seconds",
- "",
- ["method", "servlet"],
+ "synapse_http_server_in_flight_requests_ru_stime_seconds", "", ["method", "servlet"]
)
in_flight_requests_db_txn_count = Counter(
@@ -134,7 +131,7 @@ def _get_in_flight_counts():
# type
counts = {}
for rm in reqs:
- key = (rm.method, rm.name,)
+ key = (rm.method, rm.name)
counts[key] = counts.get(key, 0) + 1
return counts
@@ -175,7 +172,8 @@ class RequestMetrics(object):
if context != self.start_context:
logger.warn(
"Context have unexpectedly changed %r, %r",
- context, self.start_context
+ context,
+ self.start_context,
)
return
@@ -192,10 +190,10 @@ class RequestMetrics(object):
resource_usage = context.get_resource_usage()
response_ru_utime.labels(self.method, self.name, tag).inc(
- resource_usage.ru_utime,
+ resource_usage.ru_utime
)
response_ru_stime.labels(self.method, self.name, tag).inc(
- resource_usage.ru_stime,
+ resource_usage.ru_stime
)
response_db_txn_count.labels(self.method, self.name, tag).inc(
resource_usage.db_txn_count
@@ -222,8 +220,15 @@ class RequestMetrics(object):
diff = new_stats - self._request_stats
self._request_stats = new_stats
- in_flight_requests_ru_utime.labels(self.method, self.name).inc(diff.ru_utime)
- in_flight_requests_ru_stime.labels(self.method, self.name).inc(diff.ru_stime)
+ # max() is used since rapid use of ru_stime/ru_utime can end up with the
+ # count going backwards due to NTP, time smearing, fine-grained
+ # correction, or floating points. Who knows, really?
+ in_flight_requests_ru_utime.labels(self.method, self.name).inc(
+ max(diff.ru_utime, 0)
+ )
+ in_flight_requests_ru_stime.labels(self.method, self.name).inc(
+ max(diff.ru_stime, 0)
+ )
in_flight_requests_db_txn_count.labels(self.method, self.name).inc(
diff.db_txn_count
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 340b16ce25..de02b1017e 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -186,9 +186,9 @@ class Notifier(object):
def count_listeners():
all_user_streams = set()
- for x in self.room_to_user_streams.values():
+ for x in list(self.room_to_user_streams.values()):
all_user_streams |= x
- for x in self.user_to_user_stream.values():
+ for x in list(self.user_to_user_stream.values()):
all_user_streams.add(x)
return sum(stream.count_listeners() for stream in all_user_streams)
@@ -196,7 +196,7 @@ class Notifier(object):
LaterGauge(
"synapse_notifier_rooms", "", [],
- lambda: count(bool, self.room_to_user_streams.values()),
+ lambda: count(bool, list(self.room_to_user_streams.values())),
)
LaterGauge(
"synapse_notifier_users", "", [],
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index d746371420..f369124258 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -18,8 +18,7 @@ import logging
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
-from synapse.util.logcontext import LoggingContext
-from synapse.util.metrics import Measure
+from synapse.metrics.background_process_metrics import run_as_background_process
logger = logging.getLogger(__name__)
@@ -71,18 +70,11 @@ class EmailPusher(object):
# See httppusher
self.max_stream_ordering = None
- self.processing = False
+ self._is_processing = False
- @defer.inlineCallbacks
def on_started(self):
if self.mailer is not None:
- try:
- self.throttle_params = yield self.store.get_throttle_params_by_room(
- self.pusher_id
- )
- yield self._process()
- except Exception:
- logger.exception("Error starting email pusher")
+ self._start_processing()
def on_stop(self):
if self.timed_call:
@@ -92,43 +84,52 @@ class EmailPusher(object):
pass
self.timed_call = None
- @defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
- yield self._process()
+ self._start_processing()
def on_new_receipts(self, min_stream_id, max_stream_id):
# We could wake up and cancel the timer but there tend to be quite a
# lot of read receipts so it's probably less work to just let the
# timer fire
- return defer.succeed(None)
+ pass
- @defer.inlineCallbacks
def on_timer(self):
self.timed_call = None
- yield self._process()
+ self._start_processing()
+
+ def _start_processing(self):
+ if self._is_processing:
+ return
+
+ run_as_background_process("emailpush.process", self._process)
@defer.inlineCallbacks
def _process(self):
- if self.processing:
- return
+ # we should never get here if we are already processing
+ assert not self._is_processing
+
+ try:
+ self._is_processing = True
+
+ if self.throttle_params is None:
+ # this is our first loop: load up the throttle params
+ self.throttle_params = yield self.store.get_throttle_params_by_room(
+ self.pusher_id
+ )
- with LoggingContext("emailpush._process"):
- with Measure(self.clock, "emailpush._process"):
+ # if the max ordering changes while we're running _unsafe_process,
+ # call it again, and so on until we've caught up.
+ while True:
+ starting_max_ordering = self.max_stream_ordering
try:
- self.processing = True
- # if the max ordering changes while we're running _unsafe_process,
- # call it again, and so on until we've caught up.
- while True:
- starting_max_ordering = self.max_stream_ordering
- try:
- yield self._unsafe_process()
- except Exception:
- logger.exception("Exception processing notifs")
- if self.max_stream_ordering == starting_max_ordering:
- break
- finally:
- self.processing = False
+ yield self._unsafe_process()
+ except Exception:
+ logger.exception("Exception processing notifs")
+ if self.max_stream_ordering == starting_max_ordering:
+ break
+ finally:
+ self._is_processing = False
@defer.inlineCallbacks
def _unsafe_process(self):
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 48abd5e4d6..6bd703632d 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -22,9 +22,8 @@ from prometheus_client import Counter
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException
-from synapse.util.logcontext import LoggingContext
-from synapse.util.metrics import Measure
from . import push_rule_evaluator, push_tools
@@ -61,7 +60,7 @@ class HttpPusher(object):
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.failing_since = pusherdict['failing_since']
self.timed_call = None
- self.processing = False
+ self._is_processing = False
# This is the highest stream ordering we know it's safe to process.
# When new events arrive, we'll be given a window of new events: we
@@ -92,34 +91,27 @@ class HttpPusher(object):
self.data_minus_url.update(self.data)
del self.data_minus_url['url']
- @defer.inlineCallbacks
def on_started(self):
- try:
- yield self._process()
- except Exception:
- logger.exception("Error starting http pusher")
+ self._start_processing()
- @defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)
- yield self._process()
+ self._start_processing()
- @defer.inlineCallbacks
def on_new_receipts(self, min_stream_id, max_stream_id):
# Note that the min here shouldn't be relied upon to be accurate.
# We could check the receipts are actually m.read receipts here,
# but currently that's the only type of receipt anyway...
- with LoggingContext("push.on_new_receipts"):
- with Measure(self.clock, "push.on_new_receipts"):
- badge = yield push_tools.get_badge_count(
- self.hs.get_datastore(), self.user_id
- )
- yield self._send_badge(badge)
+ run_as_background_process("http_pusher.on_new_receipts", self._update_badge)
@defer.inlineCallbacks
+ def _update_badge(self):
+ badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
+ yield self._send_badge(badge)
+
def on_timer(self):
- yield self._process()
+ self._start_processing()
def on_stop(self):
if self.timed_call:
@@ -129,27 +121,31 @@ class HttpPusher(object):
pass
self.timed_call = None
+ def _start_processing(self):
+ if self._is_processing:
+ return
+
+ run_as_background_process("httppush.process", self._process)
+
@defer.inlineCallbacks
def _process(self):
- if self.processing:
- return
+ # we should never get here if we are already processing
+ assert not self._is_processing
- with LoggingContext("push._process"):
- with Measure(self.clock, "push._process"):
+ try:
+ self._is_processing = True
+ # if the max ordering changes while we're running _unsafe_process,
+ # call it again, and so on until we've caught up.
+ while True:
+ starting_max_ordering = self.max_stream_ordering
try:
- self.processing = True
- # if the max ordering changes while we're running _unsafe_process,
- # call it again, and so on until we've caught up.
- while True:
- starting_max_ordering = self.max_stream_ordering
- try:
- yield self._unsafe_process()
- except Exception:
- logger.exception("Exception processing notifs")
- if self.max_stream_ordering == starting_max_ordering:
- break
- finally:
- self.processing = False
+ yield self._unsafe_process()
+ except Exception:
+ logger.exception("Exception processing notifs")
+ if self.max_stream_ordering == starting_max_ordering:
+ break
+ finally:
+ self._is_processing = False
@defer.inlineCallbacks
def _unsafe_process(self):
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 1a5a10d974..16fb5e8471 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -526,8 +526,7 @@ def load_jinja2_templates(config):
Returns:
(notif_template_html, notif_template_text)
"""
- logger.info("loading jinja2")
-
+ logger.info("loading email templates from '%s'", config.email_template_dir)
loader = jinja2.FileSystemLoader(config.email_template_dir)
env = jinja2.Environment(loader=loader)
env.filters["format_ts"] = format_ts_filter
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 9f7d5ef217..5a4e73ccd6 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -20,24 +20,39 @@ from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push.pusher import PusherFactory
-from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
class PusherPool:
+ """
+ The pusher pool. This is responsible for dispatching notifications of new events to
+ the http and email pushers.
+
+ It provides three methods which are designed to be called by the rest of the
+ application: `start`, `on_new_notifications`, and `on_new_receipts`: each of these
+ delegates to each of the relevant pushers.
+
+ Note that it is expected that each pusher will have its own 'processing' loop which
+ will send out the notifications in the background, rather than blocking until the
+ notifications are sent; accordingly Pusher.on_started, Pusher.on_new_notifications and
+ Pusher.on_new_receipts are not expected to return deferreds.
+ """
def __init__(self, _hs):
self.hs = _hs
self.pusher_factory = PusherFactory(_hs)
- self.start_pushers = _hs.config.start_pushers
+ self._should_start_pushers = _hs.config.start_pushers
self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock()
self.pushers = {}
- @defer.inlineCallbacks
def start(self):
- pushers = yield self.store.get_all_pushers()
- self._start_pushers(pushers)
+ """Starts the pushers off in a background process.
+ """
+ if not self._should_start_pushers:
+ logger.info("Not starting pushers because they are disabled in the config")
+ return
+ run_as_background_process("start_pushers", self._start_pushers)
@defer.inlineCallbacks
def add_pusher(self, user_id, access_token, kind, app_id,
@@ -86,7 +101,7 @@ class PusherPool:
last_stream_ordering=last_stream_ordering,
profile_tag=profile_tag,
)
- yield self._refresh_pusher(app_id, pushkey, user_id)
+ yield self.start_pusher_by_id(app_id, pushkey, user_id)
@defer.inlineCallbacks
def remove_pushers_by_app_id_and_pushkey_not_user(self, app_id, pushkey,
@@ -123,45 +138,23 @@ class PusherPool:
p['app_id'], p['pushkey'], p['user_name'],
)
- def on_new_notifications(self, min_stream_id, max_stream_id):
- run_as_background_process(
- "on_new_notifications",
- self._on_new_notifications, min_stream_id, max_stream_id,
- )
-
@defer.inlineCallbacks
- def _on_new_notifications(self, min_stream_id, max_stream_id):
+ def on_new_notifications(self, min_stream_id, max_stream_id):
try:
users_affected = yield self.store.get_push_action_users_in_range(
min_stream_id, max_stream_id
)
- deferreds = []
-
for u in users_affected:
if u in self.pushers:
for p in self.pushers[u].values():
- deferreds.append(
- run_in_background(
- p.on_new_notifications,
- min_stream_id, max_stream_id,
- )
- )
-
- yield make_deferred_yieldable(
- defer.gatherResults(deferreds, consumeErrors=True),
- )
+ p.on_new_notifications(min_stream_id, max_stream_id)
+
except Exception:
logger.exception("Exception in pusher on_new_notifications")
- def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
- run_as_background_process(
- "on_new_receipts",
- self._on_new_receipts, min_stream_id, max_stream_id, affected_room_ids,
- )
-
@defer.inlineCallbacks
- def _on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
+ def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
try:
# Need to subtract 1 from the minimum because the lower bound here
# is not inclusive
@@ -171,26 +164,20 @@ class PusherPool:
# This returns a tuple, user_id is at index 3
users_affected = set([r[3] for r in updated_receipts])
- deferreds = []
-
for u in users_affected:
if u in self.pushers:
for p in self.pushers[u].values():
- deferreds.append(
- run_in_background(
- p.on_new_receipts,
- min_stream_id, max_stream_id,
- )
- )
-
- yield make_deferred_yieldable(
- defer.gatherResults(deferreds, consumeErrors=True),
- )
+ p.on_new_receipts(min_stream_id, max_stream_id)
+
except Exception:
logger.exception("Exception in pusher on_new_receipts")
@defer.inlineCallbacks
- def _refresh_pusher(self, app_id, pushkey, user_id):
+ def start_pusher_by_id(self, app_id, pushkey, user_id):
+ """Look up the details for the given pusher, and start it"""
+ if not self._should_start_pushers:
+ return
+
resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(
app_id, pushkey
)
@@ -201,33 +188,49 @@ class PusherPool:
p = r
if p:
+ self._start_pusher(p)
- self._start_pushers([p])
+ @defer.inlineCallbacks
+ def _start_pushers(self):
+ """Start all the pushers
- def _start_pushers(self, pushers):
- if not self.start_pushers:
- logger.info("Not starting pushers because they are disabled in the config")
- return
+ Returns:
+ Deferred
+ """
+ pushers = yield self.store.get_all_pushers()
logger.info("Starting %d pushers", len(pushers))
for pusherdict in pushers:
- try:
- p = self.pusher_factory.create_pusher(pusherdict)
- except Exception:
- logger.exception("Couldn't start a pusher: caught Exception")
- continue
- if p:
- appid_pushkey = "%s:%s" % (
- pusherdict['app_id'],
- pusherdict['pushkey'],
- )
- byuser = self.pushers.setdefault(pusherdict['user_name'], {})
+ self._start_pusher(pusherdict)
+ logger.info("Started pushers")
- if appid_pushkey in byuser:
- byuser[appid_pushkey].on_stop()
- byuser[appid_pushkey] = p
- run_in_background(p.on_started)
+ def _start_pusher(self, pusherdict):
+ """Start the given pusher
- logger.info("Started pushers")
+ Args:
+ pusherdict (dict):
+
+ Returns:
+ None
+ """
+ try:
+ p = self.pusher_factory.create_pusher(pusherdict)
+ except Exception:
+ logger.exception("Couldn't start a pusher: caught Exception")
+ return
+
+ if not p:
+ return
+
+ appid_pushkey = "%s:%s" % (
+ pusherdict['app_id'],
+ pusherdict['pushkey'],
+ )
+ byuser = self.pushers.setdefault(pusherdict['user_name'], {})
+
+ if appid_pushkey in byuser:
+ byuser[appid_pushkey].on_stop()
+ byuser[appid_pushkey] = p
+ p.on_started()
@defer.inlineCallbacks
def remove_pusher(self, app_id, pushkey, user_id):
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index d4d983b00a..943876456b 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -53,9 +53,10 @@ REQUIREMENTS = {
"pillow>=3.1.2": ["PIL"],
"pydenticon>=0.2": ["pydenticon"],
"sortedcontainers>=1.4.4": ["sortedcontainers"],
+ "psutil>=2.0.0": ["psutil>=2.0.0"],
"pysaml2>=3.0.0": ["saml2"],
"pymacaroons-pynacl>=0.9.3": ["pymacaroons"],
- "msgpack-python>=0.3.0": ["msgpack"],
+ "msgpack-python>=0.4.2": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"],
"six>=1.10": ["six"],
@@ -79,12 +80,6 @@ CONDITIONAL_REQUIREMENTS = {
"matrix-synapse-ldap3": {
"matrix-synapse-ldap3>=0.1": ["ldap_auth_provider"],
},
- "psutil": {
- "psutil>=2.0.0": ["psutil>=2.0.0"],
- },
- "affinity": {
- "affinity": ["affinity"],
- },
"postgres": {
"psycopg2>=2.6": ["psycopg2"]
}
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index 3f7be74e02..2d81d49e9a 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -15,6 +15,8 @@
import logging
+import six
+
from synapse.storage._base import SQLBaseStore
from synapse.storage.engines import PostgresEngine
@@ -23,6 +25,13 @@ from ._slaved_id_tracker import SlavedIdTracker
logger = logging.getLogger(__name__)
+def __func__(inp):
+ if six.PY3:
+ return inp
+ else:
+ return inp.__func__
+
+
class BaseSlavedStore(SQLBaseStore):
def __init__(self, db_conn, hs):
super(BaseSlavedStore, self).__init__(db_conn, hs)
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index 87eaa53004..4f19fd35aa 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -17,7 +17,7 @@ from synapse.storage import DataStore
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.stream_change_cache import StreamChangeCache
-from ._base import BaseSlavedStore
+from ._base import BaseSlavedStore, __func__
from ._slaved_id_tracker import SlavedIdTracker
@@ -43,11 +43,11 @@ class SlavedDeviceInboxStore(BaseSlavedStore):
expiry_ms=30 * 60 * 1000,
)
- get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__
- get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__
- get_new_device_msgs_for_remote = DataStore.get_new_device_msgs_for_remote.__func__
- delete_messages_for_device = DataStore.delete_messages_for_device.__func__
- delete_device_msgs_for_remote = DataStore.delete_device_msgs_for_remote.__func__
+ get_to_device_stream_token = __func__(DataStore.get_to_device_stream_token)
+ get_new_messages_for_device = __func__(DataStore.get_new_messages_for_device)
+ get_new_device_msgs_for_remote = __func__(DataStore.get_new_device_msgs_for_remote)
+ delete_messages_for_device = __func__(DataStore.delete_messages_for_device)
+ delete_device_msgs_for_remote = __func__(DataStore.delete_device_msgs_for_remote)
def stream_positions(self):
result = super(SlavedDeviceInboxStore, self).stream_positions()
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index 21b8c468fa..ec2fd561cc 100644
--- a/synapse/replication/slave/storage/devices.py
+++ b/synapse/replication/slave/storage/devices.py
@@ -13,23 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import six
-
from synapse.storage import DataStore
from synapse.storage.end_to_end_keys import EndToEndKeyStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
-from ._base import BaseSlavedStore
+from ._base import BaseSlavedStore, __func__
from ._slaved_id_tracker import SlavedIdTracker
-def __func__(inp):
- if six.PY3:
- return inp
- else:
- return inp.__func__
-
-
class SlavedDeviceStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedDeviceStore, self).__init__(db_conn, hs)
diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py
index 5777f07c8d..e933b170bb 100644
--- a/synapse/replication/slave/storage/groups.py
+++ b/synapse/replication/slave/storage/groups.py
@@ -16,7 +16,7 @@
from synapse.storage import DataStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
-from ._base import BaseSlavedStore
+from ._base import BaseSlavedStore, __func__
from ._slaved_id_tracker import SlavedIdTracker
@@ -33,9 +33,9 @@ class SlavedGroupServerStore(BaseSlavedStore):
"_group_updates_stream_cache", self._group_updates_id_gen.get_current_token(),
)
- get_groups_changes_for_user = DataStore.get_groups_changes_for_user.__func__
- get_group_stream_token = DataStore.get_group_stream_token.__func__
- get_all_groups_for_user = DataStore.get_all_groups_for_user.__func__
+ get_groups_changes_for_user = __func__(DataStore.get_groups_changes_for_user)
+ get_group_stream_token = __func__(DataStore.get_group_stream_token)
+ get_all_groups_for_user = __func__(DataStore.get_all_groups_for_user)
def stream_positions(self):
result = super(SlavedGroupServerStore, self).stream_positions()
diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py
index 05ed168463..8032f53fec 100644
--- a/synapse/replication/slave/storage/keys.py
+++ b/synapse/replication/slave/storage/keys.py
@@ -16,7 +16,7 @@
from synapse.storage import DataStore
from synapse.storage.keys import KeyStore
-from ._base import BaseSlavedStore
+from ._base import BaseSlavedStore, __func__
class SlavedKeyStore(BaseSlavedStore):
@@ -24,11 +24,11 @@ class SlavedKeyStore(BaseSlavedStore):
"_get_server_verify_key"
]
- get_server_verify_keys = DataStore.get_server_verify_keys.__func__
- store_server_verify_key = DataStore.store_server_verify_key.__func__
+ get_server_verify_keys = __func__(DataStore.get_server_verify_keys)
+ store_server_verify_key = __func__(DataStore.store_server_verify_key)
- get_server_certificate = DataStore.get_server_certificate.__func__
- store_server_certificate = DataStore.store_server_certificate.__func__
+ get_server_certificate = __func__(DataStore.get_server_certificate)
+ store_server_certificate = __func__(DataStore.store_server_certificate)
- get_server_keys_json = DataStore.get_server_keys_json.__func__
- store_server_keys_json = DataStore.store_server_keys_json.__func__
+ get_server_keys_json = __func__(DataStore.get_server_keys_json)
+ store_server_keys_json = __func__(DataStore.store_server_keys_json)
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
index 80b744082a..92447b00d4 100644
--- a/synapse/replication/slave/storage/presence.py
+++ b/synapse/replication/slave/storage/presence.py
@@ -17,7 +17,7 @@ from synapse.storage import DataStore
from synapse.storage.presence import PresenceStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
-from ._base import BaseSlavedStore
+from ._base import BaseSlavedStore, __func__
from ._slaved_id_tracker import SlavedIdTracker
@@ -34,8 +34,8 @@ class SlavedPresenceStore(BaseSlavedStore):
"PresenceStreamChangeCache", self._presence_id_gen.get_current_token()
)
- _get_active_presence = DataStore._get_active_presence.__func__
- take_presence_startup_info = DataStore.take_presence_startup_info.__func__
+ _get_active_presence = __func__(DataStore._get_active_presence)
+ take_presence_startup_info = __func__(DataStore.take_presence_startup_info)
_get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"]
get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"]
diff --git a/synapse/res/templates/mail-Vector.css b/synapse/res/templates/mail-Vector.css
new file mode 100644
index 0000000000..6a3e36eda1
--- /dev/null
+++ b/synapse/res/templates/mail-Vector.css
@@ -0,0 +1,7 @@
+.header {
+ border-bottom: 4px solid #e4f7ed ! important;
+}
+
+.notif_link a, .footer a {
+ color: #76CFA6 ! important;
+}
diff --git a/synapse/res/templates/mail.css b/synapse/res/templates/mail.css
new file mode 100644
index 0000000000..5ab3e1b06d
--- /dev/null
+++ b/synapse/res/templates/mail.css
@@ -0,0 +1,156 @@
+body {
+ margin: 0px;
+}
+
+pre, code {
+ word-break: break-word;
+ white-space: pre-wrap;
+}
+
+#page {
+ font-family: 'Open Sans', Helvetica, Arial, Sans-Serif;
+ font-color: #454545;
+ font-size: 12pt;
+ width: 100%;
+ padding: 20px;
+}
+
+#inner {
+ width: 640px;
+}
+
+.header {
+ width: 100%;
+ height: 87px;
+ color: #454545;
+ border-bottom: 4px solid #e5e5e5;
+}
+
+.logo {
+ text-align: right;
+ margin-left: 20px;
+}
+
+.salutation {
+ padding-top: 10px;
+ font-weight: bold;
+}
+
+.summarytext {
+}
+
+.room {
+ width: 100%;
+ color: #454545;
+ border-bottom: 1px solid #e5e5e5;
+}
+
+.room_header td {
+ padding-top: 38px;
+ padding-bottom: 10px;
+ border-bottom: 1px solid #e5e5e5;
+}
+
+.room_name {
+ vertical-align: middle;
+ font-size: 18px;
+ font-weight: bold;
+}
+
+.room_header h2 {
+ margin-top: 0px;
+ margin-left: 75px;
+ font-size: 20px;
+}
+
+.room_avatar {
+ width: 56px;
+ line-height: 0px;
+ text-align: center;
+ vertical-align: middle;
+}
+
+.room_avatar img {
+ width: 48px;
+ height: 48px;
+ object-fit: cover;
+ border-radius: 24px;
+}
+
+.notif {
+ border-bottom: 1px solid #e5e5e5;
+ margin-top: 16px;
+ padding-bottom: 16px;
+}
+
+.historical_message .sender_avatar {
+ opacity: 0.3;
+}
+
+/* spell out opacity and historical_message class names for Outlook aka Word */
+.historical_message .sender_name {
+ color: #e3e3e3;
+}
+
+.historical_message .message_time {
+ color: #e3e3e3;
+}
+
+.historical_message .message_body {
+ color: #c7c7c7;
+}
+
+.historical_message td,
+.message td {
+ padding-top: 10px;
+}
+
+.sender_avatar {
+ width: 56px;
+ text-align: center;
+ vertical-align: top;
+}
+
+.sender_avatar img {
+ margin-top: -2px;
+ width: 32px;
+ height: 32px;
+ border-radius: 16px;
+}
+
+.sender_name {
+ display: inline;
+ font-size: 13px;
+ color: #a2a2a2;
+}
+
+.message_time {
+ text-align: right;
+ width: 100px;
+ font-size: 11px;
+ color: #a2a2a2;
+}
+
+.message_body {
+}
+
+.notif_link td {
+ padding-top: 10px;
+ padding-bottom: 10px;
+ font-weight: bold;
+}
+
+.notif_link a, .footer a {
+ color: #454545;
+ text-decoration: none;
+}
+
+.debug {
+ font-size: 10px;
+ color: #888;
+}
+
+.footer {
+ margin-top: 20px;
+ text-align: center;
+}
\ No newline at end of file
diff --git a/synapse/res/templates/notif.html b/synapse/res/templates/notif.html
new file mode 100644
index 0000000000..88b921ca9c
--- /dev/null
+++ b/synapse/res/templates/notif.html
@@ -0,0 +1,45 @@
+{% for message in notif.messages %}
+ <tr class="{{ "historical_message" if message.is_historical else "message" }}">
+ <td class="sender_avatar">
+ {% if loop.index0 == 0 or notif.messages[loop.index0 - 1].sender_name != notif.messages[loop.index0].sender_name %}
+ {% if message.sender_avatar_url %}
+ <img alt="" class="sender_avatar" src="{{ message.sender_avatar_url|mxc_to_http(32,32) }}" />
+ {% else %}
+ {% if message.sender_hash % 3 == 0 %}
+ <img class="sender_avatar" src="https://vector.im/beta/img/76cfa6.png" />
+ {% elif message.sender_hash % 3 == 1 %}
+ <img class="sender_avatar" src="https://vector.im/beta/img/50e2c2.png" />
+ {% else %}
+ <img class="sender_avatar" src="https://vector.im/beta/img/f4c371.png" />
+ {% endif %}
+ {% endif %}
+ {% endif %}
+ </td>
+ <td class="message_contents">
+ {% if loop.index0 == 0 or notif.messages[loop.index0 - 1].sender_name != notif.messages[loop.index0].sender_name %}
+ <div class="sender_name">{% if message.msgtype == "m.emote" %}*{% endif %} {{ message.sender_name }}</div>
+ {% endif %}
+ <div class="message_body">
+ {% if message.msgtype == "m.text" %}
+ {{ message.body_text_html }}
+ {% elif message.msgtype == "m.emote" %}
+ {{ message.body_text_html }}
+ {% elif message.msgtype == "m.notice" %}
+ {{ message.body_text_html }}
+ {% elif message.msgtype == "m.image" %}
+ <img src="{{ message.image_url|mxc_to_http(640, 480, scale) }}" />
+ {% elif message.msgtype == "m.file" %}
+ <span class="filename">{{ message.body_text_plain }}</span>
+ {% endif %}
+ </div>
+ </td>
+ <td class="message_time">{{ message.ts|format_ts("%H:%M") }}</td>
+ </tr>
+{% endfor %}
+<tr class="notif_link">
+ <td></td>
+ <td>
+ <a href="{{ notif.link }}">View {{ room.title }}</a>
+ </td>
+ <td></td>
+</tr>
diff --git a/synapse/res/templates/notif.txt b/synapse/res/templates/notif.txt
new file mode 100644
index 0000000000..a37bee9833
--- /dev/null
+++ b/synapse/res/templates/notif.txt
@@ -0,0 +1,16 @@
+{% for message in notif.messages %}
+{% if message.msgtype == "m.emote" %}* {% endif %}{{ message.sender_name }} ({{ message.ts|format_ts("%H:%M") }})
+{% if message.msgtype == "m.text" %}
+{{ message.body_text_plain }}
+{% elif message.msgtype == "m.emote" %}
+{{ message.body_text_plain }}
+{% elif message.msgtype == "m.notice" %}
+{{ message.body_text_plain }}
+{% elif message.msgtype == "m.image" %}
+{{ message.body_text_plain }}
+{% elif message.msgtype == "m.file" %}
+{{ message.body_text_plain }}
+{% endif %}
+{% endfor %}
+
+View {{ room.title }} at {{ notif.link }}
diff --git a/synapse/res/templates/notif_mail.html b/synapse/res/templates/notif_mail.html
new file mode 100644
index 0000000000..fcdb3109fe
--- /dev/null
+++ b/synapse/res/templates/notif_mail.html
@@ -0,0 +1,55 @@
+<!doctype html>
+<html lang="en">
+ <head>
+ <style type="text/css">
+ {% include 'mail.css' without context %}
+ {% include "mail-%s.css" % app_name ignore missing without context %}
+ </style>
+ </head>
+ <body>
+ <table id="page">
+ <tr>
+ <td> </td>
+ <td id="inner">
+ <table class="header">
+ <tr>
+ <td>
+ <div class="salutation">Hi {{ user_display_name }},</div>
+ <div class="summarytext">{{ summary_text }}</div>
+ </td>
+ <td class="logo">
+ {% if app_name == "Riot" %}
+ <img src="http://matrix.org/img/riot-logo-email.png" width="83" height="83" alt="[Riot]"/>
+ {% elif app_name == "Vector" %}
+ <img src="http://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/>
+ {% else %}
+ <img src="http://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/>
+ {% endif %}
+ </td>
+ </tr>
+ </table>
+ {% for room in rooms %}
+ {% include 'room.html' with context %}
+ {% endfor %}
+ <div class="footer">
+ <a href="{{ unsubscribe_link }}">Unsubscribe</a>
+ <br/>
+ <br/>
+ <div class="debug">
+ Sending email at {{ reason.now|format_ts("%c") }} due to activity in room {{ reason.room_name }} because
+ an event was received at {{ reason.received_at|format_ts("%c") }}
+ which is more than {{ "%.1f"|format(reason.delay_before_mail_ms / (60*1000)) }} ({{ reason.delay_before_mail_ms }}) mins ago,
+ {% if reason.last_sent_ts %}
+ and the last time we sent a mail for this room was {{ reason.last_sent_ts|format_ts("%c") }},
+ which is more than {{ "%.1f"|format(reason.throttle_ms / (60*1000)) }} (current throttle_ms) mins ago.
+ {% else %}
+ and we don't have a last time we sent a mail for this room.
+ {% endif %}
+ </div>
+ </div>
+ </td>
+ <td> </td>
+ </tr>
+ </table>
+ </body>
+</html>
diff --git a/synapse/res/templates/notif_mail.txt b/synapse/res/templates/notif_mail.txt
new file mode 100644
index 0000000000..24843042a5
--- /dev/null
+++ b/synapse/res/templates/notif_mail.txt
@@ -0,0 +1,10 @@
+Hi {{ user_display_name }},
+
+{{ summary_text }}
+
+{% for room in rooms %}
+{% include 'room.txt' with context %}
+{% endfor %}
+
+You can disable these notifications at {{ unsubscribe_link }}
+
diff --git a/synapse/res/templates/room.html b/synapse/res/templates/room.html
new file mode 100644
index 0000000000..723c222d25
--- /dev/null
+++ b/synapse/res/templates/room.html
@@ -0,0 +1,33 @@
+<table class="room">
+ <tr class="room_header">
+ <td class="room_avatar">
+ {% if room.avatar_url %}
+ <img alt="" src="{{ room.avatar_url|mxc_to_http(48,48) }}" />
+ {% else %}
+ {% if room.hash % 3 == 0 %}
+ <img alt="" src="https://vector.im/beta/img/76cfa6.png" />
+ {% elif room.hash % 3 == 1 %}
+ <img alt="" src="https://vector.im/beta/img/50e2c2.png" />
+ {% else %}
+ <img alt="" src="https://vector.im/beta/img/f4c371.png" />
+ {% endif %}
+ {% endif %}
+ </td>
+ <td class="room_name" colspan="2">
+ {{ room.title }}
+ </td>
+ </tr>
+ {% if room.invite %}
+ <tr>
+ <td></td>
+ <td>
+ <a href="{{ room.link }}">Join the conversation.</a>
+ </td>
+ <td></td>
+ </tr>
+ {% else %}
+ {% for notif in room.notifs %}
+ {% include 'notif.html' with context %}
+ {% endfor %}
+ {% endif %}
+</table>
diff --git a/synapse/res/templates/room.txt b/synapse/res/templates/room.txt
new file mode 100644
index 0000000000..84648c710e
--- /dev/null
+++ b/synapse/res/templates/room.txt
@@ -0,0 +1,9 @@
+{{ room.title }}
+
+{% if room.invite %}
+ You've been invited, join at {{ room.link }}
+{% else %}
+ {% for notif in room.notifs %}
+ {% include 'notif.txt' with context %}
+ {% endfor %}
+{% endif %}
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 3418f06fd6..4856822a5d 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -46,6 +46,7 @@ from synapse.rest.client.v2_alpha import (
receipts,
register,
report_event,
+ room_keys,
sendtodevice,
sync,
tags,
@@ -102,6 +103,7 @@ class ClientRestResource(JsonResource):
auth.register_servlets(hs, client_resource)
receipts.register_servlets(hs, client_resource)
read_marker.register_servlets(hs, client_resource)
+ room_keys.register_servlets(hs, client_resource)
keys.register_servlets(hs, client_resource)
tokenrefresh.register_servlets(hs, client_resource)
tags.register_servlets(hs, client_resource)
diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py
index 97733f3026..0220acf644 100644
--- a/synapse/rest/client/v1/directory.py
+++ b/synapse/rest/client/v1/directory.py
@@ -74,38 +74,11 @@ class ClientDirectoryServer(ClientV1RestServlet):
if room is None:
raise SynapseError(400, "Room does not exist")
- dir_handler = self.handlers.directory_handler
+ requester = yield self.auth.get_user_by_req(request)
- try:
- # try to auth as a user
- requester = yield self.auth.get_user_by_req(request)
- try:
- user_id = requester.user.to_string()
- yield dir_handler.create_association(
- user_id, room_alias, room_id, servers
- )
- yield dir_handler.send_room_alias_update_event(
- requester,
- user_id,
- room_id
- )
- except SynapseError as e:
- raise e
- except Exception:
- logger.exception("Failed to create association")
- raise
- except AuthError:
- # try to auth as an application service
- service = yield self.auth.get_appservice_by_req(request)
- yield dir_handler.create_appservice_association(
- service, room_alias, room_id, servers
- )
- logger.info(
- "Application service at %s created alias %s pointing to %s",
- service.url,
- room_alias.to_string(),
- room_id
- )
+ yield self.handlers.directory_handler.create_association(
+ requester, room_alias, room_id, servers
+ )
defer.returnValue((200, {}))
@@ -135,7 +108,7 @@ class ClientDirectoryServer(ClientV1RestServlet):
room_alias = RoomAlias.from_string(room_alias)
yield dir_handler.delete_association(
- requester, user.to_string(), room_alias
+ requester, room_alias
)
logger.info(
diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py
index bd8b5f4afa..693b303881 100644
--- a/synapse/rest/client/v2_alpha/auth.py
+++ b/synapse/rest/client/v2_alpha/auth.py
@@ -99,7 +99,7 @@ class AuthRestServlet(RestServlet):
cannot be handled in the normal flow (with requests to the same endpoint).
Current use is for web fallback auth.
"""
- PATTERNS = client_v2_patterns("/auth/(?P<stagetype>[\w\.]*)/fallback/web")
+ PATTERNS = client_v2_patterns(r"/auth/(?P<stagetype>[\w\.]*)/fallback/web")
def __init__(self, hs):
super(AuthRestServlet, self).__init__()
diff --git a/synapse/rest/client/v2_alpha/room_keys.py b/synapse/rest/client/v2_alpha/room_keys.py
new file mode 100644
index 0000000000..45b5817d8b
--- /dev/null
+++ b/synapse/rest/client/v2_alpha/room_keys.py
@@ -0,0 +1,372 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017, 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.errors import Codes, SynapseError
+from synapse.http.servlet import (
+ RestServlet,
+ parse_json_object_from_request,
+ parse_string,
+)
+
+from ._base import client_v2_patterns
+
+logger = logging.getLogger(__name__)
+
+
+class RoomKeysServlet(RestServlet):
+ PATTERNS = client_v2_patterns(
+ "/room_keys/keys(/(?P<room_id>[^/]+))?(/(?P<session_id>[^/]+))?$"
+ )
+
+ def __init__(self, hs):
+ """
+ Args:
+ hs (synapse.server.HomeServer): server
+ """
+ super(RoomKeysServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler()
+
+ @defer.inlineCallbacks
+ def on_PUT(self, request, room_id, session_id):
+ """
+ Uploads one or more encrypted E2E room keys for backup purposes.
+ room_id: the ID of the room the keys are for (optional)
+ session_id: the ID for the E2E room keys for the room (optional)
+ version: the version of the user's backup which this data is for.
+ the version must already have been created via the /room_keys/version API.
+
+ Each session has:
+ * first_message_index: a numeric index indicating the oldest message
+ encrypted by this session.
+ * forwarded_count: how many times the uploading client claims this key
+ has been shared (forwarded)
+ * is_verified: whether the client that uploaded the keys claims they
+ were sent by a device which they've verified
+ * session_data: base64-encrypted data describing the session.
+
+ Returns 200 OK on success with body {}
+ Returns 403 Forbidden if the version in question is not the most recently
+ created version (i.e. if this is an old client trying to write to a stale backup)
+ Returns 404 Not Found if the version in question doesn't exist
+
+ The API is designed to be otherwise agnostic to the room_key encryption
+ algorithm being used. Sessions are merged with existing ones in the
+ backup using the heuristics:
+ * is_verified sessions always win over unverified sessions
+ * older first_message_index always win over newer sessions
+ * lower forwarded_count always wins over higher forwarded_count
+
+ We trust the clients not to lie and corrupt their own backups.
+ It also means that if your access_token is stolen, the attacker could
+ delete your backup.
+
+ POST /room_keys/keys/!abc:matrix.org/c0ff33?version=1 HTTP/1.1
+ Content-Type: application/json
+
+ {
+ "first_message_index": 1,
+ "forwarded_count": 1,
+ "is_verified": false,
+ "session_data": "SSBBTSBBIEZJU0gK"
+ }
+
+ Or...
+
+ POST /room_keys/keys/!abc:matrix.org?version=1 HTTP/1.1
+ Content-Type: application/json
+
+ {
+ "sessions": {
+ "c0ff33": {
+ "first_message_index": 1,
+ "forwarded_count": 1,
+ "is_verified": false,
+ "session_data": "SSBBTSBBIEZJU0gK"
+ }
+ }
+ }
+
+ Or...
+
+ POST /room_keys/keys?version=1 HTTP/1.1
+ Content-Type: application/json
+
+ {
+ "rooms": {
+ "!abc:matrix.org": {
+ "sessions": {
+ "c0ff33": {
+ "first_message_index": 1,
+ "forwarded_count": 1,
+ "is_verified": false,
+ "session_data": "SSBBTSBBIEZJU0gK"
+ }
+ }
+ }
+ }
+ }
+ """
+ requester = yield self.auth.get_user_by_req(request, allow_guest=False)
+ user_id = requester.user.to_string()
+ body = parse_json_object_from_request(request)
+ version = parse_string(request, "version")
+
+ if session_id:
+ body = {
+ "sessions": {
+ session_id: body
+ }
+ }
+
+ if room_id:
+ body = {
+ "rooms": {
+ room_id: body
+ }
+ }
+
+ yield self.e2e_room_keys_handler.upload_room_keys(
+ user_id, version, body
+ )
+ defer.returnValue((200, {}))
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, room_id, session_id):
+ """
+ Retrieves one or more encrypted E2E room keys for backup purposes.
+ Symmetric with the PUT version of the API.
+
+ room_id: the ID of the room to retrieve the keys for (optional)
+ session_id: the ID for the E2E room keys to retrieve the keys for (optional)
+ version: the version of the user's backup which this data is for.
+ the version must already have been created via the /change_secret API.
+
+ Returns as follows:
+
+ GET /room_keys/keys/!abc:matrix.org/c0ff33?version=1 HTTP/1.1
+ {
+ "first_message_index": 1,
+ "forwarded_count": 1,
+ "is_verified": false,
+ "session_data": "SSBBTSBBIEZJU0gK"
+ }
+
+ Or...
+
+ GET /room_keys/keys/!abc:matrix.org?version=1 HTTP/1.1
+ {
+ "sessions": {
+ "c0ff33": {
+ "first_message_index": 1,
+ "forwarded_count": 1,
+ "is_verified": false,
+ "session_data": "SSBBTSBBIEZJU0gK"
+ }
+ }
+ }
+
+ Or...
+
+ GET /room_keys/keys?version=1 HTTP/1.1
+ {
+ "rooms": {
+ "!abc:matrix.org": {
+ "sessions": {
+ "c0ff33": {
+ "first_message_index": 1,
+ "forwarded_count": 1,
+ "is_verified": false,
+ "session_data": "SSBBTSBBIEZJU0gK"
+ }
+ }
+ }
+ }
+ }
+ """
+ requester = yield self.auth.get_user_by_req(request, allow_guest=False)
+ user_id = requester.user.to_string()
+ version = parse_string(request, "version")
+
+ room_keys = yield self.e2e_room_keys_handler.get_room_keys(
+ user_id, version, room_id, session_id
+ )
+
+ if session_id:
+ room_keys = room_keys['rooms'][room_id]['sessions'][session_id]
+ elif room_id:
+ room_keys = room_keys['rooms'][room_id]
+
+ defer.returnValue((200, room_keys))
+
+ @defer.inlineCallbacks
+ def on_DELETE(self, request, room_id, session_id):
+ """
+ Deletes one or more encrypted E2E room keys for a user for backup purposes.
+
+ DELETE /room_keys/keys/!abc:matrix.org/c0ff33?version=1
+ HTTP/1.1 200 OK
+ {}
+
+ room_id: the ID of the room whose keys to delete (optional)
+ session_id: the ID for the E2E session to delete (optional)
+ version: the version of the user's backup which this data is for.
+ the version must already have been created via the /change_secret API.
+ """
+
+ requester = yield self.auth.get_user_by_req(request, allow_guest=False)
+ user_id = requester.user.to_string()
+ version = parse_string(request, "version")
+
+ yield self.e2e_room_keys_handler.delete_room_keys(
+ user_id, version, room_id, session_id
+ )
+ defer.returnValue((200, {}))
+
+
+class RoomKeysNewVersionServlet(RestServlet):
+ PATTERNS = client_v2_patterns(
+ "/room_keys/version$"
+ )
+
+ def __init__(self, hs):
+ """
+ Args:
+ hs (synapse.server.HomeServer): server
+ """
+ super(RoomKeysNewVersionServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler()
+
+ @defer.inlineCallbacks
+ def on_POST(self, request):
+ """
+ Create a new backup version for this user's room_keys with the given
+ info. The version is allocated by the server and returned to the user
+ in the response. This API is intended to be used whenever the user
+ changes the encryption key for their backups, ensuring that backups
+ encrypted with different keys don't collide.
+
+ It takes out an exclusive lock on this user's room_key backups, to ensure
+ clients only upload to the current backup.
+
+ The algorithm passed in the version info is a reverse-DNS namespaced
+ identifier to describe the format of the encrypted backupped keys.
+
+ The auth_data is { user_id: "user_id", nonce: <random string> }
+ encrypted using the algorithm and current encryption key described above.
+
+ POST /room_keys/version
+ Content-Type: application/json
+ {
+ "algorithm": "m.megolm_backup.v1",
+ "auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K"
+ }
+
+ HTTP/1.1 200 OK
+ Content-Type: application/json
+ {
+ "version": 12345
+ }
+ """
+ requester = yield self.auth.get_user_by_req(request, allow_guest=False)
+ user_id = requester.user.to_string()
+ info = parse_json_object_from_request(request)
+
+ new_version = yield self.e2e_room_keys_handler.create_version(
+ user_id, info
+ )
+ defer.returnValue((200, {"version": new_version}))
+
+ # we deliberately don't have a PUT /version, as these things really should
+ # be immutable to avoid people footgunning
+
+
+class RoomKeysVersionServlet(RestServlet):
+ PATTERNS = client_v2_patterns(
+ "/room_keys/version(/(?P<version>[^/]+))?$"
+ )
+
+ def __init__(self, hs):
+ """
+ Args:
+ hs (synapse.server.HomeServer): server
+ """
+ super(RoomKeysVersionServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler()
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, version):
+ """
+ Retrieve the version information about a given version of the user's
+ room_keys backup. If the version part is missing, returns info about the
+ most current backup version (if any)
+
+ It takes out an exclusive lock on this user's room_key backups, to ensure
+ clients only upload to the current backup.
+
+ Returns 404 if the given version does not exist.
+
+ GET /room_keys/version/12345 HTTP/1.1
+ {
+ "version": "12345",
+ "algorithm": "m.megolm_backup.v1",
+ "auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K"
+ }
+ """
+ requester = yield self.auth.get_user_by_req(request, allow_guest=False)
+ user_id = requester.user.to_string()
+
+ try:
+ info = yield self.e2e_room_keys_handler.get_version_info(
+ user_id, version
+ )
+ except SynapseError as e:
+ if e.code == 404:
+ raise SynapseError(404, "No backup found", Codes.NOT_FOUND)
+ defer.returnValue((200, info))
+
+ @defer.inlineCallbacks
+ def on_DELETE(self, request, version):
+ """
+ Delete the information about a given version of the user's
+ room_keys backup. If the version part is missing, deletes the most
+ current backup version (if any). Doesn't delete the actual room data.
+
+ DELETE /room_keys/version/12345 HTTP/1.1
+ HTTP/1.1 200 OK
+ {}
+ """
+ if version is None:
+ raise SynapseError(400, "No version specified to delete", Codes.NOT_FOUND)
+
+ requester = yield self.auth.get_user_by_req(request, allow_guest=False)
+ user_id = requester.user.to_string()
+
+ yield self.e2e_room_keys_handler.delete_version(
+ user_id, version
+ )
+ defer.returnValue((200, {}))
+
+
+def register_servlets(hs, http_server):
+ RoomKeysServlet(hs).register(http_server)
+ RoomKeysVersionServlet(hs).register(http_server)
+ RoomKeysNewVersionServlet(hs).register(http_server)
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index a828ff4438..08b1867fab 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -25,7 +25,7 @@ from six.moves.urllib import parse as urlparse
import twisted.internet.error
import twisted.web.http
-from twisted.internet import defer, threads
+from twisted.internet import defer
from twisted.web.resource import Resource
from synapse.api.errors import (
@@ -36,8 +36,8 @@ from synapse.api.errors import (
)
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util import logcontext
from synapse.util.async_helpers import Linearizer
-from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import is_ascii, random_string
@@ -492,10 +492,11 @@ class MediaRepository(object):
))
thumbnailer = Thumbnailer(input_path)
- t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
+ t_byte_source = yield logcontext.defer_to_thread(
+ self.hs.get_reactor(),
self._generate_thumbnail,
thumbnailer, t_width, t_height, t_method, t_type
- ))
+ )
if t_byte_source:
try:
@@ -534,10 +535,11 @@ class MediaRepository(object):
))
thumbnailer = Thumbnailer(input_path)
- t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
+ t_byte_source = yield logcontext.defer_to_thread(
+ self.hs.get_reactor(),
self._generate_thumbnail,
thumbnailer, t_width, t_height, t_method, t_type
- ))
+ )
if t_byte_source:
try:
@@ -620,15 +622,17 @@ class MediaRepository(object):
for (t_width, t_height, t_type), t_method in iteritems(thumbnails):
# Generate the thumbnail
if t_method == "crop":
- t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
+ t_byte_source = yield logcontext.defer_to_thread(
+ self.hs.get_reactor(),
thumbnailer.crop,
t_width, t_height, t_type,
- ))
+ )
elif t_method == "scale":
- t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
+ t_byte_source = yield logcontext.defer_to_thread(
+ self.hs.get_reactor(),
thumbnailer.scale,
t_width, t_height, t_type,
- ))
+ )
else:
logger.error("Unrecognized method: %r", t_method)
continue
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index a6189224ee..896078fe76 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -21,9 +21,10 @@ import sys
import six
-from twisted.internet import defer, threads
+from twisted.internet import defer
from twisted.protocols.basic import FileSender
+from synapse.util import logcontext
from synapse.util.file_consumer import BackgroundFileConsumer
from synapse.util.logcontext import make_deferred_yieldable
@@ -64,9 +65,10 @@ class MediaStorage(object):
with self.store_into_file(file_info) as (f, fname, finish_cb):
# Write to the main repository
- yield make_deferred_yieldable(threads.deferToThread(
+ yield logcontext.defer_to_thread(
+ self.hs.get_reactor(),
_write_file_synchronously, source, f,
- ))
+ )
yield finish_cb()
defer.returnValue(fname)
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index af01040a38..1a7bfd6b56 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -596,10 +596,13 @@ def _iterate_over_text(tree, *tags_to_ignore):
# to be returned.
elements = iter([tree])
while True:
- el = next(elements)
+ el = next(elements, None)
+ if el is None:
+ return
+
if isinstance(el, string_types):
yield el
- elif el is not None and el.tag not in tags_to_ignore:
+ elif el.tag not in tags_to_ignore:
# el.text is the text before the first child, so we can immediately
# return it if the text exists.
if el.text:
@@ -671,7 +674,7 @@ def summarize_paragraphs(text_nodes, min_size=200, max_size=500):
# This splits the paragraph into words, but keeping the
# (preceeding) whitespace intact so we can easily concat
# words back together.
- for match in re.finditer("\s*\S+", description):
+ for match in re.finditer(r"\s*\S+", description):
word = match.group()
# Keep adding words while the total length is less than
diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py
index 7b9f8b4d79..5aa03031f6 100644
--- a/synapse/rest/media/v1/storage_provider.py
+++ b/synapse/rest/media/v1/storage_provider.py
@@ -17,9 +17,10 @@ import logging
import os
import shutil
-from twisted.internet import defer, threads
+from twisted.internet import defer
from synapse.config._base import Config
+from synapse.util import logcontext
from synapse.util.logcontext import run_in_background
from .media_storage import FileResponder
@@ -120,7 +121,8 @@ class FileStorageProviderBackend(StorageProvider):
if not os.path.exists(dirname):
os.makedirs(dirname)
- return threads.deferToThread(
+ return logcontext.defer_to_thread(
+ self.hs.get_reactor(),
shutil.copyfile, primary_fname, backup_fname,
)
diff --git a/synapse/server.py b/synapse/server.py
index 938a05f9dc..3e9d3d8256 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -51,6 +51,7 @@ from synapse.handlers.deactivate_account import DeactivateAccountHandler
from synapse.handlers.device import DeviceHandler
from synapse.handlers.devicemessage import DeviceMessageHandler
from synapse.handlers.e2e_keys import E2eKeysHandler
+from synapse.handlers.e2e_room_keys import E2eRoomKeysHandler
from synapse.handlers.events import EventHandler, EventStreamHandler
from synapse.handlers.groups_local import GroupsLocalHandler
from synapse.handlers.initial_sync import InitialSyncHandler
@@ -130,6 +131,7 @@ class HomeServer(object):
'auth_handler',
'device_handler',
'e2e_keys_handler',
+ 'e2e_room_keys_handler',
'event_handler',
'event_stream_handler',
'initial_sync_handler',
@@ -299,6 +301,9 @@ class HomeServer(object):
def build_e2e_keys_handler(self):
return E2eKeysHandler(self)
+ def build_e2e_room_keys_handler(self):
+ return E2eRoomKeysHandler(self)
+
def build_application_service_api(self):
return ApplicationServiceApi(self)
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index b22495c1f9..9b40b18d5b 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -19,13 +19,14 @@ from collections import namedtuple
from six import iteritems, itervalues
+import attr
from frozendict import frozendict
from twisted.internet import defer
from synapse.api.constants import EventTypes, RoomVersions
from synapse.events.snapshot import EventContext
-from synapse.state import v1
+from synapse.state import v1, v2
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.expiringcache import ExpiringCache
@@ -372,15 +373,10 @@ class StateHandler(object):
result = yield self._state_resolution_handler.resolve_state_groups(
room_id, room_version, state_groups_ids, None,
- self._state_map_factory,
+ state_res_store=StateResolutionStore(self.store),
)
defer.returnValue(result)
- def _state_map_factory(self, ev_ids):
- return self.store.get_events(
- ev_ids, get_prev_content=False, check_redacted=False,
- )
-
@defer.inlineCallbacks
def resolve_events(self, room_version, state_sets, event):
logger.info(
@@ -398,10 +394,10 @@ class StateHandler(object):
}
with Measure(self.clock, "state._resolve_events"):
- new_state = yield resolve_events_with_factory(
+ new_state = yield resolve_events_with_store(
room_version, state_set_ids,
event_map=state_map,
- state_map_factory=self._state_map_factory
+ state_res_store=StateResolutionStore(self.store),
)
new_state = {
@@ -436,7 +432,7 @@ class StateResolutionHandler(object):
@defer.inlineCallbacks
@log_function
def resolve_state_groups(
- self, room_id, room_version, state_groups_ids, event_map, state_map_factory,
+ self, room_id, room_version, state_groups_ids, event_map, state_res_store,
):
"""Resolves conflicts between a set of state groups
@@ -454,9 +450,11 @@ class StateResolutionHandler(object):
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
used as a starting point fof finding the state we need; any missing
- events will be requested via state_map_factory.
+ events will be requested via state_res_store.
+
+ If None, all events will be fetched via state_res_store.
- If None, all events will be fetched via state_map_factory.
+ state_res_store (StateResolutionStore)
Returns:
Deferred[_StateCacheEntry]: resolved state
@@ -480,10 +478,10 @@ class StateResolutionHandler(object):
# start by assuming we won't have any conflicted state, and build up the new
# state map by iterating through the state groups. If we discover a conflict,
- # we give up and instead use `resolve_events_with_factory`.
+ # we give up and instead use `resolve_events_with_store`.
#
# XXX: is this actually worthwhile, or should we just let
- # resolve_events_with_factory do it?
+ # resolve_events_with_store do it?
new_state = {}
conflicted_state = False
for st in itervalues(state_groups_ids):
@@ -498,11 +496,11 @@ class StateResolutionHandler(object):
if conflicted_state:
logger.info("Resolving conflicted state for %r", room_id)
with Measure(self.clock, "state._resolve_events"):
- new_state = yield resolve_events_with_factory(
+ new_state = yield resolve_events_with_store(
room_version,
list(itervalues(state_groups_ids)),
event_map=event_map,
- state_map_factory=state_map_factory,
+ state_res_store=state_res_store,
)
# if the new state matches any of the input state groups, we can
@@ -583,7 +581,7 @@ def _make_state_cache_entry(
)
-def resolve_events_with_factory(room_version, state_sets, event_map, state_map_factory):
+def resolve_events_with_store(room_version, state_sets, event_map, state_res_store):
"""
Args:
room_version(str): Version of the room
@@ -599,17 +597,19 @@ def resolve_events_with_factory(room_version, state_sets, event_map, state_map_f
If None, all events will be fetched via state_map_factory.
- state_map_factory(func): will be called
- with a list of event_ids that are needed, and should return with
- a Deferred of dict of event_id to event.
+ state_res_store (StateResolutionStore)
Returns
Deferred[dict[(str, str), str]]:
a map from (type, state_key) to event_id.
"""
- if room_version in (RoomVersions.V1, RoomVersions.VDH_TEST,):
- return v1.resolve_events_with_factory(
- state_sets, event_map, state_map_factory,
+ if room_version == RoomVersions.V1:
+ return v1.resolve_events_with_store(
+ state_sets, event_map, state_res_store.get_events,
+ )
+ elif room_version == RoomVersions.VDH_TEST:
+ return v2.resolve_events_with_store(
+ state_sets, event_map, state_res_store,
)
else:
# This should only happen if we added a version but forgot to add it to
@@ -617,3 +617,54 @@ def resolve_events_with_factory(room_version, state_sets, event_map, state_map_f
raise Exception(
"No state resolution algorithm defined for version %r" % (room_version,)
)
+
+
+@attr.s
+class StateResolutionStore(object):
+ """Interface that allows state resolution algorithms to access the database
+ in well defined way.
+
+ Args:
+ store (DataStore)
+ """
+
+ store = attr.ib()
+
+ def get_events(self, event_ids, allow_rejected=False):
+ """Get events from the database
+
+ Args:
+ event_ids (list): The event_ids of the events to fetch
+ allow_rejected (bool): If True return rejected events.
+
+ Returns:
+ Deferred[dict[str, FrozenEvent]]: Dict from event_id to event.
+ """
+
+ return self.store.get_events(
+ event_ids,
+ check_redacted=False,
+ get_prev_content=False,
+ allow_rejected=allow_rejected,
+ )
+
+ def get_auth_chain(self, event_ids):
+ """Gets the full auth chain for a set of events (including rejected
+ events).
+
+ Includes the given event IDs in the result.
+
+ Note that:
+ 1. All events must be state events.
+ 2. For v1 rooms this may not have the full auth chain in the
+ presence of rejected events
+
+ Args:
+ event_ids (list): The event IDs of the events to fetch the auth
+ chain for. Must be state events.
+
+ Returns:
+ Deferred[list[str]]: List of event IDs of the auth chain.
+ """
+
+ return self.store.get_auth_chain_ids(event_ids, include_given=True)
diff --git a/synapse/state/v1.py b/synapse/state/v1.py
index 7a7157b352..70a981f4a2 100644
--- a/synapse/state/v1.py
+++ b/synapse/state/v1.py
@@ -31,7 +31,7 @@ POWER_KEY = (EventTypes.PowerLevels, "")
@defer.inlineCallbacks
-def resolve_events_with_factory(state_sets, event_map, state_map_factory):
+def resolve_events_with_store(state_sets, event_map, state_map_factory):
"""
Args:
state_sets(list): List of dicts of (type, state_key) -> event_id,
diff --git a/synapse/state/v2.py b/synapse/state/v2.py
new file mode 100644
index 0000000000..5d06f7e928
--- /dev/null
+++ b/synapse/state/v2.py
@@ -0,0 +1,544 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import heapq
+import itertools
+import logging
+
+from six import iteritems, itervalues
+
+from twisted.internet import defer
+
+from synapse import event_auth
+from synapse.api.constants import EventTypes
+from synapse.api.errors import AuthError
+
+logger = logging.getLogger(__name__)
+
+
+@defer.inlineCallbacks
+def resolve_events_with_store(state_sets, event_map, state_res_store):
+ """Resolves the state using the v2 state resolution algorithm
+
+ Args:
+ state_sets(list): List of dicts of (type, state_key) -> event_id,
+ which are the different state groups to resolve.
+
+ event_map(dict[str,FrozenEvent]|None):
+ a dict from event_id to event, for any events that we happen to
+ have in flight (eg, those currently being persisted). This will be
+ used as a starting point fof finding the state we need; any missing
+ events will be requested via state_res_store.
+
+ If None, all events will be fetched via state_res_store.
+
+ state_res_store (StateResolutionStore)
+
+ Returns
+ Deferred[dict[(str, str), str]]:
+ a map from (type, state_key) to event_id.
+ """
+
+ logger.debug("Computing conflicted state")
+
+ # First split up the un/conflicted state
+ unconflicted_state, conflicted_state = _seperate(state_sets)
+
+ if not conflicted_state:
+ defer.returnValue(unconflicted_state)
+
+ logger.debug("%d conflicted state entries", len(conflicted_state))
+ logger.debug("Calculating auth chain difference")
+
+ # Also fetch all auth events that appear in only some of the state sets'
+ # auth chains.
+ auth_diff = yield _get_auth_chain_difference(
+ state_sets, event_map, state_res_store,
+ )
+
+ full_conflicted_set = set(itertools.chain(
+ itertools.chain.from_iterable(itervalues(conflicted_state)),
+ auth_diff,
+ ))
+
+ events = yield state_res_store.get_events([
+ eid for eid in full_conflicted_set
+ if eid not in event_map
+ ], allow_rejected=True)
+ event_map.update(events)
+
+ full_conflicted_set = set(eid for eid in full_conflicted_set if eid in event_map)
+
+ logger.debug("%d full_conflicted_set entries", len(full_conflicted_set))
+
+ # Get and sort all the power events (kicks/bans/etc)
+ power_events = (
+ eid for eid in full_conflicted_set
+ if _is_power_event(event_map[eid])
+ )
+
+ sorted_power_events = yield _reverse_topological_power_sort(
+ power_events,
+ event_map,
+ state_res_store,
+ full_conflicted_set,
+ )
+
+ logger.debug("sorted %d power events", len(sorted_power_events))
+
+ # Now sequentially auth each one
+ resolved_state = yield _iterative_auth_checks(
+ sorted_power_events, unconflicted_state, event_map,
+ state_res_store,
+ )
+
+ logger.debug("resolved power events")
+
+ # OK, so we've now resolved the power events. Now sort the remaining
+ # events using the mainline of the resolved power level.
+
+ leftover_events = [
+ ev_id
+ for ev_id in full_conflicted_set
+ if ev_id not in sorted_power_events
+ ]
+
+ logger.debug("sorting %d remaining events", len(leftover_events))
+
+ pl = resolved_state.get((EventTypes.PowerLevels, ""), None)
+ leftover_events = yield _mainline_sort(
+ leftover_events, pl, event_map, state_res_store,
+ )
+
+ logger.debug("resolving remaining events")
+
+ resolved_state = yield _iterative_auth_checks(
+ leftover_events, resolved_state, event_map,
+ state_res_store,
+ )
+
+ logger.debug("resolved")
+
+ # We make sure that unconflicted state always still applies.
+ resolved_state.update(unconflicted_state)
+
+ logger.debug("done")
+
+ defer.returnValue(resolved_state)
+
+
+@defer.inlineCallbacks
+def _get_power_level_for_sender(event_id, event_map, state_res_store):
+ """Return the power level of the sender of the given event according to
+ their auth events.
+
+ Args:
+ event_id (str)
+ event_map (dict[str,FrozenEvent])
+ state_res_store (StateResolutionStore)
+
+ Returns:
+ Deferred[int]
+ """
+ event = yield _get_event(event_id, event_map, state_res_store)
+
+ pl = None
+ for aid, _ in event.auth_events:
+ aev = yield _get_event(aid, event_map, state_res_store)
+ if (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""):
+ pl = aev
+ break
+
+ if pl is None:
+ # Couldn't find power level. Check if they're the creator of the room
+ for aid, _ in event.auth_events:
+ aev = yield _get_event(aid, event_map, state_res_store)
+ if (aev.type, aev.state_key) == (EventTypes.Create, ""):
+ if aev.content.get("creator") == event.sender:
+ defer.returnValue(100)
+ break
+ defer.returnValue(0)
+
+ level = pl.content.get("users", {}).get(event.sender)
+ if level is None:
+ level = pl.content.get("users_default", 0)
+
+ if level is None:
+ defer.returnValue(0)
+ else:
+ defer.returnValue(int(level))
+
+
+@defer.inlineCallbacks
+def _get_auth_chain_difference(state_sets, event_map, state_res_store):
+ """Compare the auth chains of each state set and return the set of events
+ that only appear in some but not all of the auth chains.
+
+ Args:
+ state_sets (list)
+ event_map (dict[str,FrozenEvent])
+ state_res_store (StateResolutionStore)
+
+ Returns:
+ Deferred[set[str]]: Set of event IDs
+ """
+ common = set(itervalues(state_sets[0])).intersection(
+ *(itervalues(s) for s in state_sets[1:])
+ )
+
+ auth_sets = []
+ for state_set in state_sets:
+ auth_ids = set(
+ eid
+ for key, eid in iteritems(state_set)
+ if (key[0] in (
+ EventTypes.Member,
+ EventTypes.ThirdPartyInvite,
+ ) or key in (
+ (EventTypes.PowerLevels, ''),
+ (EventTypes.Create, ''),
+ (EventTypes.JoinRules, ''),
+ )) and eid not in common
+ )
+
+ auth_chain = yield state_res_store.get_auth_chain(auth_ids)
+ auth_ids.update(auth_chain)
+
+ auth_sets.append(auth_ids)
+
+ intersection = set(auth_sets[0]).intersection(*auth_sets[1:])
+ union = set().union(*auth_sets)
+
+ defer.returnValue(union - intersection)
+
+
+def _seperate(state_sets):
+ """Return the unconflicted and conflicted state. This is different than in
+ the original algorithm, as this defines a key to be conflicted if one of
+ the state sets doesn't have that key.
+
+ Args:
+ state_sets (list)
+
+ Returns:
+ tuple[dict, dict]: A tuple of unconflicted and conflicted state. The
+ conflicted state dict is a map from type/state_key to set of event IDs
+ """
+ unconflicted_state = {}
+ conflicted_state = {}
+
+ for key in set(itertools.chain.from_iterable(state_sets)):
+ event_ids = set(state_set.get(key) for state_set in state_sets)
+ if len(event_ids) == 1:
+ unconflicted_state[key] = event_ids.pop()
+ else:
+ event_ids.discard(None)
+ conflicted_state[key] = event_ids
+
+ return unconflicted_state, conflicted_state
+
+
+def _is_power_event(event):
+ """Return whether or not the event is a "power event", as defined by the
+ v2 state resolution algorithm
+
+ Args:
+ event (FrozenEvent)
+
+ Returns:
+ boolean
+ """
+ if (event.type, event.state_key) in (
+ (EventTypes.PowerLevels, ""),
+ (EventTypes.JoinRules, ""),
+ (EventTypes.Create, ""),
+ ):
+ return True
+
+ if event.type == EventTypes.Member:
+ if event.membership in ('leave', 'ban'):
+ return event.sender != event.state_key
+
+ return False
+
+
+@defer.inlineCallbacks
+def _add_event_and_auth_chain_to_graph(graph, event_id, event_map,
+ state_res_store, auth_diff):
+ """Helper function for _reverse_topological_power_sort that add the event
+ and its auth chain (that is in the auth diff) to the graph
+
+ Args:
+ graph (dict[str, set[str]]): A map from event ID to the events auth
+ event IDs
+ event_id (str): Event to add to the graph
+ event_map (dict[str,FrozenEvent])
+ state_res_store (StateResolutionStore)
+ auth_diff (set[str]): Set of event IDs that are in the auth difference.
+ """
+
+ state = [event_id]
+ while state:
+ eid = state.pop()
+ graph.setdefault(eid, set())
+
+ event = yield _get_event(eid, event_map, state_res_store)
+ for aid, _ in event.auth_events:
+ if aid in auth_diff:
+ if aid not in graph:
+ state.append(aid)
+
+ graph.setdefault(eid, set()).add(aid)
+
+
+@defer.inlineCallbacks
+def _reverse_topological_power_sort(event_ids, event_map, state_res_store, auth_diff):
+ """Returns a list of the event_ids sorted by reverse topological ordering,
+ and then by power level and origin_server_ts
+
+ Args:
+ event_ids (list[str]): The events to sort
+ event_map (dict[str,FrozenEvent])
+ state_res_store (StateResolutionStore)
+ auth_diff (set[str]): Set of event IDs that are in the auth difference.
+
+ Returns:
+ Deferred[list[str]]: The sorted list
+ """
+
+ graph = {}
+ for event_id in event_ids:
+ yield _add_event_and_auth_chain_to_graph(
+ graph, event_id, event_map, state_res_store, auth_diff,
+ )
+
+ event_to_pl = {}
+ for event_id in graph:
+ pl = yield _get_power_level_for_sender(event_id, event_map, state_res_store)
+ event_to_pl[event_id] = pl
+
+ def _get_power_order(event_id):
+ ev = event_map[event_id]
+ pl = event_to_pl[event_id]
+
+ return -pl, ev.origin_server_ts, event_id
+
+ # Note: graph is modified during the sort
+ it = lexicographical_topological_sort(
+ graph,
+ key=_get_power_order,
+ )
+ sorted_events = list(it)
+
+ defer.returnValue(sorted_events)
+
+
+@defer.inlineCallbacks
+def _iterative_auth_checks(event_ids, base_state, event_map, state_res_store):
+ """Sequentially apply auth checks to each event in given list, updating the
+ state as it goes along.
+
+ Args:
+ event_ids (list[str]): Ordered list of events to apply auth checks to
+ base_state (dict[tuple[str, str], str]): The set of state to start with
+ event_map (dict[str,FrozenEvent])
+ state_res_store (StateResolutionStore)
+
+ Returns:
+ Deferred[dict[tuple[str, str], str]]: Returns the final updated state
+ """
+ resolved_state = base_state.copy()
+
+ for event_id in event_ids:
+ event = event_map[event_id]
+
+ auth_events = {}
+ for aid, _ in event.auth_events:
+ ev = yield _get_event(aid, event_map, state_res_store)
+
+ if ev.rejected_reason is None:
+ auth_events[(ev.type, ev.state_key)] = ev
+
+ for key in event_auth.auth_types_for_event(event):
+ if key in resolved_state:
+ ev_id = resolved_state[key]
+ ev = yield _get_event(ev_id, event_map, state_res_store)
+
+ if ev.rejected_reason is None:
+ auth_events[key] = event_map[ev_id]
+
+ try:
+ event_auth.check(
+ event, auth_events,
+ do_sig_check=False,
+ do_size_check=False
+ )
+
+ resolved_state[(event.type, event.state_key)] = event_id
+ except AuthError:
+ pass
+
+ defer.returnValue(resolved_state)
+
+
+@defer.inlineCallbacks
+def _mainline_sort(event_ids, resolved_power_event_id, event_map,
+ state_res_store):
+ """Returns a sorted list of event_ids sorted by mainline ordering based on
+ the given event resolved_power_event_id
+
+ Args:
+ event_ids (list[str]): Events to sort
+ resolved_power_event_id (str): The final resolved power level event ID
+ event_map (dict[str,FrozenEvent])
+ state_res_store (StateResolutionStore)
+
+ Returns:
+ Deferred[list[str]]: The sorted list
+ """
+ mainline = []
+ pl = resolved_power_event_id
+ while pl:
+ mainline.append(pl)
+ pl_ev = yield _get_event(pl, event_map, state_res_store)
+ auth_events = pl_ev.auth_events
+ pl = None
+ for aid, _ in auth_events:
+ ev = yield _get_event(aid, event_map, state_res_store)
+ if (ev.type, ev.state_key) == (EventTypes.PowerLevels, ""):
+ pl = aid
+ break
+
+ mainline_map = {ev_id: i + 1 for i, ev_id in enumerate(reversed(mainline))}
+
+ event_ids = list(event_ids)
+
+ order_map = {}
+ for ev_id in event_ids:
+ depth = yield _get_mainline_depth_for_event(
+ event_map[ev_id], mainline_map,
+ event_map, state_res_store,
+ )
+ order_map[ev_id] = (depth, event_map[ev_id].origin_server_ts, ev_id)
+
+ event_ids.sort(key=lambda ev_id: order_map[ev_id])
+
+ defer.returnValue(event_ids)
+
+
+@defer.inlineCallbacks
+def _get_mainline_depth_for_event(event, mainline_map, event_map, state_res_store):
+ """Get the mainline depths for the given event based on the mainline map
+
+ Args:
+ event (FrozenEvent)
+ mainline_map (dict[str, int]): Map from event_id to mainline depth for
+ events in the mainline.
+ event_map (dict[str,FrozenEvent])
+ state_res_store (StateResolutionStore)
+
+ Returns:
+ Deferred[int]
+ """
+
+ # We do an iterative search, replacing `event with the power level in its
+ # auth events (if any)
+ while event:
+ depth = mainline_map.get(event.event_id)
+ if depth is not None:
+ defer.returnValue(depth)
+
+ auth_events = event.auth_events
+ event = None
+
+ for aid, _ in auth_events:
+ aev = yield _get_event(aid, event_map, state_res_store)
+ if (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""):
+ event = aev
+ break
+
+ # Didn't find a power level auth event, so we just return 0
+ defer.returnValue(0)
+
+
+@defer.inlineCallbacks
+def _get_event(event_id, event_map, state_res_store):
+ """Helper function to look up event in event_map, falling back to looking
+ it up in the store
+
+ Args:
+ event_id (str)
+ event_map (dict[str,FrozenEvent])
+ state_res_store (StateResolutionStore)
+
+ Returns:
+ Deferred[FrozenEvent]
+ """
+ if event_id not in event_map:
+ events = yield state_res_store.get_events([event_id], allow_rejected=True)
+ event_map.update(events)
+ defer.returnValue(event_map[event_id])
+
+
+def lexicographical_topological_sort(graph, key):
+ """Performs a lexicographic reverse topological sort on the graph.
+
+ This returns a reverse topological sort (i.e. if node A references B then B
+ appears before A in the sort), with ties broken lexicographically based on
+ return value of the `key` function.
+
+ NOTE: `graph` is modified during the sort.
+
+ Args:
+ graph (dict[str, set[str]]): A representation of the graph where each
+ node is a key in the dict and its value are the nodes edges.
+ key (func): A function that takes a node and returns a value that is
+ comparable and used to order nodes
+
+ Yields:
+ str: The next node in the topological sort
+ """
+
+ # Note, this is basically Kahn's algorithm except we look at nodes with no
+ # outgoing edges, c.f.
+ # https://en.wikipedia.org/wiki/Topological_sorting#Kahn's_algorithm
+ outdegree_map = graph
+ reverse_graph = {}
+
+ # Lists of nodes with zero out degree. Is actually a tuple of
+ # `(key(node), node)` so that sorting does the right thing
+ zero_outdegree = []
+
+ for node, edges in iteritems(graph):
+ if len(edges) == 0:
+ zero_outdegree.append((key(node), node))
+
+ reverse_graph.setdefault(node, set())
+ for edge in edges:
+ reverse_graph.setdefault(edge, set()).add(node)
+
+ # heapq is a built in implementation of a sorted queue.
+ heapq.heapify(zero_outdegree)
+
+ while zero_outdegree:
+ _, node = heapq.heappop(zero_outdegree)
+
+ for parent in reverse_graph[node]:
+ out = outdegree_map[parent]
+ out.discard(node)
+ if len(out) == 0:
+ heapq.heappush(zero_outdegree, (key(parent), parent))
+
+ yield node
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 23b4a8d76d..53c685c173 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -30,6 +30,7 @@ from .appservice import ApplicationServiceStore, ApplicationServiceTransactionSt
from .client_ips import ClientIpStore
from .deviceinbox import DeviceInboxStore
from .directory import DirectoryStore
+from .e2e_room_keys import EndToEndRoomKeyStore
from .end_to_end_keys import EndToEndKeyStore
from .engines import PostgresEngine
from .event_federation import EventFederationStore
@@ -77,6 +78,7 @@ class DataStore(RoomMemberStore, RoomStore,
ApplicationServiceTransactionStore,
ReceiptsStore,
EndToEndKeyStore,
+ EndToEndRoomKeyStore,
SearchStore,
TagsStore,
AccountDataStore,
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index be61147b9b..d9d0255d0b 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -18,7 +18,7 @@ import threading
import time
from six import PY2, iteritems, iterkeys, itervalues
-from six.moves import intern, range
+from six.moves import builtins, intern, range
from canonicaljson import json
from prometheus_client import Histogram
@@ -1233,7 +1233,7 @@ def db_to_json(db_content):
# psycopg2 on Python 2 returns buffer objects, which we need to cast to
# bytes to decode
- if PY2 and isinstance(db_content, buffer):
+ if PY2 and isinstance(db_content, builtins.buffer):
db_content = bytes(db_content)
# Decode it to a Unicode string before feeding it to json.loads, so we
diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py
new file mode 100644
index 0000000000..f25ded2295
--- /dev/null
+++ b/synapse/storage/e2e_room_keys.py
@@ -0,0 +1,320 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+from twisted.internet import defer
+
+from synapse.api.errors import StoreError
+
+from ._base import SQLBaseStore
+
+
+class EndToEndRoomKeyStore(SQLBaseStore):
+
+ @defer.inlineCallbacks
+ def get_e2e_room_key(self, user_id, version, room_id, session_id):
+ """Get the encrypted E2E room key for a given session from a given
+ backup version of room_keys. We only store the 'best' room key for a given
+ session at a given time, as determined by the handler.
+
+ Args:
+ user_id(str): the user whose backup we're querying
+ version(str): the version ID of the backup for the set of keys we're querying
+ room_id(str): the ID of the room whose keys we're querying.
+ This is a bit redundant as it's implied by the session_id, but
+ we include for consistency with the rest of the API.
+ session_id(str): the session whose room_key we're querying.
+
+ Returns:
+ A deferred dict giving the session_data and message metadata for
+ this room key.
+ """
+
+ row = yield self._simple_select_one(
+ table="e2e_room_keys",
+ keyvalues={
+ "user_id": user_id,
+ "version": version,
+ "room_id": room_id,
+ "session_id": session_id,
+ },
+ retcols=(
+ "first_message_index",
+ "forwarded_count",
+ "is_verified",
+ "session_data",
+ ),
+ desc="get_e2e_room_key",
+ )
+
+ row["session_data"] = json.loads(row["session_data"])
+
+ defer.returnValue(row)
+
+ @defer.inlineCallbacks
+ def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
+ """Replaces or inserts the encrypted E2E room key for a given session in
+ a given backup
+
+ Args:
+ user_id(str): the user whose backup we're setting
+ version(str): the version ID of the backup we're updating
+ room_id(str): the ID of the room whose keys we're setting
+ session_id(str): the session whose room_key we're setting
+ room_key(dict): the room_key being set
+ Raises:
+ StoreError
+ """
+
+ yield self._simple_upsert(
+ table="e2e_room_keys",
+ keyvalues={
+ "user_id": user_id,
+ "room_id": room_id,
+ "session_id": session_id,
+ },
+ values={
+ "version": version,
+ "first_message_index": room_key['first_message_index'],
+ "forwarded_count": room_key['forwarded_count'],
+ "is_verified": room_key['is_verified'],
+ "session_data": json.dumps(room_key['session_data']),
+ },
+ lock=False,
+ )
+
+ @defer.inlineCallbacks
+ def get_e2e_room_keys(
+ self, user_id, version, room_id=None, session_id=None
+ ):
+ """Bulk get the E2E room keys for a given backup, optionally filtered to a given
+ room, or a given session.
+
+ Args:
+ user_id(str): the user whose backup we're querying
+ version(str): the version ID of the backup for the set of keys we're querying
+ room_id(str): Optional. the ID of the room whose keys we're querying, if any.
+ If not specified, we return the keys for all the rooms in the backup.
+ session_id(str): Optional. the session whose room_key we're querying, if any.
+ If specified, we also require the room_id to be specified.
+ If not specified, we return all the keys in this version of
+ the backup (or for the specified room)
+
+ Returns:
+ A deferred list of dicts giving the session_data and message metadata for
+ these room keys.
+ """
+
+ keyvalues = {
+ "user_id": user_id,
+ "version": version,
+ }
+ if room_id:
+ keyvalues['room_id'] = room_id
+ if session_id:
+ keyvalues['session_id'] = session_id
+
+ rows = yield self._simple_select_list(
+ table="e2e_room_keys",
+ keyvalues=keyvalues,
+ retcols=(
+ "user_id",
+ "room_id",
+ "session_id",
+ "first_message_index",
+ "forwarded_count",
+ "is_verified",
+ "session_data",
+ ),
+ desc="get_e2e_room_keys",
+ )
+
+ sessions = {'rooms': {}}
+ for row in rows:
+ room_entry = sessions['rooms'].setdefault(row['room_id'], {"sessions": {}})
+ room_entry['sessions'][row['session_id']] = {
+ "first_message_index": row["first_message_index"],
+ "forwarded_count": row["forwarded_count"],
+ "is_verified": row["is_verified"],
+ "session_data": json.loads(row["session_data"]),
+ }
+
+ defer.returnValue(sessions)
+
+ @defer.inlineCallbacks
+ def delete_e2e_room_keys(
+ self, user_id, version, room_id=None, session_id=None
+ ):
+ """Bulk delete the E2E room keys for a given backup, optionally filtered to a given
+ room or a given session.
+
+ Args:
+ user_id(str): the user whose backup we're deleting from
+ version(str): the version ID of the backup for the set of keys we're deleting
+ room_id(str): Optional. the ID of the room whose keys we're deleting, if any.
+ If not specified, we delete the keys for all the rooms in the backup.
+ session_id(str): Optional. the session whose room_key we're querying, if any.
+ If specified, we also require the room_id to be specified.
+ If not specified, we delete all the keys in this version of
+ the backup (or for the specified room)
+
+ Returns:
+ A deferred of the deletion transaction
+ """
+
+ keyvalues = {
+ "user_id": user_id,
+ "version": version,
+ }
+ if room_id:
+ keyvalues['room_id'] = room_id
+ if session_id:
+ keyvalues['session_id'] = session_id
+
+ yield self._simple_delete(
+ table="e2e_room_keys",
+ keyvalues=keyvalues,
+ desc="delete_e2e_room_keys",
+ )
+
+ @staticmethod
+ def _get_current_version(txn, user_id):
+ txn.execute(
+ "SELECT MAX(version) FROM e2e_room_keys_versions "
+ "WHERE user_id=? AND deleted=0",
+ (user_id,)
+ )
+ row = txn.fetchone()
+ if not row:
+ raise StoreError(404, 'No current backup version')
+ return row[0]
+
+ def get_e2e_room_keys_version_info(self, user_id, version=None):
+ """Get info metadata about a version of our room_keys backup.
+
+ Args:
+ user_id(str): the user whose backup we're querying
+ version(str): Optional. the version ID of the backup we're querying about
+ If missing, we return the information about the current version.
+ Raises:
+ StoreError: with code 404 if there are no e2e_room_keys_versions present
+ Returns:
+ A deferred dict giving the info metadata for this backup version
+ """
+
+ def _get_e2e_room_keys_version_info_txn(txn):
+ if version is None:
+ this_version = self._get_current_version(txn, user_id)
+ else:
+ this_version = version
+
+ result = self._simple_select_one_txn(
+ txn,
+ table="e2e_room_keys_versions",
+ keyvalues={
+ "user_id": user_id,
+ "version": this_version,
+ "deleted": 0,
+ },
+ retcols=(
+ "version",
+ "algorithm",
+ "auth_data",
+ ),
+ )
+ result["auth_data"] = json.loads(result["auth_data"])
+ return result
+
+ return self.runInteraction(
+ "get_e2e_room_keys_version_info",
+ _get_e2e_room_keys_version_info_txn
+ )
+
+ def create_e2e_room_keys_version(self, user_id, info):
+ """Atomically creates a new version of this user's e2e_room_keys store
+ with the given version info.
+
+ Args:
+ user_id(str): the user whose backup we're creating a version
+ info(dict): the info about the backup version to be created
+
+ Returns:
+ A deferred string for the newly created version ID
+ """
+
+ def _create_e2e_room_keys_version_txn(txn):
+ txn.execute(
+ "SELECT MAX(version) FROM e2e_room_keys_versions WHERE user_id=?",
+ (user_id,)
+ )
+ current_version = txn.fetchone()[0]
+ if current_version is None:
+ current_version = '0'
+
+ new_version = str(int(current_version) + 1)
+
+ self._simple_insert_txn(
+ txn,
+ table="e2e_room_keys_versions",
+ values={
+ "user_id": user_id,
+ "version": new_version,
+ "algorithm": info["algorithm"],
+ "auth_data": json.dumps(info["auth_data"]),
+ },
+ )
+
+ return new_version
+
+ return self.runInteraction(
+ "create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn
+ )
+
+ def delete_e2e_room_keys_version(self, user_id, version=None):
+ """Delete a given backup version of the user's room keys.
+ Doesn't delete their actual key data.
+
+ Args:
+ user_id(str): the user whose backup version we're deleting
+ version(str): Optional. the version ID of the backup version we're deleting
+ If missing, we delete the current backup version info.
+ Raises:
+ StoreError: with code 404 if there are no e2e_room_keys_versions present,
+ or if the version requested doesn't exist.
+ """
+
+ def _delete_e2e_room_keys_version_txn(txn):
+ if version is None:
+ this_version = self._get_current_version(txn, user_id)
+ else:
+ this_version = version
+
+ return self._simple_update_one_txn(
+ txn,
+ table="e2e_room_keys_versions",
+ keyvalues={
+ "user_id": user_id,
+ "version": this_version,
+ },
+ updatevalues={
+ "deleted": 1,
+ }
+ )
+
+ return self.runInteraction(
+ "delete_e2e_room_keys_version",
+ _delete_e2e_room_keys_version_txn
+ )
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 24345b20a6..3faca2a042 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -376,33 +376,25 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
@defer.inlineCallbacks
def get_missing_events(self, room_id, earliest_events, latest_events,
- limit, min_depth):
+ limit):
ids = yield self.runInteraction(
"get_missing_events",
self._get_missing_events,
- room_id, earliest_events, latest_events, limit, min_depth
+ room_id, earliest_events, latest_events, limit,
)
-
events = yield self._get_events(ids)
-
- events = sorted(
- [ev for ev in events if ev.depth >= min_depth],
- key=lambda e: e.depth,
- )
-
- defer.returnValue(events[:limit])
+ defer.returnValue(events)
def _get_missing_events(self, txn, room_id, earliest_events, latest_events,
- limit, min_depth):
-
- earliest_events = set(earliest_events)
- front = set(latest_events) - earliest_events
+ limit):
- event_results = set()
+ seen_events = set(earliest_events)
+ front = set(latest_events) - seen_events
+ event_results = []
query = (
"SELECT prev_event_id FROM event_edges "
- "WHERE event_id = ? AND is_state = ? "
+ "WHERE room_id = ? AND event_id = ? AND is_state = ? "
"LIMIT ?"
)
@@ -411,18 +403,20 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
for event_id in front:
txn.execute(
query,
- (event_id, False, limit - len(event_results))
+ (room_id, event_id, False, limit - len(event_results))
)
- for e_id, in txn:
- new_front.add(e_id)
+ new_results = set(t[0] for t in txn) - seen_events
- new_front -= earliest_events
- new_front -= event_results
+ new_front |= new_results
+ seen_events |= new_results
+ event_results.extend(new_results)
front = new_front
- event_results |= new_front
+ # we built the list working backwards from latest_events; we now need to
+ # reverse it so that the events are approximately chronological.
+ event_results.reverse()
return event_results
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 03cedf3a75..c780f55277 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -34,6 +34,7 @@ from synapse.api.errors import SynapseError
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.state import StateResolutionStore
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.event_federation import EventFederationStore
from synapse.storage.events_worker import EventsWorkerStore
@@ -731,11 +732,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
# Ok, we need to defer to the state handler to resolve our state sets.
- def get_events(ev_ids):
- return self.get_events(
- ev_ids, get_prev_content=False, check_redacted=False,
- )
-
state_groups = {
sg: state_groups_map[sg] for sg in new_state_groups
}
@@ -745,7 +741,8 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
logger.debug("calling resolve_state_groups from preserve_events")
res = yield self._state_resolution_handler.resolve_state_groups(
- room_id, room_version, state_groups, events_map, get_events
+ room_id, room_version, state_groups, events_map,
+ state_res_store=StateResolutionStore(self)
)
defer.returnValue((res.state, None))
@@ -854,6 +851,27 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
# Insert into event_to_state_groups.
self._store_event_state_mappings_txn(txn, events_and_contexts)
+ # We want to store event_auth mappings for rejected events, as they're
+ # used in state res v2.
+ # This is only necessary if the rejected event appears in an accepted
+ # event's auth chain, but its easier for now just to store them (and
+ # it doesn't take much storage compared to storing the entire event
+ # anyway).
+ self._simple_insert_many_txn(
+ txn,
+ table="event_auth",
+ values=[
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "auth_id": auth_id,
+ }
+ for event, _ in events_and_contexts
+ for auth_id, _ in event.auth_events
+ if event.is_state()
+ ],
+ )
+
# _store_rejected_events_txn filters out any events which were
# rejected, and returns the filtered list.
events_and_contexts = self._store_rejected_events_txn(
@@ -1329,21 +1347,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
txn, event.room_id, event.redacts
)
- self._simple_insert_many_txn(
- txn,
- table="event_auth",
- values=[
- {
- "event_id": event.event_id,
- "room_id": event.room_id,
- "auth_id": auth_id,
- }
- for event, _ in events_and_contexts
- for auth_id, _ in event.auth_events
- if event.is_state()
- ],
- )
-
# Update the event_forward_extremities, event_backward_extremities and
# event_edges tables.
self._handle_mult_prev_events(
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index a1331c1a61..8af17921e3 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -32,7 +32,7 @@ logger = logging.getLogger(__name__)
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
if six.PY2:
- db_binary_type = buffer
+ db_binary_type = six.moves.builtins.buffer
else:
db_binary_type = memoryview
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index c7987bfcdd..2743b52bad 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -29,7 +29,7 @@ from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
if six.PY2:
- db_binary_type = buffer
+ db_binary_type = six.moves.builtins.buffer
else:
db_binary_type = memoryview
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 26b429e307..2dd14aba1c 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -567,7 +567,7 @@ class RegistrationStore(RegistrationWorkerStore,
def _find_next_generated_user_id(txn):
txn.execute("SELECT name FROM users")
- regex = re.compile("^@(\d+):")
+ regex = re.compile(r"^@(\d+):")
found = set()
diff --git a/synapse/storage/schema/delta/51/e2e_room_keys.sql b/synapse/storage/schema/delta/51/e2e_room_keys.sql
new file mode 100644
index 0000000000..c0e66a697d
--- /dev/null
+++ b/synapse/storage/schema/delta/51/e2e_room_keys.sql
@@ -0,0 +1,39 @@
+/* Copyright 2017 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- users' optionally backed up encrypted e2e sessions
+CREATE TABLE e2e_room_keys (
+ user_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ session_id TEXT NOT NULL,
+ version TEXT NOT NULL,
+ first_message_index INT,
+ forwarded_count INT,
+ is_verified BOOLEAN,
+ session_data TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX e2e_room_keys_idx ON e2e_room_keys(user_id, room_id, session_id);
+
+-- the metadata for each generation of encrypted e2e session backups
+CREATE TABLE e2e_room_keys_versions (
+ user_id TEXT NOT NULL,
+ version TEXT NOT NULL,
+ algorithm TEXT NOT NULL,
+ auth_data TEXT NOT NULL,
+ deleted SMALLINT DEFAULT 0 NOT NULL
+);
+
+CREATE UNIQUE INDEX e2e_room_keys_versions_idx ON e2e_room_keys_versions(user_id, version);
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 5623391f6e..158e9dbe7b 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -27,7 +27,7 @@ from ._base import SQLBaseStore
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
if six.PY2:
- db_binary_type = buffer
+ db_binary_type = six.moves.builtins.buffer
else:
db_binary_type = memoryview
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 4c296d72c0..d6cfdba519 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -630,7 +630,21 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks
def get_all_new_events_stream(self, from_id, current_id, limit):
- """Get all new events"""
+ """Get all new events
+
+ Returns all events with from_id < stream_ordering <= current_id.
+
+ Args:
+ from_id (int): the stream_ordering of the last event we processed
+ current_id (int): the stream_ordering of the most recently processed event
+ limit (int): the maximum number of events to return
+
+ Returns:
+ Deferred[Tuple[int, list[FrozenEvent]]]: A tuple of (next_id, events), where
+ `next_id` is the next value to pass as `from_id` (it will either be the
+ stream_ordering of the last returned event, or, if fewer than `limit` events
+ were found, `current_id`.
+ """
def get_all_new_events_stream_txn(txn):
sql = (
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index a3032cdce9..d8bf953ec0 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -30,7 +30,7 @@ from ._base import SQLBaseStore, db_to_json
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
if six.PY2:
- db_binary_type = buffer
+ db_binary_type = six.moves.builtins.buffer
else:
db_binary_type = memoryview
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 680ea928c7..9a8fae0497 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -68,7 +68,10 @@ class Clock(object):
"""
call = task.LoopingCall(f)
call.clock = self._reactor
- call.start(msec / 1000.0, now=False)
+ d = call.start(msec / 1000.0, now=False)
+ d.addErrback(
+ log_failure, "Looping call died", consumeErrors=False,
+ )
return call
def call_later(self, delay, callback, *args, **kwargs):
@@ -109,3 +112,29 @@ def batch_iter(iterable, size):
sourceiter = iter(iterable)
# call islice until it returns an empty tuple
return iter(lambda: tuple(islice(sourceiter, size)), ())
+
+
+def log_failure(failure, msg, consumeErrors=True):
+ """Creates a function suitable for passing to `Deferred.addErrback` that
+ logs any failures that occur.
+
+ Args:
+ msg (str): Message to log
+ consumeErrors (bool): If true consumes the failure, otherwise passes
+ on down the callback chain
+
+ Returns:
+ func(Failure)
+ """
+
+ logger.error(
+ msg,
+ exc_info=(
+ failure.type,
+ failure.value,
+ failure.getTracebackObject()
+ )
+ )
+
+ if not consumeErrors:
+ return failure
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index f2bde74dc5..625aedc940 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -15,6 +15,8 @@
import logging
+from six import integer_types
+
from sortedcontainers import SortedDict
from synapse.util import caches
@@ -47,7 +49,7 @@ class StreamChangeCache(object):
def has_entity_changed(self, entity, stream_pos):
"""Returns True if the entity may have been updated since stream_pos
"""
- assert type(stream_pos) is int or type(stream_pos) is long
+ assert type(stream_pos) in integer_types
if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 89224b26cc..4c6e92beb8 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -25,7 +25,7 @@ See doc/log_contexts.rst for details on how this works.
import logging
import threading
-from twisted.internet import defer
+from twisted.internet import defer, threads
logger = logging.getLogger(__name__)
@@ -562,58 +562,76 @@ def _set_context_cb(result, context):
return result
-# modules to ignore in `logcontext_tracer`
-_to_ignore = [
- "synapse.util.logcontext",
- "synapse.http.server",
- "synapse.storage._base",
- "synapse.util.async_helpers",
-]
+def defer_to_thread(reactor, f, *args, **kwargs):
+ """
+ Calls the function `f` using a thread from the reactor's default threadpool and
+ returns the result as a Deferred.
+
+ Creates a new logcontext for `f`, which is created as a child of the current
+ logcontext (so its CPU usage metrics will get attributed to the current
+ logcontext). `f` should preserve the logcontext it is given.
+
+ The result deferred follows the Synapse logcontext rules: you should `yield`
+ on it.
+
+ Args:
+ reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
+ the Deferred will be invoked, and whose threadpool we should use for the
+ function.
+
+ Normally this will be hs.get_reactor().
+
+ f (callable): The function to call.
+ args: positional arguments to pass to f.
-def logcontext_tracer(frame, event, arg):
- """A tracer that logs whenever a logcontext "unexpectedly" changes within
- a function. Probably inaccurate.
+ kwargs: keyword arguments to pass to f.
- Use by calling `sys.settrace(logcontext_tracer)` in the main thread.
+ Returns:
+ Deferred: A Deferred which fires a callback with the result of `f`, or an
+ errback if `f` throws an exception.
"""
- if event == 'call':
- name = frame.f_globals["__name__"]
- if name.startswith("synapse"):
- if name == "synapse.util.logcontext":
- if frame.f_code.co_name in ["__enter__", "__exit__"]:
- tracer = frame.f_back.f_trace
- if tracer:
- tracer.just_changed = True
-
- tracer = frame.f_trace
- if tracer:
- return tracer
-
- if not any(name.startswith(ig) for ig in _to_ignore):
- return LineTracer()
-
-
-class LineTracer(object):
- __slots__ = ["context", "just_changed"]
-
- def __init__(self):
- self.context = LoggingContext.current_context()
- self.just_changed = False
-
- def __call__(self, frame, event, arg):
- if event in 'line':
- if self.just_changed:
- self.context = LoggingContext.current_context()
- self.just_changed = False
- else:
- c = LoggingContext.current_context()
- if c != self.context:
- logger.info(
- "Context changed! %s -> %s, %s, %s",
- self.context, c,
- frame.f_code.co_filename, frame.f_lineno
- )
- self.context = c
+ return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs)
- return self
+
+def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs):
+ """
+ A wrapper for twisted.internet.threads.deferToThreadpool, which handles
+ logcontexts correctly.
+
+ Calls the function `f` using a thread from the given threadpool and returns
+ the result as a Deferred.
+
+ Creates a new logcontext for `f`, which is created as a child of the current
+ logcontext (so its CPU usage metrics will get attributed to the current
+ logcontext). `f` should preserve the logcontext it is given.
+
+ The result deferred follows the Synapse logcontext rules: you should `yield`
+ on it.
+
+ Args:
+ reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
+ the Deferred will be invoked. Normally this will be hs.get_reactor().
+
+ threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for
+ running `f`. Normally this will be hs.get_reactor().getThreadPool().
+
+ f (callable): The function to call.
+
+ args: positional arguments to pass to f.
+
+ kwargs: keyword arguments to pass to f.
+
+ Returns:
+ Deferred: A Deferred which fires a callback with the result of `f`, or an
+ errback if `f` throws an exception.
+ """
+ logcontext = LoggingContext.current_context()
+
+ def g():
+ with LoggingContext(parent_context=logcontext):
+ return f(*args, **kwargs)
+
+ return make_deferred_yieldable(
+ threads.deferToThreadPool(reactor, threadpool, g)
+ )
diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py
index 8d0f2a8918..9cb7e9c9ab 100644
--- a/synapse/util/manhole.py
+++ b/synapse/util/manhole.py
@@ -70,6 +70,8 @@ def manhole(username, password, globals):
Returns:
twisted.internet.protocol.Factory: A factory to pass to ``listenTCP``
"""
+ if not isinstance(password, bytes):
+ password = password.encode('ascii')
checker = checkers.InMemoryUsernamePasswordDatabaseDontUse(
**{username: password}
@@ -82,7 +84,7 @@ def manhole(username, password, globals):
)
factory = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker]))
- factory.publicKeys['ssh-rsa'] = Key.fromString(PUBLIC_KEY)
- factory.privateKeys['ssh-rsa'] = Key.fromString(PRIVATE_KEY)
+ factory.publicKeys[b'ssh-rsa'] = Key.fromString(PUBLIC_KEY)
+ factory.privateKeys[b'ssh-rsa'] = Key.fromString(PRIVATE_KEY)
return factory
diff --git a/synapse/visibility.py b/synapse/visibility.py
index d4680863d3..43f48196be 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -219,7 +219,7 @@ def filter_events_for_server(store, server_name, events):
# Whatever else we do, we need to check for senders which have requested
# erasure of their data.
erased_senders = yield store.are_users_erased(
- e.sender for e in events,
+ (e.sender for e in events),
)
def redact_disallowed(event, state):
@@ -324,14 +324,13 @@ def filter_events_for_server(store, server_name, events):
# server's domain.
#
# event_to_state_ids contains lots of duplicates, so it turns out to be
- # cheaper to build a complete set of unique
- # ((type, state_key), event_id) tuples, and then filter out the ones we
- # don't want.
+ # cheaper to build a complete event_id => (type, state_key) dict, and then
+ # filter out the ones we don't want
#
- state_key_to_event_id_set = {
- e
+ event_id_to_state_key = {
+ event_id: key
for key_to_eid in itervalues(event_to_state_ids)
- for e in key_to_eid.items()
+ for key, event_id in iteritems(key_to_eid)
}
def include(typ, state_key):
@@ -346,7 +345,7 @@ def filter_events_for_server(store, server_name, events):
event_map = yield store.get_events([
e_id
- for key, e_id in state_key_to_event_id_set
+ for e_id, key in iteritems(event_id_to_state_key)
if include(key[0], key[1])
])
|