diff --git a/synapse/__init__.py b/synapse/__init__.py
index 1ddbbbebfb..89ea9a9775 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -27,4 +27,4 @@ try:
except ImportError:
pass
-__version__ = "0.33.7"
+__version__ = "0.33.8"
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 5565e516d6..f20e0fcf0b 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -51,6 +51,7 @@ class LoginType(object):
EMAIL_IDENTITY = u"m.login.email.identity"
MSISDN = u"m.login.msisdn"
RECAPTCHA = u"m.login.recaptcha"
+ TERMS = u"m.login.terms"
DUMMY = u"m.login.dummy"
# Only for C/S API v1
@@ -102,6 +103,7 @@ class ThirdPartyEntityKind(object):
class RoomVersions(object):
V1 = "1"
VDH_TEST = "vdh-test-version"
+ STATE_V2_TEST = "state-v2-test"
# the version we will give rooms which are created on this server
@@ -109,7 +111,11 @@ DEFAULT_ROOM_VERSION = RoomVersions.V1
# vdh-test-version is a placeholder to get room versioning support working and tested
# until we have a working v2.
-KNOWN_ROOM_VERSIONS = {RoomVersions.V1, RoomVersions.VDH_TEST}
+KNOWN_ROOM_VERSIONS = {
+ RoomVersions.V1,
+ RoomVersions.VDH_TEST,
+ RoomVersions.STATE_V2_TEST,
+}
ServerNoticeMsgType = "m.server_notice"
ServerNoticeLimitReached = "m.server_notice.usage_limit_reached"
diff --git a/synapse/api/urls.py b/synapse/api/urls.py
index 6d9f1ca0ef..f78695b657 100644
--- a/synapse/api/urls.py
+++ b/synapse/api/urls.py
@@ -28,7 +28,6 @@ FEDERATION_PREFIX = "/_matrix/federation/v1"
STATIC_PREFIX = "/_matrix/static"
WEB_CLIENT_PREFIX = "/_matrix/client"
CONTENT_REPO_PREFIX = "/_matrix/content"
-SERVER_KEY_PREFIX = "/_matrix/key/v1"
SERVER_KEY_V2_PREFIX = "/_matrix/key/v2"
MEDIA_PREFIX = "/_matrix/media/r0"
LEGACY_MEDIA_PREFIX = "/_matrix/media/v1"
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 593e1e75db..415374a2ce 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -37,7 +37,6 @@ from synapse.api.urls import (
FEDERATION_PREFIX,
LEGACY_MEDIA_PREFIX,
MEDIA_PREFIX,
- SERVER_KEY_PREFIX,
SERVER_KEY_V2_PREFIX,
STATIC_PREFIX,
WEB_CLIENT_PREFIX,
@@ -59,7 +58,6 @@ from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, check_requirem
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.rest import ClientRestResource
-from synapse.rest.key.v1.server_key_resource import LocalKey
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.server import HomeServer
@@ -236,10 +234,7 @@ class SynapseHomeServer(HomeServer):
)
if name in ["keys", "federation"]:
- resources.update({
- SERVER_KEY_PREFIX: LocalKey(self),
- SERVER_KEY_V2_PREFIX: KeyApiV2Resource(self),
- })
+ resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self)
if name == "webclient":
resources[WEB_CLIENT_PREFIX] = build_resource_for_web_client(self)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 3926c7f263..0354e82bf8 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -226,7 +226,15 @@ class SynchrotronPresence(object):
class SynchrotronTyping(object):
def __init__(self, hs):
self._latest_room_serial = 0
+ self._reset()
+
+ def _reset(self):
+ """
+ Reset the typing handler's data caches.
+ """
+ # map room IDs to serial numbers
self._room_serials = {}
+ # map room IDs to sets of users currently typing
self._room_typing = {}
def stream_positions(self):
@@ -236,6 +244,12 @@ class SynchrotronTyping(object):
return {"typing": self._latest_room_serial}
def process_replication_rows(self, token, rows):
+ if self._latest_room_serial > token:
+ # The master has gone backwards. To prevent inconsistent data, just
+ # clear everything.
+ self._reset()
+
+ # Set the latest serial token to whatever the server gave us.
self._latest_room_serial = token
for row in rows:
diff --git a/synapse/config/consent_config.py b/synapse/config/consent_config.py
index e22c731aad..f193a090ae 100644
--- a/synapse/config/consent_config.py
+++ b/synapse/config/consent_config.py
@@ -42,6 +42,14 @@ DEFAULT_CONFIG = """\
# until the user consents to the privacy policy. The value of the setting is
# used as the text of the error.
#
+# 'require_at_registration', if enabled, will add a step to the registration
+# process, similar to how captcha works. Users will be required to accept the
+# policy before their account is created.
+#
+# 'policy_name' is the display name of the policy users will see when registering
+# for an account. Has no effect unless `require_at_registration` is enabled.
+# Defaults to "Privacy Policy".
+#
# user_consent:
# template_dir: res/templates/privacy
# version: 1.0
@@ -54,6 +62,8 @@ DEFAULT_CONFIG = """\
# block_events_error: >-
# To continue using this homeserver you must review and agree to the
# terms and conditions at %(consent_uri)s
+# require_at_registration: False
+# policy_name: Privacy Policy
#
"""
@@ -67,6 +77,8 @@ class ConsentConfig(Config):
self.user_consent_server_notice_content = None
self.user_consent_server_notice_to_guests = False
self.block_events_without_consent_error = None
+ self.user_consent_at_registration = False
+ self.user_consent_policy_name = "Privacy Policy"
def read_config(self, config):
consent_config = config.get("user_consent")
@@ -83,6 +95,12 @@ class ConsentConfig(Config):
self.user_consent_server_notice_to_guests = bool(consent_config.get(
"send_server_notice_to_guests", False,
))
+ self.user_consent_at_registration = bool(consent_config.get(
+ "require_at_registration", False,
+ ))
+ self.user_consent_policy_name = consent_config.get(
+ "policy_name", "Privacy Policy",
+ )
def default_config(self, **kwargs):
return DEFAULT_CONFIG
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index e9a936118d..7081868963 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -50,6 +50,7 @@ handlers:
maxBytes: 104857600
backupCount: 10
filters: [context]
+ encoding: utf8
console:
class: logging.StreamHandler
formatter: precise
diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py
index 080c81f14b..d40e4b8591 100644
--- a/synapse/crypto/keyclient.py
+++ b/synapse/crypto/keyclient.py
@@ -15,6 +15,8 @@
import logging
+from six.moves import urllib
+
from canonicaljson import json
from twisted.internet import defer, reactor
@@ -28,15 +30,15 @@ from synapse.util import logcontext
logger = logging.getLogger(__name__)
-KEY_API_V1 = b"/_matrix/key/v1/"
+KEY_API_V2 = "/_matrix/key/v2/server/%s"
@defer.inlineCallbacks
-def fetch_server_key(server_name, tls_client_options_factory, path=KEY_API_V1):
+def fetch_server_key(server_name, tls_client_options_factory, key_id):
"""Fetch the keys for a remote server."""
factory = SynapseKeyClientFactory()
- factory.path = path
+ factory.path = KEY_API_V2 % (urllib.parse.quote(key_id), )
factory.host = server_name
endpoint = matrix_federation_endpoint(
reactor, server_name, tls_client_options_factory, timeout=30
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index d89f94c219..515ebbc148 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
-# Copyright 2017 New Vector Ltd.
+# Copyright 2017, 2018 New Vector Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -18,8 +18,6 @@ import hashlib
import logging
from collections import namedtuple
-from six.moves import urllib
-
from signedjson.key import (
decode_verify_key_bytes,
encode_verify_key_base64,
@@ -395,32 +393,13 @@ class Keyring(object):
@defer.inlineCallbacks
def get_keys_from_server(self, server_name_and_key_ids):
- @defer.inlineCallbacks
- def get_key(server_name, key_ids):
- keys = None
- try:
- keys = yield self.get_server_verify_key_v2_direct(
- server_name, key_ids
- )
- except Exception as e:
- logger.info(
- "Unable to get key %r for %r directly: %s %s",
- key_ids, server_name,
- type(e).__name__, str(e),
- )
-
- if not keys:
- keys = yield self.get_server_verify_key_v1_direct(
- server_name, key_ids
- )
-
- keys = {server_name: keys}
-
- defer.returnValue(keys)
-
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
- run_in_background(get_key, server_name, key_ids)
+ run_in_background(
+ self.get_server_verify_key_v2_direct,
+ server_name,
+ key_ids,
+ )
for server_name, key_ids in server_name_and_key_ids
],
consumeErrors=True,
@@ -525,10 +504,7 @@ class Keyring(object):
continue
(response, tls_certificate) = yield fetch_server_key(
- server_name, self.hs.tls_client_options_factory,
- path=("/_matrix/key/v2/server/%s" % (
- urllib.parse.quote(requested_key_id),
- )).encode("ascii"),
+ server_name, self.hs.tls_client_options_factory, requested_key_id
)
if (u"signatures" not in response
@@ -657,78 +633,6 @@ class Keyring(object):
defer.returnValue(results)
- @defer.inlineCallbacks
- def get_server_verify_key_v1_direct(self, server_name, key_ids):
- """Finds a verification key for the server with one of the key ids.
- Args:
- server_name (str): The name of the server to fetch a key for.
- keys_ids (list of str): The key_ids to check for.
- """
-
- # Try to fetch the key from the remote server.
-
- (response, tls_certificate) = yield fetch_server_key(
- server_name, self.hs.tls_client_options_factory
- )
-
- # Check the response.
-
- x509_certificate_bytes = crypto.dump_certificate(
- crypto.FILETYPE_ASN1, tls_certificate
- )
-
- if ("signatures" not in response
- or server_name not in response["signatures"]):
- raise KeyLookupError("Key response not signed by remote server")
-
- if "tls_certificate" not in response:
- raise KeyLookupError("Key response missing TLS certificate")
-
- tls_certificate_b64 = response["tls_certificate"]
-
- if encode_base64(x509_certificate_bytes) != tls_certificate_b64:
- raise KeyLookupError("TLS certificate doesn't match")
-
- # Cache the result in the datastore.
-
- time_now_ms = self.clock.time_msec()
-
- verify_keys = {}
- for key_id, key_base64 in response["verify_keys"].items():
- if is_signing_algorithm_supported(key_id):
- key_bytes = decode_base64(key_base64)
- verify_key = decode_verify_key_bytes(key_id, key_bytes)
- verify_key.time_added = time_now_ms
- verify_keys[key_id] = verify_key
-
- for key_id in response["signatures"][server_name]:
- if key_id not in response["verify_keys"]:
- raise KeyLookupError(
- "Key response must include verification keys for all"
- " signatures"
- )
- if key_id in verify_keys:
- verify_signed_json(
- response,
- server_name,
- verify_keys[key_id]
- )
-
- yield self.store.store_server_certificate(
- server_name,
- server_name,
- time_now_ms,
- tls_certificate,
- )
-
- yield self.store_keys(
- server_name=server_name,
- from_server=server_name,
- verify_keys=verify_keys,
- )
-
- defer.returnValue(verify_keys)
-
def store_keys(self, server_name, from_server, verify_keys):
"""Store a collection of verify keys for a given server
Args:
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index d4d4474847..c81d8e6729 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -200,11 +200,11 @@ def _is_membership_change_allowed(event, auth_events):
membership = event.content["membership"]
# Check if this is the room creator joining:
- if len(event.prev_events) == 1 and Membership.JOIN == membership:
+ if len(event.prev_event_ids()) == 1 and Membership.JOIN == membership:
# Get room creation event:
key = (EventTypes.Create, "", )
create = auth_events.get(key)
- if create and event.prev_events[0][0] == create.event_id:
+ if create and event.prev_event_ids()[0] == create.event_id:
if create.content["creator"] == event.state_key:
return
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 12f1eb0a3e..84c75495d5 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -159,6 +159,24 @@ class EventBase(object):
def keys(self):
return six.iterkeys(self._event_dict)
+ def prev_event_ids(self):
+ """Returns the list of prev event IDs. The order matches the order
+ specified in the event, though there is no meaning to it.
+
+ Returns:
+ list[str]: The list of event IDs of this event's prev_events
+ """
+ return [e for e, _ in self.prev_events]
+
+ def auth_event_ids(self):
+ """Returns the list of auth event IDs. The order matches the order
+ specified in the event, though there is no meaning to it.
+
+ Returns:
+ list[str]: The list of event IDs of this event's auth_events
+ """
+ return [e for e, _ in self.auth_events]
+
class FrozenEvent(EventBase):
def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None):
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 0f9302a6a8..fa2cc550e2 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -324,11 +324,6 @@ class FederationServer(FederationBase):
defer.returnValue((404, ""))
@defer.inlineCallbacks
- @log_function
- def on_pull_request(self, origin, versions):
- raise NotImplementedError("Pull transactions not implemented")
-
- @defer.inlineCallbacks
def on_query_request(self, query_type, args):
received_queries_counter.labels(query_type).inc()
resp = yield self.registry.on_query(query_type, args)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 3fdd63be95..099ace28c1 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -183,9 +183,7 @@ class TransactionQueue(object):
# banned then it won't receive the event because it won't
# be in the room after the ban.
destinations = yield self.state.get_current_hosts_in_room(
- event.room_id, latest_event_ids=[
- prev_id for prev_id, _ in event.prev_events
- ],
+ event.room_id, latest_event_ids=event.prev_event_ids(),
)
except Exception:
logger.exception(
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 7288d49074..3553f418f1 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -362,14 +362,6 @@ class FederationSendServlet(BaseFederationServlet):
defer.returnValue((code, response))
-class FederationPullServlet(BaseFederationServlet):
- PATH = "/pull/"
-
- # This is for when someone asks us for everything since version X
- def on_GET(self, origin, content, query):
- return self.handler.on_pull_request(query["origin"][0], query["v"])
-
-
class FederationEventServlet(BaseFederationServlet):
PATH = "/event/(?P<event_id>[^/]*)/"
@@ -1261,7 +1253,6 @@ class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
FEDERATION_SERVLET_CLASSES = (
FederationSendServlet,
- FederationPullServlet,
FederationEventServlet,
FederationStateServlet,
FederationStateIdsServlet,
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index c5ab14314e..025a79c022 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -117,9 +117,6 @@ class Transaction(JsonEncodedObject):
"Require 'transaction_id' to construct a Transaction"
)
- for p in pdus:
- p.transaction_id = kwargs["transaction_id"]
-
kwargs["pdus"] = [p.get_pdu_json() for p in pdus]
return Transaction(**kwargs)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 329e3c7d71..a958c45271 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -59,6 +59,7 @@ class AuthHandler(BaseHandler):
LoginType.EMAIL_IDENTITY: self._check_email_identity,
LoginType.MSISDN: self._check_msisdn,
LoginType.DUMMY: self._check_dummy_auth,
+ LoginType.TERMS: self._check_terms_auth,
}
self.bcrypt_rounds = hs.config.bcrypt_rounds
@@ -431,6 +432,9 @@ class AuthHandler(BaseHandler):
def _check_dummy_auth(self, authdict, _):
return defer.succeed(True)
+ def _check_terms_auth(self, authdict, _):
+ return defer.succeed(True)
+
@defer.inlineCallbacks
def _check_threepid(self, medium, authdict):
if 'threepid_creds' not in authdict:
@@ -462,6 +466,22 @@ class AuthHandler(BaseHandler):
def _get_params_recaptcha(self):
return {"public_key": self.hs.config.recaptcha_public_key}
+ def _get_params_terms(self):
+ return {
+ "policies": {
+ "privacy_policy": {
+ "version": self.hs.config.user_consent_version,
+ "en": {
+ "name": self.hs.config.user_consent_policy_name,
+ "url": "%s/_matrix/consent?v=%s" % (
+ self.hs.config.public_baseurl,
+ self.hs.config.user_consent_version,
+ ),
+ },
+ },
+ },
+ }
+
def _auth_dict_for_flows(self, flows, session):
public_flows = []
for f in flows:
@@ -469,6 +489,7 @@ class AuthHandler(BaseHandler):
get_params = {
LoginType.RECAPTCHA: self._get_params_recaptcha,
+ LoginType.TERMS: self._get_params_terms,
}
params = {}
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 7d67bf803a..0699731c13 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -138,9 +138,30 @@ class DirectoryHandler(BaseHandler):
)
@defer.inlineCallbacks
- def delete_association(self, requester, room_alias):
- # association deletion for human users
+ def delete_association(self, requester, room_alias, send_event=True):
+ """Remove an alias from the directory
+ (this is only meant for human users; AS users should call
+ delete_appservice_association)
+
+ Args:
+ requester (Requester):
+ room_alias (RoomAlias):
+ send_event (bool): Whether to send an updated m.room.aliases event.
+ Note that, if we delete the canonical alias, we will always attempt
+ to send an m.room.canonical_alias event
+
+ Returns:
+ Deferred[unicode]: room id that the alias used to point to
+
+ Raises:
+ NotFoundError: if the alias doesn't exist
+
+ AuthError: if the user doesn't have perms to delete the alias (ie, the user
+ is neither the creator of the alias, nor a server admin.
+
+ SynapseError: if the alias belongs to an AS
+ """
user_id = requester.user.to_string()
try:
@@ -168,10 +189,11 @@ class DirectoryHandler(BaseHandler):
room_id = yield self._delete_association(room_alias)
try:
- yield self.send_room_alias_update_event(
- requester,
- room_id
- )
+ if send_event:
+ yield self.send_room_alias_update_event(
+ requester,
+ room_id
+ )
yield self._update_canonical_alias(
requester,
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 5edb3cfe04..42b040375f 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -19,7 +19,7 @@ from six import iteritems
from twisted.internet import defer
-from synapse.api.errors import RoomKeysVersionError, StoreError, SynapseError
+from synapse.api.errors import NotFoundError, RoomKeysVersionError, StoreError
from synapse.util.async_helpers import Linearizer
logger = logging.getLogger(__name__)
@@ -55,6 +55,8 @@ class E2eRoomKeysHandler(object):
room_id(string): room ID to get keys for, for None to get keys for all rooms
session_id(string): session ID to get keys for, for None to get keys for all
sessions
+ Raises:
+ NotFoundError: if the backup version does not exist
Returns:
A deferred list of dicts giving the session_data and message metadata for
these room keys.
@@ -63,13 +65,19 @@ class E2eRoomKeysHandler(object):
# we deliberately take the lock to get keys so that changing the version
# works atomically
with (yield self._upload_linearizer.queue(user_id)):
+ # make sure the backup version exists
+ try:
+ yield self.store.get_e2e_room_keys_version_info(user_id, version)
+ except StoreError as e:
+ if e.code == 404:
+ raise NotFoundError("Unknown backup version")
+ else:
+ raise
+
results = yield self.store.get_e2e_room_keys(
user_id, version, room_id, session_id
)
- if results['rooms'] == {}:
- raise SynapseError(404, "No room_keys found")
-
defer.returnValue(results)
@defer.inlineCallbacks
@@ -120,7 +128,7 @@ class E2eRoomKeysHandler(object):
}
Raises:
- SynapseError: with code 404 if there are no versions defined
+ NotFoundError: if there are no versions defined
RoomKeysVersionError: if the uploaded version is not the current version
"""
@@ -134,7 +142,7 @@ class E2eRoomKeysHandler(object):
version_info = yield self.store.get_e2e_room_keys_version_info(user_id)
except StoreError as e:
if e.code == 404:
- raise SynapseError(404, "Version '%s' not found" % (version,))
+ raise NotFoundError("Version '%s' not found" % (version,))
else:
raise
@@ -148,7 +156,7 @@ class E2eRoomKeysHandler(object):
raise RoomKeysVersionError(current_version=version_info['version'])
except StoreError as e:
if e.code == 404:
- raise SynapseError(404, "Version '%s' not found" % (version,))
+ raise NotFoundError("Version '%s' not found" % (version,))
else:
raise
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index cd5b9bbb19..9ca5fd8724 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -239,7 +239,7 @@ class FederationHandler(BaseHandler):
room_id, event_id, min_depth,
)
- prevs = {e_id for e_id, _ in pdu.prev_events}
+ prevs = set(pdu.prev_event_ids())
seen = yield self.store.have_seen_events(prevs)
if min_depth and pdu.depth < min_depth:
@@ -607,7 +607,7 @@ class FederationHandler(BaseHandler):
if e.event_id in seen_ids:
continue
e.internal_metadata.outlier = True
- auth_ids = [e_id for e_id, _ in e.auth_events]
+ auth_ids = e.auth_event_ids()
auth = {
(e.type, e.state_key): e for e in auth_chain
if e.event_id in auth_ids or e.type == EventTypes.Create
@@ -726,7 +726,7 @@ class FederationHandler(BaseHandler):
edges = [
ev.event_id
for ev in events
- if set(e_id for e_id, _ in ev.prev_events) - event_ids
+ if set(ev.prev_event_ids()) - event_ids
]
logger.info(
@@ -753,7 +753,7 @@ class FederationHandler(BaseHandler):
required_auth = set(
a_id
for event in events + list(state_events.values()) + list(auth_events.values())
- for a_id, _ in event.auth_events
+ for a_id in event.auth_event_ids()
)
auth_events.update({
e_id: event_map[e_id] for e_id in required_auth if e_id in event_map
@@ -769,7 +769,7 @@ class FederationHandler(BaseHandler):
auth_events.update(ret_events)
required_auth.update(
- a_id for event in ret_events.values() for a_id, _ in event.auth_events
+ a_id for event in ret_events.values() for a_id in event.auth_event_ids()
)
missing_auth = required_auth - set(auth_events)
@@ -796,7 +796,7 @@ class FederationHandler(BaseHandler):
required_auth.update(
a_id
for event in results if event
- for a_id, _ in event.auth_events
+ for a_id in event.auth_event_ids()
)
missing_auth = required_auth - set(auth_events)
@@ -816,7 +816,7 @@ class FederationHandler(BaseHandler):
"auth_events": {
(auth_events[a_id].type, auth_events[a_id].state_key):
auth_events[a_id]
- for a_id, _ in a.auth_events
+ for a_id in a.auth_event_ids()
if a_id in auth_events
}
})
@@ -828,7 +828,7 @@ class FederationHandler(BaseHandler):
"auth_events": {
(auth_events[a_id].type, auth_events[a_id].state_key):
auth_events[a_id]
- for a_id, _ in event_map[e_id].auth_events
+ for a_id in event_map[e_id].auth_event_ids()
if a_id in auth_events
}
})
@@ -1041,17 +1041,17 @@ class FederationHandler(BaseHandler):
Raises:
SynapseError if the event does not pass muster
"""
- if len(ev.prev_events) > 20:
+ if len(ev.prev_event_ids()) > 20:
logger.warn("Rejecting event %s which has %i prev_events",
- ev.event_id, len(ev.prev_events))
+ ev.event_id, len(ev.prev_event_ids()))
raise SynapseError(
http_client.BAD_REQUEST,
"Too many prev_events",
)
- if len(ev.auth_events) > 10:
+ if len(ev.auth_event_ids()) > 10:
logger.warn("Rejecting event %s which has %i auth_events",
- ev.event_id, len(ev.auth_events))
+ ev.event_id, len(ev.auth_event_ids()))
raise SynapseError(
http_client.BAD_REQUEST,
"Too many auth_events",
@@ -1076,7 +1076,7 @@ class FederationHandler(BaseHandler):
def on_event_auth(self, event_id):
event = yield self.store.get_event(event_id)
auth = yield self.store.get_auth_chain(
- [auth_id for auth_id, _ in event.auth_events],
+ [auth_id for auth_id in event.auth_event_ids()],
include_given=True
)
defer.returnValue([e for e in auth])
@@ -1698,7 +1698,7 @@ class FederationHandler(BaseHandler):
missing_auth_events = set()
for e in itertools.chain(auth_events, state, [event]):
- for e_id, _ in e.auth_events:
+ for e_id in e.auth_event_ids():
if e_id not in event_map:
missing_auth_events.add(e_id)
@@ -1717,7 +1717,7 @@ class FederationHandler(BaseHandler):
for e in itertools.chain(auth_events, state, [event]):
auth_for_e = {
(event_map[e_id].type, event_map[e_id].state_key): event_map[e_id]
- for e_id, _ in e.auth_events
+ for e_id in e.auth_event_ids()
if e_id in event_map
}
if create_event:
@@ -1785,10 +1785,10 @@ class FederationHandler(BaseHandler):
# This is a hack to fix some old rooms where the initial join event
# didn't reference the create event in its auth events.
- if event.type == EventTypes.Member and not event.auth_events:
- if len(event.prev_events) == 1 and event.depth < 5:
+ if event.type == EventTypes.Member and not event.auth_event_ids():
+ if len(event.prev_event_ids()) == 1 and event.depth < 5:
c = yield self.store.get_event(
- event.prev_events[0][0],
+ event.prev_event_ids()[0],
allow_none=True,
)
if c and c.type == EventTypes.Create:
@@ -1835,7 +1835,7 @@ class FederationHandler(BaseHandler):
# Now get the current auth_chain for the event.
local_auth_chain = yield self.store.get_auth_chain(
- [auth_id for auth_id, _ in event.auth_events],
+ [auth_id for auth_id in event.auth_event_ids()],
include_given=True
)
@@ -1891,7 +1891,7 @@ class FederationHandler(BaseHandler):
"""
# Check if we have all the auth events.
current_state = set(e.event_id for e in auth_events.values())
- event_auth_events = set(e_id for e_id, _ in event.auth_events)
+ event_auth_events = set(event.auth_event_ids())
if event.is_state():
event_key = (event.type, event.state_key)
@@ -1935,7 +1935,7 @@ class FederationHandler(BaseHandler):
continue
try:
- auth_ids = [e_id for e_id, _ in e.auth_events]
+ auth_ids = e.auth_event_ids()
auth = {
(e.type, e.state_key): e for e in remote_auth_chain
if e.event_id in auth_ids or e.type == EventTypes.Create
@@ -1956,7 +1956,7 @@ class FederationHandler(BaseHandler):
pass
have_events = yield self.store.get_seen_events_with_rejections(
- [e_id for e_id, _ in event.auth_events]
+ event.auth_event_ids()
)
seen_events = set(have_events.keys())
except Exception:
@@ -2058,7 +2058,7 @@ class FederationHandler(BaseHandler):
continue
try:
- auth_ids = [e_id for e_id, _ in ev.auth_events]
+ auth_ids = ev.auth_event_ids()
auth = {
(e.type, e.state_key): e
for e in result["auth_chain"]
@@ -2250,7 +2250,7 @@ class FederationHandler(BaseHandler):
missing_remote_ids = [e.event_id for e in missing_remotes]
base_remote_rejected = list(missing_remotes)
for e in missing_remotes:
- for e_id, _ in e.auth_events:
+ for e_id in e.auth_event_ids():
if e_id in missing_remote_ids:
try:
base_remote_rejected.remove(e)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 969e588e73..a7cd779b02 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -427,6 +427,9 @@ class EventCreationHandler(object):
if event.is_state():
prev_state = yield self.deduplicate_state_event(event, context)
+ logger.info(
+ "Not bothering to persist duplicate state event %s", event.event_id,
+ )
if prev_state is not None:
defer.returnValue(prev_state)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 1d9417ff1a..3928faa6e7 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -104,6 +104,8 @@ class RoomCreationHandler(BaseHandler):
creator_id=user_id, is_public=r["is_public"],
)
+ logger.info("Creating new room %s to replace %s", new_room_id, old_room_id)
+
# we create and auth the tombstone event before properly creating the new
# room, to check our user has perms in the old room.
tombstone_event, tombstone_context = (
@@ -136,10 +138,15 @@ class RoomCreationHandler(BaseHandler):
requester, tombstone_event, tombstone_context,
)
- # and finally, shut down the PLs in the old room, and update them in the new
- # room.
old_room_state = yield tombstone_context.get_current_state_ids(self.store)
+ # update any aliases
+ yield self._move_aliases_to_new_room(
+ requester, old_room_id, new_room_id, old_room_state,
+ )
+
+ # and finally, shut down the PLs in the old room, and update them in the new
+ # room.
yield self._update_upgraded_room_pls(
requester, old_room_id, new_room_id, old_room_state,
)
@@ -245,11 +252,6 @@ class RoomCreationHandler(BaseHandler):
if not self.spam_checker.user_may_create_room(user_id):
raise SynapseError(403, "You are not permitted to create rooms")
- # XXX check alias is free
- # canonical_alias = None
-
- # XXX create association in directory handler
-
creation_content = {
"room_version": new_room_version,
"predecessor": {
@@ -295,7 +297,111 @@ class RoomCreationHandler(BaseHandler):
# XXX invites/joins
# XXX 3pid invites
- # XXX directory_handler.send_room_alias_update_event
+
+ @defer.inlineCallbacks
+ def _move_aliases_to_new_room(
+ self, requester, old_room_id, new_room_id, old_room_state,
+ ):
+ directory_handler = self.hs.get_handlers().directory_handler
+
+ aliases = yield self.store.get_aliases_for_room(old_room_id)
+
+ # check to see if we have a canonical alias.
+ canonical_alias = None
+ canonical_alias_event_id = old_room_state.get((EventTypes.CanonicalAlias, ""))
+ if canonical_alias_event_id:
+ canonical_alias_event = yield self.store.get_event(canonical_alias_event_id)
+ if canonical_alias_event:
+ canonical_alias = canonical_alias_event.content.get("alias", "")
+
+ # first we try to remove the aliases from the old room (we suppress sending
+ # the room_aliases event until the end).
+ #
+ # Note that we'll only be able to remove aliases that (a) aren't owned by an AS,
+ # and (b) unless the user is a server admin, which the user created.
+ #
+ # This is probably correct - given we don't allow such aliases to be deleted
+ # normally, it would be odd to allow it in the case of doing a room upgrade -
+ # but it makes the upgrade less effective, and you have to wonder why a room
+ # admin can't remove aliases that point to that room anyway.
+ # (cf https://github.com/matrix-org/synapse/issues/2360)
+ #
+ removed_aliases = []
+ for alias_str in aliases:
+ alias = RoomAlias.from_string(alias_str)
+ try:
+ yield directory_handler.delete_association(
+ requester, alias, send_event=False,
+ )
+ removed_aliases.append(alias_str)
+ except SynapseError as e:
+ logger.warning(
+ "Unable to remove alias %s from old room: %s",
+ alias, e,
+ )
+
+ # if we didn't find any aliases, or couldn't remove anyway, we can skip the rest
+ # of this.
+ if not removed_aliases:
+ return
+
+ try:
+ # this can fail if, for some reason, our user doesn't have perms to send
+ # m.room.aliases events in the old room (note that we've already checked that
+ # they have perms to send a tombstone event, so that's not terribly likely).
+ #
+ # If that happens, it's regrettable, but we should carry on: it's the same
+ # as when you remove an alias from the directory normally - it just means that
+ # the aliases event gets out of sync with the directory
+ # (cf https://github.com/vector-im/riot-web/issues/2369)
+ yield directory_handler.send_room_alias_update_event(
+ requester, old_room_id,
+ )
+ except AuthError as e:
+ logger.warning(
+ "Failed to send updated alias event on old room: %s", e,
+ )
+
+ # we can now add any aliases we successfully removed to the new room.
+ for alias in removed_aliases:
+ try:
+ yield directory_handler.create_association(
+ requester, RoomAlias.from_string(alias),
+ new_room_id, servers=(self.hs.hostname, ),
+ send_event=False,
+ )
+ logger.info("Moved alias %s to new room", alias)
+ except SynapseError as e:
+ # I'm not really expecting this to happen, but it could if the spam
+ # checking module decides it shouldn't, or similar.
+ logger.error(
+ "Error adding alias %s to new room: %s",
+ alias, e,
+ )
+
+ try:
+ if canonical_alias and (canonical_alias in removed_aliases):
+ yield self.event_creation_handler.create_and_send_nonmember_event(
+ requester,
+ {
+ "type": EventTypes.CanonicalAlias,
+ "state_key": "",
+ "room_id": new_room_id,
+ "sender": requester.user.to_string(),
+ "content": {"alias": canonical_alias, },
+ },
+ ratelimit=False
+ )
+
+ yield directory_handler.send_room_alias_update_event(
+ requester, new_room_id,
+ )
+ except SynapseError as e:
+ # again I'm not really expecting this to fail, but if it does, I'd rather
+ # we returned the new room to the client at this point.
+ logger.error(
+ "Unable to send updated alias events in new room: %s", e,
+ )
@defer.inlineCallbacks
def create_room(self, requester, config, ratelimit=True,
@@ -522,6 +628,7 @@ class RoomCreationHandler(BaseHandler):
@defer.inlineCallbacks
def send(etype, content, **kwargs):
event = create(etype, content, **kwargs)
+ logger.info("Sending %s in new room", etype)
yield self.event_creation_handler.create_and_send_nonmember_event(
creator,
event,
@@ -544,6 +651,7 @@ class RoomCreationHandler(BaseHandler):
content=creation_content,
)
+ logger.info("Sending %s in new room", EventTypes.Member)
yield self.room_member_handler.update_membership(
creator,
creator.user,
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 0c1d52fd11..80e7b15de8 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -24,6 +24,7 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
from synapse.events.utils import serialize_event
+from synapse.storage.state import StateFilter
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -324,9 +325,12 @@ class SearchHandler(BaseHandler):
else:
last_event_id = event.event_id
+ state_filter = StateFilter.from_types(
+ [(EventTypes.Member, sender) for sender in senders]
+ )
+
state = yield self.store.get_state_for_event(
- last_event_id,
- types=[(EventTypes.Member, sender) for sender in senders]
+ last_event_id, state_filter
)
res["profile_info"] = {
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index c610933dd4..a61bbf9392 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -63,11 +63,8 @@ class TypingHandler(object):
self._member_typing_until = {} # clock time we expect to stop
self._member_last_federation_poke = {}
- # map room IDs to serial numbers
- self._room_serials = {}
self._latest_room_serial = 0
- # map room IDs to sets of users currently typing
- self._room_typing = {}
+ self._reset()
# caches which room_ids changed at which serials
self._typing_stream_change_cache = StreamChangeCache(
@@ -79,6 +76,15 @@ class TypingHandler(object):
5000,
)
+ def _reset(self):
+ """
+ Reset the typing handler's data caches.
+ """
+ # map room IDs to serial numbers
+ self._room_serials = {}
+ # map room IDs to sets of users currently typing
+ self._room_typing = {}
+
def _handle_timeouts(self):
logger.info("Checking for typing timeouts")
diff --git a/synapse/http/server.py b/synapse/http/server.py
index b4b25cab19..6a427d96a6 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -468,13 +468,13 @@ def set_cors_headers(request):
Args:
request (twisted.web.http.Request): The http request to add CORs to.
"""
- request.setHeader("Access-Control-Allow-Origin", "*")
+ request.setHeader(b"Access-Control-Allow-Origin", b"*")
request.setHeader(
- "Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS"
+ b"Access-Control-Allow-Methods", b"GET, POST, PUT, DELETE, OPTIONS"
)
request.setHeader(
- "Access-Control-Allow-Headers",
- "Origin, X-Requested-With, Content-Type, Accept, Authorization"
+ b"Access-Control-Allow-Headers",
+ b"Origin, X-Requested-With, Content-Type, Accept, Authorization"
)
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index a1e4b88e6d..528125e737 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -121,16 +121,15 @@ def parse_string(request, name, default=None, required=False,
Args:
request: the twisted HTTP request.
- name (bytes/unicode): the name of the query parameter.
- default (bytes/unicode|None): value to use if the parameter is absent,
+ name (bytes|unicode): the name of the query parameter.
+ default (bytes|unicode|None): value to use if the parameter is absent,
defaults to None. Must be bytes if encoding is None.
required (bool): whether to raise a 400 SynapseError if the
parameter is absent, defaults to False.
- allowed_values (list[bytes/unicode]): List of allowed values for the
+ allowed_values (list[bytes|unicode]): List of allowed values for the
string, or None if any value is allowed, defaults to None. Must be
the same type as name, if given.
- encoding: The encoding to decode the name to, and decode the string
- content with.
+ encoding (str|None): The encoding to decode the string content with.
Returns:
bytes/unicode|None: A string value or the default. Unicode if encoding
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index f369124258..50e1007d84 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -85,7 +85,10 @@ class EmailPusher(object):
self.timed_call = None
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
- self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
+ if self.max_stream_ordering:
+ self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
+ else:
+ self.max_stream_ordering = max_stream_ordering
self._start_processing()
def on_new_receipts(self, min_stream_id, max_stream_id):
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 6bd703632d..87fa7f006a 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -311,10 +311,10 @@ class HttpPusher(object):
]
}
}
- if event.type == 'm.room.member':
+ if event.type == 'm.room.member' and event.is_state():
d['notification']['membership'] = event.content['membership']
d['notification']['user_is_target'] = event.state_key == self.user_id
- if self.hs.config.push_include_content and 'content' in event:
+ if self.hs.config.push_include_content and event.content:
d['notification']['content'] = event.content
# We no longer send aliases separately, instead, we send the human
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 16fb5e8471..ebcb93bfc7 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -26,7 +26,6 @@ import bleach
import jinja2
from twisted.internet import defer
-from twisted.mail.smtp import sendmail
from synapse.api.constants import EventTypes
from synapse.api.errors import StoreError
@@ -85,6 +84,7 @@ class Mailer(object):
self.notif_template_html = notif_template_html
self.notif_template_text = notif_template_text
+ self.sendmail = self.hs.get_sendmail()
self.store = self.hs.get_datastore()
self.macaroon_gen = self.hs.get_macaroon_generator()
self.state_handler = self.hs.get_state_handler()
@@ -191,11 +191,11 @@ class Mailer(object):
multipart_msg.attach(html_part)
logger.info("Sending email push notification to %s" % email_address)
- # logger.debug(html_text)
- yield sendmail(
+ yield self.sendmail(
self.hs.config.email_smtp_host,
- raw_from, raw_to, multipart_msg.as_string(),
+ raw_from, raw_to, multipart_msg.as_string().encode('utf8'),
+ reactor=self.hs.get_reactor(),
port=self.hs.config.email_smtp_port,
requireAuthentication=self.hs.config.email_smtp_user is not None,
username=self.hs.config.email_smtp_user,
@@ -333,7 +333,7 @@ class Mailer(object):
notif_events, user_id, reason):
if len(notifs_by_room) == 1:
# Only one room has new stuff
- room_id = notifs_by_room.keys()[0]
+ room_id = list(notifs_by_room.keys())[0]
# If the room has some kind of name, use it, but we don't
# want the generated-from-names one here otherwise we'll
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 2bd321d530..cf6c8b875e 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -124,7 +124,7 @@ class PushRuleEvaluatorForEvent(object):
# XXX: optimisation: cache our pattern regexps
if condition['key'] == 'content.body':
- body = self._event["content"].get("body", None)
+ body = self._event.content.get("body", None)
if not body:
return False
@@ -140,7 +140,7 @@ class PushRuleEvaluatorForEvent(object):
if not display_name:
return False
- body = self._event["content"].get("body", None)
+ body = self._event.content.get("body", None)
if not body:
return False
diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py
index 693b303881..a8d8ed6590 100644
--- a/synapse/rest/client/v2_alpha/auth.py
+++ b/synapse/rest/client/v2_alpha/auth.py
@@ -68,6 +68,29 @@ function captchaDone() {
</html>
"""
+TERMS_TEMPLATE = """
+<html>
+<head>
+<title>Authentication</title>
+<meta name='viewport' content='width=device-width, initial-scale=1,
+ user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
+<link rel="stylesheet" href="/_matrix/static/client/register/style.css">
+</head>
+<body>
+<form id="registrationForm" method="post" action="%(myurl)s">
+ <div>
+ <p>
+ Please click the button below if you agree to the
+ <a href="%(terms_url)s">privacy policy of this homeserver.</a>
+ </p>
+ <input type="hidden" name="session" value="%(session)s" />
+ <input type="submit" value="Agree" />
+ </div>
+</form>
+</body>
+</html>
+"""
+
SUCCESS_TEMPLATE = """
<html>
<head>
@@ -133,13 +156,34 @@ class AuthRestServlet(RestServlet):
request.write(html_bytes)
finish_request(request)
defer.returnValue(None)
+ elif stagetype == LoginType.TERMS:
+ session = request.args['session'][0]
+
+ html = TERMS_TEMPLATE % {
+ 'session': session,
+ 'terms_url': "%s/_matrix/consent?v=%s" % (
+ self.hs.config.public_baseurl,
+ self.hs.config.user_consent_version,
+ ),
+ 'myurl': "%s/auth/%s/fallback/web" % (
+ CLIENT_V2_ALPHA_PREFIX, LoginType.TERMS
+ ),
+ }
+ html_bytes = html.encode("utf8")
+ request.setResponseCode(200)
+ request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
+ request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),))
+
+ request.write(html_bytes)
+ finish_request(request)
+ defer.returnValue(None)
else:
raise SynapseError(404, "Unknown auth stage type")
@defer.inlineCallbacks
def on_POST(self, request, stagetype):
yield
- if stagetype == "m.login.recaptcha":
+ if stagetype == LoginType.RECAPTCHA:
if ('g-recaptcha-response' not in request.args or
len(request.args['g-recaptcha-response'])) == 0:
raise SynapseError(400, "No captcha response supplied")
@@ -179,6 +223,41 @@ class AuthRestServlet(RestServlet):
finish_request(request)
defer.returnValue(None)
+ elif stagetype == LoginType.TERMS:
+ if ('session' not in request.args or
+ len(request.args['session'])) == 0:
+ raise SynapseError(400, "No session supplied")
+
+ session = request.args['session'][0]
+ authdict = {'session': session}
+
+ success = yield self.auth_handler.add_oob_auth(
+ LoginType.TERMS,
+ authdict,
+ self.hs.get_ip_from_request(request)
+ )
+
+ if success:
+ html = SUCCESS_TEMPLATE
+ else:
+ html = TERMS_TEMPLATE % {
+ 'session': session,
+ 'terms_url': "%s/_matrix/consent?v=%s" % (
+ self.hs.config.public_baseurl,
+ self.hs.config.user_consent_version,
+ ),
+ 'myurl': "%s/auth/%s/fallback/web" % (
+ CLIENT_V2_ALPHA_PREFIX, LoginType.TERMS
+ ),
+ }
+ html_bytes = html.encode("utf8")
+ request.setResponseCode(200)
+ request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
+ request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),))
+
+ request.write(html_bytes)
+ finish_request(request)
+ defer.returnValue(None)
else:
raise SynapseError(404, "Unknown auth stage type")
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 192f52e462..0515715f7c 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -359,6 +359,13 @@ class RegisterRestServlet(RestServlet):
[LoginType.MSISDN, LoginType.EMAIL_IDENTITY]
])
+ # Append m.login.terms to all flows if we're requiring consent
+ if self.hs.config.user_consent_at_registration:
+ new_flows = []
+ for flow in flows:
+ flow.append(LoginType.TERMS)
+ flows.extend(new_flows)
+
auth_result, params, session_id = yield self.auth_handler.check_auth(
flows, body, self.hs.get_ip_from_request(request)
)
@@ -445,6 +452,12 @@ class RegisterRestServlet(RestServlet):
params.get("bind_msisdn")
)
+ if auth_result and LoginType.TERMS in auth_result:
+ logger.info("%s has consented to the privacy policy" % registered_user_id)
+ yield self.store.user_set_consent_version(
+ registered_user_id, self.hs.config.user_consent_version,
+ )
+
defer.returnValue((200, return_dict))
def on_OPTIONS(self, _):
diff --git a/synapse/rest/client/v2_alpha/room_keys.py b/synapse/rest/client/v2_alpha/room_keys.py
index 45b5817d8b..ab3f1bd21a 100644
--- a/synapse/rest/client/v2_alpha/room_keys.py
+++ b/synapse/rest/client/v2_alpha/room_keys.py
@@ -17,7 +17,7 @@ import logging
from twisted.internet import defer
-from synapse.api.errors import Codes, SynapseError
+from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.http.servlet import (
RestServlet,
parse_json_object_from_request,
@@ -208,10 +208,25 @@ class RoomKeysServlet(RestServlet):
user_id, version, room_id, session_id
)
+ # Convert room_keys to the right format to return.
if session_id:
- room_keys = room_keys['rooms'][room_id]['sessions'][session_id]
+ # If the client requests a specific session, but that session was
+ # not backed up, then return an M_NOT_FOUND.
+ if room_keys['rooms'] == {}:
+ raise NotFoundError("No room_keys found")
+ else:
+ room_keys = room_keys['rooms'][room_id]['sessions'][session_id]
elif room_id:
- room_keys = room_keys['rooms'][room_id]
+ # If the client requests all sessions from a room, but no sessions
+ # are found, then return an empty result rather than an error, so
+ # that clients don't have to handle an error condition, and an
+ # empty result is valid. (Similarly if the client requests all
+ # sessions from the backup, but in that case, room_keys is already
+ # in the right format, so we don't need to do anything about it.)
+ if room_keys['rooms'] == {}:
+ room_keys = {'sessions': {}}
+ else:
+ room_keys = room_keys['rooms'][room_id]
defer.returnValue((200, room_keys))
diff --git a/synapse/rest/consent/consent_resource.py b/synapse/rest/consent/consent_resource.py
index 7362e1858d..8009b7ff1c 100644
--- a/synapse/rest/consent/consent_resource.py
+++ b/synapse/rest/consent/consent_resource.py
@@ -137,27 +137,33 @@ class ConsentResource(Resource):
request (twisted.web.http.Request):
"""
- version = parse_string(request, "v",
- default=self._default_consent_version)
- username = parse_string(request, "u", required=True)
- userhmac = parse_string(request, "h", required=True, encoding=None)
-
- self._check_hash(username, userhmac)
-
- if username.startswith('@'):
- qualified_user_id = username
- else:
- qualified_user_id = UserID(username, self.hs.hostname).to_string()
-
- u = yield self.store.get_user_by_id(qualified_user_id)
- if u is None:
- raise NotFoundError("Unknown user")
+ version = parse_string(request, "v", default=self._default_consent_version)
+ username = parse_string(request, "u", required=False, default="")
+ userhmac = None
+ has_consented = False
+ public_version = username == ""
+ if not public_version or not self.hs.config.user_consent_at_registration:
+ userhmac = parse_string(request, "h", required=True, encoding=None)
+
+ self._check_hash(username, userhmac)
+
+ if username.startswith('@'):
+ qualified_user_id = username
+ else:
+ qualified_user_id = UserID(username, self.hs.hostname).to_string()
+
+ u = yield self.store.get_user_by_id(qualified_user_id)
+ if u is None:
+ raise NotFoundError("Unknown user")
+ has_consented = u["consent_version"] == version
try:
self._render_template(
request, "%s.html" % (version,),
- user=username, userhmac=userhmac, version=version,
- has_consented=(u["consent_version"] == version),
+ user=username,
+ userhmac=userhmac.decode('ascii'),
+ version=version,
+ has_consented=has_consented, public_version=public_version,
)
except TemplateNotFound:
raise NotFoundError("Unknown policy version")
@@ -223,7 +229,7 @@ class ConsentResource(Resource):
key=self._hmac_secret,
msg=userid.encode('utf-8'),
digestmod=sha256,
- ).hexdigest()
+ ).hexdigest().encode('ascii')
if not compare_digest(want_mac, userhmac):
raise SynapseError(http_client.FORBIDDEN, "HMAC incorrect")
diff --git a/synapse/rest/key/v1/__init__.py b/synapse/rest/key/v1/__init__.py
deleted file mode 100644
index fe0ac3f8e9..0000000000
--- a/synapse/rest/key/v1/__init__.py
+++ /dev/null
@@ -1,14 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2015, 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/synapse/rest/key/v1/server_key_resource.py b/synapse/rest/key/v1/server_key_resource.py
deleted file mode 100644
index 38eb2ee23f..0000000000
--- a/synapse/rest/key/v1/server_key_resource.py
+++ /dev/null
@@ -1,92 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014-2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import logging
-
-from canonicaljson import encode_canonical_json
-from signedjson.sign import sign_json
-from unpaddedbase64 import encode_base64
-
-from OpenSSL import crypto
-from twisted.web.resource import Resource
-
-from synapse.http.server import respond_with_json_bytes
-
-logger = logging.getLogger(__name__)
-
-
-class LocalKey(Resource):
- """HTTP resource containing encoding the TLS X.509 certificate and NACL
- signature verification keys for this server::
-
- GET /key HTTP/1.1
-
- HTTP/1.1 200 OK
- Content-Type: application/json
- {
- "server_name": "this.server.example.com"
- "verify_keys": {
- "algorithm:version": # base64 encoded NACL verification key.
- },
- "tls_certificate": # base64 ASN.1 DER encoded X.509 tls cert.
- "signatures": {
- "this.server.example.com": {
- "algorithm:version": # NACL signature for this server.
- }
- }
- }
- """
-
- def __init__(self, hs):
- self.response_body = encode_canonical_json(
- self.response_json_object(hs.config)
- )
- Resource.__init__(self)
-
- @staticmethod
- def response_json_object(server_config):
- verify_keys = {}
- for key in server_config.signing_key:
- verify_key_bytes = key.verify_key.encode()
- key_id = "%s:%s" % (key.alg, key.version)
- verify_keys[key_id] = encode_base64(verify_key_bytes)
-
- x509_certificate_bytes = crypto.dump_certificate(
- crypto.FILETYPE_ASN1,
- server_config.tls_certificate
- )
- json_object = {
- u"server_name": server_config.server_name,
- u"verify_keys": verify_keys,
- u"tls_certificate": encode_base64(x509_certificate_bytes)
- }
- for key in server_config.signing_key:
- json_object = sign_json(
- json_object,
- server_config.server_name,
- key,
- )
-
- return json_object
-
- def render_GET(self, request):
- return respond_with_json_bytes(
- request, 200, self.response_body,
- )
-
- def getChild(self, name, request):
- if name == b'':
- return self
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 1a7bfd6b56..91d1dafe64 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
import cgi
import datetime
import errno
@@ -24,6 +25,7 @@ import shutil
import sys
import traceback
+import six
from six import string_types
from six.moves import urllib_parse as urlparse
@@ -98,7 +100,7 @@ class PreviewUrlResource(Resource):
# XXX: if get_user_by_req fails, what should we do in an async render?
requester = yield self.auth.get_user_by_req(request)
url = parse_string(request, "url")
- if "ts" in request.args:
+ if b"ts" in request.args:
ts = parse_integer(request, "ts")
else:
ts = self.clock.time_msec()
@@ -180,7 +182,12 @@ class PreviewUrlResource(Resource):
cache_result["expires_ts"] > ts and
cache_result["response_code"] / 100 == 2
):
- defer.returnValue(cache_result["og"])
+ # It may be stored as text in the database, not as bytes (such as
+ # PostgreSQL). If so, encode it back before handing it on.
+ og = cache_result["og"]
+ if isinstance(og, six.text_type):
+ og = og.encode('utf8')
+ defer.returnValue(og)
return
media_info = yield self._download_url(url, user)
@@ -213,14 +220,17 @@ class PreviewUrlResource(Resource):
elif _is_html(media_info['media_type']):
# TODO: somehow stop a big HTML tree from exploding synapse's RAM
- file = open(media_info['filename'])
- body = file.read()
- file.close()
+ with open(media_info['filename'], 'rb') as file:
+ body = file.read()
# clobber the encoding from the content-type, or default to utf-8
# XXX: this overrides any <meta/> or XML charset headers in the body
# which may pose problems, but so far seems to work okay.
- match = re.match(r'.*; *charset=(.*?)(;|$)', media_info['media_type'], re.I)
+ match = re.match(
+ r'.*; *charset="?(.*?)"?(;|$)',
+ media_info['media_type'],
+ re.I
+ )
encoding = match.group(1) if match else "utf-8"
og = decode_and_calc_og(body, media_info['uri'], encoding)
diff --git a/synapse/server.py b/synapse/server.py
index cf6b872cbd..9985687b95 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -23,6 +23,7 @@ import abc
import logging
from twisted.enterprise import adbapi
+from twisted.mail.smtp import sendmail
from twisted.web.client import BrowserLikePolicyForHTTPS
from synapse.api.auth import Auth
@@ -174,6 +175,7 @@ class HomeServer(object):
'message_handler',
'pagination_handler',
'room_context_handler',
+ 'sendmail',
]
# This is overridden in derived application classes
@@ -269,6 +271,9 @@ class HomeServer(object):
def build_room_creation_handler(self):
return RoomCreationHandler(self)
+ def build_sendmail(self):
+ return sendmail
+
def build_state_handler(self):
return StateHandler(self)
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 9b40b18d5b..70048b0c09 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -261,7 +261,7 @@ class StateHandler(object):
logger.debug("calling resolve_state_groups from compute_event_context")
entry = yield self.resolve_state_groups_for_events(
- event.room_id, [e for e, _ in event.prev_events],
+ event.room_id, event.prev_event_ids(),
)
prev_state_ids = entry.state
@@ -607,7 +607,7 @@ def resolve_events_with_store(room_version, state_sets, event_map, state_res_sto
return v1.resolve_events_with_store(
state_sets, event_map, state_res_store.get_events,
)
- elif room_version == RoomVersions.VDH_TEST:
+ elif room_version in (RoomVersions.VDH_TEST, RoomVersions.STATE_V2_TEST):
return v2.resolve_events_with_store(
state_sets, event_map, state_res_store,
)
diff --git a/synapse/state/v2.py b/synapse/state/v2.py
index 5d06f7e928..3573bb0028 100644
--- a/synapse/state/v2.py
+++ b/synapse/state/v2.py
@@ -53,6 +53,10 @@ def resolve_events_with_store(state_sets, event_map, state_res_store):
logger.debug("Computing conflicted state")
+ # We use event_map as a cache, so if its None we need to initialize it
+ if event_map is None:
+ event_map = {}
+
# First split up the un/conflicted state
unconflicted_state, conflicted_state = _seperate(state_sets)
@@ -155,7 +159,7 @@ def _get_power_level_for_sender(event_id, event_map, state_res_store):
event = yield _get_event(event_id, event_map, state_res_store)
pl = None
- for aid, _ in event.auth_events:
+ for aid in event.auth_event_ids():
aev = yield _get_event(aid, event_map, state_res_store)
if (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""):
pl = aev
@@ -163,7 +167,7 @@ def _get_power_level_for_sender(event_id, event_map, state_res_store):
if pl is None:
# Couldn't find power level. Check if they're the creator of the room
- for aid, _ in event.auth_events:
+ for aid in event.auth_event_ids():
aev = yield _get_event(aid, event_map, state_res_store)
if (aev.type, aev.state_key) == (EventTypes.Create, ""):
if aev.content.get("creator") == event.sender:
@@ -295,7 +299,7 @@ def _add_event_and_auth_chain_to_graph(graph, event_id, event_map,
graph.setdefault(eid, set())
event = yield _get_event(eid, event_map, state_res_store)
- for aid, _ in event.auth_events:
+ for aid in event.auth_event_ids():
if aid in auth_diff:
if aid not in graph:
state.append(aid)
@@ -365,7 +369,7 @@ def _iterative_auth_checks(event_ids, base_state, event_map, state_res_store):
event = event_map[event_id]
auth_events = {}
- for aid, _ in event.auth_events:
+ for aid in event.auth_event_ids():
ev = yield _get_event(aid, event_map, state_res_store)
if ev.rejected_reason is None:
@@ -413,9 +417,9 @@ def _mainline_sort(event_ids, resolved_power_event_id, event_map,
while pl:
mainline.append(pl)
pl_ev = yield _get_event(pl, event_map, state_res_store)
- auth_events = pl_ev.auth_events
+ auth_events = pl_ev.auth_event_ids()
pl = None
- for aid, _ in auth_events:
+ for aid in auth_events:
ev = yield _get_event(aid, event_map, state_res_store)
if (ev.type, ev.state_key) == (EventTypes.PowerLevels, ""):
pl = aid
@@ -460,10 +464,10 @@ def _get_mainline_depth_for_event(event, mainline_map, event_map, state_res_stor
if depth is not None:
defer.returnValue(depth)
- auth_events = event.auth_events
+ auth_events = event.auth_event_ids()
event = None
- for aid, _ in auth_events:
+ for aid in auth_events:
aev = yield _get_event(aid, event_map, state_res_store)
if (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""):
event = aev
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 62497ab63f..ecdab34e7d 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -22,14 +22,19 @@ from twisted.internet import defer
from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
-from ._base import Cache, SQLBaseStore, db_to_json
+from ._base import Cache, db_to_json
logger = logging.getLogger(__name__)
+DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
+ "drop_device_list_streams_non_unique_indexes"
+)
-class DeviceStore(SQLBaseStore):
+
+class DeviceStore(BackgroundUpdateStore):
def __init__(self, db_conn, hs):
super(DeviceStore, self).__init__(db_conn, hs)
@@ -52,6 +57,30 @@ class DeviceStore(SQLBaseStore):
columns=["user_id", "device_id"],
)
+ # create a unique index on device_lists_remote_cache
+ self.register_background_index_update(
+ "device_lists_remote_cache_unique_idx",
+ index_name="device_lists_remote_cache_unique_id",
+ table="device_lists_remote_cache",
+ columns=["user_id", "device_id"],
+ unique=True,
+ )
+
+ # And one on device_lists_remote_extremeties
+ self.register_background_index_update(
+ "device_lists_remote_extremeties_unique_idx",
+ index_name="device_lists_remote_extremeties_unique_idx",
+ table="device_lists_remote_extremeties",
+ columns=["user_id"],
+ unique=True,
+ )
+
+ # once they complete, we can remove the old non-unique indexes.
+ self.register_background_update_handler(
+ DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES,
+ self._drop_device_list_streams_non_unique_indexes,
+ )
+
@defer.inlineCallbacks
def store_device(self, user_id, device_id,
initial_device_display_name):
@@ -239,7 +268,19 @@ class DeviceStore(SQLBaseStore):
def update_remote_device_list_cache_entry(self, user_id, device_id, content,
stream_id):
- """Updates a single user's device in the cache.
+ """Updates a single device in the cache of a remote user's devicelist.
+
+ Note: assumes that we are the only thread that can be updating this user's
+ device list.
+
+ Args:
+ user_id (str): User to update device list for
+ device_id (str): ID of decivice being updated
+ content (dict): new data on this device
+ stream_id (int): the version of the device list
+
+ Returns:
+ Deferred[None]
"""
return self.runInteraction(
"update_remote_device_list_cache_entry",
@@ -272,7 +313,11 @@ class DeviceStore(SQLBaseStore):
},
values={
"content": json.dumps(content),
- }
+ },
+
+ # we don't need to lock, because we assume we are the only thread
+ # updating this user's devices.
+ lock=False,
)
txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id,))
@@ -289,11 +334,26 @@ class DeviceStore(SQLBaseStore):
},
values={
"stream_id": stream_id,
- }
+ },
+
+ # again, we can assume we are the only thread updating this user's
+ # extremity.
+ lock=False,
)
def update_remote_device_list_cache(self, user_id, devices, stream_id):
- """Replace the cache of the remote user's devices.
+ """Replace the entire cache of the remote user's devices.
+
+ Note: assumes that we are the only thread that can be updating this user's
+ device list.
+
+ Args:
+ user_id (str): User to update device list for
+ devices (list[dict]): list of device objects supplied over federation
+ stream_id (int): the version of the device list
+
+ Returns:
+ Deferred[None]
"""
return self.runInteraction(
"update_remote_device_list_cache",
@@ -338,7 +398,11 @@ class DeviceStore(SQLBaseStore):
},
values={
"stream_id": stream_id,
- }
+ },
+
+ # we don't need to lock, because we can assume we are the only thread
+ # updating this user's extremity.
+ lock=False,
)
def get_devices_by_remote(self, destination, from_stream_id):
@@ -722,3 +786,19 @@ class DeviceStore(SQLBaseStore):
"_prune_old_outbound_device_pokes",
_prune_txn,
)
+
+ @defer.inlineCallbacks
+ def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
+ def f(conn):
+ txn = conn.cursor()
+ txn.execute(
+ "DROP INDEX IF EXISTS device_lists_remote_cache_id"
+ )
+ txn.execute(
+ "DROP INDEX IF EXISTS device_lists_remote_extremeties_id"
+ )
+ txn.close()
+
+ yield self.runWithConnection(f)
+ yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES)
+ defer.returnValue(1)
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 1f1721e820..2a0f6cfca9 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -40,7 +40,10 @@ class EndToEndKeyStore(SQLBaseStore):
allow_none=True,
)
- new_key_json = encode_canonical_json(device_keys)
+ # In py3 we need old_key_json to match new_key_json type. The DB
+ # returns unicode while encode_canonical_json returns bytes.
+ new_key_json = encode_canonical_json(device_keys).decode("utf-8")
+
if old_key_json == new_key_json:
return False
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 3faca2a042..d3b9dea1d6 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -477,7 +477,7 @@ class EventFederationStore(EventFederationWorkerStore):
"is_state": False,
}
for ev in events
- for e_id, _ in ev.prev_events
+ for e_id in ev.prev_event_ids()
],
)
@@ -510,7 +510,7 @@ class EventFederationStore(EventFederationWorkerStore):
txn.executemany(query, [
(e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
- for ev in events for e_id, _ in ev.prev_events
+ for ev in events for e_id in ev.prev_event_ids()
if not ev.internal_metadata.is_outlier()
])
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 8881b009df..2047110b1d 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -38,6 +38,7 @@ from synapse.state import StateResolutionStore
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.event_federation import EventFederationStore
from synapse.storage.events_worker import EventsWorkerStore
+from synapse.storage.state import StateGroupWorkerStore
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util import batch_iter
from synapse.util.async_helpers import ObservableDeferred
@@ -205,7 +206,8 @@ def _retry_on_integrity_error(func):
# inherits from EventFederationStore so that we can call _update_backward_extremities
# and _handle_mult_prev_events (though arguably those could both be moved in here)
-class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore):
+class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore,
+ BackgroundUpdateStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
@@ -414,7 +416,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
)
if len_1:
all_single_prev_not_state = all(
- len(event.prev_events) == 1
+ len(event.prev_event_ids()) == 1
and not event.is_state()
for event, ctx in ev_ctx_rm
)
@@ -438,7 +440,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
# guess this by looking at the prev_events and checking
# if they match the current forward extremities.
for ev, _ in ev_ctx_rm:
- prev_event_ids = set(e for e, _ in ev.prev_events)
+ prev_event_ids = set(ev.prev_event_ids())
if latest_event_ids == prev_event_ids:
state_delta_reuse_delta_counter.inc()
break
@@ -549,7 +551,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
result.difference_update(
e_id
for event in new_events
- for e_id, _ in event.prev_events
+ for e_id in event.prev_event_ids()
)
# Finally, remove any events which are prev_events of any existing events.
@@ -867,7 +869,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
"auth_id": auth_id,
}
for event, _ in events_and_contexts
- for auth_id, _ in event.auth_events
+ for auth_id in event.auth_event_ids()
if event.is_state()
],
)
@@ -2034,55 +2036,37 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
logger.info("[purge] finding redundant state groups")
- # Get all state groups that are only referenced by events that are
- # to be deleted.
- # This works by first getting state groups that we may want to delete,
- # joining against event_to_state_groups to get events that use that
- # state group, then left joining against events_to_purge again. Any
- # state group where the left join produce *no nulls* are referenced
- # only by events that are going to be purged.
+ # Get all state groups that are referenced by events that are to be
+ # deleted. We then go and check if they are referenced by other events
+ # or state groups, and if not we delete them.
txn.execute("""
- SELECT state_group FROM
- (
- SELECT DISTINCT state_group FROM events_to_purge
- INNER JOIN event_to_state_groups USING (event_id)
- ) AS sp
- INNER JOIN event_to_state_groups USING (state_group)
- LEFT JOIN events_to_purge AS ep USING (event_id)
- GROUP BY state_group
- HAVING SUM(CASE WHEN ep.event_id IS NULL THEN 1 ELSE 0 END) = 0
+ SELECT DISTINCT state_group FROM events_to_purge
+ INNER JOIN event_to_state_groups USING (event_id)
""")
- state_rows = txn.fetchall()
- logger.info("[purge] found %i redundant state groups", len(state_rows))
-
- # make a set of the redundant state groups, so that we can look them up
- # efficiently
- state_groups_to_delete = set([sg for sg, in state_rows])
-
- # Now we get all the state groups that rely on these state groups
- logger.info("[purge] finding state groups which depend on redundant"
- " state groups")
- remaining_state_groups = []
- for i in range(0, len(state_rows), 100):
- chunk = [sg for sg, in state_rows[i:i + 100]]
- # look for state groups whose prev_state_group is one we are about
- # to delete
- rows = self._simple_select_many_txn(
- txn,
- table="state_group_edges",
- column="prev_state_group",
- iterable=chunk,
- retcols=["state_group"],
- keyvalues={},
- )
- remaining_state_groups.extend(
- row["state_group"] for row in rows
+ referenced_state_groups = set(sg for sg, in txn)
+ logger.info(
+ "[purge] found %i referenced state groups",
+ len(referenced_state_groups),
+ )
- # exclude state groups we are about to delete: no point in
- # updating them
- if row["state_group"] not in state_groups_to_delete
+ logger.info("[purge] finding state groups that can be deleted")
+
+ state_groups_to_delete, remaining_state_groups = (
+ self._find_unreferenced_groups_during_purge(
+ txn, referenced_state_groups,
)
+ )
+
+ logger.info(
+ "[purge] found %i state groups to delete",
+ len(state_groups_to_delete),
+ )
+
+ logger.info(
+ "[purge] de-delta-ing %i remaining state groups",
+ len(remaining_state_groups),
+ )
# Now we turn the state groups that reference to-be-deleted state
# groups to non delta versions.
@@ -2127,11 +2111,11 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
logger.info("[purge] removing redundant state groups")
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
- state_rows
+ ((sg,) for sg in state_groups_to_delete),
)
txn.executemany(
"DELETE FROM state_groups WHERE id = ?",
- state_rows
+ ((sg,) for sg in state_groups_to_delete),
)
logger.info("[purge] removing events from event_to_state_groups")
@@ -2227,6 +2211,85 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
logger.info("[purge] done")
+ def _find_unreferenced_groups_during_purge(self, txn, state_groups):
+ """Used when purging history to figure out which state groups can be
+ deleted and which need to be de-delta'ed (due to one of its prev groups
+ being scheduled for deletion).
+
+ Args:
+ txn
+ state_groups (set[int]): Set of state groups referenced by events
+ that are going to be deleted.
+
+ Returns:
+ tuple[set[int], set[int]]: The set of state groups that can be
+ deleted and the set of state groups that need to be de-delta'ed
+ """
+ # Graph of state group -> previous group
+ graph = {}
+
+ # Set of events that we have found to be referenced by events
+ referenced_groups = set()
+
+ # Set of state groups we've already seen
+ state_groups_seen = set(state_groups)
+
+ # Set of state groups to handle next.
+ next_to_search = set(state_groups)
+ while next_to_search:
+ # We bound size of groups we're looking up at once, to stop the
+ # SQL query getting too big
+ if len(next_to_search) < 100:
+ current_search = next_to_search
+ next_to_search = set()
+ else:
+ current_search = set(itertools.islice(next_to_search, 100))
+ next_to_search -= current_search
+
+ # Check if state groups are referenced
+ sql = """
+ SELECT DISTINCT state_group FROM event_to_state_groups
+ LEFT JOIN events_to_purge AS ep USING (event_id)
+ WHERE state_group IN (%s) AND ep.event_id IS NULL
+ """ % (",".join("?" for _ in current_search),)
+ txn.execute(sql, list(current_search))
+
+ referenced = set(sg for sg, in txn)
+ referenced_groups |= referenced
+
+ # We don't continue iterating up the state group graphs for state
+ # groups that are referenced.
+ current_search -= referenced
+
+ rows = self._simple_select_many_txn(
+ txn,
+ table="state_group_edges",
+ column="prev_state_group",
+ iterable=current_search,
+ keyvalues={},
+ retcols=("prev_state_group", "state_group",),
+ )
+
+ prevs = set(row["state_group"] for row in rows)
+ # We don't bother re-handling groups we've already seen
+ prevs -= state_groups_seen
+ next_to_search |= prevs
+ state_groups_seen |= prevs
+
+ for row in rows:
+ # Note: Each state group can have at most one prev group
+ graph[row["state_group"]] = row["prev_state_group"]
+
+ to_delete = state_groups_seen - referenced_groups
+
+ to_dedelta = set()
+ for sg in referenced_groups:
+ prev_sg = graph.get(sg)
+ if prev_sg and prev_sg in to_delete:
+ to_dedelta.add(sg)
+
+ return to_delete, to_dedelta
+
@defer.inlineCallbacks
def is_event_after(self, event_id1, event_id2):
"""Returns True if event_id1 is after event_id2 in the stream
diff --git a/synapse/storage/schema/delta/40/device_list_streams.sql b/synapse/storage/schema/delta/40/device_list_streams.sql
index 54841b3843..dd6dcb65f1 100644
--- a/synapse/storage/schema/delta/40/device_list_streams.sql
+++ b/synapse/storage/schema/delta/40/device_list_streams.sql
@@ -20,9 +20,6 @@ CREATE TABLE device_lists_remote_cache (
content TEXT NOT NULL
);
-CREATE INDEX device_lists_remote_cache_id ON device_lists_remote_cache(user_id, device_id);
-
-
-- The last update we got for a user. Empty if we're not receiving updates for
-- that user.
CREATE TABLE device_lists_remote_extremeties (
@@ -30,7 +27,11 @@ CREATE TABLE device_lists_remote_extremeties (
stream_id TEXT NOT NULL
);
-CREATE INDEX device_lists_remote_extremeties_id ON device_lists_remote_extremeties(user_id, stream_id);
+-- we used to create non-unique indexes on these tables, but as of update 52 we create
+-- unique indexes concurrently:
+--
+-- CREATE INDEX device_lists_remote_cache_id ON device_lists_remote_cache(user_id, device_id);
+-- CREATE INDEX device_lists_remote_extremeties_id ON device_lists_remote_extremeties(user_id, stream_id);
-- Stream of device lists updates. Includes both local and remotes
diff --git a/synapse/storage/schema/delta/52/add_event_to_state_group_index.sql b/synapse/storage/schema/delta/52/add_event_to_state_group_index.sql
new file mode 100644
index 0000000000..91e03d13e1
--- /dev/null
+++ b/synapse/storage/schema/delta/52/add_event_to_state_group_index.sql
@@ -0,0 +1,19 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- This is needed to efficiently check for unreferenced state groups during
+-- purge. Added events_to_state_group(state_group) index
+INSERT into background_updates (update_name, progress_json)
+ VALUES ('event_to_state_groups_sg_index', '{}');
diff --git a/synapse/storage/schema/delta/52/device_list_streams_unique_idx.sql b/synapse/storage/schema/delta/52/device_list_streams_unique_idx.sql
new file mode 100644
index 0000000000..bfa49e6f92
--- /dev/null
+++ b/synapse/storage/schema/delta/52/device_list_streams_unique_idx.sql
@@ -0,0 +1,36 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- register a background update which will create a unique index on
+-- device_lists_remote_cache
+INSERT into background_updates (update_name, progress_json)
+ VALUES ('device_lists_remote_cache_unique_idx', '{}');
+
+-- and one on device_lists_remote_extremeties
+INSERT into background_updates (update_name, progress_json, depends_on)
+ VALUES (
+ 'device_lists_remote_extremeties_unique_idx', '{}',
+
+ -- doesn't really depend on this, but we need to make sure both happen
+ -- before we drop the old indexes.
+ 'device_lists_remote_cache_unique_idx'
+ );
+
+-- once they complete, we can drop the old indexes.
+INSERT into background_updates (update_name, progress_json, depends_on)
+ VALUES (
+ 'drop_device_list_streams_non_unique_indexes', '{}',
+ 'device_lists_remote_extremeties_unique_idx'
+ );
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index ef65929bb2..d737bd6778 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -1257,6 +1257,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication"
STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index"
CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
+ EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index"
def __init__(self, db_conn, hs):
super(StateStore, self).__init__(db_conn, hs)
@@ -1275,6 +1276,12 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
columns=["state_key"],
where_clause="type='m.room.member'",
)
+ self.register_background_index_update(
+ self.EVENT_STATE_GROUP_INDEX_UPDATE_NAME,
+ index_name="event_to_state_groups_sg_index",
+ table="event_to_state_groups",
+ columns=["state_group"],
+ )
def _store_event_state_mappings_txn(self, txn, events_and_contexts):
state_groups = {}
|