diff --git a/synapse/__init__.py b/synapse/__init__.py
index 43c5821ade..1ddbbbebfb 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -27,4 +27,4 @@ try:
except ImportError:
pass
-__version__ = "0.33.6"
+__version__ = "0.33.7"
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 2e7f98404d..48b903374d 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -59,6 +59,7 @@ class Codes(object):
RESOURCE_LIMIT_EXCEEDED = "M_RESOURCE_LIMIT_EXCEEDED"
UNSUPPORTED_ROOM_VERSION = "M_UNSUPPORTED_ROOM_VERSION"
INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION"
+ WRONG_ROOM_KEYS_VERSION = "M_WRONG_ROOM_KEYS_VERSION"
class CodeMessageException(RuntimeError):
@@ -312,6 +313,20 @@ class LimitExceededError(SynapseError):
)
+class RoomKeysVersionError(SynapseError):
+ """A client has tried to upload to a non-current version of the room_keys store
+ """
+ def __init__(self, current_version):
+ """
+ Args:
+ current_version (str): the current version of the store they should have used
+ """
+ super(RoomKeysVersionError, self).__init__(
+ 403, "Wrong room_keys version", Codes.WRONG_ROOM_KEYS_VERSION
+ )
+ self.current_version = current_version
+
+
class IncompatibleRoomVersionError(SynapseError):
"""A server is trying to join a room whose version it does not support."""
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 7c866e246a..18584226e9 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -17,6 +17,7 @@ import gc
import logging
import sys
+import psutil
from daemonize import Daemonize
from twisted.internet import error, reactor
@@ -24,12 +25,6 @@ from twisted.internet import error, reactor
from synapse.util import PreserveLoggingContext
from synapse.util.rlimit import change_resource_limit
-try:
- import affinity
-except Exception:
- affinity = None
-
-
logger = logging.getLogger(__name__)
@@ -89,15 +84,20 @@ def start_reactor(
with PreserveLoggingContext():
logger.info("Running")
if cpu_affinity is not None:
- if not affinity:
- quit_with_error(
- "Missing package 'affinity' required for cpu_affinity\n"
- "option\n\n"
- "Install by running:\n\n"
- " pip install affinity\n\n"
- )
- logger.info("Setting CPU affinity to %s" % cpu_affinity)
- affinity.set_process_affinity_mask(0, cpu_affinity)
+ # Turn the bitmask into bits, reverse it so we go from 0 up
+ mask_to_bits = bin(cpu_affinity)[2:][::-1]
+
+ cpus = []
+ cpu_num = 0
+
+ for i in mask_to_bits:
+ if i == "1":
+ cpus.append(cpu_num)
+ cpu_num += 1
+
+ p = psutil.Process()
+ p.cpu_affinity(cpus)
+
change_resource_limit(soft_file_limit)
if gc_thresholds:
gc.set_threshold(*gc_thresholds)
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index 9060ab14f6..e4a68715aa 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -178,6 +178,9 @@ def start(config_options):
setup_logging(config, use_worker_options=True)
+ # This should only be done on the user directory worker or the master
+ config.update_user_directory = False
+
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index fc4b25de1c..f5c61dec5b 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -68,7 +68,7 @@ class PresenceStatusStubServlet(ClientV1RestServlet):
"Authorization": auth_headers,
}
result = yield self.http_client.get_json(
- self.main_uri + request.uri,
+ self.main_uri + request.uri.decode('ascii'),
headers=headers,
)
defer.returnValue((200, result))
@@ -125,7 +125,7 @@ class KeyUploadServlet(RestServlet):
"Authorization": auth_headers,
}
result = yield self.http_client.post_json_get_json(
- self.main_uri + request.uri,
+ self.main_uri + request.uri.decode('ascii'),
body,
headers=headers,
)
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 630dcda478..0f9f8e19f6 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage._base import __func__
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
@@ -49,31 +50,31 @@ class PusherSlaveStore(
SlavedAccountDataStore
):
update_pusher_last_stream_ordering_and_success = (
- DataStore.update_pusher_last_stream_ordering_and_success.__func__
+ __func__(DataStore.update_pusher_last_stream_ordering_and_success)
)
update_pusher_failing_since = (
- DataStore.update_pusher_failing_since.__func__
+ __func__(DataStore.update_pusher_failing_since)
)
update_pusher_last_stream_ordering = (
- DataStore.update_pusher_last_stream_ordering.__func__
+ __func__(DataStore.update_pusher_last_stream_ordering)
)
get_throttle_params_by_room = (
- DataStore.get_throttle_params_by_room.__func__
+ __func__(DataStore.get_throttle_params_by_room)
)
set_throttle_params = (
- DataStore.set_throttle_params.__func__
+ __func__(DataStore.set_throttle_params)
)
get_time_of_last_push_action_before = (
- DataStore.get_time_of_last_push_action_before.__func__
+ __func__(DataStore.get_time_of_last_push_action_before)
)
get_profile_displayname = (
- DataStore.get_profile_displayname.__func__
+ __func__(DataStore.get_profile_displayname)
)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 9a7fc6ee9d..3926c7f263 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -33,7 +33,7 @@ from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
-from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
@@ -147,7 +147,7 @@ class SynchrotronPresence(object):
and haven't come back yet. If there are poke the master about them.
"""
now = self.clock.time_msec()
- for user_id, last_sync_ms in self.users_going_offline.items():
+ for user_id, last_sync_ms in list(self.users_going_offline.items()):
if now - last_sync_ms > 10 * 1000:
self.users_going_offline.pop(user_id, None)
self.send_user_sync(user_id, False, last_sync_ms)
@@ -156,9 +156,9 @@ class SynchrotronPresence(object):
# TODO Hows this supposed to work?
pass
- get_states = PresenceHandler.get_states.__func__
- get_state = PresenceHandler.get_state.__func__
- current_state_for_users = PresenceHandler.current_state_for_users.__func__
+ get_states = __func__(PresenceHandler.get_states)
+ get_state = __func__(PresenceHandler.get_state)
+ current_state_for_users = __func__(PresenceHandler.current_state_for_users)
def user_syncing(self, user_id, affect_presence):
if affect_presence:
@@ -208,7 +208,7 @@ class SynchrotronPresence(object):
) for row in rows]
for state in states:
- self.user_to_current_state[row.user_id] = state
+ self.user_to_current_state[state.user_id] = state
stream_id = token
yield self.notify_from_replication(states, stream_id)
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
index fe156b6930..e2582cfecc 100644
--- a/synapse/config/emailconfig.py
+++ b/synapse/config/emailconfig.py
@@ -13,11 +13,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import print_function
+
# This file can't be called email.py because if it is, we cannot:
import email.utils
+import logging
+import os
+import sys
+import textwrap
from ._base import Config
+logger = logging.getLogger(__name__)
+
+TEMPLATE_DIR_WARNING = """\
+WARNING: The email notifier is configured to look for templates in '%(template_dir)s',
+but no templates could be found there. We will fall back to using the example templates;
+to get rid of this warning, leave 'email.template_dir' unset.
+"""
+
class EmailConfig(Config):
def read_config(self, config):
@@ -38,7 +52,6 @@ class EmailConfig(Config):
"smtp_host",
"smtp_port",
"notif_from",
- "template_dir",
"notif_template_html",
"notif_template_text",
]
@@ -62,9 +75,24 @@ class EmailConfig(Config):
self.email_smtp_host = email_config["smtp_host"]
self.email_smtp_port = email_config["smtp_port"]
self.email_notif_from = email_config["notif_from"]
- self.email_template_dir = email_config["template_dir"]
self.email_notif_template_html = email_config["notif_template_html"]
self.email_notif_template_text = email_config["notif_template_text"]
+
+ self.email_template_dir = email_config.get("template_dir")
+
+ # backwards-compatibility hack
+ if (
+ self.email_template_dir == "res/templates"
+ and not os.path.isfile(
+ os.path.join(self.email_template_dir, self.email_notif_template_text)
+ )
+ ):
+ t = TEMPLATE_DIR_WARNING % {
+ "template_dir": self.email_template_dir,
+ }
+ print(textwrap.fill(t, width=80) + "\n", file=sys.stderr)
+ self.email_template_dir = None
+
self.email_notif_for_new_users = email_config.get(
"notif_for_new_users", True
)
@@ -113,7 +141,9 @@ class EmailConfig(Config):
# require_transport_security: False
# notif_from: "Your Friendly %(app)s Home Server <noreply@example.com>"
# app_name: Matrix
- # template_dir: res/templates
+ # # if template_dir is unset, uses the example templates that are part of
+ # # the Synapse distribution.
+ # #template_dir: res/templates
# notif_template_html: notif_mail.html
# notif_template_text: notif_mail.txt
# notif_for_new_users: True
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 819e8f7331..4efe95faa4 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -507,19 +507,19 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
@log_function
def on_get_missing_events(self, origin, room_id, earliest_events,
- latest_events, limit, min_depth):
+ latest_events, limit):
with (yield self._server_linearizer.queue((origin, room_id))):
origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, room_id)
logger.info(
"on_get_missing_events: earliest_events: %r, latest_events: %r,"
- " limit: %d, min_depth: %d",
- earliest_events, latest_events, limit, min_depth
+ " limit: %d",
+ earliest_events, latest_events, limit,
)
missing_events = yield self.handler.on_get_missing_events(
- origin, room_id, earliest_events, latest_events, limit, min_depth
+ origin, room_id, earliest_events, latest_events, limit,
)
if len(missing_events) < 5:
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 98b5950800..3fdd63be95 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -633,14 +633,6 @@ class TransactionQueue(object):
transaction, json_data_cb
)
code = 200
-
- if response:
- for e_id, r in response.get("pdus", {}).items():
- if "error" in r:
- logger.warn(
- "Transaction returned error for %s: %s",
- e_id, r,
- )
except HttpResponseException as e:
code = e.code
response = e.response
@@ -657,19 +649,24 @@ class TransactionQueue(object):
destination, txn_id, code
)
- logger.debug("TX [%s] Sent transaction", destination)
- logger.debug("TX [%s] Marking as delivered...", destination)
-
yield self.transaction_actions.delivered(
transaction, code, response
)
- logger.debug("TX [%s] Marked as delivered", destination)
+ logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id)
- if code != 200:
+ if code == 200:
+ for e_id, r in response.get("pdus", {}).items():
+ if "error" in r:
+ logger.warn(
+ "TX [%s] {%s} Remote returned error for %s: %s",
+ destination, txn_id, e_id, r,
+ )
+ else:
for p in pdus:
- logger.info(
- "Failed to send event %s to %s", p.event_id, destination
+ logger.warn(
+ "TX [%s] {%s} Failed to send event %s",
+ destination, txn_id, p.event_id,
)
success = False
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 2ab973d6c8..edba5a9808 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -143,9 +143,17 @@ class TransportLayerClient(object):
transaction (Transaction)
Returns:
- Deferred: Results of the deferred is a tuple in the form of
- (response_code, response_body) where the response_body is a
- python dict decoded from json
+ Deferred: Succeeds when we get a 2xx HTTP response. The result
+ will be the decoded JSON body.
+
+ Fails with ``HTTPRequestException`` if we get an HTTP response
+ code >= 300.
+
+ Fails with ``NotRetryingDestination`` if we are not yet ready
+ to retry this server.
+
+ Fails with ``FederationDeniedError`` if this destination
+ is not on our federation whitelist
"""
logger.debug(
"send_data dest=%s, txid=%s",
@@ -170,11 +178,6 @@ class TransportLayerClient(object):
backoff_on_404=True, # If we get a 404 the other side has gone
)
- logger.debug(
- "send_data dest=%s, txid=%s, got response: 200",
- transaction.destination, transaction.transaction_id,
- )
-
defer.returnValue(response)
@defer.inlineCallbacks
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 2f874b4838..7288d49074 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -560,7 +560,6 @@ class FederationGetMissingEventsServlet(BaseFederationServlet):
@defer.inlineCallbacks
def on_POST(self, origin, content, query, room_id):
limit = int(content.get("limit", 10))
- min_depth = int(content.get("min_depth", 0))
earliest_events = content.get("earliest_events", [])
latest_events = content.get("latest_events", [])
@@ -569,7 +568,6 @@ class FederationGetMissingEventsServlet(BaseFederationServlet):
room_id=room_id,
earliest_events=earliest_events,
latest_events=latest_events,
- min_depth=min_depth,
limit=limit,
)
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index f0f89af7dc..17eedf4dbf 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -28,6 +28,7 @@ from synapse.metrics import (
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util import log_failure
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.metrics import Measure
@@ -36,17 +37,6 @@ logger = logging.getLogger(__name__)
events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
-def log_failure(failure):
- logger.error(
- "Application Services Failure",
- exc_info=(
- failure.type,
- failure.value,
- failure.getTracebackObject()
- )
- )
-
-
class ApplicationServicesHandler(object):
def __init__(self, hs):
@@ -112,7 +102,10 @@ class ApplicationServicesHandler(object):
if not self.started_scheduler:
def start_scheduler():
- return self.scheduler.start().addErrback(log_failure)
+ return self.scheduler.start().addErrback(
+ log_failure, "Application Services Failure",
+ )
+
run_as_background_process("as_scheduler", start_scheduler)
self.started_scheduler = True
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
new file mode 100644
index 0000000000..5edb3cfe04
--- /dev/null
+++ b/synapse/handlers/e2e_room_keys.py
@@ -0,0 +1,289 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017, 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from six import iteritems
+
+from twisted.internet import defer
+
+from synapse.api.errors import RoomKeysVersionError, StoreError, SynapseError
+from synapse.util.async_helpers import Linearizer
+
+logger = logging.getLogger(__name__)
+
+
+class E2eRoomKeysHandler(object):
+ """
+ Implements an optional realtime backup mechanism for encrypted E2E megolm room keys.
+ This gives a way for users to store and recover their megolm keys if they lose all
+ their clients. It should also extend easily to future room key mechanisms.
+ The actual payload of the encrypted keys is completely opaque to the handler.
+ """
+
+ def __init__(self, hs):
+ self.store = hs.get_datastore()
+
+ # Used to lock whenever a client is uploading key data. This prevents collisions
+ # between clients trying to upload the details of a new session, given all
+ # clients belonging to a user will receive and try to upload a new session at
+ # roughly the same time. Also used to lock out uploads when the key is being
+ # changed.
+ self._upload_linearizer = Linearizer("upload_room_keys_lock")
+
+ @defer.inlineCallbacks
+ def get_room_keys(self, user_id, version, room_id=None, session_id=None):
+ """Bulk get the E2E room keys for a given backup, optionally filtered to a given
+ room, or a given session.
+ See EndToEndRoomKeyStore.get_e2e_room_keys for full details.
+
+ Args:
+ user_id(str): the user whose keys we're getting
+ version(str): the version ID of the backup we're getting keys from
+ room_id(string): room ID to get keys for, for None to get keys for all rooms
+ session_id(string): session ID to get keys for, for None to get keys for all
+ sessions
+ Returns:
+ A deferred list of dicts giving the session_data and message metadata for
+ these room keys.
+ """
+
+ # we deliberately take the lock to get keys so that changing the version
+ # works atomically
+ with (yield self._upload_linearizer.queue(user_id)):
+ results = yield self.store.get_e2e_room_keys(
+ user_id, version, room_id, session_id
+ )
+
+ if results['rooms'] == {}:
+ raise SynapseError(404, "No room_keys found")
+
+ defer.returnValue(results)
+
+ @defer.inlineCallbacks
+ def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
+ """Bulk delete the E2E room keys for a given backup, optionally filtered to a given
+ room or a given session.
+ See EndToEndRoomKeyStore.delete_e2e_room_keys for full details.
+
+ Args:
+ user_id(str): the user whose backup we're deleting
+ version(str): the version ID of the backup we're deleting
+ room_id(string): room ID to delete keys for, for None to delete keys for all
+ rooms
+ session_id(string): session ID to delete keys for, for None to delete keys
+ for all sessions
+ Returns:
+ A deferred of the deletion transaction
+ """
+
+ # lock for consistency with uploading
+ with (yield self._upload_linearizer.queue(user_id)):
+ yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
+
+ @defer.inlineCallbacks
+ def upload_room_keys(self, user_id, version, room_keys):
+ """Bulk upload a list of room keys into a given backup version, asserting
+ that the given version is the current backup version. room_keys are merged
+ into the current backup as described in RoomKeysServlet.on_PUT().
+
+ Args:
+ user_id(str): the user whose backup we're setting
+ version(str): the version ID of the backup we're updating
+ room_keys(dict): a nested dict describing the room_keys we're setting:
+
+ {
+ "rooms": {
+ "!abc:matrix.org": {
+ "sessions": {
+ "c0ff33": {
+ "first_message_index": 1,
+ "forwarded_count": 1,
+ "is_verified": false,
+ "session_data": "SSBBTSBBIEZJU0gK"
+ }
+ }
+ }
+ }
+ }
+
+ Raises:
+ SynapseError: with code 404 if there are no versions defined
+ RoomKeysVersionError: if the uploaded version is not the current version
+ """
+
+ # TODO: Validate the JSON to make sure it has the right keys.
+
+ # XXX: perhaps we should use a finer grained lock here?
+ with (yield self._upload_linearizer.queue(user_id)):
+
+ # Check that the version we're trying to upload is the current version
+ try:
+ version_info = yield self.store.get_e2e_room_keys_version_info(user_id)
+ except StoreError as e:
+ if e.code == 404:
+ raise SynapseError(404, "Version '%s' not found" % (version,))
+ else:
+ raise
+
+ if version_info['version'] != version:
+ # Check that the version we're trying to upload actually exists
+ try:
+ version_info = yield self.store.get_e2e_room_keys_version_info(
+ user_id, version,
+ )
+ # if we get this far, the version must exist
+ raise RoomKeysVersionError(current_version=version_info['version'])
+ except StoreError as e:
+ if e.code == 404:
+ raise SynapseError(404, "Version '%s' not found" % (version,))
+ else:
+ raise
+
+ # go through the room_keys.
+ # XXX: this should/could be done concurrently, given we're in a lock.
+ for room_id, room in iteritems(room_keys['rooms']):
+ for session_id, session in iteritems(room['sessions']):
+ yield self._upload_room_key(
+ user_id, version, room_id, session_id, session
+ )
+
+ @defer.inlineCallbacks
+ def _upload_room_key(self, user_id, version, room_id, session_id, room_key):
+ """Upload a given room_key for a given room and session into a given
+ version of the backup. Merges the key with any which might already exist.
+
+ Args:
+ user_id(str): the user whose backup we're setting
+ version(str): the version ID of the backup we're updating
+ room_id(str): the ID of the room whose keys we're setting
+ session_id(str): the session whose room_key we're setting
+ room_key(dict): the room_key being set
+ """
+
+ # get the room_key for this particular row
+ current_room_key = None
+ try:
+ current_room_key = yield self.store.get_e2e_room_key(
+ user_id, version, room_id, session_id
+ )
+ except StoreError as e:
+ if e.code == 404:
+ pass
+ else:
+ raise
+
+ if self._should_replace_room_key(current_room_key, room_key):
+ yield self.store.set_e2e_room_key(
+ user_id, version, room_id, session_id, room_key
+ )
+
+ @staticmethod
+ def _should_replace_room_key(current_room_key, room_key):
+ """
+ Determine whether to replace a given current_room_key (if any)
+ with a newly uploaded room_key backup
+
+ Args:
+ current_room_key (dict): Optional, the current room_key dict if any
+ room_key (dict): The new room_key dict which may or may not be fit to
+ replace the current_room_key
+
+ Returns:
+ True if current_room_key should be replaced by room_key in the backup
+ """
+
+ if current_room_key:
+ # spelt out with if/elifs rather than nested boolean expressions
+ # purely for legibility.
+
+ if room_key['is_verified'] and not current_room_key['is_verified']:
+ return True
+ elif (
+ room_key['first_message_index'] <
+ current_room_key['first_message_index']
+ ):
+ return True
+ elif room_key['forwarded_count'] < current_room_key['forwarded_count']:
+ return True
+ else:
+ return False
+ return True
+
+ @defer.inlineCallbacks
+ def create_version(self, user_id, version_info):
+ """Create a new backup version. This automatically becomes the new
+ backup version for the user's keys; previous backups will no longer be
+ writeable to.
+
+ Args:
+ user_id(str): the user whose backup version we're creating
+ version_info(dict): metadata about the new version being created
+
+ {
+ "algorithm": "m.megolm_backup.v1",
+ "auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K"
+ }
+
+ Returns:
+ A deferred of a string that gives the new version number.
+ """
+
+ # TODO: Validate the JSON to make sure it has the right keys.
+
+ # lock everyone out until we've switched version
+ with (yield self._upload_linearizer.queue(user_id)):
+ new_version = yield self.store.create_e2e_room_keys_version(
+ user_id, version_info
+ )
+ defer.returnValue(new_version)
+
+ @defer.inlineCallbacks
+ def get_version_info(self, user_id, version=None):
+ """Get the info about a given version of the user's backup
+
+ Args:
+ user_id(str): the user whose current backup version we're querying
+ version(str): Optional; if None gives the most recent version
+ otherwise a historical one.
+ Raises:
+ StoreError: code 404 if the requested backup version doesn't exist
+ Returns:
+ A deferred of a info dict that gives the info about the new version.
+
+ {
+ "version": "1234",
+ "algorithm": "m.megolm_backup.v1",
+ "auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K"
+ }
+ """
+
+ with (yield self._upload_linearizer.queue(user_id)):
+ res = yield self.store.get_e2e_room_keys_version_info(user_id, version)
+ defer.returnValue(res)
+
+ @defer.inlineCallbacks
+ def delete_version(self, user_id, version=None):
+ """Deletes a given version of the user's e2e_room_keys backup
+
+ Args:
+ user_id(str): the user whose current backup version we're deleting
+ version(str): the version id of the backup being deleted
+ Raises:
+ StoreError: code 404 if this backup version doesn't exist
+ """
+
+ with (yield self._upload_linearizer.queue(user_id)):
+ yield self.store.delete_e2e_room_keys_version(user_id, version)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 45d955e6f5..cab57a8849 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -309,8 +309,8 @@ class FederationHandler(BaseHandler):
if sent_to_us_directly:
logger.warn(
- "[%s %s] Failed to fetch %d prev events: rejecting",
- room_id, event_id, len(prevs - seen),
+ "[%s %s] Rejecting: failed to fetch %d prev events: %s",
+ room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
)
raise FederationError(
"ERROR",
@@ -452,8 +452,8 @@ class FederationHandler(BaseHandler):
latest |= seen
logger.info(
- "[%s %s]: Requesting %d prev_events: %s",
- room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
+ "[%s %s]: Requesting missing events between %s and %s",
+ room_id, event_id, shortstr(latest), event_id,
)
# XXX: we set timeout to 10s to help workaround
@@ -1852,7 +1852,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def on_get_missing_events(self, origin, room_id, earliest_events,
- latest_events, limit, min_depth):
+ latest_events, limit):
in_room = yield self.auth.check_host_in_room(
room_id,
origin
@@ -1861,14 +1861,12 @@ class FederationHandler(BaseHandler):
raise AuthError(403, "Host not in room.")
limit = min(limit, 20)
- min_depth = max(min_depth, 0)
missing_events = yield self.store.get_missing_events(
room_id=room_id,
earliest_events=earliest_events,
latest_events=latest_events,
limit=limit,
- min_depth=min_depth,
)
missing_events = yield filter_events_for_server(
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 38e1737ec9..dc88620885 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -16,7 +16,7 @@
import logging
from collections import namedtuple
-from six import iteritems
+from six import PY3, iteritems
from six.moves import range
import msgpack
@@ -444,9 +444,16 @@ class RoomListNextBatch(namedtuple("RoomListNextBatch", (
@classmethod
def from_token(cls, token):
+ if PY3:
+ # The argument raw=False is only available on new versions of
+ # msgpack, and only really needed on Python 3. Gate it behind
+ # a PY3 check to avoid causing issues on Debian-packaged versions.
+ decoded = msgpack.loads(decode_base64(token), raw=False)
+ else:
+ decoded = msgpack.loads(decode_base64(token))
return RoomListNextBatch(**{
cls.REVERSE_KEY_DICT[key]: val
- for key, val in msgpack.loads(decode_base64(token)).items()
+ for key, val in decoded.items()
})
def to_token(self):
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 67b8ca28c7..351892a94f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -20,6 +20,8 @@ import logging
from six import iteritems, itervalues
+from prometheus_client import Counter
+
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
@@ -36,6 +38,19 @@ from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__)
+
+# Counts the number of times we returned a non-empty sync. `type` is one of
+# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
+# "true" or "false" depending on if the request asked for lazy loaded members or
+# not.
+non_empty_sync_counter = Counter(
+ "synapse_handlers_sync_nonempty_total",
+ "Count of non empty sync responses. type is initial_sync/full_state_sync"
+ "/incremental_sync. lazy_loaded indicates if lazy loaded members were "
+ "enabled for that request.",
+ ["type", "lazy_loaded"],
+)
+
# Store the cache that tracks which lazy-loaded members have been sent to a given
# client for no more than 30 minutes.
LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
@@ -227,14 +242,16 @@ class SyncHandler(object):
@defer.inlineCallbacks
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
full_state):
+ if since_token is None:
+ sync_type = "initial_sync"
+ elif full_state:
+ sync_type = "full_state_sync"
+ else:
+ sync_type = "incremental_sync"
+
context = LoggingContext.current_context()
if context:
- if since_token is None:
- context.tag = "initial_sync"
- elif full_state:
- context.tag = "full_state_sync"
- else:
- context.tag = "incremental_sync"
+ context.tag = sync_type
if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
@@ -242,7 +259,6 @@ class SyncHandler(object):
result = yield self.current_sync_for_user(
sync_config, since_token, full_state=full_state,
)
- defer.returnValue(result)
else:
def current_sync_callback(before_token, after_token):
return self.current_sync_for_user(sync_config, since_token)
@@ -251,7 +267,15 @@ class SyncHandler(object):
sync_config.user.to_string(), timeout, current_sync_callback,
from_token=since_token,
)
- defer.returnValue(result)
+
+ if result:
+ if sync_config.filter_collection.lazy_load_members():
+ lazy_loaded = "true"
+ else:
+ lazy_loaded = "false"
+ non_empty_sync_counter.labels(sync_type, lazy_loaded).inc()
+
+ defer.returnValue(result)
def current_sync_for_user(self, sync_config, since_token=None,
full_state=False):
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 14b12cd1c4..fcc02fc77d 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -195,7 +195,7 @@ class MatrixFederationHttpClient(object):
)
self.clock = hs.get_clock()
self._store = hs.get_datastore()
- self.version_string = hs.version_string.encode('ascii')
+ self.version_string_bytes = hs.version_string.encode('ascii')
self.default_timeout = 60
def schedule(x):
@@ -261,8 +261,8 @@ class MatrixFederationHttpClient(object):
ignore_backoff=ignore_backoff,
)
- method = request.method
- destination = request.destination
+ method_bytes = request.method.encode("ascii")
+ destination_bytes = request.destination.encode("ascii")
path_bytes = request.path.encode("ascii")
if request.query:
query_bytes = encode_query_args(request.query)
@@ -270,8 +270,8 @@ class MatrixFederationHttpClient(object):
query_bytes = b""
headers_dict = {
- "User-Agent": [self.version_string],
- "Host": [request.destination],
+ b"User-Agent": [self.version_string_bytes],
+ b"Host": [destination_bytes],
}
with limiter:
@@ -282,50 +282,51 @@ class MatrixFederationHttpClient(object):
else:
retries_left = MAX_SHORT_RETRIES
- url = urllib.parse.urlunparse((
- b"matrix", destination.encode("ascii"),
+ url_bytes = urllib.parse.urlunparse((
+ b"matrix", destination_bytes,
path_bytes, None, query_bytes, b"",
- )).decode('ascii')
+ ))
+ url_str = url_bytes.decode('ascii')
- http_url = urllib.parse.urlunparse((
+ url_to_sign_bytes = urllib.parse.urlunparse((
b"", b"",
path_bytes, None, query_bytes, b"",
- )).decode('ascii')
+ ))
while True:
try:
json = request.get_json()
if json:
- data = encode_canonical_json(json)
- headers_dict["Content-Type"] = ["application/json"]
+ headers_dict[b"Content-Type"] = [b"application/json"]
self.sign_request(
- destination, method, http_url, headers_dict, json
+ destination_bytes, method_bytes, url_to_sign_bytes,
+ headers_dict, json,
)
- else:
- data = None
- self.sign_request(destination, method, http_url, headers_dict)
-
- logger.info(
- "{%s} [%s] Sending request: %s %s",
- request.txn_id, destination, method, url
- )
-
- if data:
+ data = encode_canonical_json(json)
producer = FileBodyProducer(
BytesIO(data),
- cooperator=self._cooperator
+ cooperator=self._cooperator,
)
else:
producer = None
+ self.sign_request(
+ destination_bytes, method_bytes, url_to_sign_bytes,
+ headers_dict,
+ )
- request_deferred = treq.request(
- method,
- url,
+ logger.info(
+ "{%s} [%s] Sending request: %s %s",
+ request.txn_id, request.destination, request.method,
+ url_str,
+ )
+
+ # we don't want all the fancy cookie and redirect handling that
+ # treq.request gives: just use the raw Agent.
+ request_deferred = self.agent.request(
+ method_bytes,
+ url_bytes,
headers=Headers(headers_dict),
- data=producer,
- agent=self.agent,
- reactor=self.hs.get_reactor(),
- unbuffered=True
+ bodyProducer=producer,
)
request_deferred = timeout_deferred(
@@ -344,9 +345,9 @@ class MatrixFederationHttpClient(object):
logger.warn(
"{%s} [%s] Request failed: %s %s: %s",
request.txn_id,
- destination,
- method,
- url,
+ request.destination,
+ request.method,
+ url_str,
_flatten_response_never_received(e),
)
@@ -366,7 +367,7 @@ class MatrixFederationHttpClient(object):
logger.debug(
"{%s} [%s] Waiting %ss before re-sending...",
request.txn_id,
- destination,
+ request.destination,
delay,
)
@@ -378,7 +379,7 @@ class MatrixFederationHttpClient(object):
logger.info(
"{%s} [%s] Got response headers: %d %s",
request.txn_id,
- destination,
+ request.destination,
response.code,
response.phrase.decode('ascii', errors='replace'),
)
@@ -411,8 +412,9 @@ class MatrixFederationHttpClient(object):
destination_is must be non-None.
method (bytes): The HTTP method of the request
url_bytes (bytes): The URI path of the request
- headers_dict (dict): Dictionary of request headers to append to
- content (bytes): The body of the request
+ headers_dict (dict[bytes, list[bytes]]): Dictionary of request headers to
+ append to
+ content (object): The body of the request
destination_is (bytes): As 'destination', but if the destination is an
identity server
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index fedb4e6b18..62045a918b 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -39,7 +39,8 @@ outgoing_responses_counter = Counter(
)
response_timer = Histogram(
- "synapse_http_server_response_time_seconds", "sec",
+ "synapse_http_server_response_time_seconds",
+ "sec",
["method", "servlet", "tag", "code"],
)
@@ -79,15 +80,11 @@ response_size = Counter(
# than when the response was written.
in_flight_requests_ru_utime = Counter(
- "synapse_http_server_in_flight_requests_ru_utime_seconds",
- "",
- ["method", "servlet"],
+ "synapse_http_server_in_flight_requests_ru_utime_seconds", "", ["method", "servlet"]
)
in_flight_requests_ru_stime = Counter(
- "synapse_http_server_in_flight_requests_ru_stime_seconds",
- "",
- ["method", "servlet"],
+ "synapse_http_server_in_flight_requests_ru_stime_seconds", "", ["method", "servlet"]
)
in_flight_requests_db_txn_count = Counter(
@@ -134,7 +131,7 @@ def _get_in_flight_counts():
# type
counts = {}
for rm in reqs:
- key = (rm.method, rm.name,)
+ key = (rm.method, rm.name)
counts[key] = counts.get(key, 0) + 1
return counts
@@ -175,7 +172,8 @@ class RequestMetrics(object):
if context != self.start_context:
logger.warn(
"Context have unexpectedly changed %r, %r",
- context, self.start_context
+ context,
+ self.start_context,
)
return
@@ -192,10 +190,10 @@ class RequestMetrics(object):
resource_usage = context.get_resource_usage()
response_ru_utime.labels(self.method, self.name, tag).inc(
- resource_usage.ru_utime,
+ resource_usage.ru_utime
)
response_ru_stime.labels(self.method, self.name, tag).inc(
- resource_usage.ru_stime,
+ resource_usage.ru_stime
)
response_db_txn_count.labels(self.method, self.name, tag).inc(
resource_usage.db_txn_count
@@ -222,8 +220,15 @@ class RequestMetrics(object):
diff = new_stats - self._request_stats
self._request_stats = new_stats
- in_flight_requests_ru_utime.labels(self.method, self.name).inc(diff.ru_utime)
- in_flight_requests_ru_stime.labels(self.method, self.name).inc(diff.ru_stime)
+ # max() is used since rapid use of ru_stime/ru_utime can end up with the
+ # count going backwards due to NTP, time smearing, fine-grained
+ # correction, or floating points. Who knows, really?
+ in_flight_requests_ru_utime.labels(self.method, self.name).inc(
+ max(diff.ru_utime, 0)
+ )
+ in_flight_requests_ru_stime.labels(self.method, self.name).inc(
+ max(diff.ru_stime, 0)
+ )
in_flight_requests_db_txn_count.labels(self.method, self.name).inc(
diff.db_txn_count
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 340b16ce25..de02b1017e 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -186,9 +186,9 @@ class Notifier(object):
def count_listeners():
all_user_streams = set()
- for x in self.room_to_user_streams.values():
+ for x in list(self.room_to_user_streams.values()):
all_user_streams |= x
- for x in self.user_to_user_stream.values():
+ for x in list(self.user_to_user_stream.values()):
all_user_streams.add(x)
return sum(stream.count_listeners() for stream in all_user_streams)
@@ -196,7 +196,7 @@ class Notifier(object):
LaterGauge(
"synapse_notifier_rooms", "", [],
- lambda: count(bool, self.room_to_user_streams.values()),
+ lambda: count(bool, list(self.room_to_user_streams.values())),
)
LaterGauge(
"synapse_notifier_users", "", [],
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 1a5a10d974..b9dcfee740 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -528,7 +528,10 @@ def load_jinja2_templates(config):
"""
logger.info("loading jinja2")
- loader = jinja2.FileSystemLoader(config.email_template_dir)
+ if config.email_template_dir:
+ loader = jinja2.FileSystemLoader(config.email_template_dir)
+ else:
+ loader = jinja2.PackageLoader('synapse', 'res/templates')
env = jinja2.Environment(loader=loader)
env.filters["format_ts"] = format_ts_filter
env.filters["mxc_to_http"] = _create_mxc_to_http_filter(config)
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index d4d983b00a..f51184b50d 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -55,7 +55,7 @@ REQUIREMENTS = {
"sortedcontainers>=1.4.4": ["sortedcontainers"],
"pysaml2>=3.0.0": ["saml2"],
"pymacaroons-pynacl>=0.9.3": ["pymacaroons"],
- "msgpack-python>=0.3.0": ["msgpack"],
+ "msgpack-python>=0.4.2": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"],
"six>=1.10": ["six"],
@@ -82,9 +82,6 @@ CONDITIONAL_REQUIREMENTS = {
"psutil": {
"psutil>=2.0.0": ["psutil>=2.0.0"],
},
- "affinity": {
- "affinity": ["affinity"],
- },
"postgres": {
"psycopg2>=2.6": ["psycopg2"]
}
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index 3f7be74e02..2d81d49e9a 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -15,6 +15,8 @@
import logging
+import six
+
from synapse.storage._base import SQLBaseStore
from synapse.storage.engines import PostgresEngine
@@ -23,6 +25,13 @@ from ._slaved_id_tracker import SlavedIdTracker
logger = logging.getLogger(__name__)
+def __func__(inp):
+ if six.PY3:
+ return inp
+ else:
+ return inp.__func__
+
+
class BaseSlavedStore(SQLBaseStore):
def __init__(self, db_conn, hs):
super(BaseSlavedStore, self).__init__(db_conn, hs)
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index 87eaa53004..4f19fd35aa 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -17,7 +17,7 @@ from synapse.storage import DataStore
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.stream_change_cache import StreamChangeCache
-from ._base import BaseSlavedStore
+from ._base import BaseSlavedStore, __func__
from ._slaved_id_tracker import SlavedIdTracker
@@ -43,11 +43,11 @@ class SlavedDeviceInboxStore(BaseSlavedStore):
expiry_ms=30 * 60 * 1000,
)
- get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__
- get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__
- get_new_device_msgs_for_remote = DataStore.get_new_device_msgs_for_remote.__func__
- delete_messages_for_device = DataStore.delete_messages_for_device.__func__
- delete_device_msgs_for_remote = DataStore.delete_device_msgs_for_remote.__func__
+ get_to_device_stream_token = __func__(DataStore.get_to_device_stream_token)
+ get_new_messages_for_device = __func__(DataStore.get_new_messages_for_device)
+ get_new_device_msgs_for_remote = __func__(DataStore.get_new_device_msgs_for_remote)
+ delete_messages_for_device = __func__(DataStore.delete_messages_for_device)
+ delete_device_msgs_for_remote = __func__(DataStore.delete_device_msgs_for_remote)
def stream_positions(self):
result = super(SlavedDeviceInboxStore, self).stream_positions()
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index 21b8c468fa..ec2fd561cc 100644
--- a/synapse/replication/slave/storage/devices.py
+++ b/synapse/replication/slave/storage/devices.py
@@ -13,23 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import six
-
from synapse.storage import DataStore
from synapse.storage.end_to_end_keys import EndToEndKeyStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
-from ._base import BaseSlavedStore
+from ._base import BaseSlavedStore, __func__
from ._slaved_id_tracker import SlavedIdTracker
-def __func__(inp):
- if six.PY3:
- return inp
- else:
- return inp.__func__
-
-
class SlavedDeviceStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedDeviceStore, self).__init__(db_conn, hs)
diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py
index 5777f07c8d..e933b170bb 100644
--- a/synapse/replication/slave/storage/groups.py
+++ b/synapse/replication/slave/storage/groups.py
@@ -16,7 +16,7 @@
from synapse.storage import DataStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
-from ._base import BaseSlavedStore
+from ._base import BaseSlavedStore, __func__
from ._slaved_id_tracker import SlavedIdTracker
@@ -33,9 +33,9 @@ class SlavedGroupServerStore(BaseSlavedStore):
"_group_updates_stream_cache", self._group_updates_id_gen.get_current_token(),
)
- get_groups_changes_for_user = DataStore.get_groups_changes_for_user.__func__
- get_group_stream_token = DataStore.get_group_stream_token.__func__
- get_all_groups_for_user = DataStore.get_all_groups_for_user.__func__
+ get_groups_changes_for_user = __func__(DataStore.get_groups_changes_for_user)
+ get_group_stream_token = __func__(DataStore.get_group_stream_token)
+ get_all_groups_for_user = __func__(DataStore.get_all_groups_for_user)
def stream_positions(self):
result = super(SlavedGroupServerStore, self).stream_positions()
diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py
index 05ed168463..8032f53fec 100644
--- a/synapse/replication/slave/storage/keys.py
+++ b/synapse/replication/slave/storage/keys.py
@@ -16,7 +16,7 @@
from synapse.storage import DataStore
from synapse.storage.keys import KeyStore
-from ._base import BaseSlavedStore
+from ._base import BaseSlavedStore, __func__
class SlavedKeyStore(BaseSlavedStore):
@@ -24,11 +24,11 @@ class SlavedKeyStore(BaseSlavedStore):
"_get_server_verify_key"
]
- get_server_verify_keys = DataStore.get_server_verify_keys.__func__
- store_server_verify_key = DataStore.store_server_verify_key.__func__
+ get_server_verify_keys = __func__(DataStore.get_server_verify_keys)
+ store_server_verify_key = __func__(DataStore.store_server_verify_key)
- get_server_certificate = DataStore.get_server_certificate.__func__
- store_server_certificate = DataStore.store_server_certificate.__func__
+ get_server_certificate = __func__(DataStore.get_server_certificate)
+ store_server_certificate = __func__(DataStore.store_server_certificate)
- get_server_keys_json = DataStore.get_server_keys_json.__func__
- store_server_keys_json = DataStore.store_server_keys_json.__func__
+ get_server_keys_json = __func__(DataStore.get_server_keys_json)
+ store_server_keys_json = __func__(DataStore.store_server_keys_json)
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
index 80b744082a..92447b00d4 100644
--- a/synapse/replication/slave/storage/presence.py
+++ b/synapse/replication/slave/storage/presence.py
@@ -17,7 +17,7 @@ from synapse.storage import DataStore
from synapse.storage.presence import PresenceStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
-from ._base import BaseSlavedStore
+from ._base import BaseSlavedStore, __func__
from ._slaved_id_tracker import SlavedIdTracker
@@ -34,8 +34,8 @@ class SlavedPresenceStore(BaseSlavedStore):
"PresenceStreamChangeCache", self._presence_id_gen.get_current_token()
)
- _get_active_presence = DataStore._get_active_presence.__func__
- take_presence_startup_info = DataStore.take_presence_startup_info.__func__
+ _get_active_presence = __func__(DataStore._get_active_presence)
+ take_presence_startup_info = __func__(DataStore.take_presence_startup_info)
_get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"]
get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"]
diff --git a/synapse/res/templates/mail-Vector.css b/synapse/res/templates/mail-Vector.css
new file mode 100644
index 0000000000..6a3e36eda1
--- /dev/null
+++ b/synapse/res/templates/mail-Vector.css
@@ -0,0 +1,7 @@
+.header {
+ border-bottom: 4px solid #e4f7ed ! important;
+}
+
+.notif_link a, .footer a {
+ color: #76CFA6 ! important;
+}
diff --git a/synapse/res/templates/mail.css b/synapse/res/templates/mail.css
new file mode 100644
index 0000000000..5ab3e1b06d
--- /dev/null
+++ b/synapse/res/templates/mail.css
@@ -0,0 +1,156 @@
+body {
+ margin: 0px;
+}
+
+pre, code {
+ word-break: break-word;
+ white-space: pre-wrap;
+}
+
+#page {
+ font-family: 'Open Sans', Helvetica, Arial, Sans-Serif;
+ font-color: #454545;
+ font-size: 12pt;
+ width: 100%;
+ padding: 20px;
+}
+
+#inner {
+ width: 640px;
+}
+
+.header {
+ width: 100%;
+ height: 87px;
+ color: #454545;
+ border-bottom: 4px solid #e5e5e5;
+}
+
+.logo {
+ text-align: right;
+ margin-left: 20px;
+}
+
+.salutation {
+ padding-top: 10px;
+ font-weight: bold;
+}
+
+.summarytext {
+}
+
+.room {
+ width: 100%;
+ color: #454545;
+ border-bottom: 1px solid #e5e5e5;
+}
+
+.room_header td {
+ padding-top: 38px;
+ padding-bottom: 10px;
+ border-bottom: 1px solid #e5e5e5;
+}
+
+.room_name {
+ vertical-align: middle;
+ font-size: 18px;
+ font-weight: bold;
+}
+
+.room_header h2 {
+ margin-top: 0px;
+ margin-left: 75px;
+ font-size: 20px;
+}
+
+.room_avatar {
+ width: 56px;
+ line-height: 0px;
+ text-align: center;
+ vertical-align: middle;
+}
+
+.room_avatar img {
+ width: 48px;
+ height: 48px;
+ object-fit: cover;
+ border-radius: 24px;
+}
+
+.notif {
+ border-bottom: 1px solid #e5e5e5;
+ margin-top: 16px;
+ padding-bottom: 16px;
+}
+
+.historical_message .sender_avatar {
+ opacity: 0.3;
+}
+
+/* spell out opacity and historical_message class names for Outlook aka Word */
+.historical_message .sender_name {
+ color: #e3e3e3;
+}
+
+.historical_message .message_time {
+ color: #e3e3e3;
+}
+
+.historical_message .message_body {
+ color: #c7c7c7;
+}
+
+.historical_message td,
+.message td {
+ padding-top: 10px;
+}
+
+.sender_avatar {
+ width: 56px;
+ text-align: center;
+ vertical-align: top;
+}
+
+.sender_avatar img {
+ margin-top: -2px;
+ width: 32px;
+ height: 32px;
+ border-radius: 16px;
+}
+
+.sender_name {
+ display: inline;
+ font-size: 13px;
+ color: #a2a2a2;
+}
+
+.message_time {
+ text-align: right;
+ width: 100px;
+ font-size: 11px;
+ color: #a2a2a2;
+}
+
+.message_body {
+}
+
+.notif_link td {
+ padding-top: 10px;
+ padding-bottom: 10px;
+ font-weight: bold;
+}
+
+.notif_link a, .footer a {
+ color: #454545;
+ text-decoration: none;
+}
+
+.debug {
+ font-size: 10px;
+ color: #888;
+}
+
+.footer {
+ margin-top: 20px;
+ text-align: center;
+}
\ No newline at end of file
diff --git a/synapse/res/templates/notif.html b/synapse/res/templates/notif.html
new file mode 100644
index 0000000000..88b921ca9c
--- /dev/null
+++ b/synapse/res/templates/notif.html
@@ -0,0 +1,45 @@
+{% for message in notif.messages %}
+ <tr class="{{ "historical_message" if message.is_historical else "message" }}">
+ <td class="sender_avatar">
+ {% if loop.index0 == 0 or notif.messages[loop.index0 - 1].sender_name != notif.messages[loop.index0].sender_name %}
+ {% if message.sender_avatar_url %}
+ <img alt="" class="sender_avatar" src="{{ message.sender_avatar_url|mxc_to_http(32,32) }}" />
+ {% else %}
+ {% if message.sender_hash % 3 == 0 %}
+ <img class="sender_avatar" src="https://vector.im/beta/img/76cfa6.png" />
+ {% elif message.sender_hash % 3 == 1 %}
+ <img class="sender_avatar" src="https://vector.im/beta/img/50e2c2.png" />
+ {% else %}
+ <img class="sender_avatar" src="https://vector.im/beta/img/f4c371.png" />
+ {% endif %}
+ {% endif %}
+ {% endif %}
+ </td>
+ <td class="message_contents">
+ {% if loop.index0 == 0 or notif.messages[loop.index0 - 1].sender_name != notif.messages[loop.index0].sender_name %}
+ <div class="sender_name">{% if message.msgtype == "m.emote" %}*{% endif %} {{ message.sender_name }}</div>
+ {% endif %}
+ <div class="message_body">
+ {% if message.msgtype == "m.text" %}
+ {{ message.body_text_html }}
+ {% elif message.msgtype == "m.emote" %}
+ {{ message.body_text_html }}
+ {% elif message.msgtype == "m.notice" %}
+ {{ message.body_text_html }}
+ {% elif message.msgtype == "m.image" %}
+ <img src="{{ message.image_url|mxc_to_http(640, 480, scale) }}" />
+ {% elif message.msgtype == "m.file" %}
+ <span class="filename">{{ message.body_text_plain }}</span>
+ {% endif %}
+ </div>
+ </td>
+ <td class="message_time">{{ message.ts|format_ts("%H:%M") }}</td>
+ </tr>
+{% endfor %}
+<tr class="notif_link">
+ <td></td>
+ <td>
+ <a href="{{ notif.link }}">View {{ room.title }}</a>
+ </td>
+ <td></td>
+</tr>
diff --git a/synapse/res/templates/notif.txt b/synapse/res/templates/notif.txt
new file mode 100644
index 0000000000..a37bee9833
--- /dev/null
+++ b/synapse/res/templates/notif.txt
@@ -0,0 +1,16 @@
+{% for message in notif.messages %}
+{% if message.msgtype == "m.emote" %}* {% endif %}{{ message.sender_name }} ({{ message.ts|format_ts("%H:%M") }})
+{% if message.msgtype == "m.text" %}
+{{ message.body_text_plain }}
+{% elif message.msgtype == "m.emote" %}
+{{ message.body_text_plain }}
+{% elif message.msgtype == "m.notice" %}
+{{ message.body_text_plain }}
+{% elif message.msgtype == "m.image" %}
+{{ message.body_text_plain }}
+{% elif message.msgtype == "m.file" %}
+{{ message.body_text_plain }}
+{% endif %}
+{% endfor %}
+
+View {{ room.title }} at {{ notif.link }}
diff --git a/synapse/res/templates/notif_mail.html b/synapse/res/templates/notif_mail.html
new file mode 100644
index 0000000000..fcdb3109fe
--- /dev/null
+++ b/synapse/res/templates/notif_mail.html
@@ -0,0 +1,55 @@
+<!doctype html>
+<html lang="en">
+ <head>
+ <style type="text/css">
+ {% include 'mail.css' without context %}
+ {% include "mail-%s.css" % app_name ignore missing without context %}
+ </style>
+ </head>
+ <body>
+ <table id="page">
+ <tr>
+ <td> </td>
+ <td id="inner">
+ <table class="header">
+ <tr>
+ <td>
+ <div class="salutation">Hi {{ user_display_name }},</div>
+ <div class="summarytext">{{ summary_text }}</div>
+ </td>
+ <td class="logo">
+ {% if app_name == "Riot" %}
+ <img src="http://matrix.org/img/riot-logo-email.png" width="83" height="83" alt="[Riot]"/>
+ {% elif app_name == "Vector" %}
+ <img src="http://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/>
+ {% else %}
+ <img src="http://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/>
+ {% endif %}
+ </td>
+ </tr>
+ </table>
+ {% for room in rooms %}
+ {% include 'room.html' with context %}
+ {% endfor %}
+ <div class="footer">
+ <a href="{{ unsubscribe_link }}">Unsubscribe</a>
+ <br/>
+ <br/>
+ <div class="debug">
+ Sending email at {{ reason.now|format_ts("%c") }} due to activity in room {{ reason.room_name }} because
+ an event was received at {{ reason.received_at|format_ts("%c") }}
+ which is more than {{ "%.1f"|format(reason.delay_before_mail_ms / (60*1000)) }} ({{ reason.delay_before_mail_ms }}) mins ago,
+ {% if reason.last_sent_ts %}
+ and the last time we sent a mail for this room was {{ reason.last_sent_ts|format_ts("%c") }},
+ which is more than {{ "%.1f"|format(reason.throttle_ms / (60*1000)) }} (current throttle_ms) mins ago.
+ {% else %}
+ and we don't have a last time we sent a mail for this room.
+ {% endif %}
+ </div>
+ </div>
+ </td>
+ <td> </td>
+ </tr>
+ </table>
+ </body>
+</html>
diff --git a/synapse/res/templates/notif_mail.txt b/synapse/res/templates/notif_mail.txt
new file mode 100644
index 0000000000..24843042a5
--- /dev/null
+++ b/synapse/res/templates/notif_mail.txt
@@ -0,0 +1,10 @@
+Hi {{ user_display_name }},
+
+{{ summary_text }}
+
+{% for room in rooms %}
+{% include 'room.txt' with context %}
+{% endfor %}
+
+You can disable these notifications at {{ unsubscribe_link }}
+
diff --git a/synapse/res/templates/room.html b/synapse/res/templates/room.html
new file mode 100644
index 0000000000..723c222d25
--- /dev/null
+++ b/synapse/res/templates/room.html
@@ -0,0 +1,33 @@
+<table class="room">
+ <tr class="room_header">
+ <td class="room_avatar">
+ {% if room.avatar_url %}
+ <img alt="" src="{{ room.avatar_url|mxc_to_http(48,48) }}" />
+ {% else %}
+ {% if room.hash % 3 == 0 %}
+ <img alt="" src="https://vector.im/beta/img/76cfa6.png" />
+ {% elif room.hash % 3 == 1 %}
+ <img alt="" src="https://vector.im/beta/img/50e2c2.png" />
+ {% else %}
+ <img alt="" src="https://vector.im/beta/img/f4c371.png" />
+ {% endif %}
+ {% endif %}
+ </td>
+ <td class="room_name" colspan="2">
+ {{ room.title }}
+ </td>
+ </tr>
+ {% if room.invite %}
+ <tr>
+ <td></td>
+ <td>
+ <a href="{{ room.link }}">Join the conversation.</a>
+ </td>
+ <td></td>
+ </tr>
+ {% else %}
+ {% for notif in room.notifs %}
+ {% include 'notif.html' with context %}
+ {% endfor %}
+ {% endif %}
+</table>
diff --git a/synapse/res/templates/room.txt b/synapse/res/templates/room.txt
new file mode 100644
index 0000000000..84648c710e
--- /dev/null
+++ b/synapse/res/templates/room.txt
@@ -0,0 +1,9 @@
+{{ room.title }}
+
+{% if room.invite %}
+ You've been invited, join at {{ room.link }}
+{% else %}
+ {% for notif in room.notifs %}
+ {% include 'notif.txt' with context %}
+ {% endfor %}
+{% endif %}
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 3418f06fd6..4856822a5d 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -46,6 +46,7 @@ from synapse.rest.client.v2_alpha import (
receipts,
register,
report_event,
+ room_keys,
sendtodevice,
sync,
tags,
@@ -102,6 +103,7 @@ class ClientRestResource(JsonResource):
auth.register_servlets(hs, client_resource)
receipts.register_servlets(hs, client_resource)
read_marker.register_servlets(hs, client_resource)
+ room_keys.register_servlets(hs, client_resource)
keys.register_servlets(hs, client_resource)
tokenrefresh.register_servlets(hs, client_resource)
tags.register_servlets(hs, client_resource)
diff --git a/synapse/rest/client/v2_alpha/room_keys.py b/synapse/rest/client/v2_alpha/room_keys.py
new file mode 100644
index 0000000000..45b5817d8b
--- /dev/null
+++ b/synapse/rest/client/v2_alpha/room_keys.py
@@ -0,0 +1,372 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017, 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.errors import Codes, SynapseError
+from synapse.http.servlet import (
+ RestServlet,
+ parse_json_object_from_request,
+ parse_string,
+)
+
+from ._base import client_v2_patterns
+
+logger = logging.getLogger(__name__)
+
+
+class RoomKeysServlet(RestServlet):
+ PATTERNS = client_v2_patterns(
+ "/room_keys/keys(/(?P<room_id>[^/]+))?(/(?P<session_id>[^/]+))?$"
+ )
+
+ def __init__(self, hs):
+ """
+ Args:
+ hs (synapse.server.HomeServer): server
+ """
+ super(RoomKeysServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler()
+
+ @defer.inlineCallbacks
+ def on_PUT(self, request, room_id, session_id):
+ """
+ Uploads one or more encrypted E2E room keys for backup purposes.
+ room_id: the ID of the room the keys are for (optional)
+ session_id: the ID for the E2E room keys for the room (optional)
+ version: the version of the user's backup which this data is for.
+ the version must already have been created via the /room_keys/version API.
+
+ Each session has:
+ * first_message_index: a numeric index indicating the oldest message
+ encrypted by this session.
+ * forwarded_count: how many times the uploading client claims this key
+ has been shared (forwarded)
+ * is_verified: whether the client that uploaded the keys claims they
+ were sent by a device which they've verified
+ * session_data: base64-encrypted data describing the session.
+
+ Returns 200 OK on success with body {}
+ Returns 403 Forbidden if the version in question is not the most recently
+ created version (i.e. if this is an old client trying to write to a stale backup)
+ Returns 404 Not Found if the version in question doesn't exist
+
+ The API is designed to be otherwise agnostic to the room_key encryption
+ algorithm being used. Sessions are merged with existing ones in the
+ backup using the heuristics:
+ * is_verified sessions always win over unverified sessions
+ * older first_message_index always win over newer sessions
+ * lower forwarded_count always wins over higher forwarded_count
+
+ We trust the clients not to lie and corrupt their own backups.
+ It also means that if your access_token is stolen, the attacker could
+ delete your backup.
+
+ POST /room_keys/keys/!abc:matrix.org/c0ff33?version=1 HTTP/1.1
+ Content-Type: application/json
+
+ {
+ "first_message_index": 1,
+ "forwarded_count": 1,
+ "is_verified": false,
+ "session_data": "SSBBTSBBIEZJU0gK"
+ }
+
+ Or...
+
+ POST /room_keys/keys/!abc:matrix.org?version=1 HTTP/1.1
+ Content-Type: application/json
+
+ {
+ "sessions": {
+ "c0ff33": {
+ "first_message_index": 1,
+ "forwarded_count": 1,
+ "is_verified": false,
+ "session_data": "SSBBTSBBIEZJU0gK"
+ }
+ }
+ }
+
+ Or...
+
+ POST /room_keys/keys?version=1 HTTP/1.1
+ Content-Type: application/json
+
+ {
+ "rooms": {
+ "!abc:matrix.org": {
+ "sessions": {
+ "c0ff33": {
+ "first_message_index": 1,
+ "forwarded_count": 1,
+ "is_verified": false,
+ "session_data": "SSBBTSBBIEZJU0gK"
+ }
+ }
+ }
+ }
+ }
+ """
+ requester = yield self.auth.get_user_by_req(request, allow_guest=False)
+ user_id = requester.user.to_string()
+ body = parse_json_object_from_request(request)
+ version = parse_string(request, "version")
+
+ if session_id:
+ body = {
+ "sessions": {
+ session_id: body
+ }
+ }
+
+ if room_id:
+ body = {
+ "rooms": {
+ room_id: body
+ }
+ }
+
+ yield self.e2e_room_keys_handler.upload_room_keys(
+ user_id, version, body
+ )
+ defer.returnValue((200, {}))
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, room_id, session_id):
+ """
+ Retrieves one or more encrypted E2E room keys for backup purposes.
+ Symmetric with the PUT version of the API.
+
+ room_id: the ID of the room to retrieve the keys for (optional)
+ session_id: the ID for the E2E room keys to retrieve the keys for (optional)
+ version: the version of the user's backup which this data is for.
+ the version must already have been created via the /change_secret API.
+
+ Returns as follows:
+
+ GET /room_keys/keys/!abc:matrix.org/c0ff33?version=1 HTTP/1.1
+ {
+ "first_message_index": 1,
+ "forwarded_count": 1,
+ "is_verified": false,
+ "session_data": "SSBBTSBBIEZJU0gK"
+ }
+
+ Or...
+
+ GET /room_keys/keys/!abc:matrix.org?version=1 HTTP/1.1
+ {
+ "sessions": {
+ "c0ff33": {
+ "first_message_index": 1,
+ "forwarded_count": 1,
+ "is_verified": false,
+ "session_data": "SSBBTSBBIEZJU0gK"
+ }
+ }
+ }
+
+ Or...
+
+ GET /room_keys/keys?version=1 HTTP/1.1
+ {
+ "rooms": {
+ "!abc:matrix.org": {
+ "sessions": {
+ "c0ff33": {
+ "first_message_index": 1,
+ "forwarded_count": 1,
+ "is_verified": false,
+ "session_data": "SSBBTSBBIEZJU0gK"
+ }
+ }
+ }
+ }
+ }
+ """
+ requester = yield self.auth.get_user_by_req(request, allow_guest=False)
+ user_id = requester.user.to_string()
+ version = parse_string(request, "version")
+
+ room_keys = yield self.e2e_room_keys_handler.get_room_keys(
+ user_id, version, room_id, session_id
+ )
+
+ if session_id:
+ room_keys = room_keys['rooms'][room_id]['sessions'][session_id]
+ elif room_id:
+ room_keys = room_keys['rooms'][room_id]
+
+ defer.returnValue((200, room_keys))
+
+ @defer.inlineCallbacks
+ def on_DELETE(self, request, room_id, session_id):
+ """
+ Deletes one or more encrypted E2E room keys for a user for backup purposes.
+
+ DELETE /room_keys/keys/!abc:matrix.org/c0ff33?version=1
+ HTTP/1.1 200 OK
+ {}
+
+ room_id: the ID of the room whose keys to delete (optional)
+ session_id: the ID for the E2E session to delete (optional)
+ version: the version of the user's backup which this data is for.
+ the version must already have been created via the /change_secret API.
+ """
+
+ requester = yield self.auth.get_user_by_req(request, allow_guest=False)
+ user_id = requester.user.to_string()
+ version = parse_string(request, "version")
+
+ yield self.e2e_room_keys_handler.delete_room_keys(
+ user_id, version, room_id, session_id
+ )
+ defer.returnValue((200, {}))
+
+
+class RoomKeysNewVersionServlet(RestServlet):
+ PATTERNS = client_v2_patterns(
+ "/room_keys/version$"
+ )
+
+ def __init__(self, hs):
+ """
+ Args:
+ hs (synapse.server.HomeServer): server
+ """
+ super(RoomKeysNewVersionServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler()
+
+ @defer.inlineCallbacks
+ def on_POST(self, request):
+ """
+ Create a new backup version for this user's room_keys with the given
+ info. The version is allocated by the server and returned to the user
+ in the response. This API is intended to be used whenever the user
+ changes the encryption key for their backups, ensuring that backups
+ encrypted with different keys don't collide.
+
+ It takes out an exclusive lock on this user's room_key backups, to ensure
+ clients only upload to the current backup.
+
+ The algorithm passed in the version info is a reverse-DNS namespaced
+ identifier to describe the format of the encrypted backupped keys.
+
+ The auth_data is { user_id: "user_id", nonce: <random string> }
+ encrypted using the algorithm and current encryption key described above.
+
+ POST /room_keys/version
+ Content-Type: application/json
+ {
+ "algorithm": "m.megolm_backup.v1",
+ "auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K"
+ }
+
+ HTTP/1.1 200 OK
+ Content-Type: application/json
+ {
+ "version": 12345
+ }
+ """
+ requester = yield self.auth.get_user_by_req(request, allow_guest=False)
+ user_id = requester.user.to_string()
+ info = parse_json_object_from_request(request)
+
+ new_version = yield self.e2e_room_keys_handler.create_version(
+ user_id, info
+ )
+ defer.returnValue((200, {"version": new_version}))
+
+ # we deliberately don't have a PUT /version, as these things really should
+ # be immutable to avoid people footgunning
+
+
+class RoomKeysVersionServlet(RestServlet):
+ PATTERNS = client_v2_patterns(
+ "/room_keys/version(/(?P<version>[^/]+))?$"
+ )
+
+ def __init__(self, hs):
+ """
+ Args:
+ hs (synapse.server.HomeServer): server
+ """
+ super(RoomKeysVersionServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler()
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, version):
+ """
+ Retrieve the version information about a given version of the user's
+ room_keys backup. If the version part is missing, returns info about the
+ most current backup version (if any)
+
+ It takes out an exclusive lock on this user's room_key backups, to ensure
+ clients only upload to the current backup.
+
+ Returns 404 if the given version does not exist.
+
+ GET /room_keys/version/12345 HTTP/1.1
+ {
+ "version": "12345",
+ "algorithm": "m.megolm_backup.v1",
+ "auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K"
+ }
+ """
+ requester = yield self.auth.get_user_by_req(request, allow_guest=False)
+ user_id = requester.user.to_string()
+
+ try:
+ info = yield self.e2e_room_keys_handler.get_version_info(
+ user_id, version
+ )
+ except SynapseError as e:
+ if e.code == 404:
+ raise SynapseError(404, "No backup found", Codes.NOT_FOUND)
+ defer.returnValue((200, info))
+
+ @defer.inlineCallbacks
+ def on_DELETE(self, request, version):
+ """
+ Delete the information about a given version of the user's
+ room_keys backup. If the version part is missing, deletes the most
+ current backup version (if any). Doesn't delete the actual room data.
+
+ DELETE /room_keys/version/12345 HTTP/1.1
+ HTTP/1.1 200 OK
+ {}
+ """
+ if version is None:
+ raise SynapseError(400, "No version specified to delete", Codes.NOT_FOUND)
+
+ requester = yield self.auth.get_user_by_req(request, allow_guest=False)
+ user_id = requester.user.to_string()
+
+ yield self.e2e_room_keys_handler.delete_version(
+ user_id, version
+ )
+ defer.returnValue((200, {}))
+
+
+def register_servlets(hs, http_server):
+ RoomKeysServlet(hs).register(http_server)
+ RoomKeysVersionServlet(hs).register(http_server)
+ RoomKeysNewVersionServlet(hs).register(http_server)
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index af01040a38..8c892ff187 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -596,10 +596,13 @@ def _iterate_over_text(tree, *tags_to_ignore):
# to be returned.
elements = iter([tree])
while True:
- el = next(elements)
+ el = next(elements, None)
+ if el is None:
+ return
+
if isinstance(el, string_types):
yield el
- elif el is not None and el.tag not in tags_to_ignore:
+ elif el.tag not in tags_to_ignore:
# el.text is the text before the first child, so we can immediately
# return it if the text exists.
if el.text:
diff --git a/synapse/server.py b/synapse/server.py
index 938a05f9dc..3e9d3d8256 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -51,6 +51,7 @@ from synapse.handlers.deactivate_account import DeactivateAccountHandler
from synapse.handlers.device import DeviceHandler
from synapse.handlers.devicemessage import DeviceMessageHandler
from synapse.handlers.e2e_keys import E2eKeysHandler
+from synapse.handlers.e2e_room_keys import E2eRoomKeysHandler
from synapse.handlers.events import EventHandler, EventStreamHandler
from synapse.handlers.groups_local import GroupsLocalHandler
from synapse.handlers.initial_sync import InitialSyncHandler
@@ -130,6 +131,7 @@ class HomeServer(object):
'auth_handler',
'device_handler',
'e2e_keys_handler',
+ 'e2e_room_keys_handler',
'event_handler',
'event_stream_handler',
'initial_sync_handler',
@@ -299,6 +301,9 @@ class HomeServer(object):
def build_e2e_keys_handler(self):
return E2eKeysHandler(self)
+ def build_e2e_room_keys_handler(self):
+ return E2eRoomKeysHandler(self)
+
def build_application_service_api(self):
return ApplicationServiceApi(self)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 23b4a8d76d..53c685c173 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -30,6 +30,7 @@ from .appservice import ApplicationServiceStore, ApplicationServiceTransactionSt
from .client_ips import ClientIpStore
from .deviceinbox import DeviceInboxStore
from .directory import DirectoryStore
+from .e2e_room_keys import EndToEndRoomKeyStore
from .end_to_end_keys import EndToEndKeyStore
from .engines import PostgresEngine
from .event_federation import EventFederationStore
@@ -77,6 +78,7 @@ class DataStore(RoomMemberStore, RoomStore,
ApplicationServiceTransactionStore,
ReceiptsStore,
EndToEndKeyStore,
+ EndToEndRoomKeyStore,
SearchStore,
TagsStore,
AccountDataStore,
diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py
new file mode 100644
index 0000000000..f25ded2295
--- /dev/null
+++ b/synapse/storage/e2e_room_keys.py
@@ -0,0 +1,320 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+from twisted.internet import defer
+
+from synapse.api.errors import StoreError
+
+from ._base import SQLBaseStore
+
+
+class EndToEndRoomKeyStore(SQLBaseStore):
+
+ @defer.inlineCallbacks
+ def get_e2e_room_key(self, user_id, version, room_id, session_id):
+ """Get the encrypted E2E room key for a given session from a given
+ backup version of room_keys. We only store the 'best' room key for a given
+ session at a given time, as determined by the handler.
+
+ Args:
+ user_id(str): the user whose backup we're querying
+ version(str): the version ID of the backup for the set of keys we're querying
+ room_id(str): the ID of the room whose keys we're querying.
+ This is a bit redundant as it's implied by the session_id, but
+ we include for consistency with the rest of the API.
+ session_id(str): the session whose room_key we're querying.
+
+ Returns:
+ A deferred dict giving the session_data and message metadata for
+ this room key.
+ """
+
+ row = yield self._simple_select_one(
+ table="e2e_room_keys",
+ keyvalues={
+ "user_id": user_id,
+ "version": version,
+ "room_id": room_id,
+ "session_id": session_id,
+ },
+ retcols=(
+ "first_message_index",
+ "forwarded_count",
+ "is_verified",
+ "session_data",
+ ),
+ desc="get_e2e_room_key",
+ )
+
+ row["session_data"] = json.loads(row["session_data"])
+
+ defer.returnValue(row)
+
+ @defer.inlineCallbacks
+ def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
+ """Replaces or inserts the encrypted E2E room key for a given session in
+ a given backup
+
+ Args:
+ user_id(str): the user whose backup we're setting
+ version(str): the version ID of the backup we're updating
+ room_id(str): the ID of the room whose keys we're setting
+ session_id(str): the session whose room_key we're setting
+ room_key(dict): the room_key being set
+ Raises:
+ StoreError
+ """
+
+ yield self._simple_upsert(
+ table="e2e_room_keys",
+ keyvalues={
+ "user_id": user_id,
+ "room_id": room_id,
+ "session_id": session_id,
+ },
+ values={
+ "version": version,
+ "first_message_index": room_key['first_message_index'],
+ "forwarded_count": room_key['forwarded_count'],
+ "is_verified": room_key['is_verified'],
+ "session_data": json.dumps(room_key['session_data']),
+ },
+ lock=False,
+ )
+
+ @defer.inlineCallbacks
+ def get_e2e_room_keys(
+ self, user_id, version, room_id=None, session_id=None
+ ):
+ """Bulk get the E2E room keys for a given backup, optionally filtered to a given
+ room, or a given session.
+
+ Args:
+ user_id(str): the user whose backup we're querying
+ version(str): the version ID of the backup for the set of keys we're querying
+ room_id(str): Optional. the ID of the room whose keys we're querying, if any.
+ If not specified, we return the keys for all the rooms in the backup.
+ session_id(str): Optional. the session whose room_key we're querying, if any.
+ If specified, we also require the room_id to be specified.
+ If not specified, we return all the keys in this version of
+ the backup (or for the specified room)
+
+ Returns:
+ A deferred list of dicts giving the session_data and message metadata for
+ these room keys.
+ """
+
+ keyvalues = {
+ "user_id": user_id,
+ "version": version,
+ }
+ if room_id:
+ keyvalues['room_id'] = room_id
+ if session_id:
+ keyvalues['session_id'] = session_id
+
+ rows = yield self._simple_select_list(
+ table="e2e_room_keys",
+ keyvalues=keyvalues,
+ retcols=(
+ "user_id",
+ "room_id",
+ "session_id",
+ "first_message_index",
+ "forwarded_count",
+ "is_verified",
+ "session_data",
+ ),
+ desc="get_e2e_room_keys",
+ )
+
+ sessions = {'rooms': {}}
+ for row in rows:
+ room_entry = sessions['rooms'].setdefault(row['room_id'], {"sessions": {}})
+ room_entry['sessions'][row['session_id']] = {
+ "first_message_index": row["first_message_index"],
+ "forwarded_count": row["forwarded_count"],
+ "is_verified": row["is_verified"],
+ "session_data": json.loads(row["session_data"]),
+ }
+
+ defer.returnValue(sessions)
+
+ @defer.inlineCallbacks
+ def delete_e2e_room_keys(
+ self, user_id, version, room_id=None, session_id=None
+ ):
+ """Bulk delete the E2E room keys for a given backup, optionally filtered to a given
+ room or a given session.
+
+ Args:
+ user_id(str): the user whose backup we're deleting from
+ version(str): the version ID of the backup for the set of keys we're deleting
+ room_id(str): Optional. the ID of the room whose keys we're deleting, if any.
+ If not specified, we delete the keys for all the rooms in the backup.
+ session_id(str): Optional. the session whose room_key we're querying, if any.
+ If specified, we also require the room_id to be specified.
+ If not specified, we delete all the keys in this version of
+ the backup (or for the specified room)
+
+ Returns:
+ A deferred of the deletion transaction
+ """
+
+ keyvalues = {
+ "user_id": user_id,
+ "version": version,
+ }
+ if room_id:
+ keyvalues['room_id'] = room_id
+ if session_id:
+ keyvalues['session_id'] = session_id
+
+ yield self._simple_delete(
+ table="e2e_room_keys",
+ keyvalues=keyvalues,
+ desc="delete_e2e_room_keys",
+ )
+
+ @staticmethod
+ def _get_current_version(txn, user_id):
+ txn.execute(
+ "SELECT MAX(version) FROM e2e_room_keys_versions "
+ "WHERE user_id=? AND deleted=0",
+ (user_id,)
+ )
+ row = txn.fetchone()
+ if not row:
+ raise StoreError(404, 'No current backup version')
+ return row[0]
+
+ def get_e2e_room_keys_version_info(self, user_id, version=None):
+ """Get info metadata about a version of our room_keys backup.
+
+ Args:
+ user_id(str): the user whose backup we're querying
+ version(str): Optional. the version ID of the backup we're querying about
+ If missing, we return the information about the current version.
+ Raises:
+ StoreError: with code 404 if there are no e2e_room_keys_versions present
+ Returns:
+ A deferred dict giving the info metadata for this backup version
+ """
+
+ def _get_e2e_room_keys_version_info_txn(txn):
+ if version is None:
+ this_version = self._get_current_version(txn, user_id)
+ else:
+ this_version = version
+
+ result = self._simple_select_one_txn(
+ txn,
+ table="e2e_room_keys_versions",
+ keyvalues={
+ "user_id": user_id,
+ "version": this_version,
+ "deleted": 0,
+ },
+ retcols=(
+ "version",
+ "algorithm",
+ "auth_data",
+ ),
+ )
+ result["auth_data"] = json.loads(result["auth_data"])
+ return result
+
+ return self.runInteraction(
+ "get_e2e_room_keys_version_info",
+ _get_e2e_room_keys_version_info_txn
+ )
+
+ def create_e2e_room_keys_version(self, user_id, info):
+ """Atomically creates a new version of this user's e2e_room_keys store
+ with the given version info.
+
+ Args:
+ user_id(str): the user whose backup we're creating a version
+ info(dict): the info about the backup version to be created
+
+ Returns:
+ A deferred string for the newly created version ID
+ """
+
+ def _create_e2e_room_keys_version_txn(txn):
+ txn.execute(
+ "SELECT MAX(version) FROM e2e_room_keys_versions WHERE user_id=?",
+ (user_id,)
+ )
+ current_version = txn.fetchone()[0]
+ if current_version is None:
+ current_version = '0'
+
+ new_version = str(int(current_version) + 1)
+
+ self._simple_insert_txn(
+ txn,
+ table="e2e_room_keys_versions",
+ values={
+ "user_id": user_id,
+ "version": new_version,
+ "algorithm": info["algorithm"],
+ "auth_data": json.dumps(info["auth_data"]),
+ },
+ )
+
+ return new_version
+
+ return self.runInteraction(
+ "create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn
+ )
+
+ def delete_e2e_room_keys_version(self, user_id, version=None):
+ """Delete a given backup version of the user's room keys.
+ Doesn't delete their actual key data.
+
+ Args:
+ user_id(str): the user whose backup version we're deleting
+ version(str): Optional. the version ID of the backup version we're deleting
+ If missing, we delete the current backup version info.
+ Raises:
+ StoreError: with code 404 if there are no e2e_room_keys_versions present,
+ or if the version requested doesn't exist.
+ """
+
+ def _delete_e2e_room_keys_version_txn(txn):
+ if version is None:
+ this_version = self._get_current_version(txn, user_id)
+ else:
+ this_version = version
+
+ return self._simple_update_one_txn(
+ txn,
+ table="e2e_room_keys_versions",
+ keyvalues={
+ "user_id": user_id,
+ "version": this_version,
+ },
+ updatevalues={
+ "deleted": 1,
+ }
+ )
+
+ return self.runInteraction(
+ "delete_e2e_room_keys_version",
+ _delete_e2e_room_keys_version_txn
+ )
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 24345b20a6..3faca2a042 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -376,33 +376,25 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
@defer.inlineCallbacks
def get_missing_events(self, room_id, earliest_events, latest_events,
- limit, min_depth):
+ limit):
ids = yield self.runInteraction(
"get_missing_events",
self._get_missing_events,
- room_id, earliest_events, latest_events, limit, min_depth
+ room_id, earliest_events, latest_events, limit,
)
-
events = yield self._get_events(ids)
-
- events = sorted(
- [ev for ev in events if ev.depth >= min_depth],
- key=lambda e: e.depth,
- )
-
- defer.returnValue(events[:limit])
+ defer.returnValue(events)
def _get_missing_events(self, txn, room_id, earliest_events, latest_events,
- limit, min_depth):
-
- earliest_events = set(earliest_events)
- front = set(latest_events) - earliest_events
+ limit):
- event_results = set()
+ seen_events = set(earliest_events)
+ front = set(latest_events) - seen_events
+ event_results = []
query = (
"SELECT prev_event_id FROM event_edges "
- "WHERE event_id = ? AND is_state = ? "
+ "WHERE room_id = ? AND event_id = ? AND is_state = ? "
"LIMIT ?"
)
@@ -411,18 +403,20 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
for event_id in front:
txn.execute(
query,
- (event_id, False, limit - len(event_results))
+ (room_id, event_id, False, limit - len(event_results))
)
- for e_id, in txn:
- new_front.add(e_id)
+ new_results = set(t[0] for t in txn) - seen_events
- new_front -= earliest_events
- new_front -= event_results
+ new_front |= new_results
+ seen_events |= new_results
+ event_results.extend(new_results)
front = new_front
- event_results |= new_front
+ # we built the list working backwards from latest_events; we now need to
+ # reverse it so that the events are approximately chronological.
+ event_results.reverse()
return event_results
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index e4d0f8b1a9..af822fb69d 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -39,6 +39,7 @@ from synapse.storage.event_federation import EventFederationStore
from synapse.storage.events_worker import EventsWorkerStore
from synapse.storage.state import StateGroupWorkerStore
from synapse.types import RoomStreamToken, get_domain_from_id
+from synapse.util import batch_iter
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder
@@ -388,12 +389,10 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
)
for room_id, ev_ctx_rm in iteritems(events_by_room):
- # Work out new extremities by recursively adding and removing
- # the new events.
latest_event_ids = yield self.get_latest_event_ids_in_room(
room_id
)
- new_latest_event_ids = yield self._calculate_new_extremeties(
+ new_latest_event_ids = yield self._calculate_new_extremities(
room_id, ev_ctx_rm, latest_event_ids
)
@@ -402,6 +401,12 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
# No change in extremities, so no change in state
continue
+ # there should always be at least one forward extremity.
+ # (except during the initial persistence of the send_join
+ # results, in which case there will be no existing
+ # extremities, so we'll `continue` above and skip this bit.)
+ assert new_latest_event_ids, "No forward extremities left!"
+
new_forward_extremeties[room_id] = new_latest_event_ids
len_1 = (
@@ -519,44 +524,79 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
)
@defer.inlineCallbacks
- def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
- """Calculates the new forward extremeties for a room given events to
+ def _calculate_new_extremities(self, room_id, event_contexts, latest_event_ids):
+ """Calculates the new forward extremities for a room given events to
persist.
Assumes that we are only persisting events for one room at a time.
"""
- new_latest_event_ids = set(latest_event_ids)
- # First, add all the new events to the list
- new_latest_event_ids.update(
- event.event_id for event, ctx in event_contexts
+
+ # we're only interested in new events which aren't outliers and which aren't
+ # being rejected.
+ new_events = [
+ event for event, ctx in event_contexts
if not event.internal_metadata.is_outlier() and not ctx.rejected
+ ]
+
+ # start with the existing forward extremities
+ result = set(latest_event_ids)
+
+ # add all the new events to the list
+ result.update(
+ event.event_id for event in new_events
)
- # Now remove all events that are referenced by the to-be-added events
- new_latest_event_ids.difference_update(
+
+ # Now remove all events which are prev_events of any of the new events
+ result.difference_update(
e_id
- for event, ctx in event_contexts
+ for event in new_events
for e_id, _ in event.prev_events
- if not event.internal_metadata.is_outlier() and not ctx.rejected
)
- # And finally remove any events that are referenced by previously added
- # events.
- rows = yield self._simple_select_many_batch(
- table="event_edges",
- column="prev_event_id",
- iterable=list(new_latest_event_ids),
- retcols=["prev_event_id"],
- keyvalues={
- "is_state": False,
- },
- desc="_calculate_new_extremeties",
- )
+ # Finally, remove any events which are prev_events of any existing events.
+ existing_prevs = yield self._get_events_which_are_prevs(result)
+ result.difference_update(existing_prevs)
- new_latest_event_ids.difference_update(
- row["prev_event_id"] for row in rows
- )
+ defer.returnValue(result)
- defer.returnValue(new_latest_event_ids)
+ @defer.inlineCallbacks
+ def _get_events_which_are_prevs(self, event_ids):
+ """Filter the supplied list of event_ids to get those which are prev_events of
+ existing (non-outlier/rejected) events.
+
+ Args:
+ event_ids (Iterable[str]): event ids to filter
+
+ Returns:
+ Deferred[List[str]]: filtered event ids
+ """
+ results = []
+
+ def _get_events(txn, batch):
+ sql = """
+ SELECT prev_event_id
+ FROM event_edges
+ INNER JOIN events USING (event_id)
+ LEFT JOIN rejections USING (event_id)
+ WHERE
+ prev_event_id IN (%s)
+ AND NOT events.outlier
+ AND rejections.event_id IS NULL
+ """ % (
+ ",".join("?" for _ in batch),
+ )
+
+ txn.execute(sql, batch)
+ results.extend(r[0] for r in txn)
+
+ for chunk in batch_iter(event_ids, 100):
+ yield self.runInteraction(
+ "_get_events_which_are_prevs",
+ _get_events,
+ chunk,
+ )
+
+ defer.returnValue(results)
@defer.inlineCallbacks
def _get_new_state_after_events(self, room_id, events_context, old_latest_event_ids,
@@ -588,10 +628,6 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
the new current state is only returned if we've already calculated
it.
"""
-
- if not new_latest_event_ids:
- return
-
# map from state_group to ((type, key) -> event_id) state map
state_groups_map = {}
diff --git a/synapse/storage/schema/delta/51/e2e_room_keys.sql b/synapse/storage/schema/delta/51/e2e_room_keys.sql
new file mode 100644
index 0000000000..c0e66a697d
--- /dev/null
+++ b/synapse/storage/schema/delta/51/e2e_room_keys.sql
@@ -0,0 +1,39 @@
+/* Copyright 2017 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- users' optionally backed up encrypted e2e sessions
+CREATE TABLE e2e_room_keys (
+ user_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ session_id TEXT NOT NULL,
+ version TEXT NOT NULL,
+ first_message_index INT,
+ forwarded_count INT,
+ is_verified BOOLEAN,
+ session_data TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX e2e_room_keys_idx ON e2e_room_keys(user_id, room_id, session_id);
+
+-- the metadata for each generation of encrypted e2e session backups
+CREATE TABLE e2e_room_keys_versions (
+ user_id TEXT NOT NULL,
+ version TEXT NOT NULL,
+ algorithm TEXT NOT NULL,
+ auth_data TEXT NOT NULL,
+ deleted SMALLINT DEFAULT 0 NOT NULL
+);
+
+CREATE UNIQUE INDEX e2e_room_keys_versions_idx ON e2e_room_keys_versions(user_id, version);
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 4c296d72c0..d6cfdba519 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -630,7 +630,21 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks
def get_all_new_events_stream(self, from_id, current_id, limit):
- """Get all new events"""
+ """Get all new events
+
+ Returns all events with from_id < stream_ordering <= current_id.
+
+ Args:
+ from_id (int): the stream_ordering of the last event we processed
+ current_id (int): the stream_ordering of the most recently processed event
+ limit (int): the maximum number of events to return
+
+ Returns:
+ Deferred[Tuple[int, list[FrozenEvent]]]: A tuple of (next_id, events), where
+ `next_id` is the next value to pass as `from_id` (it will either be the
+ stream_ordering of the last returned event, or, if fewer than `limit` events
+ were found, `current_id`.
+ """
def get_all_new_events_stream_txn(txn):
sql = (
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 680ea928c7..9a8fae0497 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -68,7 +68,10 @@ class Clock(object):
"""
call = task.LoopingCall(f)
call.clock = self._reactor
- call.start(msec / 1000.0, now=False)
+ d = call.start(msec / 1000.0, now=False)
+ d.addErrback(
+ log_failure, "Looping call died", consumeErrors=False,
+ )
return call
def call_later(self, delay, callback, *args, **kwargs):
@@ -109,3 +112,29 @@ def batch_iter(iterable, size):
sourceiter = iter(iterable)
# call islice until it returns an empty tuple
return iter(lambda: tuple(islice(sourceiter, size)), ())
+
+
+def log_failure(failure, msg, consumeErrors=True):
+ """Creates a function suitable for passing to `Deferred.addErrback` that
+ logs any failures that occur.
+
+ Args:
+ msg (str): Message to log
+ consumeErrors (bool): If true consumes the failure, otherwise passes
+ on down the callback chain
+
+ Returns:
+ func(Failure)
+ """
+
+ logger.error(
+ msg,
+ exc_info=(
+ failure.type,
+ failure.value,
+ failure.getTracebackObject()
+ )
+ )
+
+ if not consumeErrors:
+ return failure
diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py
index 8d0f2a8918..9cb7e9c9ab 100644
--- a/synapse/util/manhole.py
+++ b/synapse/util/manhole.py
@@ -70,6 +70,8 @@ def manhole(username, password, globals):
Returns:
twisted.internet.protocol.Factory: A factory to pass to ``listenTCP``
"""
+ if not isinstance(password, bytes):
+ password = password.encode('ascii')
checker = checkers.InMemoryUsernamePasswordDatabaseDontUse(
**{username: password}
@@ -82,7 +84,7 @@ def manhole(username, password, globals):
)
factory = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker]))
- factory.publicKeys['ssh-rsa'] = Key.fromString(PUBLIC_KEY)
- factory.privateKeys['ssh-rsa'] = Key.fromString(PRIVATE_KEY)
+ factory.publicKeys[b'ssh-rsa'] = Key.fromString(PUBLIC_KEY)
+ factory.privateKeys[b'ssh-rsa'] = Key.fromString(PRIVATE_KEY)
return factory
diff --git a/synapse/visibility.py b/synapse/visibility.py
index d4680863d3..43f48196be 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -219,7 +219,7 @@ def filter_events_for_server(store, server_name, events):
# Whatever else we do, we need to check for senders which have requested
# erasure of their data.
erased_senders = yield store.are_users_erased(
- e.sender for e in events,
+ (e.sender for e in events),
)
def redact_disallowed(event, state):
@@ -324,14 +324,13 @@ def filter_events_for_server(store, server_name, events):
# server's domain.
#
# event_to_state_ids contains lots of duplicates, so it turns out to be
- # cheaper to build a complete set of unique
- # ((type, state_key), event_id) tuples, and then filter out the ones we
- # don't want.
+ # cheaper to build a complete event_id => (type, state_key) dict, and then
+ # filter out the ones we don't want
#
- state_key_to_event_id_set = {
- e
+ event_id_to_state_key = {
+ event_id: key
for key_to_eid in itervalues(event_to_state_ids)
- for e in key_to_eid.items()
+ for key, event_id in iteritems(key_to_eid)
}
def include(typ, state_key):
@@ -346,7 +345,7 @@ def filter_events_for_server(store, server_name, events):
event_map = yield store.get_events([
e_id
- for key, e_id in state_key_to_event_id_set
+ for e_id, key in iteritems(event_id_to_state_key)
if include(key[0], key[1])
])
|