diff options
Diffstat (limited to 'synapse')
40 files changed, 1892 insertions, 589 deletions
diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 6b347b1749..ee129c8689 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -79,6 +79,7 @@ class EventTypes(object): RoomHistoryVisibility = "m.room.history_visibility" CanonicalAlias = "m.room.canonical_alias" + Encryption = "m.room.encryption" RoomAvatar = "m.room.avatar" RoomEncryption = "m.room.encryption" GuestAccess = "m.room.guest_access" diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py index b2895355a8..4085bd10b9 100644 --- a/synapse/api/room_versions.py +++ b/synapse/api/room_versions.py @@ -85,10 +85,6 @@ class RoomVersions(object): ) -# the version we will give rooms which are created on this server -DEFAULT_ROOM_VERSION = RoomVersions.V1 - - KNOWN_ROOM_VERSIONS = { v.identifier: v for v in ( RoomVersions.V1, diff --git a/synapse/api/urls.py b/synapse/api/urls.py index 3c6bddff7a..e16c386a14 100644 --- a/synapse/api/urls.py +++ b/synapse/api/urls.py @@ -26,6 +26,7 @@ CLIENT_API_PREFIX = "/_matrix/client" FEDERATION_PREFIX = "/_matrix/federation" FEDERATION_V1_PREFIX = FEDERATION_PREFIX + "/v1" FEDERATION_V2_PREFIX = FEDERATION_PREFIX + "/v2" +FEDERATION_UNSTABLE_PREFIX = FEDERATION_PREFIX + "/unstable" STATIC_PREFIX = "/_matrix/static" WEB_CLIENT_PREFIX = "/_matrix/client" CONTENT_REPO_PREFIX = "/_matrix/content" diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 08199a5e8d..8cc990399f 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -344,15 +344,21 @@ class _LimitedHostnameResolver(object): def resolveHostName(self, resolutionReceiver, hostName, portNumber=0, addressTypes=None, transportSemantics='TCP'): - # Note this is happening deep within the reactor, so we don't need to - # worry about log contexts. - # We need this function to return `resolutionReceiver` so we do all the # actual logic involving deferreds in a separate function. - self._resolve( - resolutionReceiver, hostName, portNumber, - addressTypes, transportSemantics, - ) + + # even though this is happening within the depths of twisted, we need to drop + # our logcontext before starting _resolve, otherwise: (a) _resolve will drop + # the logcontext if it returns an incomplete deferred; (b) _resolve will + # call the resolutionReceiver *with* a logcontext, which it won't be expecting. + with PreserveLoggingContext(): + self._resolve( + resolutionReceiver, + hostName, + portNumber, + addressTypes, + transportSemantics, + ) return resolutionReceiver diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 864f1eac48..a16e037f32 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -38,6 +38,7 @@ from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore +from synapse.replication.slave.storage.profile import SlavedProfileStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore @@ -81,6 +82,7 @@ class ClientReaderSlavedStore( SlavedApplicationServiceStore, SlavedRegistrationStore, SlavedTransactionStore, + SlavedProfileStore, SlavedClientIpStore, BaseSlavedStore, ): diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 727fdc54d8..5c4fc8ff21 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -13,6 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from .api import ApiConfig from .appservice import AppServiceConfig from .captcha import CaptchaConfig @@ -36,20 +37,41 @@ from .saml2_config import SAML2Config from .server import ServerConfig from .server_notices_config import ServerNoticesConfig from .spam_checker import SpamCheckerConfig +from .stats import StatsConfig from .tls import TlsConfig from .user_directory import UserDirectoryConfig from .voip import VoipConfig from .workers import WorkerConfig -class HomeServerConfig(ServerConfig, TlsConfig, DatabaseConfig, LoggingConfig, - RatelimitConfig, ContentRepositoryConfig, CaptchaConfig, - VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig, - AppServiceConfig, KeyConfig, SAML2Config, CasConfig, - JWTConfig, PasswordConfig, EmailConfig, - WorkerConfig, PasswordAuthProviderConfig, PushConfig, - SpamCheckerConfig, GroupsConfig, UserDirectoryConfig, - ConsentConfig, - ServerNoticesConfig, RoomDirectoryConfig, - ): +class HomeServerConfig( + ServerConfig, + TlsConfig, + DatabaseConfig, + LoggingConfig, + RatelimitConfig, + ContentRepositoryConfig, + CaptchaConfig, + VoipConfig, + RegistrationConfig, + MetricsConfig, + ApiConfig, + AppServiceConfig, + KeyConfig, + SAML2Config, + CasConfig, + JWTConfig, + PasswordConfig, + EmailConfig, + WorkerConfig, + PasswordAuthProviderConfig, + PushConfig, + SpamCheckerConfig, + GroupsConfig, + UserDirectoryConfig, + ConsentConfig, + StatsConfig, + ServerNoticesConfig, + RoomDirectoryConfig, +): pass diff --git a/synapse/config/server.py b/synapse/config/server.py index f34aa42afa..e763e19e15 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -20,6 +20,7 @@ import os.path from netaddr import IPSet +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.http.endpoint import parse_and_validate_server_name from synapse.python_dependencies import DependencyException, check_requirements @@ -35,6 +36,8 @@ logger = logging.Logger(__name__) # in the list. DEFAULT_BIND_ADDRESSES = ['::', '0.0.0.0'] +DEFAULT_ROOM_VERSION = "1" + class ServerConfig(Config): @@ -88,6 +91,22 @@ class ServerConfig(Config): "restrict_public_rooms_to_local_users", False, ) + default_room_version = config.get( + "default_room_version", DEFAULT_ROOM_VERSION, + ) + + # Ensure room version is a str + default_room_version = str(default_room_version) + + if default_room_version not in KNOWN_ROOM_VERSIONS: + raise ConfigError( + "Unknown default_room_version: %s, known room versions: %s" % + (default_room_version, list(KNOWN_ROOM_VERSIONS.keys())) + ) + + # Get the actual room version object rather than just the identifier + self.default_room_version = KNOWN_ROOM_VERSIONS[default_room_version] + # whether to enable search. If disabled, new entries will not be inserted # into the search tables and they will not be indexed. Users will receive # errors when attempting to search for messages. @@ -310,6 +329,10 @@ class ServerConfig(Config): unsecure_port = 8008 pid_file = os.path.join(data_dir_path, "homeserver.pid") + + # Bring DEFAULT_ROOM_VERSION into the local-scope for use in the + # default config string + default_room_version = DEFAULT_ROOM_VERSION return """\ ## Server ## @@ -384,6 +407,16 @@ class ServerConfig(Config): # #restrict_public_rooms_to_local_users: true + # The default room version for newly created rooms. + # + # Known room versions are listed here: + # https://matrix.org/docs/spec/#complete-list-of-room-versions + # + # For example, for room version 1, default_room_version should be set + # to "1". + # + #default_room_version: "%(default_room_version)s" + # The GC threshold parameters to pass to `gc.set_threshold`, if defined # #gc_thresholds: [700, 10, 10] diff --git a/synapse/config/stats.py b/synapse/config/stats.py new file mode 100644 index 0000000000..80fc1b9dd0 --- /dev/null +++ b/synapse/config/stats.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import division + +import sys + +from ._base import Config + + +class StatsConfig(Config): + """Stats Configuration + Configuration for the behaviour of synapse's stats engine + """ + + def read_config(self, config): + self.stats_enabled = True + self.stats_bucket_size = 86400 + self.stats_retention = sys.maxsize + stats_config = config.get("stats", None) + if stats_config: + self.stats_enabled = stats_config.get("enabled", self.stats_enabled) + self.stats_bucket_size = ( + self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000 + ) + self.stats_retention = ( + self.parse_duration( + stats_config.get("retention", "%ds" % (sys.maxsize,)) + ) + / 1000 + ) + + def default_config(self, config_dir_path, server_name, **kwargs): + return """ + # Local statistics collection. Used in populating the room directory. + # + # 'bucket_size' controls how large each statistics timeslice is. It can + # be defined in a human readable short form -- e.g. "1d", "1y". + # + # 'retention' controls how long historical statistics will be kept for. + # It can be defined in a human readable short form -- e.g. "1d", "1y". + # + # + #stats: + # enabled: true + # bucket_size: 1d + # retention: 1y + """ diff --git a/synapse/config/user_directory.py b/synapse/config/user_directory.py index 142754a7dc..023997ccde 100644 --- a/synapse/config/user_directory.py +++ b/synapse/config/user_directory.py @@ -43,9 +43,9 @@ class UserDirectoryConfig(Config): # # 'search_all_users' defines whether to search all users visible to your HS # when searching the user directory, rather than limiting to users visible - # in public rooms. Defaults to false. If you set it True, you'll have to run - # UPDATE user_directory_stream_pos SET stream_id = NULL; - # on your database to tell it to rebuild the user_directory search indexes. + # in public rooms. Defaults to false. If you set it True, you'll have to + # rebuild the user_directory search indexes, see + # https://github.com/matrix-org/synapse/blob/master/docs/user_directory.md # #user_directory: # enabled: true diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index d8ba870cca..c63f106cf3 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -17,10 +17,10 @@ import logging from collections import namedtuple +import six from six import raise_from from six.moves import urllib -import nacl.signing from signedjson.key import ( decode_verify_key_bytes, encode_verify_key_base64, @@ -43,6 +43,7 @@ from synapse.api.errors import ( RequestSendFailed, SynapseError, ) +from synapse.storage.keys import FetchKeyResult from synapse.util import logcontext, unwrapFirstError from synapse.util.logcontext import ( LoggingContext, @@ -56,9 +57,9 @@ from synapse.util.retryutils import NotRetryingDestination logger = logging.getLogger(__name__) -VerifyKeyRequest = namedtuple("VerifyRequest", ( - "server_name", "key_ids", "json_object", "deferred" -)) +VerifyKeyRequest = namedtuple( + "VerifyRequest", ("server_name", "key_ids", "json_object", "deferred") +) """ A request for a verify key to verify a JSON object. @@ -80,12 +81,13 @@ class KeyLookupError(ValueError): class Keyring(object): def __init__(self, hs): - self.store = hs.get_datastore() self.clock = hs.get_clock() - self.client = hs.get_http_client() - self.config = hs.get_config() - self.perspective_servers = self.config.perspectives - self.hs = hs + + self._key_fetchers = ( + StoreKeyFetcher(hs), + PerspectivesKeyFetcher(hs), + ServerKeyFetcher(hs), + ) # map from server name to Deferred. Has an entry for each server with # an ongoing key download; the Deferred completes once the download @@ -96,9 +98,7 @@ class Keyring(object): def verify_json_for_server(self, server_name, json_object): return logcontext.make_deferred_yieldable( - self.verify_json_objects_for_server( - [(server_name, json_object)] - )[0] + self.verify_json_objects_for_server([(server_name, json_object)])[0] ) def verify_json_objects_for_server(self, server_and_json): @@ -130,18 +130,15 @@ class Keyring(object): if not key_ids: return defer.fail( SynapseError( - 400, - "Not signed by %s" % (server_name,), - Codes.UNAUTHORIZED, + 400, "Not signed by %s" % (server_name,), Codes.UNAUTHORIZED ) ) - logger.debug("Verifying for %s with key_ids %s", - server_name, key_ids) + logger.debug("Verifying for %s with key_ids %s", server_name, key_ids) # add the key request to the queue, but don't start it off yet. verify_request = VerifyKeyRequest( - server_name, key_ids, json_object, defer.Deferred(), + server_name, key_ids, json_object, defer.Deferred() ) verify_requests.append(verify_request) @@ -179,16 +176,12 @@ class Keyring(object): # any other lookups until we have finished. # The deferreds are called with no logcontext. server_to_deferred = { - rq.server_name: defer.Deferred() - for rq in verify_requests + rq.server_name: defer.Deferred() for rq in verify_requests } # We want to wait for any previous lookups to complete before # proceeding. - yield self.wait_for_previous_lookups( - [rq.server_name for rq in verify_requests], - server_to_deferred, - ) + yield self.wait_for_previous_lookups(server_to_deferred) # Actually start fetching keys. self._get_server_verify_keys(verify_requests) @@ -216,19 +209,16 @@ class Keyring(object): return res for verify_request in verify_requests: - verify_request.deferred.addBoth( - remove_deferreds, verify_request, - ) + verify_request.deferred.addBoth(remove_deferreds, verify_request) except Exception: logger.exception("Error starting key lookups") @defer.inlineCallbacks - def wait_for_previous_lookups(self, server_names, server_to_deferred): + def wait_for_previous_lookups(self, server_to_deferred): """Waits for any previous key lookups for the given servers to finish. Args: - server_names (list): list of server_names we want to lookup - server_to_deferred (dict): server_name to deferred which gets + server_to_deferred (dict[str, Deferred]): server_name to deferred which gets resolved once we've finished looking up keys for that server. The Deferreds should be regular twisted ones which call their callbacks with no logcontext. @@ -241,14 +231,15 @@ class Keyring(object): while True: wait_on = [ (server_name, self.key_downloads[server_name]) - for server_name in server_names + for server_name in server_to_deferred.keys() if server_name in self.key_downloads ] if not wait_on: break logger.info( "Waiting for existing lookups for %s to complete [loop %i]", - [w[0] for w in wait_on], loop_count, + [w[0] for w in wait_on], + loop_count, ) with PreserveLoggingContext(): yield defer.DeferredList((w[1] for w in wait_on)) @@ -279,13 +270,6 @@ class Keyring(object): verify_requests (list[VerifyKeyRequest]): list of verify requests """ - # These are functions that produce keys given a list of key ids - key_fetch_fns = ( - self.get_keys_from_store, # First try the local store - self.get_keys_from_perspectives, # Then try via perspectives - self.get_keys_from_server, # Then try directly - ) - @defer.inlineCallbacks def do_iterations(): with Measure(self.clock, "get_server_verify_keys"): @@ -296,8 +280,8 @@ class Keyring(object): verify_request.key_ids ) - for fn in key_fetch_fns: - results = yield fn(missing_keys.items()) + for f in self._key_fetchers: + results = yield f.get_keys(missing_keys.items()) # We now need to figure out which verify requests we have keys # for and which we don't @@ -315,11 +299,15 @@ class Keyring(object): # complete this VerifyKeyRequest. result_keys = results.get(server_name, {}) for key_id in verify_request.key_ids: - key = result_keys.get(key_id) - if key: + fetch_key_result = result_keys.get(key_id) + if fetch_key_result: with PreserveLoggingContext(): verify_request.deferred.callback( - (server_name, key_id, key) + ( + server_name, + key_id, + fetch_key_result.verify_key, + ) ) break else: @@ -335,13 +323,14 @@ class Keyring(object): with PreserveLoggingContext(): for verify_request in requests_missing_keys: - verify_request.deferred.errback(SynapseError( - 401, - "No key for %s with id %s" % ( - verify_request.server_name, verify_request.key_ids, - ), - Codes.UNAUTHORIZED, - )) + verify_request.deferred.errback( + SynapseError( + 401, + "No key for %s with id %s" + % (verify_request.server_name, verify_request.key_ids), + Codes.UNAUTHORIZED, + ) + ) def on_err(err): with PreserveLoggingContext(): @@ -351,17 +340,31 @@ class Keyring(object): run_in_background(do_iterations).addErrback(on_err) - @defer.inlineCallbacks - def get_keys_from_store(self, server_name_and_key_ids): + +class KeyFetcher(object): + def get_keys(self, server_name_and_key_ids): """ Args: - server_name_and_key_ids (iterable(Tuple[str, iterable[str]]): + server_name_and_key_ids (iterable[Tuple[str, iterable[str]]]): list of (server_name, iterable[key_id]) tuples to fetch keys for + Note that the iterables may be iterated more than once. Returns: - Deferred: resolves to dict[str, dict[str, VerifyKey|None]]: map from - server_name -> key_id -> VerifyKey + Deferred[dict[str, dict[str, synapse.storage.keys.FetchKeyResult|None]]]: + map from server_name -> key_id -> FetchKeyResult """ + raise NotImplementedError + + +class StoreKeyFetcher(KeyFetcher): + """KeyFetcher impl which fetches keys from our data store""" + + def __init__(self, hs): + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def get_keys(self, server_name_and_key_ids): + """see KeyFetcher.get_keys""" keys_to_fetch = ( (server_name, key_id) for server_name, key_ids in server_name_and_key_ids @@ -373,8 +376,135 @@ class Keyring(object): keys.setdefault(server_name, {})[key_id] = key defer.returnValue(keys) + +class BaseV2KeyFetcher(object): + def __init__(self, hs): + self.store = hs.get_datastore() + self.config = hs.get_config() + + @defer.inlineCallbacks + def process_v2_response( + self, from_server, response_json, time_added_ms, requested_ids=[] + ): + """Parse a 'Server Keys' structure from the result of a /key request + + This is used to parse either the entirety of the response from + GET /_matrix/key/v2/server, or a single entry from the list returned by + POST /_matrix/key/v2/query. + + Checks that each signature in the response that claims to come from the origin + server is valid, and that there is at least one such signature. + + Stores the json in server_keys_json so that it can be used for future responses + to /_matrix/key/v2/query. + + Args: + from_server (str): the name of the server producing this result: either + the origin server for a /_matrix/key/v2/server request, or the notary + for a /_matrix/key/v2/query. + + response_json (dict): the json-decoded Server Keys response object + + time_added_ms (int): the timestamp to record in server_keys_json + + requested_ids (iterable[str]): a list of the key IDs that were requested. + We will store the json for these key ids as well as any that are + actually in the response + + Returns: + Deferred[dict[str, FetchKeyResult]]: map from key_id to result object + """ + ts_valid_until_ms = response_json[u"valid_until_ts"] + + # start by extracting the keys from the response, since they may be required + # to validate the signature on the response. + verify_keys = {} + for key_id, key_data in response_json["verify_keys"].items(): + if is_signing_algorithm_supported(key_id): + key_base64 = key_data["key"] + key_bytes = decode_base64(key_base64) + verify_key = decode_verify_key_bytes(key_id, key_bytes) + verify_keys[key_id] = FetchKeyResult( + verify_key=verify_key, valid_until_ts=ts_valid_until_ms + ) + + server_name = response_json["server_name"] + verified = False + for key_id in response_json["signatures"].get(server_name, {}): + # each of the keys used for the signature must be present in the response + # json. + key = verify_keys.get(key_id) + if not key: + raise KeyLookupError( + "Key response is signed by key id %s:%s but that key is not " + "present in the response" % (server_name, key_id) + ) + + verify_signed_json(response_json, server_name, key.verify_key) + verified = True + + if not verified: + raise KeyLookupError( + "Key response for %s is not signed by the origin server" + % (server_name,) + ) + + for key_id, key_data in response_json["old_verify_keys"].items(): + if is_signing_algorithm_supported(key_id): + key_base64 = key_data["key"] + key_bytes = decode_base64(key_base64) + verify_key = decode_verify_key_bytes(key_id, key_bytes) + verify_keys[key_id] = FetchKeyResult( + verify_key=verify_key, valid_until_ts=key_data["expired_ts"] + ) + + # re-sign the json with our own key, so that it is ready if we are asked to + # give it out as a notary server + signed_key_json = sign_json( + response_json, self.config.server_name, self.config.signing_key[0] + ) + + signed_key_json_bytes = encode_canonical_json(signed_key_json) + + # for reasons I don't quite understand, we store this json for the key ids we + # requested, as well as those we got. + updated_key_ids = set(requested_ids) + updated_key_ids.update(verify_keys) + + yield logcontext.make_deferred_yieldable( + defer.gatherResults( + [ + run_in_background( + self.store.store_server_keys_json, + server_name=server_name, + key_id=key_id, + from_server=from_server, + ts_now_ms=time_added_ms, + ts_expires_ms=ts_valid_until_ms, + key_json_bytes=signed_key_json_bytes, + ) + for key_id in updated_key_ids + ], + consumeErrors=True, + ).addErrback(unwrapFirstError) + ) + + defer.returnValue(verify_keys) + + +class PerspectivesKeyFetcher(BaseV2KeyFetcher): + """KeyFetcher impl which fetches keys from the "perspectives" servers""" + + def __init__(self, hs): + super(PerspectivesKeyFetcher, self).__init__(hs) + self.clock = hs.get_clock() + self.client = hs.get_http_client() + self.perspective_servers = self.config.perspectives + @defer.inlineCallbacks - def get_keys_from_perspectives(self, server_name_and_key_ids): + def get_keys(self, server_name_and_key_ids): + """see KeyFetcher.get_keys""" + @defer.inlineCallbacks def get_key(perspective_name, perspective_keys): try: @@ -383,25 +513,26 @@ class Keyring(object): ) defer.returnValue(result) except KeyLookupError as e: - logger.warning( - "Key lookup failed from %r: %s", perspective_name, e, - ) + logger.warning("Key lookup failed from %r: %s", perspective_name, e) except Exception as e: logger.exception( "Unable to get key from %r: %s %s", perspective_name, - type(e).__name__, str(e), + type(e).__name__, + str(e), ) defer.returnValue({}) - results = yield logcontext.make_deferred_yieldable(defer.gatherResults( - [ - run_in_background(get_key, p_name, p_keys) - for p_name, p_keys in self.perspective_servers.items() - ], - consumeErrors=True, - ).addErrback(unwrapFirstError)) + results = yield logcontext.make_deferred_yieldable( + defer.gatherResults( + [ + run_in_background(get_key, p_name, p_keys) + for p_name, p_keys in self.perspective_servers.items() + ], + consumeErrors=True, + ).addErrback(unwrapFirstError) + ) union_of_keys = {} for result in results: @@ -411,33 +542,30 @@ class Keyring(object): defer.returnValue(union_of_keys) @defer.inlineCallbacks - def get_keys_from_server(self, server_name_and_key_ids): - results = yield logcontext.make_deferred_yieldable(defer.gatherResults( - [ - run_in_background( - self.get_server_verify_key_v2_direct, - server_name, - key_ids, - ) - for server_name, key_ids in server_name_and_key_ids - ], - consumeErrors=True, - ).addErrback(unwrapFirstError)) - - merged = {} - for result in results: - merged.update(result) + def get_server_verify_key_v2_indirect( + self, server_names_and_key_ids, perspective_name, perspective_keys + ): + """ + Args: + server_names_and_key_ids (iterable[Tuple[str, iterable[str]]]): + list of (server_name, iterable[key_id]) tuples to fetch keys for + perspective_name (str): name of the notary server to query for the keys + perspective_keys (dict[str, VerifyKey]): map of key_id->key for the + notary server - defer.returnValue({ - server_name: keys - for server_name, keys in merged.items() - if keys - }) + Returns: + Deferred[dict[str, dict[str, synapse.storage.keys.FetchKeyResult]]]: map + from server_name -> key_id -> FetchKeyResult - @defer.inlineCallbacks - def get_server_verify_key_v2_indirect(self, server_names_and_key_ids, - perspective_name, - perspective_keys): + Raises: + KeyLookupError if there was an error processing the entire response from + the server + """ + logger.info( + "Requesting keys %s from notary server %s", + server_names_and_key_ids, + perspective_name, + ) # TODO(mark): Set the minimum_valid_until_ts to that needed by # the events being validated or the current time if validating # an incoming request. @@ -448,9 +576,7 @@ class Keyring(object): data={ u"server_keys": { server_name: { - key_id: { - u"minimum_valid_until_ts": 0 - } for key_id in key_ids + key_id: {u"minimum_valid_until_ts": 0} for key_id in key_ids } for server_name, key_ids in server_names_and_key_ids } @@ -458,240 +584,174 @@ class Keyring(object): long_retries=True, ) except (NotRetryingDestination, RequestSendFailed) as e: - raise_from( - KeyLookupError("Failed to connect to remote server"), e, - ) + raise_from(KeyLookupError("Failed to connect to remote server"), e) except HttpResponseException as e: - raise_from( - KeyLookupError("Remote server returned an error"), e, - ) + raise_from(KeyLookupError("Remote server returned an error"), e) keys = {} + added_keys = [] - responses = query_response["server_keys"] + time_now_ms = self.clock.time_msec() - for response in responses: - if (u"signatures" not in response - or perspective_name not in response[u"signatures"]): + for response in query_response["server_keys"]: + # do this first, so that we can give useful errors thereafter + server_name = response.get("server_name") + if not isinstance(server_name, six.string_types): raise KeyLookupError( - "Key response not signed by perspective server" - " %r" % (perspective_name,) + "Malformed response from key notary server %s: invalid server_name" + % (perspective_name,) ) - verified = False - for key_id in response[u"signatures"][perspective_name]: - if key_id in perspective_keys: - verify_signed_json( - response, - perspective_name, - perspective_keys[key_id] - ) - verified = True - - if not verified: - logging.info( - "Response from perspective server %r not signed with a" - " known key, signed with: %r, known keys: %r", + try: + processed_response = yield self._process_perspectives_response( perspective_name, - list(response[u"signatures"][perspective_name]), - list(perspective_keys) + perspective_keys, + response, + time_added_ms=time_now_ms, ) - raise KeyLookupError( - "Response not signed with a known key for perspective" - " server %r" % (perspective_name,) + except KeyLookupError as e: + logger.warning( + "Error processing response from key notary server %s for origin " + "server %s: %s", + perspective_name, + server_name, + e, ) + # we continue to process the rest of the response + continue - processed_response = yield self.process_v2_response( - perspective_name, response + added_keys.extend( + (server_name, key_id, key) for key_id, key in processed_response.items() ) - server_name = response["server_name"] - keys.setdefault(server_name, {}).update(processed_response) - yield logcontext.make_deferred_yieldable(defer.gatherResults( - [ - run_in_background( - self.store_keys, - server_name=server_name, - from_server=perspective_name, - verify_keys=response_keys, - ) - for server_name, response_keys in keys.items() - ], - consumeErrors=True - ).addErrback(unwrapFirstError)) + yield self.store.store_server_verify_keys( + perspective_name, time_now_ms, added_keys + ) defer.returnValue(keys) + def _process_perspectives_response( + self, perspective_name, perspective_keys, response, time_added_ms + ): + """Parse a 'Server Keys' structure from the result of a /key/query request + + Checks that the entry is correctly signed by the perspectives server, and then + passes over to process_v2_response + + Args: + perspective_name (str): the name of the notary server that produced this + result + + perspective_keys (dict[str, VerifyKey]): map of key_id->key for the + notary server + + response (dict): the json-decoded Server Keys response object + + time_added_ms (int): the timestamp to record in server_keys_json + + Returns: + Deferred[dict[str, FetchKeyResult]]: map from key_id to result object + """ + if ( + u"signatures" not in response + or perspective_name not in response[u"signatures"] + ): + raise KeyLookupError("Response not signed by the notary server") + + verified = False + for key_id in response[u"signatures"][perspective_name]: + if key_id in perspective_keys: + verify_signed_json(response, perspective_name, perspective_keys[key_id]) + verified = True + + if not verified: + raise KeyLookupError( + "Response not signed with a known key: signed with: %r, known keys: %r" + % ( + list(response[u"signatures"][perspective_name].keys()), + list(perspective_keys.keys()), + ) + ) + + return self.process_v2_response( + perspective_name, response, time_added_ms=time_added_ms + ) + + +class ServerKeyFetcher(BaseV2KeyFetcher): + """KeyFetcher impl which fetches keys from the origin servers""" + + def __init__(self, hs): + super(ServerKeyFetcher, self).__init__(hs) + self.clock = hs.get_clock() + self.client = hs.get_http_client() + + @defer.inlineCallbacks + def get_keys(self, server_name_and_key_ids): + """see KeyFetcher.get_keys""" + results = yield logcontext.make_deferred_yieldable( + defer.gatherResults( + [ + run_in_background( + self.get_server_verify_key_v2_direct, server_name, key_ids + ) + for server_name, key_ids in server_name_and_key_ids + ], + consumeErrors=True, + ).addErrback(unwrapFirstError) + ) + + merged = {} + for result in results: + merged.update(result) + + defer.returnValue( + {server_name: keys for server_name, keys in merged.items() if keys} + ) + @defer.inlineCallbacks def get_server_verify_key_v2_direct(self, server_name, key_ids): - keys = {} # type: dict[str, nacl.signing.VerifyKey] + keys = {} # type: dict[str, FetchKeyResult] for requested_key_id in key_ids: if requested_key_id in keys: continue + time_now_ms = self.clock.time_msec() try: response = yield self.client.get_json( destination=server_name, - path="/_matrix/key/v2/server/" + urllib.parse.quote(requested_key_id), + path="/_matrix/key/v2/server/" + + urllib.parse.quote(requested_key_id), ignore_backoff=True, ) except (NotRetryingDestination, RequestSendFailed) as e: - raise_from( - KeyLookupError("Failed to connect to remote server"), e, - ) + raise_from(KeyLookupError("Failed to connect to remote server"), e) except HttpResponseException as e: - raise_from( - KeyLookupError("Remote server returned an error"), e, - ) - - if (u"signatures" not in response - or server_name not in response[u"signatures"]): - raise KeyLookupError("Key response not signed by remote server") + raise_from(KeyLookupError("Remote server returned an error"), e) if response["server_name"] != server_name: - raise KeyLookupError("Expected a response for server %r not %r" % ( - server_name, response["server_name"] - )) + raise KeyLookupError( + "Expected a response for server %r not %r" + % (server_name, response["server_name"]) + ) response_keys = yield self.process_v2_response( from_server=server_name, requested_ids=[requested_key_id], response_json=response, + time_added_ms=time_now_ms, + ) + yield self.store.store_server_verify_keys( + server_name, + time_now_ms, + ((server_name, key_id, key) for key_id, key in response_keys.items()), ) - keys.update(response_keys) - yield self.store_keys( - server_name=server_name, - from_server=server_name, - verify_keys=keys, - ) defer.returnValue({server_name: keys}) - @defer.inlineCallbacks - def process_v2_response( - self, from_server, response_json, requested_ids=[], - ): - """Parse a 'Server Keys' structure from the result of a /key request - - This is used to parse either the entirety of the response from - GET /_matrix/key/v2/server, or a single entry from the list returned by - POST /_matrix/key/v2/query. - - Checks that each signature in the response that claims to come from the origin - server is valid. (Does not check that there actually is such a signature, for - some reason.) - - Stores the json in server_keys_json so that it can be used for future responses - to /_matrix/key/v2/query. - - Args: - from_server (str): the name of the server producing this result: either - the origin server for a /_matrix/key/v2/server request, or the notary - for a /_matrix/key/v2/query. - - response_json (dict): the json-decoded Server Keys response object - - requested_ids (iterable[str]): a list of the key IDs that were requested. - We will store the json for these key ids as well as any that are - actually in the response - - Returns: - Deferred[dict[str, nacl.signing.VerifyKey]]: - map from key_id to key object - """ - time_now_ms = self.clock.time_msec() - response_keys = {} - verify_keys = {} - for key_id, key_data in response_json["verify_keys"].items(): - if is_signing_algorithm_supported(key_id): - key_base64 = key_data["key"] - key_bytes = decode_base64(key_base64) - verify_key = decode_verify_key_bytes(key_id, key_bytes) - verify_key.time_added = time_now_ms - verify_keys[key_id] = verify_key - - old_verify_keys = {} - for key_id, key_data in response_json["old_verify_keys"].items(): - if is_signing_algorithm_supported(key_id): - key_base64 = key_data["key"] - key_bytes = decode_base64(key_base64) - verify_key = decode_verify_key_bytes(key_id, key_bytes) - verify_key.expired = key_data["expired_ts"] - verify_key.time_added = time_now_ms - old_verify_keys[key_id] = verify_key - - server_name = response_json["server_name"] - for key_id in response_json["signatures"].get(server_name, {}): - if key_id not in response_json["verify_keys"]: - raise KeyLookupError( - "Key response must include verification keys for all" - " signatures" - ) - if key_id in verify_keys: - verify_signed_json( - response_json, - server_name, - verify_keys[key_id] - ) - - signed_key_json = sign_json( - response_json, - self.config.server_name, - self.config.signing_key[0], - ) - - signed_key_json_bytes = encode_canonical_json(signed_key_json) - ts_valid_until_ms = signed_key_json[u"valid_until_ts"] - - updated_key_ids = set(requested_ids) - updated_key_ids.update(verify_keys) - updated_key_ids.update(old_verify_keys) - - response_keys.update(verify_keys) - response_keys.update(old_verify_keys) - - yield logcontext.make_deferred_yieldable(defer.gatherResults( - [ - run_in_background( - self.store.store_server_keys_json, - server_name=server_name, - key_id=key_id, - from_server=from_server, - ts_now_ms=time_now_ms, - ts_expires_ms=ts_valid_until_ms, - key_json_bytes=signed_key_json_bytes, - ) - for key_id in updated_key_ids - ], - consumeErrors=True, - ).addErrback(unwrapFirstError)) - - defer.returnValue(response_keys) - - def store_keys(self, server_name, from_server, verify_keys): - """Store a collection of verify keys for a given server - Args: - server_name(str): The name of the server the keys are for. - from_server(str): The server the keys were downloaded from. - verify_keys(dict): A mapping of key_id to VerifyKey. - Returns: - A deferred that completes when the keys are stored. - """ - # TODO(markjh): Store whether the keys have expired. - return logcontext.make_deferred_yieldable(defer.gatherResults( - [ - run_in_background( - self.store.store_server_verify_key, - server_name, server_name, key.time_added, key - ) - for key_id, key in verify_keys.items() - ], - consumeErrors=True, - ).addErrback(unwrapFirstError)) - @defer.inlineCallbacks def _handle_key_deferred(verify_request): @@ -713,17 +773,19 @@ def _handle_key_deferred(verify_request): except KeyLookupError as e: logger.warn( "Failed to download keys for %s: %s %s", - server_name, type(e).__name__, str(e), + server_name, + type(e).__name__, + str(e), ) raise SynapseError( - 502, - "Error downloading keys for %s" % (server_name,), - Codes.UNAUTHORIZED, + 502, "Error downloading keys for %s" % (server_name,), Codes.UNAUTHORIZED ) except Exception as e: logger.exception( "Got Exception when downloading keys for %s: %s %s", - server_name, type(e).__name__, str(e), + server_name, + type(e).__name__, + str(e), ) raise SynapseError( 401, @@ -733,22 +795,24 @@ def _handle_key_deferred(verify_request): json_object = verify_request.json_object - logger.debug("Got key %s %s:%s for server %s, verifying" % ( - key_id, verify_key.alg, verify_key.version, server_name, - )) + logger.debug( + "Got key %s %s:%s for server %s, verifying" + % (key_id, verify_key.alg, verify_key.version, server_name) + ) try: verify_signed_json(json_object, server_name, verify_key) except SignatureVerifyException as e: logger.debug( "Error verifying signature for %s:%s:%s with key %s: %s", - server_name, verify_key.alg, verify_key.version, + server_name, + verify_key.alg, + verify_key.version, encode_verify_key_base64(verify_key), str(e), ) raise SynapseError( 401, - "Invalid signature for server %s with key %s:%s: %s" % ( - server_name, verify_key.alg, verify_key.version, str(e), - ), + "Invalid signature for server %s with key %s:%s: %s" + % (server_name, verify_key.alg, verify_key.version, str(e)), Codes.UNAUTHORIZED, ) diff --git a/synapse/events/builder.py b/synapse/events/builder.py index 1fe995f212..546b6f4982 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -76,6 +76,7 @@ class EventBuilder(object): # someone tries to get them when they don't exist. _state_key = attr.ib(default=None) _redacts = attr.ib(default=None) + _origin_server_ts = attr.ib(default=None) internal_metadata = attr.ib(default=attr.Factory(lambda: _EventInternalMetadata({}))) @@ -142,6 +143,9 @@ class EventBuilder(object): if self._redacts is not None: event_dict["redacts"] = self._redacts + if self._origin_server_ts is not None: + event_dict["origin_server_ts"] = self._origin_server_ts + defer.returnValue( create_local_event_from_event_dict( clock=self._clock, @@ -209,6 +213,7 @@ class EventBuilderFactory(object): content=key_values.get("content", {}), unsigned=key_values.get("unsigned", {}), redacts=key_values.get("redacts", None), + origin_server_ts=key_values.get("origin_server_ts", None), ) @@ -245,7 +250,7 @@ def create_local_event_from_event_dict(clock, hostname, signing_key, event_dict["event_id"] = _create_event_id(clock, hostname) event_dict["origin"] = hostname - event_dict["origin_server_ts"] = time_now + event_dict.setdefault("origin_server_ts", time_now) event_dict.setdefault("unsigned", {}) age = event_dict["unsigned"].pop("age", 0) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 27a2a9ef98..e2d4384de1 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -330,12 +330,13 @@ class EventClientSerializer(object): ) @defer.inlineCallbacks - def serialize_event(self, event, time_now, **kwargs): + def serialize_event(self, event, time_now, bundle_aggregations=True, **kwargs): """Serializes a single event. Args: event (EventBase) time_now (int): The current time in milliseconds + bundle_aggregations (bool): Whether to bundle in related events **kwargs: Arguments to pass to `serialize_event` Returns: @@ -350,7 +351,7 @@ class EventClientSerializer(object): # If MSC1849 is enabled then we need to look if thre are any relations # we need to bundle in with the event - if self.experimental_msc1849_support_enabled: + if self.experimental_msc1849_support_enabled and bundle_aggregations: annotations = yield self.store.get_aggregation_groups_for_event( event_id, ) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 385eda2dca..d0efc4e0d3 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -23,7 +23,11 @@ from twisted.internet import defer import synapse from synapse.api.errors import Codes, FederationDeniedError, SynapseError from synapse.api.room_versions import RoomVersions -from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX +from synapse.api.urls import ( + FEDERATION_UNSTABLE_PREFIX, + FEDERATION_V1_PREFIX, + FEDERATION_V2_PREFIX, +) from synapse.http.endpoint import parse_and_validate_server_name from synapse.http.server import JsonResource from synapse.http.servlet import ( @@ -1304,6 +1308,30 @@ class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet): defer.returnValue((200, new_content)) +class RoomComplexityServlet(BaseFederationServlet): + """ + Indicates to other servers how complex (and therefore likely + resource-intensive) a public room this server knows about is. + """ + PATH = "/rooms/(?P<room_id>[^/]*)/complexity" + PREFIX = FEDERATION_UNSTABLE_PREFIX + + @defer.inlineCallbacks + def on_GET(self, origin, content, query, room_id): + + store = self.handler.hs.get_datastore() + + is_public = yield store.is_room_world_readable_or_publicly_joinable( + room_id + ) + + if not is_public: + raise SynapseError(404, "Room not found", errcode=Codes.INVALID_PARAM) + + complexity = yield store.get_room_complexity(room_id) + defer.returnValue((200, complexity)) + + FEDERATION_SERVLET_CLASSES = ( FederationSendServlet, FederationEventServlet, @@ -1327,6 +1355,7 @@ FEDERATION_SERVLET_CLASSES = ( FederationThirdPartyInviteExchangeServlet, On3pidBindServlet, FederationVersionServlet, + RoomComplexityServlet, ) OPENID_SERVLET_CLASSES = ( diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 6003ad9cca..eb525070cf 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -122,6 +122,9 @@ class EventStreamHandler(BaseHandler): chunks = yield self._event_serializer.serialize_events( events, time_now, as_client_event=as_client_event, + # We don't bundle "live" events, as otherwise clients + # will end up double counting annotations. + bundle_aggregations=False, ) chunk = { diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2202ed699a..cf4fad7de0 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2013,15 +2013,44 @@ class FederationHandler(BaseHandler): Args: origin (str): - event (synapse.events.FrozenEvent): + event (synapse.events.EventBase): context (synapse.events.snapshot.EventContext): - auth_events (dict[(str, str)->str]): + auth_events (dict[(str, str)->synapse.events.EventBase]): + Map from (event_type, state_key) to event + + What we expect the event's auth_events to be, based on the event's + position in the dag. I think? maybe?? + + Also NB that this function adds entries to it. + Returns: + defer.Deferred[None] + """ + room_version = yield self.store.get_room_version(event.room_id) + + yield self._update_auth_events_and_context_for_auth( + origin, event, context, auth_events + ) + try: + self.auth.check(room_version, event, auth_events=auth_events) + except AuthError as e: + logger.warn("Failed auth resolution for %r because %s", event, e) + raise e + + @defer.inlineCallbacks + def _update_auth_events_and_context_for_auth( + self, origin, event, context, auth_events + ): + """Helper for do_auth. See there for docs. + + Args: + origin (str): + event (synapse.events.EventBase): + context (synapse.events.snapshot.EventContext): + auth_events (dict[(str, str)->synapse.events.EventBase]): Returns: defer.Deferred[None] """ - # Check if we have all the auth events. - current_state = set(e.event_id for e in auth_events.values()) event_auth_events = set(event.auth_event_ids()) if event.is_state(): @@ -2029,11 +2058,21 @@ class FederationHandler(BaseHandler): else: event_key = None - if event_auth_events - current_state: + # if the event's auth_events refers to events which are not in our + # calculated auth_events, we need to fetch those events from somewhere. + # + # we start by fetching them from the store, and then try calling /event_auth/. + missing_auth = event_auth_events.difference( + e.event_id for e in auth_events.values() + ) + + if missing_auth: # TODO: can we use store.have_seen_events here instead? have_events = yield self.store.get_seen_events_with_rejections( - event_auth_events - current_state + missing_auth ) + logger.debug("Got events %s from store", have_events) + missing_auth.difference_update(have_events.keys()) else: have_events = {} @@ -2042,13 +2081,12 @@ class FederationHandler(BaseHandler): for e in auth_events.values() }) - seen_events = set(have_events.keys()) - - missing_auth = event_auth_events - seen_events - current_state - if missing_auth: - logger.info("Missing auth: %s", missing_auth) # If we don't have all the auth events, we need to get them. + logger.info( + "auth_events contains unknown events: %s", + missing_auth, + ) try: remote_auth_chain = yield self.federation_client.get_event_auth( origin, event.room_id, event.event_id @@ -2089,145 +2127,168 @@ class FederationHandler(BaseHandler): have_events = yield self.store.get_seen_events_with_rejections( event.auth_event_ids() ) - seen_events = set(have_events.keys()) except Exception: # FIXME: logger.exception("Failed to get auth chain") + if event.internal_metadata.is_outlier(): + logger.info("Skipping auth_event fetch for outlier") + return + # FIXME: Assumes we have and stored all the state for all the # prev_events - current_state = set(e.event_id for e in auth_events.values()) - different_auth = event_auth_events - current_state + different_auth = event_auth_events.difference( + e.event_id for e in auth_events.values() + ) - room_version = yield self.store.get_room_version(event.room_id) + if not different_auth: + return - if different_auth and not event.internal_metadata.is_outlier(): - # Do auth conflict res. - logger.info("Different auth: %s", different_auth) - - different_events = yield logcontext.make_deferred_yieldable( - defer.gatherResults([ - logcontext.run_in_background( - self.store.get_event, - d, - allow_none=True, - allow_rejected=False, - ) - for d in different_auth - if d in have_events and not have_events[d] - ], consumeErrors=True) - ).addErrback(unwrapFirstError) - - if different_events: - local_view = dict(auth_events) - remote_view = dict(auth_events) - remote_view.update({ - (d.type, d.state_key): d for d in different_events if d - }) + logger.info( + "auth_events refers to events which are not in our calculated auth " + "chain: %s", + different_auth, + ) + + room_version = yield self.store.get_room_version(event.room_id) - new_state = yield self.state_handler.resolve_events( - room_version, - [list(local_view.values()), list(remote_view.values())], - event + different_events = yield logcontext.make_deferred_yieldable( + defer.gatherResults([ + logcontext.run_in_background( + self.store.get_event, + d, + allow_none=True, + allow_rejected=False, ) + for d in different_auth + if d in have_events and not have_events[d] + ], consumeErrors=True) + ).addErrback(unwrapFirstError) + + if different_events: + local_view = dict(auth_events) + remote_view = dict(auth_events) + remote_view.update({ + (d.type, d.state_key): d for d in different_events if d + }) - auth_events.update(new_state) + new_state = yield self.state_handler.resolve_events( + room_version, + [list(local_view.values()), list(remote_view.values())], + event + ) - current_state = set(e.event_id for e in auth_events.values()) - different_auth = event_auth_events - current_state + logger.info( + "After state res: updating auth_events with new state %s", + { + (d.type, d.state_key): d.event_id for d in new_state.values() + if auth_events.get((d.type, d.state_key)) != d + }, + ) - yield self._update_context_for_auth_events( - event, context, auth_events, event_key, - ) + auth_events.update(new_state) + + different_auth = event_auth_events.difference( + e.event_id for e in auth_events.values() + ) - if different_auth and not event.internal_metadata.is_outlier(): - logger.info("Different auth after resolution: %s", different_auth) + yield self._update_context_for_auth_events( + event, context, auth_events, event_key, + ) - # Only do auth resolution if we have something new to say. - # We can't rove an auth failure. - do_resolution = False + if not different_auth: + # we're done + return - provable = [ - RejectedReason.NOT_ANCESTOR, RejectedReason.NOT_ANCESTOR, - ] + logger.info( + "auth_events still refers to events which are not in the calculated auth " + "chain after state resolution: %s", + different_auth, + ) - for e_id in different_auth: - if e_id in have_events: - if have_events[e_id] in provable: - do_resolution = True - break + # Only do auth resolution if we have something new to say. + # We can't prove an auth failure. + do_resolution = False - if do_resolution: - prev_state_ids = yield context.get_prev_state_ids(self.store) - # 1. Get what we think is the auth chain. - auth_ids = yield self.auth.compute_auth_events( - event, prev_state_ids - ) - local_auth_chain = yield self.store.get_auth_chain( - auth_ids, include_given=True - ) + for e_id in different_auth: + if e_id in have_events: + if have_events[e_id] == RejectedReason.NOT_ANCESTOR: + do_resolution = True + break - try: - # 2. Get remote difference. - result = yield self.federation_client.query_auth( - origin, - event.room_id, - event.event_id, - local_auth_chain, - ) + if not do_resolution: + logger.info( + "Skipping auth resolution due to lack of provable rejection reasons" + ) + return - seen_remotes = yield self.store.have_seen_events( - [e.event_id for e in result["auth_chain"]] - ) + logger.info("Doing auth resolution") - # 3. Process any remote auth chain events we haven't seen. - for ev in result["auth_chain"]: - if ev.event_id in seen_remotes: - continue + prev_state_ids = yield context.get_prev_state_ids(self.store) - if ev.event_id == event.event_id: - continue + # 1. Get what we think is the auth chain. + auth_ids = yield self.auth.compute_auth_events( + event, prev_state_ids + ) + local_auth_chain = yield self.store.get_auth_chain( + auth_ids, include_given=True + ) - try: - auth_ids = ev.auth_event_ids() - auth = { - (e.type, e.state_key): e - for e in result["auth_chain"] - if e.event_id in auth_ids - or event.type == EventTypes.Create - } - ev.internal_metadata.outlier = True + try: + # 2. Get remote difference. + result = yield self.federation_client.query_auth( + origin, + event.room_id, + event.event_id, + local_auth_chain, + ) - logger.debug( - "do_auth %s different_auth: %s", - event.event_id, e.event_id - ) + seen_remotes = yield self.store.have_seen_events( + [e.event_id for e in result["auth_chain"]] + ) - yield self._handle_new_event( - origin, ev, auth_events=auth - ) + # 3. Process any remote auth chain events we haven't seen. + for ev in result["auth_chain"]: + if ev.event_id in seen_remotes: + continue - if ev.event_id in event_auth_events: - auth_events[(ev.type, ev.state_key)] = ev - except AuthError: - pass + if ev.event_id == event.event_id: + continue - except Exception: - # FIXME: - logger.exception("Failed to query auth chain") + try: + auth_ids = ev.auth_event_ids() + auth = { + (e.type, e.state_key): e + for e in result["auth_chain"] + if e.event_id in auth_ids + or event.type == EventTypes.Create + } + ev.internal_metadata.outlier = True + + logger.debug( + "do_auth %s different_auth: %s", + event.event_id, e.event_id + ) - # 4. Look at rejects and their proofs. - # TODO. + yield self._handle_new_event( + origin, ev, auth_events=auth + ) - yield self._update_context_for_auth_events( - event, context, auth_events, event_key, - ) + if ev.event_id in event_auth_events: + auth_events[(ev.type, ev.state_key)] = ev + except AuthError: + pass - try: - self.auth.check(room_version, event, auth_events=auth_events) - except AuthError as e: - logger.warn("Failed auth resolution for %r because %s", event, e) - raise e + except Exception: + # FIXME: + logger.exception("Failed to query auth chain") + + # 4. Look at rejects and their proofs. + # TODO. + + yield self._update_context_for_auth_events( + event, context, auth_events, event_key, + ) @defer.inlineCallbacks def _update_context_for_auth_events(self, event, context, auth_events, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 792edc7579..0b02469ceb 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -166,6 +166,9 @@ class MessageHandler(object): now = self.clock.time_msec() events = yield self._event_serializer.serialize_events( room_state.values(), now, + # We don't bother bundling aggregations in when asked for state + # events, as clients won't use them. + bundle_aggregations=False, ) defer.returnValue(events) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 59d53f1050..6209858bbb 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -182,17 +182,27 @@ class PresenceHandler(object): # Start a LoopingCall in 30s that fires every 5s. # The initial delay is to allow disconnected clients a chance to # reconnect before we treat them as offline. + def run_timeout_handler(): + return run_as_background_process( + "handle_presence_timeouts", self._handle_timeouts + ) + self.clock.call_later( 30, self.clock.looping_call, - self._handle_timeouts, + run_timeout_handler, 5000, ) + def run_persister(): + return run_as_background_process( + "persist_presence_changes", self._persist_unpersisted_changes + ) + self.clock.call_later( 60, self.clock.looping_call, - self._persist_unpersisted_changes, + run_persister, 60 * 1000, ) @@ -229,6 +239,7 @@ class PresenceHandler(object): ) if self.unpersisted_users_changes: + yield self.store.update_presence([ self.user_to_current_state[user_id] for user_id in self.unpersisted_users_changes @@ -240,30 +251,18 @@ class PresenceHandler(object): """We periodically persist the unpersisted changes, as otherwise they may stack up and slow down shutdown times. """ - logger.info( - "Performing _persist_unpersisted_changes. Persisting %d unpersisted changes", - len(self.unpersisted_users_changes) - ) - unpersisted = self.unpersisted_users_changes self.unpersisted_users_changes = set() if unpersisted: + logger.info( + "Persisting %d upersisted presence updates", len(unpersisted) + ) yield self.store.update_presence([ self.user_to_current_state[user_id] for user_id in unpersisted ]) - logger.info("Finished _persist_unpersisted_changes") - - @defer.inlineCallbacks - def _update_states_and_catch_exception(self, new_states): - try: - res = yield self._update_states(new_states) - defer.returnValue(res) - except Exception: - logger.exception("Error updating presence") - @defer.inlineCallbacks def _update_states(self, new_states): """Updates presence of users. Sets the appropriate timeouts. Pokes @@ -338,45 +337,41 @@ class PresenceHandler(object): logger.info("Handling presence timeouts") now = self.clock.time_msec() - try: - with Measure(self.clock, "presence_handle_timeouts"): - # Fetch the list of users that *may* have timed out. Things may have - # changed since the timeout was set, so we won't necessarily have to - # take any action. - users_to_check = set(self.wheel_timer.fetch(now)) - - # Check whether the lists of syncing processes from an external - # process have expired. - expired_process_ids = [ - process_id for process_id, last_update - in self.external_process_last_updated_ms.items() - if now - last_update > EXTERNAL_PROCESS_EXPIRY - ] - for process_id in expired_process_ids: - users_to_check.update( - self.external_process_last_updated_ms.pop(process_id, ()) - ) - self.external_process_last_update.pop(process_id) + # Fetch the list of users that *may* have timed out. Things may have + # changed since the timeout was set, so we won't necessarily have to + # take any action. + users_to_check = set(self.wheel_timer.fetch(now)) + + # Check whether the lists of syncing processes from an external + # process have expired. + expired_process_ids = [ + process_id for process_id, last_update + in self.external_process_last_updated_ms.items() + if now - last_update > EXTERNAL_PROCESS_EXPIRY + ] + for process_id in expired_process_ids: + users_to_check.update( + self.external_process_last_updated_ms.pop(process_id, ()) + ) + self.external_process_last_update.pop(process_id) - states = [ - self.user_to_current_state.get( - user_id, UserPresenceState.default(user_id) - ) - for user_id in users_to_check - ] + states = [ + self.user_to_current_state.get( + user_id, UserPresenceState.default(user_id) + ) + for user_id in users_to_check + ] - timers_fired_counter.inc(len(states)) + timers_fired_counter.inc(len(states)) - changes = handle_timeouts( - states, - is_mine_fn=self.is_mine_id, - syncing_user_ids=self.get_currently_syncing_users(), - now=now, - ) + changes = handle_timeouts( + states, + is_mine_fn=self.is_mine_id, + syncing_user_ids=self.get_currently_syncing_users(), + now=now, + ) - run_in_background(self._update_states_and_catch_exception, changes) - except Exception: - logger.exception("Exception in _handle_timeouts loop") + return self._update_states(changes) @defer.inlineCallbacks def bump_presence_active_time(self, user): diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index e37ae96899..4a17911a87 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -27,7 +27,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError -from synapse.api.room_versions import DEFAULT_ROOM_VERSION, KNOWN_ROOM_VERSIONS +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.storage.state import StateFilter from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID from synapse.util import stringutils @@ -70,6 +70,7 @@ class RoomCreationHandler(BaseHandler): self.spam_checker = hs.get_spam_checker() self.event_creation_handler = hs.get_event_creation_handler() self.room_member_handler = hs.get_room_member_handler() + self.config = hs.config # linearizer to stop two upgrades happening at once self._upgrade_linearizer = Linearizer("room_upgrade_linearizer") @@ -475,7 +476,11 @@ class RoomCreationHandler(BaseHandler): if ratelimit: yield self.ratelimit(requester) - room_version = config.get("room_version", DEFAULT_ROOM_VERSION.identifier) + room_version = config.get( + "room_version", + self.config.default_room_version.identifier, + ) + if not isinstance(room_version, string_types): raise SynapseError( 400, diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py new file mode 100644 index 0000000000..0e92b405ba --- /dev/null +++ b/synapse/handlers/stats.py @@ -0,0 +1,325 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.api.constants import EventTypes, JoinRules, Membership +from synapse.handlers.state_deltas import StateDeltasHandler +from synapse.metrics import event_processing_positions +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import UserID +from synapse.util.metrics import Measure + +logger = logging.getLogger(__name__) + + +class StatsHandler(StateDeltasHandler): + """Handles keeping the *_stats tables updated with a simple time-series of + information about the users, rooms and media on the server, such that admins + have some idea of who is consuming their resources. + + Heavily derived from UserDirectoryHandler + """ + + def __init__(self, hs): + super(StatsHandler, self).__init__(hs) + self.hs = hs + self.store = hs.get_datastore() + self.state = hs.get_state_handler() + self.server_name = hs.hostname + self.clock = hs.get_clock() + self.notifier = hs.get_notifier() + self.is_mine_id = hs.is_mine_id + self.stats_bucket_size = hs.config.stats_bucket_size + + # The current position in the current_state_delta stream + self.pos = None + + # Guard to ensure we only process deltas one at a time + self._is_processing = False + + if hs.config.stats_enabled: + self.notifier.add_replication_callback(self.notify_new_event) + + # We kick this off so that we don't have to wait for a change before + # we start populating stats + self.clock.call_later(0, self.notify_new_event) + + def notify_new_event(self): + """Called when there may be more deltas to process + """ + if not self.hs.config.stats_enabled: + return + + if self._is_processing: + return + + @defer.inlineCallbacks + def process(): + try: + yield self._unsafe_process() + finally: + self._is_processing = False + + self._is_processing = True + run_as_background_process("stats.notify_new_event", process) + + @defer.inlineCallbacks + def _unsafe_process(self): + # If self.pos is None then means we haven't fetched it from DB + if self.pos is None: + self.pos = yield self.store.get_stats_stream_pos() + + # If still None then the initial background update hasn't happened yet + if self.pos is None: + defer.returnValue(None) + + # Loop round handling deltas until we're up to date + while True: + with Measure(self.clock, "stats_delta"): + deltas = yield self.store.get_current_state_deltas(self.pos) + if not deltas: + return + + logger.info("Handling %d state deltas", len(deltas)) + yield self._handle_deltas(deltas) + + self.pos = deltas[-1]["stream_id"] + yield self.store.update_stats_stream_pos(self.pos) + + event_processing_positions.labels("stats").set(self.pos) + + @defer.inlineCallbacks + def _handle_deltas(self, deltas): + """ + Called with the state deltas to process + """ + for delta in deltas: + typ = delta["type"] + state_key = delta["state_key"] + room_id = delta["room_id"] + event_id = delta["event_id"] + stream_id = delta["stream_id"] + prev_event_id = delta["prev_event_id"] + + logger.debug("Handling: %r %r, %s", typ, state_key, event_id) + + token = yield self.store.get_earliest_token_for_room_stats(room_id) + + # If the earliest token to begin from is larger than our current + # stream ID, skip processing this delta. + if token is not None and token >= stream_id: + logger.debug( + "Ignoring: %s as earlier than this room's initial ingestion event", + event_id, + ) + continue + + if event_id is None and prev_event_id is None: + # Errr... + continue + + event_content = {} + + if event_id is not None: + event_content = (yield self.store.get_event(event_id)).content or {} + + # quantise time to the nearest bucket + now = yield self.store.get_received_ts(event_id) + now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size + + if typ == EventTypes.Member: + # we could use _get_key_change here but it's a bit inefficient + # given we're not testing for a specific result; might as well + # just grab the prev_membership and membership strings and + # compare them. + prev_event_content = {} + if prev_event_id is not None: + prev_event_content = ( + yield self.store.get_event(prev_event_id) + ).content + + membership = event_content.get("membership", Membership.LEAVE) + prev_membership = prev_event_content.get("membership", Membership.LEAVE) + + if prev_membership == membership: + continue + + if prev_membership == Membership.JOIN: + yield self.store.update_stats_delta( + now, "room", room_id, "joined_members", -1 + ) + elif prev_membership == Membership.INVITE: + yield self.store.update_stats_delta( + now, "room", room_id, "invited_members", -1 + ) + elif prev_membership == Membership.LEAVE: + yield self.store.update_stats_delta( + now, "room", room_id, "left_members", -1 + ) + elif prev_membership == Membership.BAN: + yield self.store.update_stats_delta( + now, "room", room_id, "banned_members", -1 + ) + else: + err = "%s is not a valid prev_membership" % (repr(prev_membership),) + logger.error(err) + raise ValueError(err) + + if membership == Membership.JOIN: + yield self.store.update_stats_delta( + now, "room", room_id, "joined_members", +1 + ) + elif membership == Membership.INVITE: + yield self.store.update_stats_delta( + now, "room", room_id, "invited_members", +1 + ) + elif membership == Membership.LEAVE: + yield self.store.update_stats_delta( + now, "room", room_id, "left_members", +1 + ) + elif membership == Membership.BAN: + yield self.store.update_stats_delta( + now, "room", room_id, "banned_members", +1 + ) + else: + err = "%s is not a valid membership" % (repr(membership),) + logger.error(err) + raise ValueError(err) + + user_id = state_key + if self.is_mine_id(user_id): + # update user_stats as it's one of our users + public = yield self._is_public_room(room_id) + + if membership == Membership.LEAVE: + yield self.store.update_stats_delta( + now, + "user", + user_id, + "public_rooms" if public else "private_rooms", + -1, + ) + elif membership == Membership.JOIN: + yield self.store.update_stats_delta( + now, + "user", + user_id, + "public_rooms" if public else "private_rooms", + +1, + ) + + elif typ == EventTypes.Create: + # Newly created room. Add it with all blank portions. + yield self.store.update_room_state( + room_id, + { + "join_rules": None, + "history_visibility": None, + "encryption": None, + "name": None, + "topic": None, + "avatar": None, + "canonical_alias": None, + }, + ) + + elif typ == EventTypes.JoinRules: + yield self.store.update_room_state( + room_id, {"join_rules": event_content.get("join_rule")} + ) + + is_public = yield self._get_key_change( + prev_event_id, event_id, "join_rule", JoinRules.PUBLIC + ) + if is_public is not None: + yield self.update_public_room_stats(now, room_id, is_public) + + elif typ == EventTypes.RoomHistoryVisibility: + yield self.store.update_room_state( + room_id, + {"history_visibility": event_content.get("history_visibility")}, + ) + + is_public = yield self._get_key_change( + prev_event_id, event_id, "history_visibility", "world_readable" + ) + if is_public is not None: + yield self.update_public_room_stats(now, room_id, is_public) + + elif typ == EventTypes.Encryption: + yield self.store.update_room_state( + room_id, {"encryption": event_content.get("algorithm")} + ) + elif typ == EventTypes.Name: + yield self.store.update_room_state( + room_id, {"name": event_content.get("name")} + ) + elif typ == EventTypes.Topic: + yield self.store.update_room_state( + room_id, {"topic": event_content.get("topic")} + ) + elif typ == EventTypes.RoomAvatar: + yield self.store.update_room_state( + room_id, {"avatar": event_content.get("url")} + ) + elif typ == EventTypes.CanonicalAlias: + yield self.store.update_room_state( + room_id, {"canonical_alias": event_content.get("alias")} + ) + + @defer.inlineCallbacks + def update_public_room_stats(self, ts, room_id, is_public): + """ + Increment/decrement a user's number of public rooms when a room they are + in changes to/from public visibility. + + Args: + ts (int): Timestamp in seconds + room_id (str) + is_public (bool) + """ + # For now, blindly iterate over all local users in the room so that + # we can handle the whole problem of copying buckets over as needed + user_ids = yield self.store.get_users_in_room(room_id) + + for user_id in user_ids: + if self.hs.is_mine(UserID.from_string(user_id)): + yield self.store.update_stats_delta( + ts, "user", user_id, "public_rooms", +1 if is_public else -1 + ) + yield self.store.update_stats_delta( + ts, "user", user_id, "private_rooms", -1 if is_public else +1 + ) + + @defer.inlineCallbacks + def _is_public_room(self, room_id): + join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules) + history_visibility = yield self.state.get_current_state( + room_id, EventTypes.RoomHistoryVisibility + ) + + if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or ( + ( + history_visibility + and history_visibility.content.get("history_visibility") + == "world_readable" + ) + ): + defer.returnValue(True) + else: + defer.returnValue(False) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 7eefc7b1fc..8197619a78 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -711,10 +711,6 @@ class MatrixFederationHttpClient(object): RequestSendFailed: If there were problems connecting to the remote, due to e.g. DNS failures, connection timeouts etc. """ - logger.debug("get_json args: %s", args) - - logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail) - request = MatrixFederationRequest( method="GET", destination=destination, diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index 528125e737..197c652850 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -55,7 +55,7 @@ def parse_integer_from_args(args, name, default=None, required=False): return int(args[name][0]) except Exception: message = "Query parameter %r must be an integer" % (name,) - raise SynapseError(400, message) + raise SynapseError(400, message, errcode=Codes.INVALID_PARAM) else: if required: message = "Missing integer query parameter %r" % (name,) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index e3f828c4bb..f64baa4d58 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -74,14 +74,6 @@ REQUIREMENTS = [ "attrs>=17.4.0", "netaddr>=0.7.18", - - # requests is a transitive dep of treq, and urlib3 is a transitive dep - # of requests, as well as of sentry-sdk. - # - # As of requests 2.21, requests does not yet support urllib3 1.25. - # (If we do not pin it here, pip will give us the latest urllib3 - # due to the dep via sentry-sdk.) - "urllib3<1.25", ] CONDITIONAL_REQUIREMENTS = { diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index 744d85594f..d6c4dcdb18 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -822,10 +822,16 @@ class AdminRestResource(JsonResource): def __init__(self, hs): JsonResource.__init__(self, hs, canonical_json=False) + register_servlets(hs, self) - register_servlets_for_client_rest_resource(hs, self) - SendServerNoticeServlet(hs).register(self) - VersionServlet(hs).register(self) + +def register_servlets(hs, http_server): + """ + Register all the admin servlets. + """ + register_servlets_for_client_rest_resource(hs, http_server) + SendServerNoticeServlet(hs).register(http_server) + VersionServlet(hs).register(http_server) def register_servlets_for_client_rest_resource(hs, http_server): diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index 5180e9eaf1..029039c162 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -386,7 +386,7 @@ class CasRedirectServlet(RestServlet): b"redirectUrl": args[b"redirectUrl"][0] }).encode('ascii') hs_redirect_url = (self.cas_service_url + - b"/_matrix/client/api/v1/login/cas/ticket") + b"/_matrix/client/r0/login/cas/ticket") service_param = urllib.parse.urlencode({ b"service": b"%s?%s" % (hs_redirect_url, client_redirect_url_param) }).encode('ascii') @@ -395,7 +395,7 @@ class CasRedirectServlet(RestServlet): class CasTicketServlet(ClientV1RestServlet): - PATTERNS = client_path_patterns("/login/cas/ticket", releases=()) + PATTERNS = client_path_patterns("/login/cas/ticket") def __init__(self, hs): super(CasTicketServlet, self).__init__(hs) diff --git a/synapse/rest/client/v1/logout.py b/synapse/rest/client/v1/logout.py index 430c692336..ba20e75033 100644 --- a/synapse/rest/client/v1/logout.py +++ b/synapse/rest/client/v1/logout.py @@ -17,8 +17,6 @@ import logging from twisted.internet import defer -from synapse.api.errors import AuthError - from .base import ClientV1RestServlet, client_path_patterns logger = logging.getLogger(__name__) @@ -38,23 +36,16 @@ class LogoutRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_POST(self, request): - try: - requester = yield self.auth.get_user_by_req(request) - except AuthError: - # this implies the access token has already been deleted. - defer.returnValue((401, { - "errcode": "M_UNKNOWN_TOKEN", - "error": "Access Token unknown or expired" - })) + requester = yield self.auth.get_user_by_req(request) + + if requester.device_id is None: + # the acccess token wasn't associated with a device. + # Just delete the access token + access_token = self._auth.get_access_token_from_request(request) + yield self._auth_handler.delete_access_token(access_token) else: - if requester.device_id is None: - # the acccess token wasn't associated with a device. - # Just delete the access token - access_token = self._auth.get_access_token_from_request(request) - yield self._auth_handler.delete_access_token(access_token) - else: - yield self._device_handler.delete_device( - requester.user.to_string(), requester.device_id) + yield self._device_handler.delete_device( + requester.user.to_string(), requester.device_id) defer.returnValue((200, {})) diff --git a/synapse/rest/client/v2_alpha/capabilities.py b/synapse/rest/client/v2_alpha/capabilities.py index a868d06098..2b4892330c 100644 --- a/synapse/rest/client/v2_alpha/capabilities.py +++ b/synapse/rest/client/v2_alpha/capabilities.py @@ -16,7 +16,7 @@ import logging from twisted.internet import defer -from synapse.api.room_versions import DEFAULT_ROOM_VERSION, KNOWN_ROOM_VERSIONS +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.http.servlet import RestServlet from ._base import client_v2_patterns @@ -36,6 +36,7 @@ class CapabilitiesRestServlet(RestServlet): """ super(CapabilitiesRestServlet, self).__init__() self.hs = hs + self.config = hs.config self.auth = hs.get_auth() self.store = hs.get_datastore() @@ -48,7 +49,7 @@ class CapabilitiesRestServlet(RestServlet): response = { "capabilities": { "m.room_versions": { - "default": DEFAULT_ROOM_VERSION.identifier, + "default": self.config.default_room_version.identifier, "available": { v.identifier: v.disposition for v in KNOWN_ROOM_VERSIONS.values() diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index c701e534e7..d3025025e3 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -358,6 +358,9 @@ class SyncRestServlet(RestServlet): def serialize(events): return self._event_serializer.serialize_events( events, time_now=time_now, + # We don't bundle "live" events, as otherwise clients + # will end up double counting annotations. + bundle_aggregations=False, token_id=token_id, event_format=event_formatter, only_event_fields=only_fields, diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py index eb8782aa6e..21c3c807b9 100644 --- a/synapse/rest/key/v2/remote_key_resource.py +++ b/synapse/rest/key/v2/remote_key_resource.py @@ -20,7 +20,7 @@ from twisted.web.resource import Resource from twisted.web.server import NOT_DONE_YET from synapse.api.errors import Codes, SynapseError -from synapse.crypto.keyring import KeyLookupError +from synapse.crypto.keyring import KeyLookupError, ServerKeyFetcher from synapse.http.server import respond_with_json_bytes, wrap_json_request_handler from synapse.http.servlet import parse_integer, parse_json_object_from_request @@ -89,7 +89,7 @@ class RemoteKey(Resource): isLeaf = True def __init__(self, hs): - self.keyring = hs.get_keyring() + self.fetcher = ServerKeyFetcher(hs) self.store = hs.get_datastore() self.clock = hs.get_clock() self.federation_domain_whitelist = hs.config.federation_domain_whitelist @@ -217,7 +217,7 @@ class RemoteKey(Resource): if cache_misses and query_remote_on_cache_miss: for server_name, key_ids in cache_misses.items(): try: - yield self.keyring.get_server_verify_key_v2_direct( + yield self.fetcher.get_server_verify_key_v2_direct( server_name, key_ids ) except KeyLookupError as e: diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py index 5305e9175f..35a750923b 100644 --- a/synapse/rest/media/v1/thumbnail_resource.py +++ b/synapse/rest/media/v1/thumbnail_resource.py @@ -56,8 +56,8 @@ class ThumbnailResource(Resource): def _async_render_GET(self, request): set_cors_headers(request) server_name, media_id, _ = parse_media_id(request) - width = parse_integer(request, "width") - height = parse_integer(request, "height") + width = parse_integer(request, "width", required=True) + height = parse_integer(request, "height", required=True) method = parse_string(request, "method", "scale") m_type = parse_string(request, "type", "image/png") diff --git a/synapse/server.py b/synapse/server.py index 80d40b9272..9229a68a8d 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -72,6 +72,7 @@ from synapse.handlers.room_list import RoomListHandler from synapse.handlers.room_member import RoomMemberMasterHandler from synapse.handlers.room_member_worker import RoomMemberWorkerHandler from synapse.handlers.set_password import SetPasswordHandler +from synapse.handlers.stats import StatsHandler from synapse.handlers.sync import SyncHandler from synapse.handlers.typing import TypingHandler from synapse.handlers.user_directory import UserDirectoryHandler @@ -139,6 +140,7 @@ class HomeServer(object): 'acme_handler', 'auth_handler', 'device_handler', + 'stats_handler', 'e2e_keys_handler', 'e2e_room_keys_handler', 'event_handler', @@ -191,6 +193,7 @@ class HomeServer(object): REQUIRED_ON_MASTER_STARTUP = [ "user_directory_handler", + "stats_handler" ] # This is overridden in derived application classes @@ -474,6 +477,9 @@ class HomeServer(object): def build_secrets(self): return Secrets() + def build_stats_handler(self): + return StatsHandler(self) + def build_spam_checker(self): return SpamChecker(self) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 56c434d4e8..71316f7d09 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -56,6 +56,7 @@ from .roommember import RoomMemberStore from .search import SearchStore from .signatures import SignatureStore from .state import StateStore +from .stats import StatsStore from .stream import StreamStore from .tags import TagsStore from .transactions import TransactionStore @@ -102,6 +103,7 @@ class DataStore( GroupServerStore, UserErasureStore, MonthlyActiveUsersStore, + StatsStore, RelationsStore, ): def __init__(self, db_conn, hs): diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index adc6cf26b5..b56c83e460 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import division + import itertools import logging from collections import namedtuple @@ -610,4 +612,74 @@ class EventsWorkerStore(SQLBaseStore): return res - return self.runInteraction("get_rejection_reasons", f) + return self.runInteraction("get_seen_events_with_rejections", f) + + def _get_total_state_event_counts_txn(self, txn, room_id): + """ + See get_total_state_event_counts. + """ + sql = "SELECT COUNT(*) FROM state_events WHERE room_id=?" + txn.execute(sql, (room_id,)) + row = txn.fetchone() + return row[0] if row else 0 + + def get_total_state_event_counts(self, room_id): + """ + Gets the total number of state events in a room. + + Args: + room_id (str) + + Returns: + Deferred[int] + """ + return self.runInteraction( + "get_total_state_event_counts", + self._get_total_state_event_counts_txn, room_id + ) + + def _get_current_state_event_counts_txn(self, txn, room_id): + """ + See get_current_state_event_counts. + """ + sql = "SELECT COUNT(*) FROM current_state_events WHERE room_id=?" + txn.execute(sql, (room_id,)) + row = txn.fetchone() + return row[0] if row else 0 + + def get_current_state_event_counts(self, room_id): + """ + Gets the current number of state events in a room. + + Args: + room_id (str) + + Returns: + Deferred[int] + """ + return self.runInteraction( + "get_current_state_event_counts", + self._get_current_state_event_counts_txn, room_id + ) + + @defer.inlineCallbacks + def get_room_complexity(self, room_id): + """ + Get a rough approximation of the complexity of the room. This is used by + remote servers to decide whether they wish to join the room or not. + Higher complexity value indicates that being in the room will consume + more resources. + + Args: + room_id (str) + + Returns: + Deferred[dict[str:int]] of complexity version to complexity. + """ + state_events = yield self.get_current_state_event_counts(room_id) + + # Call this one "v1", so we can introduce new ones as we want to develop + # it. + complexity_v1 = round(state_events / 500, 2) + + defer.returnValue({"v1": complexity_v1}) diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index 7036541792..5300720dbb 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -19,6 +19,7 @@ import logging import six +import attr from signedjson.key import decode_verify_key_bytes from synapse.util import batch_iter @@ -36,6 +37,12 @@ else: db_binary_type = memoryview +@attr.s(slots=True, frozen=True) +class FetchKeyResult(object): + verify_key = attr.ib() # VerifyKey: the key itself + valid_until_ts = attr.ib() # int: how long we can use this key for + + class KeyStore(SQLBaseStore): """Persistence for signature verification keys """ @@ -54,8 +61,8 @@ class KeyStore(SQLBaseStore): iterable of (server_name, key-id) tuples to fetch keys for Returns: - Deferred: resolves to dict[Tuple[str, str], VerifyKey|None]: - map from (server_name, key_id) -> VerifyKey, or None if the key is + Deferred: resolves to dict[Tuple[str, str], FetchKeyResult|None]: + map from (server_name, key_id) -> FetchKeyResult, or None if the key is unknown """ keys = {} @@ -65,17 +72,19 @@ class KeyStore(SQLBaseStore): # batch_iter always returns tuples so it's safe to do len(batch) sql = ( - "SELECT server_name, key_id, verify_key FROM server_signature_keys " - "WHERE 1=0" + "SELECT server_name, key_id, verify_key, ts_valid_until_ms " + "FROM server_signature_keys WHERE 1=0" ) + " OR (server_name=? AND key_id=?)" * len(batch) txn.execute(sql, tuple(itertools.chain.from_iterable(batch))) for row in txn: - server_name, key_id, key_bytes = row - keys[(server_name, key_id)] = decode_verify_key_bytes( - key_id, bytes(key_bytes) + server_name, key_id, key_bytes, ts_valid_until_ms = row + res = FetchKeyResult( + verify_key=decode_verify_key_bytes(key_id, bytes(key_bytes)), + valid_until_ts=ts_valid_until_ms, ) + keys[(server_name, key_id)] = res def _txn(txn): for batch in batch_iter(server_name_and_key_ids, 50): @@ -84,38 +93,53 @@ class KeyStore(SQLBaseStore): return self.runInteraction("get_server_verify_keys", _txn) - def store_server_verify_key( - self, server_name, from_server, time_now_ms, verify_key - ): - """Stores a NACL verification key for the given server. + def store_server_verify_keys(self, from_server, ts_added_ms, verify_keys): + """Stores NACL verification keys for remote servers. Args: - server_name (str): The name of the server. - from_server (str): Where the verification key was looked up - time_now_ms (int): The time now in milliseconds - verify_key (nacl.signing.VerifyKey): The NACL verify key. + from_server (str): Where the verification keys were looked up + ts_added_ms (int): The time to record that the key was added + verify_keys (iterable[tuple[str, str, FetchKeyResult]]): + keys to be stored. Each entry is a triplet of + (server_name, key_id, key). """ - key_id = "%s:%s" % (verify_key.alg, verify_key.version) - - # XXX fix this to not need a lock (#3819) - def _txn(txn): - self._simple_upsert_txn( - txn, - table="server_signature_keys", - keyvalues={"server_name": server_name, "key_id": key_id}, - values={ - "from_server": from_server, - "ts_added_ms": time_now_ms, - "verify_key": db_binary_type(verify_key.encode()), - }, + key_values = [] + value_values = [] + invalidations = [] + for server_name, key_id, fetch_result in verify_keys: + key_values.append((server_name, key_id)) + value_values.append( + ( + from_server, + ts_added_ms, + fetch_result.valid_until_ts, + db_binary_type(fetch_result.verify_key.encode()), + ) ) # invalidate takes a tuple corresponding to the params of # _get_server_verify_key. _get_server_verify_key only takes one # param, which is itself the 2-tuple (server_name, key_id). - txn.call_after( - self._get_server_verify_key.invalidate, ((server_name, key_id),) - ) - - return self.runInteraction("store_server_verify_key", _txn) + invalidations.append((server_name, key_id)) + + def _invalidate(res): + f = self._get_server_verify_key.invalidate + for i in invalidations: + f((i, )) + return res + + return self.runInteraction( + "store_server_verify_keys", + self._simple_upsert_many_txn, + table="server_signature_keys", + key_names=("server_name", "key_id"), + key_values=key_values, + value_names=( + "from_server", + "ts_added_ms", + "ts_valid_until_ms", + "verify_key", + ), + value_values=value_values, + ).addCallback(_invalidate) def store_server_keys_json( self, server_name, key_id, from_server, ts_now_ms, ts_expires_ms, key_json_bytes diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 57df17bcc2..4bd1669458 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -142,6 +142,38 @@ class RoomMemberWorkerStore(EventsWorkerStore): return self.runInteraction("get_room_summary", _get_room_summary_txn) + def _get_user_count_in_room_txn(self, txn, room_id, membership): + """ + See get_user_count_in_room. + """ + sql = ( + "SELECT count(*) FROM room_memberships as m" + " INNER JOIN current_state_events as c" + " ON m.event_id = c.event_id " + " AND m.room_id = c.room_id " + " AND m.user_id = c.state_key" + " WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?" + ) + + txn.execute(sql, (room_id, membership)) + row = txn.fetchone() + return row[0] + + def get_user_count_in_room(self, room_id, membership): + """ + Get the user count in a room with a particular membership. + + Args: + room_id (str) + membership (Membership) + + Returns: + Deferred[int] + """ + return self.runInteraction( + "get_users_in_room", self._get_user_count_in_room_txn, room_id, membership + ) + @cached() def get_invited_rooms_for_user(self, user_id): """ Get all the rooms the user is invited to diff --git a/synapse/storage/schema/delta/54/account_validity.sql b/synapse/storage/schema/delta/54/account_validity_with_renewal.sql index 2357626000..0adb2ad55e 100644 --- a/synapse/storage/schema/delta/54/account_validity.sql +++ b/synapse/storage/schema/delta/54/account_validity_with_renewal.sql @@ -13,6 +13,9 @@ * limitations under the License. */ +-- We previously changed the schema for this table without renaming the file, which means +-- that some databases might still be using the old schema. This ensures Synapse uses the +-- right schema for the table. DROP TABLE IF EXISTS account_validity; -- Track what users are in public rooms. diff --git a/synapse/storage/schema/delta/54/add_validity_to_server_keys.sql b/synapse/storage/schema/delta/54/add_validity_to_server_keys.sql new file mode 100644 index 0000000000..c01aa9d2d9 --- /dev/null +++ b/synapse/storage/schema/delta/54/add_validity_to_server_keys.sql @@ -0,0 +1,23 @@ +/* Copyright 2019 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* When we can use this key until, before we have to refresh it. */ +ALTER TABLE server_signature_keys ADD COLUMN ts_valid_until_ms BIGINT; + +UPDATE server_signature_keys SET ts_valid_until_ms = ( + SELECT MAX(ts_valid_until_ms) FROM server_keys_json skj WHERE + skj.server_name = server_signature_keys.server_name AND + skj.key_id = server_signature_keys.key_id +); diff --git a/synapse/storage/schema/delta/54/stats.sql b/synapse/storage/schema/delta/54/stats.sql new file mode 100644 index 0000000000..652e58308e --- /dev/null +++ b/synapse/storage/schema/delta/54/stats.sql @@ -0,0 +1,80 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE stats_stream_pos ( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + stream_id BIGINT, + CHECK (Lock='X') +); + +INSERT INTO stats_stream_pos (stream_id) VALUES (null); + +CREATE TABLE user_stats ( + user_id TEXT NOT NULL, + ts BIGINT NOT NULL, + bucket_size INT NOT NULL, + public_rooms INT NOT NULL, + private_rooms INT NOT NULL +); + +CREATE UNIQUE INDEX user_stats_user_ts ON user_stats(user_id, ts); + +CREATE TABLE room_stats ( + room_id TEXT NOT NULL, + ts BIGINT NOT NULL, + bucket_size INT NOT NULL, + current_state_events INT NOT NULL, + joined_members INT NOT NULL, + invited_members INT NOT NULL, + left_members INT NOT NULL, + banned_members INT NOT NULL, + state_events INT NOT NULL +); + +CREATE UNIQUE INDEX room_stats_room_ts ON room_stats(room_id, ts); + +-- cache of current room state; useful for the publicRooms list +CREATE TABLE room_state ( + room_id TEXT NOT NULL, + join_rules TEXT, + history_visibility TEXT, + encryption TEXT, + name TEXT, + topic TEXT, + avatar TEXT, + canonical_alias TEXT + -- get aliases straight from the right table +); + +CREATE UNIQUE INDEX room_state_room ON room_state(room_id); + +CREATE TABLE room_stats_earliest_token ( + room_id TEXT NOT NULL, + token BIGINT NOT NULL +); + +CREATE UNIQUE INDEX room_stats_earliest_token_idx ON room_stats_earliest_token(room_id); + +-- Set up staging tables +INSERT INTO background_updates (update_name, progress_json) VALUES + ('populate_stats_createtables', '{}'); + +-- Run through each room and update stats +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_stats_process_rooms', '{}', 'populate_stats_createtables'); + +-- Clean up staging tables +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_stats_cleanup', '{}', 'populate_stats_process_rooms'); diff --git a/synapse/storage/state_deltas.py b/synapse/storage/state_deltas.py index 31a0279b18..5fdb442104 100644 --- a/synapse/storage/state_deltas.py +++ b/synapse/storage/state_deltas.py @@ -84,10 +84,16 @@ class StateDeltasStore(SQLBaseStore): "get_current_state_deltas", get_current_state_deltas_txn ) - def get_max_stream_id_in_current_state_deltas(self): - return self._simple_select_one_onecol( + def _get_max_stream_id_in_current_state_deltas_txn(self, txn): + return self._simple_select_one_onecol_txn( + txn, table="current_state_delta_stream", keyvalues={}, retcol="COALESCE(MAX(stream_id), -1)", - desc="get_max_stream_id_in_current_state_deltas", + ) + + def get_max_stream_id_in_current_state_deltas(self): + return self.runInteraction( + "get_max_stream_id_in_current_state_deltas", + self._get_max_stream_id_in_current_state_deltas_txn, ) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py new file mode 100644 index 0000000000..eb0ced5b5e --- /dev/null +++ b/synapse/storage/stats.py @@ -0,0 +1,450 @@ +# -*- coding: utf-8 -*- +# Copyright 2018, 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.api.constants import EventTypes, Membership +from synapse.storage.state_deltas import StateDeltasStore +from synapse.util.caches.descriptors import cached + +logger = logging.getLogger(__name__) + +# these fields track absolutes (e.g. total number of rooms on the server) +ABSOLUTE_STATS_FIELDS = { + "room": ( + "current_state_events", + "joined_members", + "invited_members", + "left_members", + "banned_members", + "state_events", + ), + "user": ("public_rooms", "private_rooms"), +} + +TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} + +TEMP_TABLE = "_temp_populate_stats" + + +class StatsStore(StateDeltasStore): + def __init__(self, db_conn, hs): + super(StatsStore, self).__init__(db_conn, hs) + + self.server_name = hs.hostname + self.clock = self.hs.get_clock() + self.stats_enabled = hs.config.stats_enabled + self.stats_bucket_size = hs.config.stats_bucket_size + + self.register_background_update_handler( + "populate_stats_createtables", self._populate_stats_createtables + ) + self.register_background_update_handler( + "populate_stats_process_rooms", self._populate_stats_process_rooms + ) + self.register_background_update_handler( + "populate_stats_cleanup", self._populate_stats_cleanup + ) + + @defer.inlineCallbacks + def _populate_stats_createtables(self, progress, batch_size): + + if not self.stats_enabled: + yield self._end_background_update("populate_stats_createtables") + defer.returnValue(1) + + # Get all the rooms that we want to process. + def _make_staging_area(txn): + sql = ( + "CREATE TABLE IF NOT EXISTS " + + TEMP_TABLE + + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)" + ) + txn.execute(sql) + + sql = ( + "CREATE TABLE IF NOT EXISTS " + + TEMP_TABLE + + "_position(position TEXT NOT NULL)" + ) + txn.execute(sql) + + # Get rooms we want to process from the database + sql = """ + SELECT room_id, count(*) FROM current_state_events + GROUP BY room_id + """ + txn.execute(sql) + rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()] + self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms) + del rooms + + new_pos = yield self.get_max_stream_id_in_current_state_deltas() + yield self.runInteraction("populate_stats_temp_build", _make_staging_area) + yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos}) + self.get_earliest_token_for_room_stats.invalidate_all() + + yield self._end_background_update("populate_stats_createtables") + defer.returnValue(1) + + @defer.inlineCallbacks + def _populate_stats_cleanup(self, progress, batch_size): + """ + Update the user directory stream position, then clean up the old tables. + """ + if not self.stats_enabled: + yield self._end_background_update("populate_stats_cleanup") + defer.returnValue(1) + + position = yield self._simple_select_one_onecol( + TEMP_TABLE + "_position", None, "position" + ) + yield self.update_stats_stream_pos(position) + + def _delete_staging_area(txn): + txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms") + txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position") + + yield self.runInteraction("populate_stats_cleanup", _delete_staging_area) + + yield self._end_background_update("populate_stats_cleanup") + defer.returnValue(1) + + @defer.inlineCallbacks + def _populate_stats_process_rooms(self, progress, batch_size): + + if not self.stats_enabled: + yield self._end_background_update("populate_stats_process_rooms") + defer.returnValue(1) + + # If we don't have progress filed, delete everything. + if not progress: + yield self.delete_all_stats() + + def _get_next_batch(txn): + # Only fetch 250 rooms, so we don't fetch too many at once, even + # if those 250 rooms have less than batch_size state events. + sql = """ + SELECT room_id, events FROM %s_rooms + ORDER BY events DESC + LIMIT 250 + """ % ( + TEMP_TABLE, + ) + txn.execute(sql) + rooms_to_work_on = txn.fetchall() + + if not rooms_to_work_on: + return None + + # Get how many are left to process, so we can give status on how + # far we are in processing + txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms") + progress["remaining"] = txn.fetchone()[0] + + return rooms_to_work_on + + rooms_to_work_on = yield self.runInteraction( + "populate_stats_temp_read", _get_next_batch + ) + + # No more rooms -- complete the transaction. + if not rooms_to_work_on: + yield self._end_background_update("populate_stats_process_rooms") + defer.returnValue(1) + + logger.info( + "Processing the next %d rooms of %d remaining", + len(rooms_to_work_on), progress["remaining"], + ) + + # Number of state events we've processed by going through each room + processed_event_count = 0 + + for room_id, event_count in rooms_to_work_on: + + current_state_ids = yield self.get_current_state_ids(room_id) + + join_rules = yield self.get_event( + current_state_ids.get((EventTypes.JoinRules, "")), allow_none=True + ) + history_visibility = yield self.get_event( + current_state_ids.get((EventTypes.RoomHistoryVisibility, "")), + allow_none=True, + ) + encryption = yield self.get_event( + current_state_ids.get((EventTypes.RoomEncryption, "")), allow_none=True + ) + name = yield self.get_event( + current_state_ids.get((EventTypes.Name, "")), allow_none=True + ) + topic = yield self.get_event( + current_state_ids.get((EventTypes.Topic, "")), allow_none=True + ) + avatar = yield self.get_event( + current_state_ids.get((EventTypes.RoomAvatar, "")), allow_none=True + ) + canonical_alias = yield self.get_event( + current_state_ids.get((EventTypes.CanonicalAlias, "")), allow_none=True + ) + + def _or_none(x, arg): + if x: + return x.content.get(arg) + return None + + yield self.update_room_state( + room_id, + { + "join_rules": _or_none(join_rules, "join_rule"), + "history_visibility": _or_none( + history_visibility, "history_visibility" + ), + "encryption": _or_none(encryption, "algorithm"), + "name": _or_none(name, "name"), + "topic": _or_none(topic, "topic"), + "avatar": _or_none(avatar, "url"), + "canonical_alias": _or_none(canonical_alias, "alias"), + }, + ) + + now = self.hs.get_reactor().seconds() + + # quantise time to the nearest bucket + now = (now // self.stats_bucket_size) * self.stats_bucket_size + + def _fetch_data(txn): + + # Get the current token of the room + current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn) + + current_state_events = len(current_state_ids) + joined_members = self._get_user_count_in_room_txn( + txn, room_id, Membership.JOIN + ) + invited_members = self._get_user_count_in_room_txn( + txn, room_id, Membership.INVITE + ) + left_members = self._get_user_count_in_room_txn( + txn, room_id, Membership.LEAVE + ) + banned_members = self._get_user_count_in_room_txn( + txn, room_id, Membership.BAN + ) + total_state_events = self._get_total_state_event_counts_txn( + txn, room_id + ) + + self._update_stats_txn( + txn, + "room", + room_id, + now, + { + "bucket_size": self.stats_bucket_size, + "current_state_events": current_state_events, + "joined_members": joined_members, + "invited_members": invited_members, + "left_members": left_members, + "banned_members": banned_members, + "state_events": total_state_events, + }, + ) + self._simple_insert_txn( + txn, + "room_stats_earliest_token", + {"room_id": room_id, "token": current_token}, + ) + + yield self.runInteraction("update_room_stats", _fetch_data) + + # We've finished a room. Delete it from the table. + yield self._simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id}) + # Update the remaining counter. + progress["remaining"] -= 1 + yield self.runInteraction( + "populate_stats", + self._background_update_progress_txn, + "populate_stats_process_rooms", + progress, + ) + + processed_event_count += event_count + + if processed_event_count > batch_size: + # Don't process any more rooms, we've hit our batch size. + defer.returnValue(processed_event_count) + + defer.returnValue(processed_event_count) + + def delete_all_stats(self): + """ + Delete all statistics records. + """ + + def _delete_all_stats_txn(txn): + txn.execute("DELETE FROM room_state") + txn.execute("DELETE FROM room_stats") + txn.execute("DELETE FROM room_stats_earliest_token") + txn.execute("DELETE FROM user_stats") + + return self.runInteraction("delete_all_stats", _delete_all_stats_txn) + + def get_stats_stream_pos(self): + return self._simple_select_one_onecol( + table="stats_stream_pos", + keyvalues={}, + retcol="stream_id", + desc="stats_stream_pos", + ) + + def update_stats_stream_pos(self, stream_id): + return self._simple_update_one( + table="stats_stream_pos", + keyvalues={}, + updatevalues={"stream_id": stream_id}, + desc="update_stats_stream_pos", + ) + + def update_room_state(self, room_id, fields): + """ + Args: + room_id (str) + fields (dict[str:Any]) + """ + return self._simple_upsert( + table="room_state", + keyvalues={"room_id": room_id}, + values=fields, + desc="update_room_state", + ) + + def get_deltas_for_room(self, room_id, start, size=100): + """ + Get statistics deltas for a given room. + + Args: + room_id (str) + start (int): Pagination start. Number of entries, not timestamp. + size (int): How many entries to return. + + Returns: + Deferred[list[dict]], where the dict has the keys of + ABSOLUTE_STATS_FIELDS["room"] and "ts". + """ + return self._simple_select_list_paginate( + "room_stats", + {"room_id": room_id}, + "ts", + start, + size, + retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]), + order_direction="DESC", + ) + + def get_all_room_state(self): + return self._simple_select_list( + "room_state", None, retcols=("name", "topic", "canonical_alias") + ) + + @cached() + def get_earliest_token_for_room_stats(self, room_id): + """ + Fetch the "earliest token". This is used by the room stats delta + processor to ignore deltas that have been processed between the + start of the background task and any particular room's stats + being calculated. + + Returns: + Deferred[int] + """ + return self._simple_select_one_onecol( + "room_stats_earliest_token", + {"room_id": room_id}, + retcol="token", + allow_none=True, + ) + + def update_stats(self, stats_type, stats_id, ts, fields): + table, id_col = TYPE_TO_ROOM[stats_type] + return self._simple_upsert( + table=table, + keyvalues={id_col: stats_id, "ts": ts}, + values=fields, + desc="update_stats", + ) + + def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields): + table, id_col = TYPE_TO_ROOM[stats_type] + return self._simple_upsert_txn( + txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields + ) + + def update_stats_delta(self, ts, stats_type, stats_id, field, value): + def _update_stats_delta(txn): + table, id_col = TYPE_TO_ROOM[stats_type] + + sql = ( + "SELECT * FROM %s" + " WHERE %s=? and ts=(" + " SELECT MAX(ts) FROM %s" + " WHERE %s=?" + ")" + ) % (table, id_col, table, id_col) + txn.execute(sql, (stats_id, stats_id)) + rows = self.cursor_to_dict(txn) + if len(rows) == 0: + # silently skip as we don't have anything to apply a delta to yet. + # this tries to minimise any race between the initial sync and + # subsequent deltas arriving. + return + + current_ts = ts + latest_ts = rows[0]["ts"] + if current_ts < latest_ts: + # This one is in the past, but we're just encountering it now. + # Mark it as part of the current bucket. + current_ts = latest_ts + elif ts != latest_ts: + # we have to copy our absolute counters over to the new entry. + values = { + key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type] + } + values[id_col] = stats_id + values["ts"] = ts + values["bucket_size"] = self.stats_bucket_size + + self._simple_insert_txn(txn, table=table, values=values) + + # actually update the new value + if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]: + self._simple_update_txn( + txn, + table=table, + keyvalues={id_col: stats_id, "ts": current_ts}, + updatevalues={field: value}, + ) + else: + sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % ( + table, + field, + field, + id_col, + ) + txn.execute(sql, (value, stats_id, current_ts)) + + return self.runInteraction("update_stats_delta", _update_stats_delta) diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 311b49e18a..fe412355d8 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -226,6 +226,8 @@ class LoggingContext(object): self.request = request def __str__(self): + if self.request: + return str(self.request) return "%s@%x" % (self.name, id(self)) @classmethod @@ -274,12 +276,10 @@ class LoggingContext(object): current = self.set_current_context(self.previous_context) if current is not self: if current is self.sentinel: - logger.warn("Expected logging context %s has been lost", self) + logger.warning("Expected logging context %s was lost", self) else: - logger.warn( - "Current logging context %s is not expected context %s", - current, - self + logger.warning( + "Expected logging context %s but found %s", self, current ) self.previous_context = None self.alive = False @@ -433,10 +433,14 @@ class PreserveLoggingContext(object): context = LoggingContext.set_current_context(self.current_context) if context != self.new_context: - logger.warn( - "Unexpected logging context: %s is not %s", - context, self.new_context, - ) + if context is LoggingContext.sentinel: + logger.warning("Expected logging context %s was lost", self.new_context) + else: + logger.warning( + "Expected logging context %s but found %s", + self.new_context, + context, + ) if self.current_context is not LoggingContext.sentinel: if not self.current_context.alive: |