diff options
59 files changed, 730 insertions, 450 deletions
diff --git a/changelog.d/6580.feature b/changelog.d/6580.feature new file mode 100644 index 0000000000..233c589c66 --- /dev/null +++ b/changelog.d/6580.feature @@ -0,0 +1 @@ +Add experimental config option to specify multiple databases. diff --git a/changelog.d/6617.misc b/changelog.d/6617.misc new file mode 100644 index 0000000000..94aa271d38 --- /dev/null +++ b/changelog.d/6617.misc @@ -0,0 +1 @@ +Reduce the reconnect time when worker replication fails, to make it easier to catch up. diff --git a/changelog.d/6619.misc b/changelog.d/6619.misc new file mode 100644 index 0000000000..b608133219 --- /dev/null +++ b/changelog.d/6619.misc @@ -0,0 +1 @@ +Simplify http handling by removing redundant SynapseRequestFactory. diff --git a/changelog.d/6620.misc b/changelog.d/6620.misc new file mode 100644 index 0000000000..8bfb78fb20 --- /dev/null +++ b/changelog.d/6620.misc @@ -0,0 +1 @@ +Add a workaround for synapse raising exceptions when fetching the notary's own key from the notary. diff --git a/changelog.d/6625.bugfix b/changelog.d/6625.bugfix new file mode 100644 index 0000000000..a8dc5587dc --- /dev/null +++ b/changelog.d/6625.bugfix @@ -0,0 +1 @@ +Fix exception when fetching the `matrix.org:ed25519:auto` key. diff --git a/changelog.d/6626.feature b/changelog.d/6626.feature new file mode 100644 index 0000000000..15798fa59b --- /dev/null +++ b/changelog.d/6626.feature @@ -0,0 +1 @@ +Raise an error if someone tries to use the log_file config option. diff --git a/changelog.d/6627.misc b/changelog.d/6627.misc new file mode 100644 index 0000000000..702f067070 --- /dev/null +++ b/changelog.d/6627.misc @@ -0,0 +1 @@ +Automate generation of the sample log config. diff --git a/changelog.d/6628.removal b/changelog.d/6628.removal new file mode 100644 index 0000000000..66cd6aeca4 --- /dev/null +++ b/changelog.d/6628.removal @@ -0,0 +1 @@ +Remove unused, undocumented /_matrix/content API. diff --git a/changelog.d/6629.misc b/changelog.d/6629.misc new file mode 100644 index 0000000000..68f77af05b --- /dev/null +++ b/changelog.d/6629.misc @@ -0,0 +1 @@ +Simplify event creation code by removing redundant queries on the event_reference_hashes table. \ No newline at end of file diff --git a/changelog.d/6633.bugfix b/changelog.d/6633.bugfix new file mode 100644 index 0000000000..4bacf26021 --- /dev/null +++ b/changelog.d/6633.bugfix @@ -0,0 +1 @@ +Fix bug where a moderator upgraded a room and became an admin in the new room. \ No newline at end of file diff --git a/changelog.d/6640.bugfix b/changelog.d/6640.bugfix new file mode 100644 index 0000000000..8c2a129933 --- /dev/null +++ b/changelog.d/6640.bugfix @@ -0,0 +1 @@ +Fix an error which was thrown by the PresenceHandler _on_shutdown handler. diff --git a/changelog.d/6642.misc b/changelog.d/6642.misc new file mode 100644 index 0000000000..a480bbd134 --- /dev/null +++ b/changelog.d/6642.misc @@ -0,0 +1 @@ +Fix errors when frozen_dicts are enabled. diff --git a/changelog.d/6645.bugfix b/changelog.d/6645.bugfix new file mode 100644 index 0000000000..f648df3fc0 --- /dev/null +++ b/changelog.d/6645.bugfix @@ -0,0 +1 @@ +Fix exceptions in the synchrotron worker log when events are rejected. diff --git a/changelog.d/6647.misc b/changelog.d/6647.misc new file mode 100644 index 0000000000..fbe7c0e7db --- /dev/null +++ b/changelog.d/6647.misc @@ -0,0 +1 @@ +Port core background update routines to async/await. diff --git a/changelog.d/6648.bugfix b/changelog.d/6648.bugfix new file mode 100644 index 0000000000..39916de437 --- /dev/null +++ b/changelog.d/6648.bugfix @@ -0,0 +1 @@ +Ensure that upgraded rooms are removed from the directory. diff --git a/changelog.d/6653.misc b/changelog.d/6653.misc new file mode 100644 index 0000000000..fbe7c0e7db --- /dev/null +++ b/changelog.d/6653.misc @@ -0,0 +1 @@ +Port core background update routines to async/await. diff --git a/debian/build_virtualenv b/debian/build_virtualenv index 2791896052..d892fd5c9d 100755 --- a/debian/build_virtualenv +++ b/debian/build_virtualenv @@ -85,6 +85,9 @@ PYTHONPATH="$tmpdir" \ ' > "${PACKAGE_BUILD_DIR}/etc/matrix-synapse/homeserver.yaml" +# build the log config file +"${TARGET_PYTHON}" -B "${VIRTUALENV_DIR}/bin/generate_log_config" \ + --output-file="${PACKAGE_BUILD_DIR}/etc/matrix-synapse/log.yaml" # add a dependency on the right version of python to substvars. PYPKG=`basename $SNAKE` diff --git a/debian/changelog b/debian/changelog index 31791c127c..75fe89fa97 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +matrix-synapse-py3 (1.7.3ubuntu1) UNRELEASED; urgency=medium + + * Automate generation of the default log configuration file. + + -- Richard van der Hoff <richard@matrix.org> Fri, 03 Jan 2020 13:55:38 +0000 + matrix-synapse-py3 (1.7.3) stable; urgency=medium * New synapse release 1.7.3. diff --git a/debian/install b/debian/install index 43dc8c6904..da8b726a2b 100644 --- a/debian/install +++ b/debian/install @@ -1,2 +1 @@ -debian/log.yaml etc/matrix-synapse debian/manage_debconf.pl /opt/venvs/matrix-synapse/lib/ diff --git a/debian/log.yaml b/debian/log.yaml deleted file mode 100644 index 95b655dd35..0000000000 --- a/debian/log.yaml +++ /dev/null @@ -1,36 +0,0 @@ - -version: 1 - -formatters: - precise: - format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s- %(message)s' - -filters: - context: - (): synapse.logging.context.LoggingContextFilter - request: "" - -handlers: - file: - class: logging.handlers.RotatingFileHandler - formatter: precise - filename: /var/log/matrix-synapse/homeserver.log - maxBytes: 104857600 - backupCount: 10 - filters: [context] - encoding: utf8 - console: - class: logging.StreamHandler - formatter: precise - level: WARN - -loggers: - synapse: - level: INFO - - synapse.storage.SQL: - level: INFO - -root: - level: INFO - handlers: [file, console] diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index e3b05423b8..fad5f968b5 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -692,10 +692,6 @@ media_store_path: "DATADIR/media_store" # config: # directory: /mnt/some/other/directory -# Directory where in-progress uploads are stored. -# -uploads_path: "DATADIR/uploads" - # The largest allowed upload size in bytes # #max_upload_size: 10M diff --git a/docs/sample_log_config.yaml b/docs/sample_log_config.yaml index 11e8f35f41..1a2739455e 100644 --- a/docs/sample_log_config.yaml +++ b/docs/sample_log_config.yaml @@ -1,4 +1,4 @@ -# Example log config file for synapse. +# Log configuration for Synapse. # # This is a YAML file containing a standard Python logging configuration # dictionary. See [1] for details on the valid settings. @@ -20,7 +20,7 @@ handlers: file: class: logging.handlers.RotatingFileHandler formatter: precise - filename: /home/rav/work/synapse/homeserver.log + filename: /var/log/matrix-synapse/homeserver.log maxBytes: 104857600 backupCount: 10 filters: [context] diff --git a/scripts-dev/generate_sample_config b/scripts-dev/generate_sample_config index 5e33b9b549..9cb4630a5c 100755 --- a/scripts-dev/generate_sample_config +++ b/scripts-dev/generate_sample_config @@ -7,12 +7,22 @@ set -e cd `dirname $0`/.. SAMPLE_CONFIG="docs/sample_config.yaml" +SAMPLE_LOG_CONFIG="docs/sample_log_config.yaml" + +check() { + diff -u "$SAMPLE_LOG_CONFIG" <(./scripts/generate_log_config) >/dev/null || return 1 +} if [ "$1" == "--check" ]; then diff -u "$SAMPLE_CONFIG" <(./scripts/generate_config --header-file docs/.sample_config_header.yaml) >/dev/null || { echo -e "\e[1m\e[31m$SAMPLE_CONFIG is not up-to-date. Regenerate it with \`scripts-dev/generate_sample_config\`.\e[0m" >&2 exit 1 } + diff -u "$SAMPLE_LOG_CONFIG" <(./scripts/generate_log_config) >/dev/null || { + echo -e "\e[1m\e[31m$SAMPLE_LOG_CONFIG is not up-to-date. Regenerate it with \`scripts-dev/generate_sample_config\`.\e[0m" >&2 + exit 1 + } else ./scripts/generate_config --header-file docs/.sample_config_header.yaml -o "$SAMPLE_CONFIG" + ./scripts/generate_log_config -o "$SAMPLE_LOG_CONFIG" fi diff --git a/scripts/generate_log_config b/scripts/generate_log_config new file mode 100755 index 0000000000..b6957f48a3 --- /dev/null +++ b/scripts/generate_log_config @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 + +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import sys + +from synapse.config.logger import DEFAULT_LOG_CONFIG + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + + parser.add_argument( + "-o", + "--output-file", + type=argparse.FileType("w"), + default=sys.stdout, + help="File to write the configuration to. Default: stdout", + ) + + parser.add_argument( + "-f", + "--log-file", + type=str, + default="/var/log/matrix-synapse/homeserver.log", + help="name of the log file", + ) + + args = parser.parse_args() + args.output_file.write(DEFAULT_LOG_CONFIG.substitute(log_file=args.log_file)) diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index eb927f2094..cb77314f1e 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -166,6 +166,11 @@ class Store( logger.exception("Failed to insert: %s", table) raise + def set_room_is_public(self, room_id, is_public): + raise Exception( + "Attempt to set room_is_public during port_db: database not empty?" + ) + class MockHomeserver: def __init__(self, config): diff --git a/synapse/api/urls.py b/synapse/api/urls.py index ff1f39e86c..f34434bd67 100644 --- a/synapse/api/urls.py +++ b/synapse/api/urls.py @@ -29,7 +29,6 @@ FEDERATION_V2_PREFIX = FEDERATION_PREFIX + "/v2" FEDERATION_UNSTABLE_PREFIX = FEDERATION_PREFIX + "/unstable" STATIC_PREFIX = "/_matrix/static" WEB_CLIENT_PREFIX = "/_matrix/client" -CONTENT_REPO_PREFIX = "/_matrix/content" SERVER_KEY_V2_PREFIX = "/_matrix/key/v2" MEDIA_PREFIX = "/_matrix/media/r0" LEGACY_MEDIA_PREFIX = "/_matrix/media/v1" diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 0e9bf7f53a..e5b44a5eed 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -39,7 +39,6 @@ import synapse import synapse.config.logger from synapse import events from synapse.api.urls import ( - CONTENT_REPO_PREFIX, FEDERATION_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, @@ -65,7 +64,6 @@ from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory from synapse.rest import ClientRestResource from synapse.rest.admin import AdminRestResource from synapse.rest.key.v2 import KeyApiV2Resource -from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.rest.well_known import WellKnownResource from synapse.server import HomeServer from synapse.storage import DataStore @@ -223,13 +221,7 @@ class SynapseHomeServer(HomeServer): if self.get_config().enable_media_repo: media_repo = self.get_media_repository_resource() resources.update( - { - MEDIA_PREFIX: media_repo, - LEGACY_MEDIA_PREFIX: media_repo, - CONTENT_REPO_PREFIX: ContentRepoResource( - self, self.config.uploads_path - ), - } + {MEDIA_PREFIX: media_repo, LEGACY_MEDIA_PREFIX: media_repo} ) elif name == "media": raise ConfigError( @@ -318,7 +310,7 @@ def setup(config_options): "Synapse Homeserver", config_options ) except ConfigError as e: - sys.stderr.write("\n" + str(e) + "\n") + sys.stderr.write("\nERROR: %s\n" % (e,)) sys.exit(1) if not config: diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index 4c80f257e2..a63c53dc44 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -21,7 +21,7 @@ from twisted.web.resource import NoResource import synapse from synapse import events -from synapse.api.urls import CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX +from synapse.api.urls import LEGACY_MEDIA_PREFIX, MEDIA_PREFIX from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig @@ -37,7 +37,6 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.admin import register_servlets_for_media_repo -from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.server import HomeServer from synapse.storage.data_stores.main.media_repository import MediaRepositoryStore from synapse.util.httpresourcetree import create_resource_tree @@ -82,9 +81,6 @@ class MediaRepositoryServer(HomeServer): { MEDIA_PREFIX: media_repo, LEGACY_MEDIA_PREFIX: media_repo, - CONTENT_REPO_PREFIX: ContentRepoResource( - self, self.config.uploads_path - ), "/_synapse/admin": admin_resource, } ) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index dd2132e608..03031ee34d 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -48,7 +48,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.replication.tcp.streams.events import EventsStreamEventRow +from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow from synapse.rest.client.v1 import events from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet from synapse.rest.client.v1.room import RoomInitialSyncRestServlet @@ -371,8 +371,7 @@ class SyncReplicationHandler(ReplicationClientHandler): def get_currently_syncing_users(self): return self.presence_handler.get_currently_syncing_users() - @defer.inlineCallbacks - def process_and_notify(self, stream_name, token, rows): + async def process_and_notify(self, stream_name, token, rows): try: if stream_name == "events": # We shouldn't get multiple rows per token for events stream, so @@ -380,7 +379,14 @@ class SyncReplicationHandler(ReplicationClientHandler): for row in rows: if row.type != EventsStreamEventRow.TypeId: continue - event = yield self.store.get_event(row.data.event_id) + assert isinstance(row, EventsStreamRow) + + event = await self.store.get_event( + row.data.event_id, allow_rejected=True + ) + if event.rejected_reason: + continue + extra_users = () if event.type == EventTypes.Member: extra_users = (event.state_key,) @@ -412,11 +418,11 @@ class SyncReplicationHandler(ReplicationClientHandler): elif stream_name == "device_lists": all_room_ids = set() for row in rows: - room_ids = yield self.store.get_rooms_for_user(row.user_id) + room_ids = await self.store.get_rooms_for_user(row.user_id) all_room_ids.update(room_ids) self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids) elif stream_name == "presence": - yield self.presence_handler.process_replication_rows(token, rows) + await self.presence_handler.process_replication_rows(token, rows) elif stream_name == "receipts": self.notifier.on_new_event( "groups_key", token, users=[row.user_id for row in rows] diff --git a/synapse/config/database.py b/synapse/config/database.py index 134824789c..219b32f670 100644 --- a/synapse/config/database.py +++ b/synapse/config/database.py @@ -15,7 +15,6 @@ import logging import os from textwrap import indent -from typing import List import yaml @@ -30,16 +29,13 @@ class DatabaseConnectionConfig: Args: name: A label for the database, used for logging. db_config: The config for a particular database, as per `database` - section of main config. Has two fields: `name` for database - module name, and `args` for the args to give to the database - connector. - data_stores: The list of data stores that should be provisioned on the - database. Defaults to all data stores. + section of main config. Has three fields: `name` for database + module name, `args` for the args to give to the database + connector, and optional `data_stores` that is a list of stores to + provision on this database (defaulting to all). """ - def __init__( - self, name: str, db_config: dict, data_stores: List[str] = ["main", "state"] - ): + def __init__(self, name: str, db_config: dict): if db_config["name"] not in ("sqlite3", "psycopg2"): raise ConfigError("Unsupported database type %r" % (db_config["name"],)) @@ -48,6 +44,10 @@ class DatabaseConnectionConfig: {"cp_min": 1, "cp_max": 1, "check_same_thread": False} ) + data_stores = db_config.get("data_stores") + if data_stores is None: + data_stores = ["main", "state"] + self.name = name self.config = db_config self.data_stores = data_stores @@ -59,14 +59,43 @@ class DatabaseConfig(Config): def read_config(self, config, **kwargs): self.event_cache_size = self.parse_size(config.get("event_cache_size", "10K")) + # We *experimentally* support specifying multiple databases via the + # `databases` key. This is a map from a label to database config in the + # same format as the `database` config option, plus an extra + # `data_stores` key to specify which data store goes where. For example: + # + # databases: + # master: + # name: psycopg2 + # data_stores: ["main"] + # args: {} + # state: + # name: psycopg2 + # data_stores: ["state"] + # args: {} + + multi_database_config = config.get("databases") database_config = config.get("database") - if database_config is None: - database_config = {"name": "sqlite3", "args": {}} + if multi_database_config and database_config: + raise ConfigError("Can't specify both 'database' and 'datbases' in config") + + if multi_database_config: + if config.get("database_path"): + raise ConfigError("Can't specify 'database_path' with 'databases'") + + self.databases = [ + DatabaseConnectionConfig(name, db_conf) + for name, db_conf in multi_database_config.items() + ] + + else: + if database_config is None: + database_config = {"name": "sqlite3", "args": {}} - self.databases = [DatabaseConnectionConfig("master", database_config)] + self.databases = [DatabaseConnectionConfig("master", database_config)] - self.set_databasepath(config.get("database_path")) + self.set_databasepath(config.get("database_path")) def generate_config_section(self, data_dir_path, database_conf, **kwargs): if not database_conf: diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 75bb904718..a25c70e928 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import argparse import logging import logging.config import os @@ -37,10 +37,17 @@ from synapse.logging._structured import ( from synapse.logging.context import LoggingContextFilter from synapse.util.versionstring import get_version_string -from ._base import Config +from ._base import Config, ConfigError DEFAULT_LOG_CONFIG = Template( - """ + """\ +# Log configuration for Synapse. +# +# This is a YAML file containing a standard Python logging configuration +# dictionary. See [1] for details on the valid settings. +# +# [1]: https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema + version: 1 formatters: @@ -81,11 +88,18 @@ disable_existing_loggers: false """ ) +LOG_FILE_ERROR = """\ +Support for the log_file configuration option and --log-file command-line option was +removed in Synapse 1.3.0. You should instead set up a separate log configuration file. +""" + class LoggingConfig(Config): section = "logging" def read_config(self, config, **kwargs): + if config.get("log_file"): + raise ConfigError(LOG_FILE_ERROR) self.log_config = self.abspath(config.get("log_config")) self.no_redirect_stdio = config.get("no_redirect_stdio", False) @@ -106,6 +120,8 @@ class LoggingConfig(Config): def read_arguments(self, args): if args.no_redirect_stdio is not None: self.no_redirect_stdio = args.no_redirect_stdio + if args.log_file is not None: + raise ConfigError(LOG_FILE_ERROR) @staticmethod def add_arguments(parser): @@ -118,6 +134,10 @@ class LoggingConfig(Config): help="Do not redirect stdout/stderr to the log", ) + logging_group.add_argument( + "-f", "--log-file", dest="log_file", help=argparse.SUPPRESS, + ) + def generate_files(self, config, config_dir_path): log_config = config.get("log_config") if log_config and not os.path.exists(log_config): diff --git a/synapse/config/repository.py b/synapse/config/repository.py index d0205e14b9..7d2dd27fd0 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -156,7 +156,6 @@ class ContentRepositoryConfig(Config): (provider_class, parsed_config, wrapper_config) ) - self.uploads_path = self.ensure_directory(config.get("uploads_path", "uploads")) self.dynamic_thumbnails = config.get("dynamic_thumbnails", False) self.thumbnail_requirements = parse_thumbnail_requirements( config.get("thumbnail_sizes", DEFAULT_THUMBNAIL_SIZES) @@ -231,10 +230,6 @@ class ContentRepositoryConfig(Config): # config: # directory: /mnt/some/other/directory - # Directory where in-progress uploads are stored. - # - uploads_path: "%(uploads_path)s" - # The largest allowed upload size in bytes # #max_upload_size: 10M diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index ccaa8a9920..e65bd61d97 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. - +import collections.abc import hashlib import logging @@ -40,8 +40,11 @@ def check_event_content_hash(event, hash_algorithm=hashlib.sha256): # some malformed events lack a 'hashes'. Protect against it being missing # or a weird type by basically treating it the same as an unhashed event. hashes = event.get("hashes") - if not isinstance(hashes, dict): - raise SynapseError(400, "Malformed 'hashes'", Codes.UNAUTHORIZED) + # nb it might be a frozendict or a dict + if not isinstance(hashes, collections.abc.Mapping): + raise SynapseError( + 400, "Malformed 'hashes': %s" % (type(hashes),), Codes.UNAUTHORIZED + ) if name not in hashes: raise SynapseError( diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 7cfad192e8..6fe5a6a26a 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -511,17 +511,18 @@ class BaseV2KeyFetcher(object): server_name = response_json["server_name"] verified = False for key_id in response_json["signatures"].get(server_name, {}): - # each of the keys used for the signature must be present in the response - # json. key = verify_keys.get(key_id) if not key: - raise KeyLookupError( - "Key response is signed by key id %s:%s but that key is not " - "present in the response" % (server_name, key_id) - ) + # the key may not be present in verify_keys if: + # * we got the key from the notary server, and: + # * the key belongs to the notary server, and: + # * the notary server is using a different key to sign notary + # responses. + continue verify_signed_json(response_json, server_name, key.verify_key) verified = True + break if not verified: raise KeyLookupError( diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 4ad752205f..8ea3aca2f4 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -48,7 +48,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.send_event import ReplicationSendEventRestServlet from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour from synapse.storage.state import StateFilter -from synapse.types import RoomAlias, UserID, create_requester +from synapse.types import Collection, RoomAlias, UserID, create_requester from synapse.util.async_helpers import Linearizer from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.metrics import measure_func @@ -422,7 +422,7 @@ class EventCreationHandler(object): event_dict, token_id=None, txn_id=None, - prev_events_and_hashes=None, + prev_event_ids: Optional[Collection[str]] = None, require_consent=True, ): """ @@ -439,10 +439,9 @@ class EventCreationHandler(object): token_id (str) txn_id (str) - prev_events_and_hashes (list[(str, dict[str, str], int)]|None): + prev_event_ids: the forward extremities to use as the prev_events for the - new event. For each event, a tuple of (event_id, hashes, depth) - where *hashes* is a map from algorithm to hash. + new event. If None, they will be requested from the database. @@ -498,9 +497,7 @@ class EventCreationHandler(object): builder.internal_metadata.txn_id = txn_id event, context = yield self.create_new_client_event( - builder=builder, - requester=requester, - prev_events_and_hashes=prev_events_and_hashes, + builder=builder, requester=requester, prev_event_ids=prev_event_ids, ) # In an ideal world we wouldn't need the second part of this condition. However, @@ -714,7 +711,7 @@ class EventCreationHandler(object): @measure_func("create_new_client_event") @defer.inlineCallbacks def create_new_client_event( - self, builder, requester=None, prev_events_and_hashes=None + self, builder, requester=None, prev_event_ids: Optional[Collection[str]] = None ): """Create a new event for a local client @@ -723,10 +720,9 @@ class EventCreationHandler(object): requester (synapse.types.Requester|None): - prev_events_and_hashes (list[(str, dict[str, str], int)]|None): + prev_event_ids: the forward extremities to use as the prev_events for the - new event. For each event, a tuple of (event_id, hashes, depth) - where *hashes* is a map from algorithm to hash. + new event. If None, they will be requested from the database. @@ -734,22 +730,15 @@ class EventCreationHandler(object): Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)] """ - if prev_events_and_hashes is not None: - assert len(prev_events_and_hashes) <= 10, ( + if prev_event_ids is not None: + assert len(prev_event_ids) <= 10, ( "Attempting to create an event with %i prev_events" - % (len(prev_events_and_hashes),) + % (len(prev_event_ids),) ) else: - prev_events_and_hashes = yield self.store.get_prev_events_for_room( - builder.room_id - ) - - prev_events = [ - (event_id, prev_hashes) - for event_id, prev_hashes, _ in prev_events_and_hashes - ] + prev_event_ids = yield self.store.get_prev_events_for_room(builder.room_id) - event = yield builder.build(prev_event_ids=[p for p, _ in prev_events]) + event = yield builder.build(prev_event_ids=prev_event_ids) context = yield self.state.compute_event_context(event) if requester: context.app_service = requester.app_service @@ -1042,9 +1031,7 @@ class EventCreationHandler(object): # For each room we need to find a joined member we can use to send # the dummy event with. - prev_events_and_hashes = yield self.store.get_prev_events_for_room(room_id) - - latest_event_ids = (event_id for (event_id, _, _) in prev_events_and_hashes) + latest_event_ids = yield self.store.get_prev_events_for_room(room_id) members = yield self.state.get_current_users_in_room( room_id, latest_event_ids=latest_event_ids @@ -1063,7 +1050,7 @@ class EventCreationHandler(object): "room_id": room_id, "sender": user_id, }, - prev_events_and_hashes=prev_events_and_hashes, + prev_event_ids=latest_event_ids, ) event.internal_metadata.proactively_send = False diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 240c4add12..202aa9294f 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -95,12 +95,7 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER class PresenceHandler(object): - def __init__(self, hs): - """ - - Args: - hs (synapse.server.HomeServer): - """ + def __init__(self, hs: "synapse.server.HomeServer"): self.hs = hs self.is_mine = hs.is_mine self.is_mine_id = hs.is_mine_id @@ -230,7 +225,7 @@ class PresenceHandler(object): is some spurious presence changes that will self-correct. """ # If the DB pool has already terminated, don't try updating - if not self.store.database.is_running(): + if not self.store.db.is_running(): return logger.info( diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 89c9118b26..9cab2adbfb 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -16,6 +16,7 @@ # limitations under the License. """Contains functions for performing events on rooms.""" + import itertools import logging import math @@ -271,7 +272,7 @@ class RoomCreationHandler(BaseHandler): except AuthError as e: logger.warning("Unable to update PLs in old room: %s", e) - logger.info("Setting correct PLs in new room") + logger.info("Setting correct PLs in new room to %s", old_room_pl_state.content) yield self.event_creation_handler.create_and_send_nonmember_event( requester, { @@ -365,13 +366,18 @@ class RoomCreationHandler(BaseHandler): needed_power_level = max(state_default, ban, max(event_power_levels.values())) # Raise the requester's power level in the new room if necessary - current_power_level = power_levels["users"][requester.user.to_string()] + current_power_level = power_levels["users"][user_id] if current_power_level < needed_power_level: - # Assign this power level to the requester - power_levels["users"][requester.user.to_string()] = needed_power_level + # make sure we copy the event content rather than overwriting it. + # note that if frozen_dicts are enabled, `power_levels` will be a frozen + # dict so we can't just copy.deepcopy it. - # Set the power levels to the modified state - initial_state[(EventTypes.PowerLevels, "")] = power_levels + new_power_levels = {k: v for k, v in power_levels.items() if k != "users"} + new_power_levels["users"] = { + k: v for k, v in power_levels.get("users", {}).items() if k != user_id + } + new_power_levels["users"][user_id] = needed_power_level + initial_state[(EventTypes.PowerLevels, "")] = new_power_levels yield self._send_events_for_new_room( requester, @@ -733,7 +739,7 @@ class RoomCreationHandler(BaseHandler): initial_state, creation_content, room_alias=None, - power_level_content_override=None, + power_level_content_override=None, # Doesn't apply when initial state has power level state event content creator_join_profile=None, ): def create(etype, content, **kwargs): diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 44c5e3239c..03bb52ccfb 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -25,7 +25,7 @@ from twisted.internet import defer from synapse import types from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes, SynapseError -from synapse.types import RoomID, UserID +from synapse.types import Collection, RoomID, UserID from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_joined_room, user_left_room @@ -149,7 +149,7 @@ class RoomMemberHandler(object): target, room_id, membership, - prev_events_and_hashes, + prev_event_ids: Collection[str], txn_id=None, ratelimit=True, content=None, @@ -177,7 +177,7 @@ class RoomMemberHandler(object): }, token_id=requester.access_token_id, txn_id=txn_id, - prev_events_and_hashes=prev_events_and_hashes, + prev_event_ids=prev_event_ids, require_consent=require_consent, ) @@ -370,8 +370,7 @@ class RoomMemberHandler(object): if block_invite: raise SynapseError(403, "Invites have been disabled on this server") - prev_events_and_hashes = yield self.store.get_prev_events_for_room(room_id) - latest_event_ids = (event_id for (event_id, _, _) in prev_events_and_hashes) + latest_event_ids = yield self.store.get_prev_events_for_room(room_id) current_state_ids = yield self.state_handler.get_current_state_ids( room_id, latest_event_ids=latest_event_ids @@ -485,7 +484,7 @@ class RoomMemberHandler(object): membership=effective_membership_state, txn_id=txn_id, ratelimit=ratelimit, - prev_events_and_hashes=prev_events_and_hashes, + prev_event_ids=latest_event_ids, content=content, require_consent=require_consent, ) @@ -507,6 +506,8 @@ class RoomMemberHandler(object): Returns: Deferred """ + logger.info("Transferring room state from %s to %s", old_room_id, room_id) + # Find all local users that were in the old room and copy over each user's state users = yield self.store.get_users_in_room(old_room_id) yield self.copy_user_state_on_room_upgrade(old_room_id, room_id, users) diff --git a/synapse/http/site.py b/synapse/http/site.py index ff8184a3d0..9f2d035fa0 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -47,9 +47,9 @@ class SynapseRequest(Request): logcontext(LoggingContext) : the log context for this request """ - def __init__(self, site, channel, *args, **kw): + def __init__(self, channel, *args, **kw): Request.__init__(self, channel, *args, **kw) - self.site = site + self.site = channel.site self._channel = channel # this is used by the tests self.authenticated_entity = None self.start_time = 0 @@ -331,18 +331,6 @@ class XForwardedForRequest(SynapseRequest): ) -class SynapseRequestFactory(object): - def __init__(self, site, x_forwarded_for): - self.site = site - self.x_forwarded_for = x_forwarded_for - - def __call__(self, *args, **kwargs): - if self.x_forwarded_for: - return XForwardedForRequest(self.site, *args, **kwargs) - else: - return SynapseRequest(self.site, *args, **kwargs) - - class SynapseSite(Site): """ Subclass of a twisted http Site that does access logging with python's @@ -364,7 +352,7 @@ class SynapseSite(Site): self.site_tag = site_tag proxied = config.get("x_forwarded", False) - self.requestFactory = SynapseRequestFactory(self, proxied) + self.requestFactory = XForwardedForRequest if proxied else SynapseRequest self.access_logger = logging.getLogger(logger_name) self.server_version_string = server_version_string.encode("ascii") diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index fead78388c..bbcb84646c 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -46,7 +46,8 @@ class ReplicationClientFactory(ReconnectingClientFactory): is required. """ - maxDelay = 30 # Try at least once every N seconds + initialDelay = 0.1 + maxDelay = 1 # Try at least once every N seconds def __init__(self, hs, client_name, handler: AbstractReplicationClientHandler): self.client_name = client_name diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py index e7fc3f0431..bf5e0eb844 100644 --- a/synapse/rest/key/v2/remote_key_resource.py +++ b/synapse/rest/key/v2/remote_key_resource.py @@ -15,6 +15,7 @@ import logging from canonicaljson import encode_canonical_json, json +from signedjson.key import encode_verify_key_base64 from signedjson.sign import sign_json from twisted.internet import defer @@ -216,15 +217,28 @@ class RemoteKey(DirectServeResource): if cache_misses and query_remote_on_cache_miss: yield self.fetcher.get_keys(cache_misses) yield self.query_keys(request, query, query_remote_on_cache_miss=False) - else: - signed_keys = [] - for key_json in json_results: - key_json = json.loads(key_json) + return + + signed_keys = [] + for key_json in json_results: + key_json = json.loads(key_json) + + # backwards-compatibility hack for #6596: if the requested key belongs + # to us, make sure that all of the signing keys appear in the + # "verify_keys" section. + if key_json["server_name"] == self.config.server_name: + verify_keys = key_json["verify_keys"] for signing_key in self.config.key_server_signing_keys: - key_json = sign_json(key_json, self.config.server_name, signing_key) + key_id = "%s:%s" % (signing_key.alg, signing_key.version) + verify_keys[key_id] = { + "key": encode_verify_key_base64(signing_key.verify_key) + } + + for signing_key in self.config.key_server_signing_keys: + key_json = sign_json(key_json, self.config.server_name, signing_key) - signed_keys.append(key_json) + signed_keys.append(key_json) - results = {"server_keys": signed_keys} + results = {"server_keys": signed_keys} - respond_with_json_bytes(request, 200, encode_canonical_json(results)) + respond_with_json_bytes(request, 200, encode_canonical_json(results)) diff --git a/synapse/rest/media/v0/__init__.py b/synapse/rest/media/v0/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 --- a/synapse/rest/media/v0/__init__.py +++ /dev/null diff --git a/synapse/rest/media/v0/content_repository.py b/synapse/rest/media/v0/content_repository.py deleted file mode 100644 index 86884c0ef4..0000000000 --- a/synapse/rest/media/v0/content_repository.py +++ /dev/null @@ -1,103 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014-2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import base64 -import logging -import os -import re - -from canonicaljson import json - -from twisted.protocols.basic import FileSender -from twisted.web import resource, server - -from synapse.api.errors import Codes, cs_error -from synapse.http.server import finish_request, respond_with_json_bytes - -logger = logging.getLogger(__name__) - - -class ContentRepoResource(resource.Resource): - """Provides file uploading and downloading. - - Uploads are POSTed to wherever this Resource is linked to. This resource - returns a "content token" which can be used to GET this content again. The - token is typically a path, but it may not be. Tokens can expire, be - one-time uses, etc. - - In this case, the token is a path to the file and contains 3 interesting - sections: - - User ID base64d (for namespacing content to each user) - - random 24 char string - - Content type base64d (so we can return it when clients GET it) - - """ - - isLeaf = True - - def __init__(self, hs, directory): - resource.Resource.__init__(self) - self.hs = hs - self.directory = directory - - def render_GET(self, request): - # no auth here on purpose, to allow anyone to view, even across home - # servers. - - # TODO: A little crude here, we could do this better. - filename = request.path.decode("ascii").split("/")[-1] - # be paranoid - filename = re.sub("[^0-9A-z.-_]", "", filename) - - file_path = self.directory + "/" + filename - - logger.debug("Searching for %s", file_path) - - if os.path.isfile(file_path): - # filename has the content type - base64_contentype = filename.split(".")[1] - content_type = base64.urlsafe_b64decode(base64_contentype) - logger.info("Sending file %s", file_path) - f = open(file_path, "rb") - request.setHeader("Content-Type", content_type) - - # cache for at least a day. - # XXX: we might want to turn this off for data we don't want to - # recommend caching as it's sensitive or private - or at least - # select private. don't bother setting Expires as all our matrix - # clients are smart enough to be happy with Cache-Control (right?) - request.setHeader(b"Cache-Control", b"public,max-age=86400,s-maxage=86400") - - d = FileSender().beginFileTransfer(f, request) - - # after the file has been sent, clean up and finish the request - def cbFinished(ignored): - f.close() - finish_request(request) - - d.addCallback(cbFinished) - else: - respond_with_json_bytes( - request, - 404, - json.dumps(cs_error("Not found", code=Codes.NOT_FOUND)), - send_cors=True, - ) - - return server.NOT_DONE_YET - - def render_OPTIONS(self, request): - respond_with_json_bytes(request, 200, {}, send_cors=True) - return server.NOT_DONE_YET diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 88546ad614..3bb9381663 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -16,6 +16,7 @@ # limitations under the License. import logging import random +from abc import ABCMeta from six import PY2 from six.moves import builtins @@ -30,7 +31,8 @@ from synapse.types import get_domain_from_id logger = logging.getLogger(__name__) -class SQLBaseStore(object): +# some of our subclasses have abstract methods, so we use the ABCMeta metaclass. +class SQLBaseStore(metaclass=ABCMeta): """Base class for data stores that holds helper functions. Note that multiple instances of this class will exist as there will be one diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 4f97fd5ab6..bd547f35cf 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +from typing import Optional from canonicaljson import json @@ -97,15 +98,14 @@ class BackgroundUpdater(object): def start_doing_background_updates(self): run_as_background_process("background_updates", self.run_background_updates) - @defer.inlineCallbacks - def run_background_updates(self, sleep=True): + async def run_background_updates(self, sleep=True): logger.info("Starting background schema updates") while True: if sleep: - yield self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0) + await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0) try: - result = yield self.do_next_background_update( + result = await self.do_next_background_update( self.BACKGROUND_UPDATE_DURATION_MS ) except Exception: @@ -170,20 +170,21 @@ class BackgroundUpdater(object): return not update_exists - @defer.inlineCallbacks - def do_next_background_update(self, desired_duration_ms): + async def do_next_background_update( + self, desired_duration_ms: float + ) -> Optional[int]: """Does some amount of work on the next queued background update + Returns once some amount of work is done. + Args: desired_duration_ms(float): How long we want to spend updating. Returns: - A deferred that completes once some amount of work is done. - The deferred will have a value of None if there is currently - no more work to do. + None if there is no more work to do, otherwise an int """ if not self._background_update_queue: - updates = yield self.db.simple_select_list( + updates = await self.db.simple_select_list( "background_updates", keyvalues=None, retcols=("update_name", "depends_on"), @@ -201,11 +202,12 @@ class BackgroundUpdater(object): update_name = self._background_update_queue.pop(0) self._background_update_queue.append(update_name) - res = yield self._do_background_update(update_name, desired_duration_ms) + res = await self._do_background_update(update_name, desired_duration_ms) return res - @defer.inlineCallbacks - def _do_background_update(self, update_name, desired_duration_ms): + async def _do_background_update( + self, update_name: str, desired_duration_ms: float + ) -> int: logger.info("Starting update batch on background update '%s'", update_name) update_handler = self._background_update_handlers[update_name] @@ -225,7 +227,7 @@ class BackgroundUpdater(object): else: batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE - progress_json = yield self.db.simple_select_one_onecol( + progress_json = await self.db.simple_select_one_onecol( "background_updates", keyvalues={"update_name": update_name}, retcol="progress_json", @@ -234,7 +236,7 @@ class BackgroundUpdater(object): progress = json.loads(progress_json) time_start = self._clock.time_msec() - items_updated = yield update_handler(progress, batch_size) + items_updated = await update_handler(progress, batch_size) time_stop = self._clock.time_msec() duration_ms = time_stop - time_start @@ -263,7 +265,9 @@ class BackgroundUpdater(object): * A dict of the current progress * An integer count of the number of items to update in this batch. - The handler should return a deferred integer count of items updated. + The handler should return a deferred or coroutine which returns an integer count + of items updated. + The handler is responsible for updating the progress of the update. Args: @@ -432,6 +436,21 @@ class BackgroundUpdater(object): "background_updates", keyvalues={"update_name": update_name} ) + def _background_update_progress(self, update_name: str, progress: dict): + """Update the progress of a background update + + Args: + update_name: The name of the background update task + progress: The progress of the update. + """ + + return self.db.runInteraction( + "background_update_progress", + self._background_update_progress_txn, + update_name, + progress, + ) + def _background_update_progress_txn(self, txn, update_name, progress): """Update the progress of a background update diff --git a/synapse/storage/data_stores/__init__.py b/synapse/storage/data_stores/__init__.py index d20df5f076..092e803799 100644 --- a/synapse/storage/data_stores/__init__.py +++ b/synapse/storage/data_stores/__init__.py @@ -37,6 +37,8 @@ class DataStores(object): # store. self.databases = [] + self.main = None + self.state = None for database_config in hs.config.database.databases: db_name = database_config.name @@ -54,10 +56,22 @@ class DataStores(object): if "main" in database_config.data_stores: logger.info("Starting 'main' data store") + + # Sanity check we don't try and configure the main store on + # multiple databases. + if self.main: + raise Exception("'main' data store already configured") + self.main = main_store_class(database, db_conn, hs) if "state" in database_config.data_stores: logger.info("Starting 'state' data store") + + # Sanity check we don't try and configure the state store on + # multiple databases. + if self.state: + raise Exception("'state' data store already configured") + self.state = StateGroupDataStore(database, db_conn, hs) db_conn.commit() @@ -65,3 +79,10 @@ class DataStores(object): self.databases.append(database) logger.info("Database %r prepared", db_name) + + # Sanity check that we have actually configured all the required stores. + if not self.main: + raise Exception("No 'main' data store configured") + + if not self.state: + raise Exception("No 'main' data store configured") diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py index 1d18f13801..60c67457b4 100644 --- a/synapse/storage/data_stores/main/event_federation.py +++ b/synapse/storage/data_stores/main/event_federation.py @@ -14,13 +14,10 @@ # limitations under the License. import itertools import logging -import random from six.moves import range from six.moves.queue import Empty, PriorityQueue -from unpaddedbase64 import encode_base64 - from twisted.internet import defer from synapse.api.errors import StoreError @@ -148,8 +145,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas retcol="event_id", ) - @defer.inlineCallbacks - def get_prev_events_for_room(self, room_id): + def get_prev_events_for_room(self, room_id: str): """ Gets a subset of the current forward extremities in the given room. @@ -160,40 +156,29 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas room_id (str): room_id Returns: - Deferred[list[(str, dict[str, str], int)]] - for each event, a tuple of (event_id, hashes, depth) - where *hashes* is a map from algorithm to hash. + Deferred[List[str]]: the event ids of the forward extremites + """ - res = yield self.get_latest_event_ids_and_hashes_in_room(room_id) - if len(res) > 10: - # Sort by reverse depth, so we point to the most recent. - res.sort(key=lambda a: -a[2]) - # we use half of the limit for the actual most recent events, and - # the other half to randomly point to some of the older events, to - # make sure that we don't completely ignore the older events. - res = res[0:5] + random.sample(res[5:], 5) + return self.db.runInteraction( + "get_prev_events_for_room", self._get_prev_events_for_room_txn, room_id + ) - return res + def _get_prev_events_for_room_txn(self, txn, room_id: str): + # we just use the 10 newest events. Older events will become + # prev_events of future events. - def get_latest_event_ids_and_hashes_in_room(self, room_id): + sql = """ + SELECT e.event_id FROM event_forward_extremities AS f + INNER JOIN events AS e USING (event_id) + WHERE f.room_id = ? + ORDER BY e.depth DESC + LIMIT 10 """ - Gets the current forward extremities in the given room - Args: - room_id (str): room_id + txn.execute(sql, (room_id,)) - Returns: - Deferred[list[(str, dict[str, str], int)]] - for each event, a tuple of (event_id, hashes, depth) - where *hashes* is a map from algorithm to hash. - """ - - return self.db.runInteraction( - "get_latest_event_ids_and_hashes_in_room", - self._get_latest_event_ids_and_hashes_in_room, - room_id, - ) + return [row[0] for row in txn] def get_rooms_with_many_extremities(self, min_count, limit, room_id_filter): """Get the top rooms with at least N extremities. @@ -243,27 +228,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas desc="get_latest_event_ids_in_room", ) - def _get_latest_event_ids_and_hashes_in_room(self, txn, room_id): - sql = ( - "SELECT e.event_id, e.depth FROM events as e " - "INNER JOIN event_forward_extremities as f " - "ON e.event_id = f.event_id " - "AND e.room_id = f.room_id " - "WHERE f.room_id = ?" - ) - - txn.execute(sql, (room_id,)) - - results = [] - for event_id, depth in txn.fetchall(): - hashes = self._get_event_reference_hashes_txn(txn, event_id) - prev_hashes = { - k: encode_base64(v) for k, v in hashes.items() if k == "sha256" - } - results.append((event_id, prev_hashes, depth)) - - return results - def get_min_depth(self, room_id): """ For hte given room, get the minimum depth we have seen for it. """ diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index 2c9142814c..0cce5232f5 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -137,7 +137,7 @@ class EventsWorkerStore(SQLBaseStore): @defer.inlineCallbacks def get_event( self, - event_id: List[str], + event_id: str, redact_behaviour: EventRedactBehaviour = EventRedactBehaviour.REDACT, get_prev_content: bool = False, allow_rejected: bool = False, @@ -148,15 +148,22 @@ class EventsWorkerStore(SQLBaseStore): Args: event_id: The event_id of the event to fetch + redact_behaviour: Determine what to do with a redacted event. Possible values: * AS_IS - Return the full event body with no redacted content * REDACT - Return the event but with a redacted body - * DISALLOW - Do not return redacted events + * DISALLOW - Do not return redacted events (behave as per allow_none + if the event is redacted) + get_prev_content: If True and event is a state event, include the previous states content in the unsigned field. - allow_rejected: If True return rejected events. + + allow_rejected: If True, return rejected events. Otherwise, + behave as per allow_none. + allow_none: If True, return None if no event found, if False throw a NotFoundError + check_room_id: if not None, check the room of the found event. If there is a mismatch, behave as per allow_none. @@ -196,14 +203,18 @@ class EventsWorkerStore(SQLBaseStore): Args: event_ids: The event_ids of the events to fetch + redact_behaviour: Determine what to do with a redacted event. Possible values: * AS_IS - Return the full event body with no redacted content * REDACT - Return the event but with a redacted body - * DISALLOW - Do not return redacted events + * DISALLOW - Do not return redacted events (omit them from the response) + get_prev_content: If True and event is a state event, include the previous states content in the unsigned field. - allow_rejected: If True return rejected events. + + allow_rejected: If True, return rejected events. Otherwise, + omits rejeted events from the response. Returns: Deferred : Dict from event_id to event. @@ -228,15 +239,21 @@ class EventsWorkerStore(SQLBaseStore): """Get events from the database and return in a list in the same order as given by `event_ids` arg. + Unknown events will be omitted from the response. + Args: event_ids: The event_ids of the events to fetch + redact_behaviour: Determine what to do with a redacted event. Possible values: * AS_IS - Return the full event body with no redacted content * REDACT - Return the event but with a redacted body - * DISALLOW - Do not return redacted events + * DISALLOW - Do not return redacted events (omit them from the response) + get_prev_content: If True and event is a state event, include the previous states content in the unsigned field. - allow_rejected: If True, return rejected events. + + allow_rejected: If True, return rejected events. Otherwise, + omits rejected events from the response. Returns: Deferred[list[EventBase]]: List of events fetched from the database. The @@ -369,9 +386,14 @@ class EventsWorkerStore(SQLBaseStore): If events are pulled from the database, they will be cached for future lookups. + Unknown events are omitted from the response. + Args: + event_ids (Iterable[str]): The event_ids of the events to fetch - allow_rejected (bool): Whether to include rejected events + + allow_rejected (bool): Whether to include rejected events. If False, + rejected events are omitted from the response. Returns: Deferred[Dict[str, _EventCacheEntry]]: @@ -506,9 +528,13 @@ class EventsWorkerStore(SQLBaseStore): Returned events will be added to the cache for future lookups. + Unknown events are omitted from the response. + Args: event_ids (Iterable[str]): The event_ids of the events to fetch - allow_rejected (bool): Whether to include rejected events + + allow_rejected (bool): Whether to include rejected events. If False, + rejected events are omitted from the response. Returns: Deferred[Dict[str, _EventCacheEntry]]: diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py index aa476d0fbf..79cfd39194 100644 --- a/synapse/storage/data_stores/main/room.py +++ b/synapse/storage/data_stores/main/room.py @@ -17,6 +17,7 @@ import collections import logging import re +from abc import abstractmethod from typing import Optional, Tuple from six import integer_types @@ -367,6 +368,8 @@ class RoomWorkerStore(SQLBaseStore): class RoomBackgroundUpdateStore(SQLBaseStore): + REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory" + def __init__(self, database: Database, db_conn, hs): super(RoomBackgroundUpdateStore, self).__init__(database, db_conn, hs) @@ -376,6 +379,11 @@ class RoomBackgroundUpdateStore(SQLBaseStore): "insert_room_retention", self._background_insert_retention, ) + self.db.updates.register_background_update_handler( + self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, + self._remove_tombstoned_rooms_from_directory, + ) + @defer.inlineCallbacks def _background_insert_retention(self, progress, batch_size): """Retrieves a list of all rooms within a range and inserts an entry for each of @@ -444,6 +452,62 @@ class RoomBackgroundUpdateStore(SQLBaseStore): defer.returnValue(batch_size) + async def _remove_tombstoned_rooms_from_directory( + self, progress, batch_size + ) -> int: + """Removes any rooms with tombstone events from the room directory + + Nowadays this is handled by the room upgrade handler, but we may have some + that got left behind + """ + + last_room = progress.get("room_id", "") + + def _get_rooms(txn): + txn.execute( + """ + SELECT room_id + FROM rooms r + INNER JOIN current_state_events cse USING (room_id) + WHERE room_id > ? AND r.is_public + AND cse.type = '%s' AND cse.state_key = '' + ORDER BY room_id ASC + LIMIT ?; + """ + % EventTypes.Tombstone, + (last_room, batch_size), + ) + + return [row[0] for row in txn] + + rooms = await self.db.runInteraction( + "get_tombstoned_directory_rooms", _get_rooms + ) + + if not rooms: + await self.db.updates._end_background_update( + self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE + ) + return 0 + + for room_id in rooms: + logger.info("Removing tombstoned room %s from the directory", room_id) + await self.set_room_is_public(room_id, False) + + await self.db.updates._background_update_progress( + self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, {"room_id": rooms[-1]} + ) + + return len(rooms) + + @abstractmethod + def set_room_is_public(self, room_id, is_public): + # this will need to be implemented if a background update is performed with + # existing (tombstoned, public) rooms in the database. + # + # It's overridden by RoomStore for the synapse master. + raise NotImplementedError() + class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): def __init__(self, database: Database, db_conn, hs): diff --git a/synapse/storage/data_stores/main/schema/delta/56/remove_tombstoned_rooms_from_directory.sql b/synapse/storage/data_stores/main/schema/delta/56/remove_tombstoned_rooms_from_directory.sql new file mode 100644 index 0000000000..aeb17813d3 --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/56/remove_tombstoned_rooms_from_directory.sql @@ -0,0 +1,18 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Now that #6232 is a thing, we can remove old rooms from the directory. +INSERT INTO background_updates (update_name, progress_json) VALUES + ('remove_tombstoned_rooms_from_directory', '{}'); diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py index 0dc39f139c..d07440e3ed 100644 --- a/synapse/storage/data_stores/main/state.py +++ b/synapse/storage/data_stores/main/state.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import collections.abc import logging from collections import namedtuple from typing import Iterable, Tuple @@ -107,7 +107,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): predecessor = create_event.content.get("predecessor", None) # Ensure the key is a dictionary - if not isinstance(predecessor, dict): + if not isinstance(predecessor, collections.abc.Mapping): return None return predecessor diff --git a/synapse/types.py b/synapse/types.py index aafc3ffe74..cd996c0b5a 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -15,6 +15,7 @@ # limitations under the License. import re import string +import sys from collections import namedtuple import attr @@ -23,6 +24,17 @@ from unpaddedbase64 import decode_base64 from synapse.api.errors import SynapseError +# define a version of typing.Collection that works on python 3.5 +if sys.version_info[:3] >= (3, 6, 0): + from typing import Collection +else: + from typing import Sized, Iterable, Container, TypeVar + + T_co = TypeVar("T_co", covariant=True) + + class Collection(Iterable[T_co], Container[T_co], Sized): + __slots__ = () + class Requester( namedtuple( diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py index 8efd39c7f7..34d5895f18 100644 --- a/tests/crypto/test_keyring.py +++ b/tests/crypto/test_keyring.py @@ -19,6 +19,7 @@ from mock import Mock import canonicaljson import signedjson.key import signedjson.sign +from nacl.signing import SigningKey from signedjson.key import encode_verify_key_base64, get_verify_key from twisted.internet import defer @@ -412,34 +413,37 @@ class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase): handlers=None, http_client=self.http_client, config=config ) - def test_get_keys_from_perspectives(self): - # arbitrarily advance the clock a bit - self.reactor.advance(100) - - fetcher = PerspectivesKeyFetcher(self.hs) - - SERVER_NAME = "server2" - testkey = signedjson.key.generate_signing_key("ver1") - testverifykey = signedjson.key.get_verify_key(testkey) - testverifykey_id = "ed25519:ver1" - VALID_UNTIL_TS = 200 * 1000 + def build_perspectives_response( + self, server_name: str, signing_key: SigningKey, valid_until_ts: int, + ) -> dict: + """ + Build a valid perspectives server response to a request for the given key + """ + verify_key = signedjson.key.get_verify_key(signing_key) + verifykey_id = "%s:%s" % (verify_key.alg, verify_key.version) - # valid response response = { - "server_name": SERVER_NAME, + "server_name": server_name, "old_verify_keys": {}, - "valid_until_ts": VALID_UNTIL_TS, + "valid_until_ts": valid_until_ts, "verify_keys": { - testverifykey_id: { - "key": signedjson.key.encode_verify_key_base64(testverifykey) + verifykey_id: { + "key": signedjson.key.encode_verify_key_base64(verify_key) } }, } - # the response must be signed by both the origin server and the perspectives # server. - signedjson.sign.sign_json(response, SERVER_NAME, testkey) + signedjson.sign.sign_json(response, server_name, signing_key) self.mock_perspective_server.sign_response(response) + return response + + def expect_outgoing_key_query( + self, expected_server_name: str, expected_key_id: str, response: dict + ) -> None: + """ + Tell the mock http client to expect a perspectives-server key query + """ def post_json(destination, path, data, **kwargs): self.assertEqual(destination, self.mock_perspective_server.server_name) @@ -447,11 +451,79 @@ class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase): # check that the request is for the expected key q = data["server_keys"] - self.assertEqual(list(q[SERVER_NAME].keys()), ["key1"]) + self.assertEqual(list(q[expected_server_name].keys()), [expected_key_id]) return {"server_keys": [response]} self.http_client.post_json.side_effect = post_json + def test_get_keys_from_perspectives(self): + # arbitrarily advance the clock a bit + self.reactor.advance(100) + + fetcher = PerspectivesKeyFetcher(self.hs) + + SERVER_NAME = "server2" + testkey = signedjson.key.generate_signing_key("ver1") + testverifykey = signedjson.key.get_verify_key(testkey) + testverifykey_id = "ed25519:ver1" + VALID_UNTIL_TS = 200 * 1000 + + response = self.build_perspectives_response( + SERVER_NAME, testkey, VALID_UNTIL_TS, + ) + + self.expect_outgoing_key_query(SERVER_NAME, "key1", response) + + keys_to_fetch = {SERVER_NAME: {"key1": 0}} + keys = self.get_success(fetcher.get_keys(keys_to_fetch)) + self.assertIn(SERVER_NAME, keys) + k = keys[SERVER_NAME][testverifykey_id] + self.assertEqual(k.valid_until_ts, VALID_UNTIL_TS) + self.assertEqual(k.verify_key, testverifykey) + self.assertEqual(k.verify_key.alg, "ed25519") + self.assertEqual(k.verify_key.version, "ver1") + + # check that the perspectives store is correctly updated + lookup_triplet = (SERVER_NAME, testverifykey_id, None) + key_json = self.get_success( + self.hs.get_datastore().get_server_keys_json([lookup_triplet]) + ) + res = key_json[lookup_triplet] + self.assertEqual(len(res), 1) + res = res[0] + self.assertEqual(res["key_id"], testverifykey_id) + self.assertEqual(res["from_server"], self.mock_perspective_server.server_name) + self.assertEqual(res["ts_added_ms"], self.reactor.seconds() * 1000) + self.assertEqual(res["ts_valid_until_ms"], VALID_UNTIL_TS) + + self.assertEqual( + bytes(res["key_json"]), canonicaljson.encode_canonical_json(response) + ) + + def test_get_perspectives_own_key(self): + """Check that we can get the perspectives server's own keys + + This is slightly complicated by the fact that the perspectives server may + use different keys for signing notary responses. + """ + + # arbitrarily advance the clock a bit + self.reactor.advance(100) + + fetcher = PerspectivesKeyFetcher(self.hs) + + SERVER_NAME = self.mock_perspective_server.server_name + testkey = signedjson.key.generate_signing_key("ver1") + testverifykey = signedjson.key.get_verify_key(testkey) + testverifykey_id = "ed25519:ver1" + VALID_UNTIL_TS = 200 * 1000 + + response = self.build_perspectives_response( + SERVER_NAME, testkey, VALID_UNTIL_TS + ) + + self.expect_outgoing_key_query(SERVER_NAME, "key1", response) + keys_to_fetch = {SERVER_NAME: {"key1": 0}} keys = self.get_success(fetcher.get_keys(keys_to_fetch)) self.assertIn(SERVER_NAME, keys) @@ -490,35 +562,14 @@ class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase): VALID_UNTIL_TS = 200 * 1000 def build_response(): - # valid response - response = { - "server_name": SERVER_NAME, - "old_verify_keys": {}, - "valid_until_ts": VALID_UNTIL_TS, - "verify_keys": { - testverifykey_id: { - "key": signedjson.key.encode_verify_key_base64(testverifykey) - } - }, - } - - # the response must be signed by both the origin server and the perspectives - # server. - signedjson.sign.sign_json(response, SERVER_NAME, testkey) - self.mock_perspective_server.sign_response(response) - return response + return self.build_perspectives_response( + SERVER_NAME, testkey, VALID_UNTIL_TS + ) def get_key_from_perspectives(response): fetcher = PerspectivesKeyFetcher(self.hs) keys_to_fetch = {SERVER_NAME: {"key1": 0}} - - def post_json(destination, path, data, **kwargs): - self.assertEqual(destination, self.mock_perspective_server.server_name) - self.assertEqual(path, "/_matrix/key/v2/query") - return {"server_keys": [response]} - - self.http_client.post_json.side_effect = post_json - + self.expect_outgoing_key_query(SERVER_NAME, "key1", response) return self.get_success(fetcher.get_keys(keys_to_fetch)) # start with a valid response so we can check we are testing the right thing diff --git a/tests/rest/key/v2/test_remote_key_resource.py b/tests/rest/key/v2/test_remote_key_resource.py new file mode 100644 index 0000000000..d8246b4e78 --- /dev/null +++ b/tests/rest/key/v2/test_remote_key_resource.py @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import urllib.parse +from io import BytesIO + +from mock import Mock + +import signedjson.key +from nacl.signing import SigningKey +from signedjson.sign import sign_json + +from twisted.web.resource import NoResource + +from synapse.http.site import SynapseRequest +from synapse.rest.key.v2 import KeyApiV2Resource +from synapse.util.httpresourcetree import create_resource_tree + +from tests import unittest +from tests.server import FakeChannel, wait_until_result + + +class RemoteKeyResourceTestCase(unittest.HomeserverTestCase): + def make_homeserver(self, reactor, clock): + self.http_client = Mock() + return self.setup_test_homeserver(http_client=self.http_client) + + def create_test_json_resource(self): + return create_resource_tree( + {"/_matrix/key/v2": KeyApiV2Resource(self.hs)}, root_resource=NoResource() + ) + + def expect_outgoing_key_request( + self, server_name: str, signing_key: SigningKey + ) -> None: + """ + Tell the mock http client to expect an outgoing GET request for the given key + """ + + def get_json(destination, path, ignore_backoff=False, **kwargs): + self.assertTrue(ignore_backoff) + self.assertEqual(destination, server_name) + key_id = "%s:%s" % (signing_key.alg, signing_key.version) + self.assertEqual( + path, "/_matrix/key/v2/server/%s" % (urllib.parse.quote(key_id),) + ) + + response = { + "server_name": server_name, + "old_verify_keys": {}, + "valid_until_ts": 200 * 1000, + "verify_keys": { + key_id: { + "key": signedjson.key.encode_verify_key_base64( + signing_key.verify_key + ) + } + }, + } + sign_json(response, server_name, signing_key) + return response + + self.http_client.get_json.side_effect = get_json + + def make_notary_request(self, server_name: str, key_id: str) -> dict: + """Send a GET request to the test server requesting the given key. + + Checks that the response is a 200 and returns the decoded json body. + """ + channel = FakeChannel(self.site, self.reactor) + req = SynapseRequest(channel) + req.content = BytesIO(b"") + req.requestReceived( + b"GET", + b"/_matrix/key/v2/query/%s/%s" + % (server_name.encode("utf-8"), key_id.encode("utf-8")), + b"1.1", + ) + wait_until_result(self.reactor, req) + self.assertEqual(channel.code, 200) + resp = channel.json_body + return resp + + def test_get_key(self): + """Fetch a remote key""" + SERVER_NAME = "remote.server" + testkey = signedjson.key.generate_signing_key("ver1") + self.expect_outgoing_key_request(SERVER_NAME, testkey) + + resp = self.make_notary_request(SERVER_NAME, "ed25519:ver1") + keys = resp["server_keys"] + self.assertEqual(len(keys), 1) + + self.assertIn("ed25519:ver1", keys[0]["verify_keys"]) + self.assertEqual(len(keys[0]["verify_keys"]), 1) + + # it should be signed by both the origin server and the notary + self.assertIn(SERVER_NAME, keys[0]["signatures"]) + self.assertIn(self.hs.hostname, keys[0]["signatures"]) + + def test_get_own_key(self): + """Fetch our own key""" + testkey = signedjson.key.generate_signing_key("ver1") + self.expect_outgoing_key_request(self.hs.hostname, testkey) + + resp = self.make_notary_request(self.hs.hostname, "ed25519:ver1") + keys = resp["server_keys"] + self.assertEqual(len(keys), 1) + + # it should be signed by both itself, and the notary signing key + sigs = keys[0]["signatures"] + self.assertEqual(len(sigs), 1) + self.assertIn(self.hs.hostname, sigs) + oursigs = sigs[self.hs.hostname] + self.assertEqual(len(oursigs), 2) + + # and both keys should be present in the verify_keys section + self.assertIn("ed25519:ver1", keys[0]["verify_keys"]) + self.assertIn("ed25519:a_lPym", keys[0]["verify_keys"]) diff --git a/tests/server.py b/tests/server.py index a554dfdd57..1644710aa0 100644 --- a/tests/server.py +++ b/tests/server.py @@ -20,6 +20,7 @@ from twisted.python.failure import Failure from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactorClock from twisted.web.http import unquote from twisted.web.http_headers import Headers +from twisted.web.server import Site from synapse.http.site import SynapseRequest from synapse.util import Clock @@ -42,6 +43,7 @@ class FakeChannel(object): wire). """ + site = attr.ib(type=Site) _reactor = attr.ib() result = attr.ib(default=attr.Factory(dict)) _producer = None @@ -176,9 +178,9 @@ def make_request( content = content.encode("utf8") site = FakeSite() - channel = FakeChannel(reactor) + channel = FakeChannel(site, reactor) - req = request(site, channel) + req = request(channel) req.process = lambda: b"" req.content = BytesIO(content) req.postpath = list(map(unquote, path[1:].split(b"/"))) diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py index aec76f4ab1..ae14fb407d 100644 --- a/tests/storage/test_background_update.py +++ b/tests/storage/test_background_update.py @@ -2,44 +2,37 @@ from mock import Mock from twisted.internet import defer +from synapse.storage.background_updates import BackgroundUpdater + from tests import unittest -from tests.utils import setup_test_homeserver -class BackgroundUpdateTestCase(unittest.TestCase): - @defer.inlineCallbacks - def setUp(self): - hs = yield setup_test_homeserver(self.addCleanup) - self.store = hs.get_datastore() - self.clock = hs.get_clock() +class BackgroundUpdateTestCase(unittest.HomeserverTestCase): + def prepare(self, reactor, clock, homeserver): + self.updates = self.hs.get_datastore().db.updates # type: BackgroundUpdater + # the base test class should have run the real bg updates for us + self.assertTrue(self.updates.has_completed_background_updates()) self.update_handler = Mock() - - yield self.store.db.updates.register_background_update_handler( + self.updates.register_background_update_handler( "test_update", self.update_handler ) - # run the real background updates, to get them out the way - # (perhaps we should run them as part of the test HS setup, since we - # run all of the other schema setup stuff there?) - while True: - res = yield self.store.db.updates.do_next_background_update(1000) - if res is None: - break - - @defer.inlineCallbacks def test_do_background_update(self): - desired_count = 1000 + # the time we claim each update takes duration_ms = 42 + # the target runtime for each bg update + target_background_update_duration_ms = 50000 + # first step: make a bit of progress @defer.inlineCallbacks def update(progress, count): - self.clock.advance_time_msec(count * duration_ms) + yield self.clock.sleep((count * duration_ms) / 1000) progress = {"my_key": progress["my_key"] + 1} - yield self.store.db.runInteraction( + yield self.hs.get_datastore().db.runInteraction( "update_progress", - self.store.db.updates._background_update_progress_txn, + self.updates._background_update_progress_txn, "test_update", progress, ) @@ -47,37 +40,46 @@ class BackgroundUpdateTestCase(unittest.TestCase): self.update_handler.side_effect = update - yield self.store.db.updates.start_background_update( - "test_update", {"my_key": 1} + self.get_success( + self.updates.start_background_update("test_update", {"my_key": 1}) ) - self.update_handler.reset_mock() - result = yield self.store.db.updates.do_next_background_update( - duration_ms * desired_count + res = self.get_success( + self.updates.do_next_background_update( + target_background_update_duration_ms + ), + by=0.1, ) - self.assertIsNotNone(result) + self.assertIsNotNone(res) + + # on the first call, we should get run with the default background update size self.update_handler.assert_called_once_with( - {"my_key": 1}, self.store.db.updates.DEFAULT_BACKGROUND_BATCH_SIZE + {"my_key": 1}, self.updates.DEFAULT_BACKGROUND_BATCH_SIZE ) # second step: complete the update + # we should now get run with a much bigger number of items to update @defer.inlineCallbacks def update(progress, count): - yield self.store.db.updates._end_background_update("test_update") + self.assertEqual(progress, {"my_key": 2}) + self.assertAlmostEqual( + count, target_background_update_duration_ms / duration_ms, places=0, + ) + yield self.updates._end_background_update("test_update") return count self.update_handler.side_effect = update self.update_handler.reset_mock() - result = yield self.store.db.updates.do_next_background_update( - duration_ms * desired_count + result = self.get_success( + self.updates.do_next_background_update(target_background_update_duration_ms) ) self.assertIsNotNone(result) - self.update_handler.assert_called_once_with({"my_key": 2}, desired_count) + self.update_handler.assert_called_once() # third step: we don't expect to be called any more self.update_handler.reset_mock() - result = yield self.store.db.updates.do_next_background_update( - duration_ms * desired_count + result = self.get_success( + self.updates.do_next_background_update(target_background_update_duration_ms) ) self.assertIsNone(result) self.assertFalse(self.update_handler.called) diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index eadfb90a22..a331517f4d 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -60,21 +60,14 @@ class EventFederationWorkerStoreTestCase(tests.unittest.TestCase): (event_id, bytearray(b"ffff")), ) - for i in range(0, 11): + for i in range(0, 20): yield self.store.db.runInteraction("insert", insert_event, i) - # this should get the last five and five others + # this should get the last ten r = yield self.store.get_prev_events_for_room(room_id) self.assertEqual(10, len(r)) - for i in range(0, 5): - el = r[i] - depth = el[2] - self.assertEqual(10 - i, depth) - - for i in range(5, 5): - el = r[i] - depth = el[2] - self.assertLessEqual(5, depth) + for i in range(0, 10): + self.assertEqual("$event_%i:local" % (19 - i), r[i]) @defer.inlineCallbacks def test_get_rooms_with_many_extremities(self): diff --git a/tests/unittest.py b/tests/unittest.py index b30b7d1718..ddcd4becfe 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -36,7 +36,7 @@ from synapse.config.homeserver import HomeServerConfig from synapse.config.ratelimiting import FederationRateLimitConfig from synapse.federation.transport import server as federation_server from synapse.http.server import JsonResource -from synapse.http.site import SynapseRequest +from synapse.http.site import SynapseRequest, SynapseSite from synapse.logging.context import LoggingContext from synapse.server import HomeServer from synapse.types import Requester, UserID, create_requester @@ -210,6 +210,15 @@ class HomeserverTestCase(TestCase): # Register the resources self.resource = self.create_test_json_resource() + # create a site to wrap the resource. + self.site = SynapseSite( + logger_name="synapse.access.http.fake", + site_tag="test", + config={}, + resource=self.resource, + server_version_string="1", + ) + from tests.rest.client.v1.utils import RestHelper self.helper = RestHelper(self.hs, self.resource, getattr(self, "user_id", None)) @@ -522,10 +531,6 @@ class HomeserverTestCase(TestCase): secrets = self.hs.get_secrets() requester = Requester(user, None, False, None, None) - prev_events_and_hashes = None - if prev_event_ids: - prev_events_and_hashes = [[p, {}, 0] for p in prev_event_ids] - event, context = self.get_success( event_creator.create_event( requester, @@ -535,7 +540,7 @@ class HomeserverTestCase(TestCase): "sender": user.to_string(), "content": {"body": secrets.token_hex(), "msgtype": "m.text"}, }, - prev_events_and_hashes=prev_events_and_hashes, + prev_event_ids=prev_event_ids, ) ) diff --git a/tox.ini b/tox.ini index 1d6428f64f..0ab6d5666b 100644 --- a/tox.ini +++ b/tox.ini @@ -182,7 +182,6 @@ commands = mypy \ synapse/logging/ \ synapse/module_api \ synapse/rest/consent \ - synapse/rest/media/v0 \ synapse/rest/saml2 \ synapse/spam_checker_api \ synapse/storage/engines \ |