summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--UPGRADE.rst16
-rw-r--r--changelog.d/8559.misc1
-rw-r--r--changelog.d/8607.feature1
-rw-r--r--changelog.d/8616.misc1
-rw-r--r--changelog.d/8676.bugfix1
-rw-r--r--changelog.d/8679.misc1
-rw-r--r--changelog.d/8680.misc1
-rw-r--r--changelog.d/8684.misc1
-rw-r--r--changelog.d/8685.feature1
-rw-r--r--changelog.d/8689.feature1
-rw-r--r--docs/sample_log_config.yaml4
-rw-r--r--docs/structured_logging.md164
-rwxr-xr-xscripts-dev/lint.sh2
-rw-r--r--scripts-dev/mypy_synapse_plugin.py38
-rw-r--r--synapse/api/auth.py113
-rw-r--r--synapse/appservice/__init__.py14
-rw-r--r--synapse/config/logger.py96
-rw-r--r--synapse/events/__init__.py4
-rw-r--r--synapse/federation/transport/server.py2
-rw-r--r--synapse/handlers/account_validity.py2
-rw-r--r--synapse/handlers/auth.py8
-rw-r--r--synapse/handlers/presence.py12
-rw-r--r--synapse/handlers/profile.py8
-rw-r--r--synapse/handlers/register.py16
-rw-r--r--synapse/handlers/room.py31
-rw-r--r--synapse/handlers/room_member.py8
-rw-r--r--synapse/http/site.py30
-rw-r--r--synapse/logging/__init__.py20
-rw-r--r--synapse/logging/_remote.py122
-rw-r--r--synapse/logging/_structured.py329
-rw-r--r--synapse/logging/_terse_json.py192
-rw-r--r--synapse/logging/filter.py33
-rw-r--r--synapse/replication/http/membership.py6
-rw-r--r--synapse/replication/http/send_event.py3
-rw-r--r--synapse/storage/databases/main/registration.py52
-rw-r--r--synapse/storage/databases/main/schema/delta/58/22puppet_token.sql17
-rw-r--r--synapse/types.py33
-rw-r--r--synmark/__init__.py39
-rw-r--r--synmark/__main__.py6
-rw-r--r--synmark/suites/logging.py60
-rw-r--r--tests/api/test_auth.py29
-rw-r--r--tests/api/test_ratelimiting.py4
-rw-r--r--tests/appservice/test_appservice.py1
-rw-r--r--tests/handlers/test_device.py2
-rw-r--r--tests/handlers/test_message.py2
-rw-r--r--tests/logging/__init__.py34
-rw-r--r--tests/logging/test_remote_handler.py169
-rw-r--r--tests/logging/test_structured.py214
-rw-r--r--tests/logging/test_terse_json.py253
-rw-r--r--tests/push/test_email.py2
-rw-r--r--tests/push/test_http.py10
-rw-r--r--tests/replication/_base.py11
-rw-r--r--tests/replication/test_pusher_shard.py2
-rw-r--r--tests/rest/admin/test_user.py2
-rw-r--r--tests/rest/client/v2_alpha/test_register.py1
-rw-r--r--tests/server.py4
-rw-r--r--tests/storage/test_registration.py10
57 files changed, 1031 insertions, 1208 deletions
diff --git a/UPGRADE.rst b/UPGRADE.rst

index 5a68312217..960c2aeb2b 100644 --- a/UPGRADE.rst +++ b/UPGRADE.rst
@@ -75,6 +75,22 @@ for example: wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb +Upgrading to v1.23.0 +==================== + +Structured logging configuration breaking changes +------------------------------------------------- + +This release deprecates use of the ``structured: true`` logging configuration for +structured logging. If your logging configuration contains ``structured: true`` +then it should be modified based on the `structured logging documentation +<https://github.com/matrix-org/synapse/blob/master/docs/structured_logging.md>`_. + +The ``structured`` and ``drains`` logging options are now deprecated and should +be replaced by standard logging configuration of ``handlers`` and ``formatters`. + +A future will release of Synapse will make using ``structured: true`` an error. + Upgrading to v1.22.0 ==================== diff --git a/changelog.d/8559.misc b/changelog.d/8559.misc new file mode 100644
index 0000000000..d7bd00964e --- /dev/null +++ b/changelog.d/8559.misc
@@ -0,0 +1 @@ +Optimise `/createRoom` with multiple invited users. diff --git a/changelog.d/8607.feature b/changelog.d/8607.feature new file mode 100644
index 0000000000..fef1eccb92 --- /dev/null +++ b/changelog.d/8607.feature
@@ -0,0 +1 @@ +Support generating structured logs via the standard logging configuration. diff --git a/changelog.d/8616.misc b/changelog.d/8616.misc new file mode 100644
index 0000000000..385b14063e --- /dev/null +++ b/changelog.d/8616.misc
@@ -0,0 +1 @@ +Change schema to support access tokens belonging to one user but granting access to another. diff --git a/changelog.d/8676.bugfix b/changelog.d/8676.bugfix new file mode 100644
index 0000000000..df16c72761 --- /dev/null +++ b/changelog.d/8676.bugfix
@@ -0,0 +1 @@ +Fix a bug where an appservice may not be forwarded events for a room it was recently invited to. Broken in v1.22.0. diff --git a/changelog.d/8679.misc b/changelog.d/8679.misc new file mode 100644
index 0000000000..662eced4cf --- /dev/null +++ b/changelog.d/8679.misc
@@ -0,0 +1 @@ +Clarify representation of events in logfiles. diff --git a/changelog.d/8680.misc b/changelog.d/8680.misc new file mode 100644
index 0000000000..2ca2975464 --- /dev/null +++ b/changelog.d/8680.misc
@@ -0,0 +1 @@ +Don't require `hiredis` package to be installed to run unit tests. diff --git a/changelog.d/8684.misc b/changelog.d/8684.misc new file mode 100644
index 0000000000..1d23d42926 --- /dev/null +++ b/changelog.d/8684.misc
@@ -0,0 +1 @@ +Fix typing info on cache call signature to accept `on_invalidate`. diff --git a/changelog.d/8685.feature b/changelog.d/8685.feature new file mode 100644
index 0000000000..fef1eccb92 --- /dev/null +++ b/changelog.d/8685.feature
@@ -0,0 +1 @@ +Support generating structured logs via the standard logging configuration. diff --git a/changelog.d/8689.feature b/changelog.d/8689.feature new file mode 100644
index 0000000000..ed8d926964 --- /dev/null +++ b/changelog.d/8689.feature
@@ -0,0 +1 @@ +Add an admin APIs to allow server admins to list users' pushers. Contributed by @dklimpel. \ No newline at end of file diff --git a/docs/sample_log_config.yaml b/docs/sample_log_config.yaml
index e26657f9fe..ff3c747180 100644 --- a/docs/sample_log_config.yaml +++ b/docs/sample_log_config.yaml
@@ -3,7 +3,11 @@ # This is a YAML file containing a standard Python logging configuration # dictionary. See [1] for details on the valid settings. # +# Synapse also supports structured logging for machine readable logs which can +# be ingested by ELK stacks. See [2] for details. +# # [1]: https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema +# [2]: https://github.com/matrix-org/synapse/blob/master/docs/structured_logging.md version: 1 diff --git a/docs/structured_logging.md b/docs/structured_logging.md
index decec9b8fa..b1281667e0 100644 --- a/docs/structured_logging.md +++ b/docs/structured_logging.md
@@ -1,83 +1,161 @@ # Structured Logging -A structured logging system can be useful when your logs are destined for a machine to parse and process. By maintaining its machine-readable characteristics, it enables more efficient searching and aggregations when consumed by software such as the "ELK stack". +A structured logging system can be useful when your logs are destined for a +machine to parse and process. By maintaining its machine-readable characteristics, +it enables more efficient searching and aggregations when consumed by software +such as the "ELK stack". -Synapse's structured logging system is configured via the file that Synapse's `log_config` config option points to. The file must be YAML and contain `structured: true`. It must contain a list of "drains" (places where logs go to). +Synapse's structured logging system is configured via the file that Synapse's +`log_config` config option points to. The file should include a formatter which +uses the `synapse.logging.TerseJsonFormatter` class included with Synapse and a +handler which uses the above formatter. + +There is also a `synapse.logging.JsonFormatter` option which does not include +a timestamp in the resulting JSON. This is useful if the log ingester adds its +own timestamp. A structured logging configuration looks similar to the following: ```yaml -structured: true +version: 1 + +formatters: + structured: + class: synapse.logging.TerseJsonFormatter + +handlers: + file: + class: logging.handlers.TimedRotatingFileHandler + formatter: structured + filename: /path/to/my/logs/homeserver.log + when: midnight + backupCount: 3 # Does not include the current log file. + encoding: utf8 loggers: synapse: level: INFO + handlers: [remote] synapse.storage.SQL: level: WARNING - -drains: - console: - type: console - location: stdout - file: - type: file_json - location: homeserver.log ``` -The above logging config will set Synapse as 'INFO' logging level by default, with the SQL layer at 'WARNING', and will have two logging drains (to the console and to a file, stored as JSON). - -## Drain Types +The above logging config will set Synapse as 'INFO' logging level by default, +with the SQL layer at 'WARNING', and will log to a file, stored as JSON. -Drain types can be specified by the `type` key. +It is also possible to figure Synapse to log to a remote endpoint by using the +`synapse.logging.RemoteHandler` class included with Synapse. It takes the +following arguments: -### `console` +- `host`: Hostname or IP address of the log aggregator. +- `port`: Numerical port to contact on the host. +- `maximum_buffer`: (Optional, defaults to 1000) The maximum buffer size to allow. -Outputs human-readable logs to the console. +A remote structured logging configuration looks similar to the following: -Arguments: +```yaml +version: 1 -- `location`: Either `stdout` or `stderr`. +formatters: + structured: + class: synapse.logging.TerseJsonFormatter -### `console_json` +handlers: + remote: + class: synapse.logging.RemoteHandler + formatter: structured + host: 10.1.2.3 + port: 9999 -Outputs machine-readable JSON logs to the console. +loggers: + synapse: + level: INFO + handlers: [remote] + synapse.storage.SQL: + level: WARNING +``` -Arguments: +The above logging config will set Synapse as 'INFO' logging level by default, +with the SQL layer at 'WARNING', and will log JSON formatted messages to a +remote endpoint at 10.1.2.3:9999. -- `location`: Either `stdout` or `stderr`. +## Upgrading from legacy structured logging configuration -### `console_json_terse` +Versions of Synapse prior to v1.23.0 included a custom structured logging +configuration which is deprecated. It used a `structured: true` flag and +configured `drains` instead of ``handlers`` and `formatters`. -Outputs machine-readable JSON logs to the console, separated by newlines. This -format is not designed to be read and re-formatted into human-readable text, but -is optimal for a logging aggregation system. +Synapse currently automatically converts the old configuration to the new +configuration, but this will be removed in a future version of Synapse. The +following reference can be used to update your configuration. Based on the drain +`type`, we can pick a new handler: -Arguments: +1. For a type of `console`, `console_json`, or `console_json_terse`: a handler + with a class of `logging.StreamHandler` and a `stream` of `ext://sys.stdout` + or `ext://sys.stderr` should be used. +2. For a type of `file` or `file_json`: a handler of `logging.FileHandler` with + a location of the file path should be used. +3. For a type of `network_json_terse`: a handler of `synapse.logging.RemoteHandler` + with the host and port should be used. -- `location`: Either `stdout` or `stderr`. +Then based on the drain `type` we can pick a new formatter: -### `file` +1. For a type of `console` or `file` no formatter is necessary. +2. For a type of `console_json` or `file_json`: a formatter of + `synapse.logging.JsonFormatter` should be used. +3. For a type of `console_json_terse` or `network_json_terse`: a formatter of + `synapse.logging.TerseJsonFormatter` should be used. -Outputs human-readable logs to a file. +For each new handler and formatter they should be added to the logging configuration +and then assigned to either a logger or the root logger. -Arguments: +An example legacy configuration: -- `location`: An absolute path to the file to log to. +```yaml +structured: true -### `file_json` +loggers: + synapse: + level: INFO + synapse.storage.SQL: + level: WARNING -Outputs machine-readable logs to a file. +drains: + console: + type: console + location: stdout + file: + type: file_json + location: homeserver.log +``` -Arguments: +Would be converted into a new configuration: -- `location`: An absolute path to the file to log to. +```yaml +version: 1 -### `network_json_terse` +formatters: + json: + class: synapse.logging.JsonFormatter -Delivers machine-readable JSON logs to a log aggregator over TCP. This is -compatible with LogStash's TCP input with the codec set to `json_lines`. +handlers: + console: + class: logging.StreamHandler + location: ext://sys.stdout + file: + class: logging.FileHandler + formatter: json + filename: homeserver.log -Arguments: +loggers: + synapse: + level: INFO + handlers: [console, file] + synapse.storage.SQL: + level: WARNING +``` -- `host`: Hostname or IP address of the log aggregator. -- `port`: Numerical port to contact on the host. \ No newline at end of file +The new logging configuration is a bit more verbose, but significantly more +flexible. It allows for configuration that were not previously possible, such as +sending plain logs over the network, or using different handlers for different +modules. diff --git a/scripts-dev/lint.sh b/scripts-dev/lint.sh
index f141805519..f328ab57d5 100755 --- a/scripts-dev/lint.sh +++ b/scripts-dev/lint.sh
@@ -80,7 +80,7 @@ else # then lint everything! if [[ -z ${files+x} ]]; then # Lint all source code files and directories - files=("synapse" "tests" "scripts-dev" "scripts" "contrib" "synctl" "setup.py") + files=("synapse" "tests" "scripts-dev" "scripts" "contrib" "synctl" "setup.py" "synmark") fi fi diff --git a/scripts-dev/mypy_synapse_plugin.py b/scripts-dev/mypy_synapse_plugin.py
index a5b88731f1..5882f3a0b0 100644 --- a/scripts-dev/mypy_synapse_plugin.py +++ b/scripts-dev/mypy_synapse_plugin.py
@@ -19,9 +19,10 @@ can crop up, e.g the cache descriptors. from typing import Callable, Optional +from mypy.nodes import ARG_NAMED_OPT from mypy.plugin import MethodSigContext, Plugin from mypy.typeops import bind_self -from mypy.types import CallableType +from mypy.types import CallableType, NoneType class SynapsePlugin(Plugin): @@ -40,8 +41,9 @@ def cached_function_method_signature(ctx: MethodSigContext) -> CallableType: It already has *almost* the correct signature, except: - 1. the `self` argument needs to be marked as "bound"; and - 2. any `cache_context` argument should be removed. + 1. the `self` argument needs to be marked as "bound"; + 2. any `cache_context` argument should be removed; + 3. an optional keyword argument `on_invalidated` should be added. """ # First we mark this as a bound function signature. @@ -58,19 +60,33 @@ def cached_function_method_signature(ctx: MethodSigContext) -> CallableType: context_arg_index = idx break + arg_types = list(signature.arg_types) + arg_names = list(signature.arg_names) + arg_kinds = list(signature.arg_kinds) + if context_arg_index: - arg_types = list(signature.arg_types) arg_types.pop(context_arg_index) - - arg_names = list(signature.arg_names) arg_names.pop(context_arg_index) - - arg_kinds = list(signature.arg_kinds) arg_kinds.pop(context_arg_index) - signature = signature.copy_modified( - arg_types=arg_types, arg_names=arg_names, arg_kinds=arg_kinds, - ) + # Third, we add an optional "on_invalidate" argument. + # + # This is a callable which accepts no input and returns nothing. + calltyp = CallableType( + arg_types=[], + arg_kinds=[], + arg_names=[], + ret_type=NoneType(), + fallback=ctx.api.named_generic_type("builtins.function", []), + ) + + arg_types.append(calltyp) + arg_names.append("on_invalidate") + arg_kinds.append(ARG_NAMED_OPT) # Arg is an optional kwarg. + + signature = signature.copy_modified( + arg_types=arg_types, arg_names=arg_names, arg_kinds=arg_kinds, + ) return signature diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index b23f8cc227..bc73982dc2 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py
@@ -33,6 +33,7 @@ from synapse.api.errors import ( from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import EventBase from synapse.logging import opentracing as opentracing +from synapse.storage.databases.main.registration import TokenLookupResult from synapse.types import StateMap, UserID from synapse.util.caches.lrucache import LruCache from synapse.util.metrics import Measure @@ -190,10 +191,6 @@ class Auth: user_id, app_service = self._get_appservice_user_id(request) if user_id: - request.authenticated_entity = user_id - opentracing.set_tag("authenticated_entity", user_id) - opentracing.set_tag("appservice_id", app_service.id) - if ip_addr and self._track_appservice_user_ips: await self.store.insert_client_ip( user_id=user_id, @@ -203,31 +200,38 @@ class Auth: device_id="dummy-device", # stubbed ) - return synapse.types.create_requester(user_id, app_service=app_service) + requester = synapse.types.create_requester( + user_id, app_service=app_service + ) + + request.requester = user_id + opentracing.set_tag("authenticated_entity", user_id) + opentracing.set_tag("user_id", user_id) + opentracing.set_tag("appservice_id", app_service.id) + + return requester user_info = await self.get_user_by_access_token( access_token, rights, allow_expired=allow_expired ) - user = user_info["user"] - token_id = user_info["token_id"] - is_guest = user_info["is_guest"] - shadow_banned = user_info["shadow_banned"] + token_id = user_info.token_id + is_guest = user_info.is_guest + shadow_banned = user_info.shadow_banned # Deny the request if the user account has expired. if self._account_validity_enabled and not allow_expired: - user_id = user.to_string() - if await self.store.is_account_expired(user_id, self.clock.time_msec()): + if await self.store.is_account_expired( + user_info.user_id, self.clock.time_msec() + ): raise AuthError( 403, "User account has expired", errcode=Codes.EXPIRED_ACCOUNT ) - # device_id may not be present if get_user_by_access_token has been - # stubbed out. - device_id = user_info.get("device_id") + device_id = user_info.device_id - if user and access_token and ip_addr: + if access_token and ip_addr: await self.store.insert_client_ip( - user_id=user.to_string(), + user_id=user_info.token_owner, access_token=access_token, ip=ip_addr, user_agent=user_agent, @@ -241,19 +245,23 @@ class Auth: errcode=Codes.GUEST_ACCESS_FORBIDDEN, ) - request.authenticated_entity = user.to_string() - opentracing.set_tag("authenticated_entity", user.to_string()) - if device_id: - opentracing.set_tag("device_id", device_id) - - return synapse.types.create_requester( - user, + requester = synapse.types.create_requester( + user_info.user_id, token_id, is_guest, shadow_banned, device_id, app_service=app_service, + authenticated_entity=user_info.token_owner, ) + + request.requester = requester + opentracing.set_tag("authenticated_entity", user_info.token_owner) + opentracing.set_tag("user_id", user_info.user_id) + if device_id: + opentracing.set_tag("device_id", device_id) + + return requester except KeyError: raise MissingClientTokenError() @@ -289,7 +297,7 @@ class Auth: async def get_user_by_access_token( self, token: str, rights: str = "access", allow_expired: bool = False, - ) -> dict: + ) -> TokenLookupResult: """ Validate access token and get user_id from it Args: @@ -298,13 +306,7 @@ class Auth: allow this allow_expired: If False, raises an InvalidClientTokenError if the token is expired - Returns: - dict that includes: - `user` (UserID) - `is_guest` (bool) - `shadow_banned` (bool) - `token_id` (int|None): access token id. May be None if guest - `device_id` (str|None): device corresponding to access token + Raises: InvalidClientTokenError if a user by that token exists, but the token is expired @@ -314,9 +316,9 @@ class Auth: if rights == "access": # first look in the database - r = await self._look_up_user_by_access_token(token) + r = await self.store.get_user_by_access_token(token) if r: - valid_until_ms = r["valid_until_ms"] + valid_until_ms = r.valid_until_ms if ( not allow_expired and valid_until_ms is not None @@ -333,7 +335,6 @@ class Auth: # otherwise it needs to be a valid macaroon try: user_id, guest = self._parse_and_validate_macaroon(token, rights) - user = UserID.from_string(user_id) if rights == "access": if not guest: @@ -359,23 +360,17 @@ class Auth: raise InvalidClientTokenError( "Guest access token used for regular user" ) - ret = { - "user": user, - "is_guest": True, - "shadow_banned": False, - "token_id": None, + + ret = TokenLookupResult( + user_id=user_id, + is_guest=True, # all guests get the same device id - "device_id": GUEST_DEVICE_ID, - } + device_id=GUEST_DEVICE_ID, + ) elif rights == "delete_pusher": # We don't store these tokens in the database - ret = { - "user": user, - "is_guest": False, - "shadow_banned": False, - "token_id": None, - "device_id": None, - } + + ret = TokenLookupResult(user_id=user_id, is_guest=False) else: raise RuntimeError("Unknown rights setting %s", rights) return ret @@ -484,31 +479,15 @@ class Auth: now = self.hs.get_clock().time_msec() return now < expiry - async def _look_up_user_by_access_token(self, token): - ret = await self.store.get_user_by_access_token(token) - if not ret: - return None - - # we use ret.get() below because *lots* of unit tests stub out - # get_user_by_access_token in a way where it only returns a couple of - # the fields. - user_info = { - "user": UserID.from_string(ret.get("name")), - "token_id": ret.get("token_id", None), - "is_guest": False, - "shadow_banned": ret.get("shadow_banned"), - "device_id": ret.get("device_id"), - "valid_until_ms": ret.get("valid_until_ms"), - } - return user_info - def get_appservice_by_req(self, request): token = self.get_access_token_from_request(request) service = self.store.get_app_service_by_token(token) if not service: logger.warning("Unrecognised appservice access token.") raise InvalidClientTokenError() - request.authenticated_entity = service.sender + request.requester = synapse.types.create_requester( + service.sender, app_service=service + ) return service async def is_server_admin(self, user: UserID) -> bool: diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 3862d9c08f..3944780a42 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py
@@ -19,7 +19,7 @@ from typing import TYPE_CHECKING, Iterable, List, Match, Optional from synapse.api.constants import EventTypes from synapse.events import EventBase from synapse.types import GroupID, JsonDict, UserID, get_domain_from_id -from synapse.util.caches.descriptors import cached +from synapse.util.caches.descriptors import _CacheContext, cached if TYPE_CHECKING: from synapse.appservice.api import ApplicationServiceApi @@ -52,11 +52,11 @@ class ApplicationService: self, token, hostname, + id, + sender, url=None, namespaces=None, hs_token=None, - sender=None, - id=None, protocols=None, rate_limited=True, ip_range_whitelist=None, @@ -164,9 +164,9 @@ class ApplicationService: does_match = await self.matches_user_in_member_list(event.room_id, store) return does_match - @cached(num_args=1) + @cached(num_args=1, cache_context=True) async def matches_user_in_member_list( - self, room_id: str, store: "DataStore" + self, room_id: str, store: "DataStore", cache_context: _CacheContext, ) -> bool: """Check if this service is interested a room based upon it's membership @@ -177,7 +177,9 @@ class ApplicationService: Returns: True if this service would like to know about this room. """ - member_list = await store.get_users_in_room(room_id) + member_list = await store.get_users_in_room( + room_id, on_invalidate=cache_context.invalidate + ) # check joined member events for user_id in member_list: diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index 6b7be28aee..d4e887a3e0 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py
@@ -23,7 +23,6 @@ from string import Template import yaml from twisted.logger import ( - ILogObserver, LogBeginner, STDLibLogObserver, eventAsText, @@ -32,11 +31,9 @@ from twisted.logger import ( import synapse from synapse.app import _base as appbase -from synapse.logging._structured import ( - reload_structured_logging, - setup_structured_logging, -) +from synapse.logging._structured import setup_structured_logging from synapse.logging.context import LoggingContextFilter +from synapse.logging.filter import MetadataFilter from synapse.util.versionstring import get_version_string from ._base import Config, ConfigError @@ -48,7 +45,11 @@ DEFAULT_LOG_CONFIG = Template( # This is a YAML file containing a standard Python logging configuration # dictionary. See [1] for details on the valid settings. # +# Synapse also supports structured logging for machine readable logs which can +# be ingested by ELK stacks. See [2] for details. +# # [1]: https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema +# [2]: https://github.com/matrix-org/synapse/blob/master/docs/structured_logging.md version: 1 @@ -176,11 +177,11 @@ class LoggingConfig(Config): log_config_file.write(DEFAULT_LOG_CONFIG.substitute(log_file=log_file)) -def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner): +def _setup_stdlib_logging(config, log_config_path, logBeginner: LogBeginner) -> None: """ - Set up Python stdlib logging. + Set up Python standard library logging. """ - if log_config is None: + if log_config_path is None: log_format = ( "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s" " - %(message)s" @@ -196,7 +197,8 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner): handler.setFormatter(formatter) logger.addHandler(handler) else: - logging.config.dictConfig(log_config) + # Load the logging configuration. + _load_logging_config(log_config_path) # We add a log record factory that runs all messages through the # LoggingContextFilter so that we get the context *at the time we log* @@ -204,12 +206,14 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner): # filter options, but care must when using e.g. MemoryHandler to buffer # writes. - log_filter = LoggingContextFilter(request="") + log_context_filter = LoggingContextFilter(request="") + log_metadata_filter = MetadataFilter({"server_name": config.server_name}) old_factory = logging.getLogRecordFactory() def factory(*args, **kwargs): record = old_factory(*args, **kwargs) - log_filter.filter(record) + log_context_filter.filter(record) + log_metadata_filter.filter(record) return record logging.setLogRecordFactory(factory) @@ -255,21 +259,40 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner): if not config.no_redirect_stdio: print("Redirected stdout/stderr to logs") - return observer - -def _reload_stdlib_logging(*args, log_config=None): - logger = logging.getLogger("") +def _load_logging_config(log_config_path: str) -> None: + """ + Configure logging from a log config path. + """ + with open(log_config_path, "rb") as f: + log_config = yaml.safe_load(f.read()) if not log_config: - logger.warning("Reloaded a blank config?") + logging.warning("Loaded a blank logging config?") + + # If the old structured logging configuration is being used, convert it to + # the new style configuration. + if "structured" in log_config and log_config.get("structured"): + log_config = setup_structured_logging(log_config) logging.config.dictConfig(log_config) +def _reload_logging_config(log_config_path): + """ + Reload the log configuration from the file and apply it. + """ + # If no log config path was given, it cannot be reloaded. + if log_config_path is None: + return + + _load_logging_config(log_config_path) + logging.info("Reloaded log config from %s due to SIGHUP", log_config_path) + + def setup_logging( hs, config, use_worker_options=False, logBeginner: LogBeginner = globalLogBeginner -) -> ILogObserver: +) -> None: """ Set up the logging subsystem. @@ -282,41 +305,18 @@ def setup_logging( logBeginner: The Twisted logBeginner to use. - Returns: - The "root" Twisted Logger observer, suitable for sending logs to from a - Logger instance. """ - log_config = config.worker_log_config if use_worker_options else config.log_config - - def read_config(*args, callback=None): - if log_config is None: - return None - - with open(log_config, "rb") as f: - log_config_body = yaml.safe_load(f.read()) - - if callback: - callback(log_config=log_config_body) - logging.info("Reloaded log config from %s due to SIGHUP", log_config) - - return log_config_body + log_config_path = ( + config.worker_log_config if use_worker_options else config.log_config + ) - log_config_body = read_config() + # Perform one-time logging configuration. + _setup_stdlib_logging(config, log_config_path, logBeginner=logBeginner) + # Add a SIGHUP handler to reload the logging configuration, if one is available. + appbase.register_sighup(_reload_logging_config, log_config_path) - if log_config_body and log_config_body.get("structured") is True: - logger = setup_structured_logging( - hs, config, log_config_body, logBeginner=logBeginner - ) - appbase.register_sighup(read_config, callback=reload_structured_logging) - else: - logger = _setup_stdlib_logging(config, log_config_body, logBeginner=logBeginner) - appbase.register_sighup(read_config, callback=_reload_stdlib_logging) - - # make sure that the first thing we log is a thing we can grep backwards - # for + # Log immediately so we can grep backwards. logging.warning("***** STARTING SERVER *****") logging.warning("Server %s version %s", sys.argv[0], get_version_string(synapse)) logging.info("Server hostname: %s", config.server_name) logging.info("Instance name: %s", hs.get_instance_name()) - - return logger diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index e203206865..8028663fa8 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py
@@ -368,7 +368,7 @@ class FrozenEvent(EventBase): return self.__repr__() def __repr__(self): - return "<FrozenEvent event_id='%s', type='%s', state_key='%s'>" % ( + return "<FrozenEvent event_id=%r, type=%r, state_key=%r>" % ( self.get("event_id", None), self.get("type", None), self.get("state_key", None), @@ -451,7 +451,7 @@ class FrozenEventV2(EventBase): return self.__repr__() def __repr__(self): - return "<%s event_id='%s', type='%s', state_key='%s'>" % ( + return "<%s event_id=%r, type=%r, state_key=%r>" % ( self.__class__.__name__, self.event_id, self.get("type", None), diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 7b4baddbf8..1f74c504a3 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py
@@ -155,7 +155,7 @@ class Authenticator: ) logger.debug("Request from %s", origin) - request.authenticated_entity = origin + request.requester = origin # If we get a valid signed request from the other side, its probably # alive diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py
index 2ed6d7d248..ce97fa70d7 100644 --- a/synapse/handlers/account_validity.py +++ b/synapse/handlers/account_validity.py
@@ -299,7 +299,7 @@ class AccountValidityHandler: self, user_id: str, expiration_ts: Optional[int] = None, - email_sent: Optional[bool] = False, + email_sent: bool = False, renewal_token: Optional[str] = None, ) -> int: """Renews the account attached to a given user by pushing back the diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 276594f3d9..ff103cbb92 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py
@@ -991,17 +991,17 @@ class AuthHandler(BaseHandler): # This might return an awaitable, if it does block the log out # until it completes. result = provider.on_logged_out( - user_id=str(user_info["user"]), - device_id=user_info["device_id"], + user_id=user_info.user_id, + device_id=user_info.device_id, access_token=access_token, ) if inspect.isawaitable(result): await result # delete pushers associated with this access token - if user_info["token_id"] is not None: + if user_info.token_id is not None: await self.hs.get_pusherpool().remove_pushers_by_access_token( - str(user_info["user"]), (user_info["token_id"],) + user_info.user_id, (user_info.token_id,) ) async def delete_access_tokens_for_user( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 49a00eed9c..8e014c9bb5 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py
@@ -48,7 +48,7 @@ from synapse.util.wheel_timer import WheelTimer MYPY = False if MYPY: - import synapse.server + from synapse.server import HomeServer logger = logging.getLogger(__name__) @@ -101,7 +101,7 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER class BasePresenceHandler(abc.ABC): """Parts of the PresenceHandler that are shared between workers and master""" - def __init__(self, hs: "synapse.server.HomeServer"): + def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() self.store = hs.get_datastore() @@ -199,7 +199,7 @@ class BasePresenceHandler(abc.ABC): class PresenceHandler(BasePresenceHandler): - def __init__(self, hs: "synapse.server.HomeServer"): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.hs = hs self.is_mine_id = hs.is_mine_id @@ -1011,7 +1011,7 @@ def format_user_presence_state(state, now, include_user_id=True): class PresenceEventSource: - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): # We can't call get_presence_handler here because there's a cycle: # # Presence -> Notifier -> PresenceEventSource -> Presence @@ -1071,12 +1071,14 @@ class PresenceEventSource: users_interested_in = await self._get_interested_in(user, explicit_room_id) - user_ids_changed = set() + user_ids_changed = set() # type: Collection[str] changed = None if from_key: changed = stream_change_cache.get_all_entities_changed(from_key) if changed is not None and len(changed) < 500: + assert isinstance(user_ids_changed, set) + # For small deltas, its quicker to get all changes and then # work out if we share a room or they're in our presence list get_updates_counter.labels("stream").inc() diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index c9a3a4a592..ab468c7d92 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py
@@ -274,7 +274,7 @@ class ProfileHandler(BaseHandler): if not self.hs.is_mine(target_user): raise SynapseError(400, "User is not hosted on this homeserver") - if not by_admin and requester and target_user != requester.user: + if not by_admin and target_user != requester.user: raise AuthError(400, "Cannot set another user's displayname") if not by_admin and not self.hs.config.enable_set_displayname: @@ -306,12 +306,6 @@ class ProfileHandler(BaseHandler): else: new_batchnum = None - # If the admin changes the display name of a user, the requesting user cannot send - # the join event to update the displayname in the rooms. - # This must be done by the target user himself. - if by_admin: - requester = create_requester(target_user) - await self.store.set_profile_displayname( target_user.localpart, displayname_to_set, new_batchnum ) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index ae6a0b76d1..59ad075a4b 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py
@@ -130,7 +130,10 @@ class RegistrationHandler(BaseHandler): # Retrieve guest user information from provided access token user_data = await self.auth.get_user_by_access_token(guest_access_token) - if not user_data["is_guest"] or user_data["user"].localpart != localpart: + if ( + not user_data.is_guest + or UserID.from_string(user_data.user_id).localpart != localpart + ): raise AuthError( 403, "Cannot register taken user ID without valid guest " @@ -239,8 +242,9 @@ class RegistrationHandler(BaseHandler): ) if default_display_name: + requester = create_requester(user) await self.profile_handler.set_displayname( - user, None, default_display_name, by_admin=True + user, requester, default_display_name, by_admin=True ) if self.hs.config.user_directory_search_all_users: @@ -274,8 +278,9 @@ class RegistrationHandler(BaseHandler): shadow_banned=shadow_banned, ) + requester = create_requester(user) await self.profile_handler.set_displayname( - user, None, default_display_name, by_admin=True + user, requester, default_display_name, by_admin=True ) # Successfully registered @@ -542,8 +547,9 @@ class RegistrationHandler(BaseHandler): create_profile_with_displayname=display_name, ) + requester = create_requester(user) await self.profile_handler.set_displayname( - user, None, display_name, by_admin=True + user, requester, display_name, by_admin=True ) if self.hs.config.user_directory_search_all_users: @@ -858,7 +864,7 @@ class RegistrationHandler(BaseHandler): # up when the access token is saved, but that's quite an # invasive change I'd rather do separately. user_tuple = await self.store.get_user_by_access_token(token) - token_id = user_tuple["token_id"] + token_id = user_tuple.token_id await self.pusher_pool.add_pusher( user_id=user_id, diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 6457d40672..43472b3288 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py
@@ -789,23 +789,30 @@ class RoomCreationHandler(BaseHandler): ratelimit=False, ) - for invitee in invite_list: + # we avoid dropping the lock between invites, as otherwise joins can + # start coming in and making the createRoom slow. + # + # we also don't need to check the requester's shadow-ban here, as we + # have already done so above (and potentially emptied invite_list). + with (await self.room_member_handler.member_linearizer.queue((room_id,))): content = {} is_direct = config.get("is_direct", None) if is_direct: content["is_direct"] = is_direct - # Note that update_membership with an action of "invite" can raise a - # ShadowBanError, but this was handled above by emptying invite_list. - _, last_stream_id = await self.room_member_handler.update_membership( - requester, - UserID.from_string(invitee), - room_id, - "invite", - ratelimit=False, - content=content, - new_room=True, - ) + for invitee in invite_list: + ( + _, + last_stream_id, + ) = await self.room_member_handler.update_membership_locked( + requester, + UserID.from_string(invitee), + room_id, + "invite", + ratelimit=False, + content=content, + new_room=True, + ) for invite_3pid in invite_3pid_list: id_server = invite_3pid["id_server"] diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index bcdd0fbdd6..da2cd6dff2 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py
@@ -308,7 +308,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): key = (room_id,) with (await self.member_linearizer.queue(key)): - result = await self._update_membership( + result = await self.update_membership_locked( requester, target, room_id, @@ -324,7 +324,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): return result - async def _update_membership( + async def update_membership_locked( self, requester: Requester, target: UserID, @@ -338,6 +338,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): new_room: bool = False, require_consent: bool = True, ) -> Tuple[str, int]: + """Helper for update_membership. + + Assumes that the membership linearizer is already held for the room. + """ content_specified = bool(content) if content is None: content = {} diff --git a/synapse/http/site.py b/synapse/http/site.py
index ddb1770b09..5f0581dc3f 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py
@@ -14,7 +14,7 @@ import contextlib import logging import time -from typing import Optional +from typing import Optional, Union from twisted.python.failure import Failure from twisted.web.server import Request, Site @@ -23,6 +23,7 @@ from synapse.config.server import ListenerConfig from synapse.http import redact_uri from synapse.http.request_metrics import RequestMetrics, requests_counter from synapse.logging.context import LoggingContext, PreserveLoggingContext +from synapse.types import Requester logger = logging.getLogger(__name__) @@ -54,9 +55,12 @@ class SynapseRequest(Request): Request.__init__(self, channel, *args, **kw) self.site = channel.site self._channel = channel # this is used by the tests - self.authenticated_entity = None self.start_time = 0.0 + # The requester, if authenticated. For federation requests this is the + # server name, for client requests this is the Requester object. + self.requester = None # type: Optional[Union[Requester, str]] + # we can't yet create the logcontext, as we don't know the method. self.logcontext = None # type: Optional[LoggingContext] @@ -271,11 +275,23 @@ class SynapseRequest(Request): # to the client (nb may be negative) response_send_time = self.finish_time - self._processing_finished_time - # need to decode as it could be raw utf-8 bytes - # from a IDN servname in an auth header - authenticated_entity = self.authenticated_entity - if authenticated_entity is not None and isinstance(authenticated_entity, bytes): - authenticated_entity = authenticated_entity.decode("utf-8", "replace") + # Convert the requester into a string that we can log + authenticated_entity = None + if isinstance(self.requester, str): + authenticated_entity = self.requester + elif isinstance(self.requester, Requester): + authenticated_entity = self.requester.authenticated_entity + + # If this is a request where the target user doesn't match the user who + # authenticated (e.g. and admin is puppetting a user) then we log both. + if self.requester.user.to_string() != authenticated_entity: + authenticated_entity = "{},{}".format( + authenticated_entity, self.requester.user.to_string(), + ) + elif self.requester is not None: + # This shouldn't happen, but we log it so we don't lose information + # and can see that we're doing something wrong. + authenticated_entity = repr(self.requester) # type: ignore[unreachable] # ...or could be raw utf-8 bytes in the User-Agent header. # N.B. if you don't do this, the logger explodes cryptically diff --git a/synapse/logging/__init__.py b/synapse/logging/__init__.py
index e69de29bb2..b28b7b2ef7 100644 --- a/synapse/logging/__init__.py +++ b/synapse/logging/__init__.py
@@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# These are imported to allow for nicer logging configuration files. +from synapse.logging._remote import RemoteHandler +from synapse.logging._terse_json import JsonFormatter, TerseJsonFormatter + +__all__ = ["RemoteHandler", "JsonFormatter", "TerseJsonFormatter"] diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py
index 0caf325916..fb937b3f28 100644 --- a/synapse/logging/_remote.py +++ b/synapse/logging/_remote.py
@@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import sys import traceback from collections import deque @@ -21,10 +22,11 @@ from math import floor from typing import Callable, Optional import attr +from typing_extensions import Deque from zope.interface import implementer from twisted.application.internet import ClientService -from twisted.internet.defer import Deferred +from twisted.internet.defer import CancelledError, Deferred from twisted.internet.endpoints import ( HostnameEndpoint, TCP4ClientEndpoint, @@ -32,7 +34,9 @@ from twisted.internet.endpoints import ( ) from twisted.internet.interfaces import IPushProducer, ITransport from twisted.internet.protocol import Factory, Protocol -from twisted.logger import ILogObserver, Logger, LogLevel +from twisted.python.failure import Failure + +logger = logging.getLogger(__name__) @attr.s @@ -45,11 +49,11 @@ class LogProducer: Args: buffer: Log buffer to read logs from. transport: Transport to write to. - format_event: A callable to format the log entry to a string. + format: A callable to format the log record to a string. """ transport = attr.ib(type=ITransport) - format_event = attr.ib(type=Callable[[dict], str]) + _format = attr.ib(type=Callable[[logging.LogRecord], str]) _buffer = attr.ib(type=deque) _paused = attr.ib(default=False, type=bool, init=False) @@ -61,16 +65,19 @@ class LogProducer: self._buffer = deque() def resumeProducing(self): + # If we're already producing, nothing to do. self._paused = False + # Loop until paused. while self._paused is False and (self._buffer and self.transport.connected): try: - # Request the next event and format it. - event = self._buffer.popleft() - msg = self.format_event(event) + # Request the next record and format it. + record = self._buffer.popleft() + msg = self._format(record) # Send it as a new line over the transport. self.transport.write(msg.encode("utf8")) + self.transport.write(b"\n") except Exception: # Something has gone wrong writing to the transport -- log it # and break out of the while. @@ -78,76 +85,85 @@ class LogProducer: break -@attr.s -@implementer(ILogObserver) -class TCPLogObserver: +class RemoteHandler(logging.Handler): """ - An IObserver that writes JSON logs to a TCP target. + An logging handler that writes logs to a TCP target. Args: - hs (HomeServer): The homeserver that is being logged for. host: The host of the logging target. port: The logging target's port. - format_event: A callable to format the log entry to a string. maximum_buffer: The maximum buffer size. """ - hs = attr.ib() - host = attr.ib(type=str) - port = attr.ib(type=int) - format_event = attr.ib(type=Callable[[dict], str]) - maximum_buffer = attr.ib(type=int) - _buffer = attr.ib(default=attr.Factory(deque), type=deque) - _connection_waiter = attr.ib(default=None, type=Optional[Deferred]) - _logger = attr.ib(default=attr.Factory(Logger)) - _producer = attr.ib(default=None, type=Optional[LogProducer]) - - def start(self) -> None: + def __init__( + self, + host: str, + port: int, + maximum_buffer: int = 1000, + level=logging.NOTSET, + _reactor=None, + ): + super().__init__(level=level) + self.host = host + self.port = port + self.maximum_buffer = maximum_buffer + + self._buffer = deque() # type: Deque[logging.LogRecord] + self._connection_waiter = None # type: Optional[Deferred] + self._producer = None # type: Optional[LogProducer] # Connect without DNS lookups if it's a direct IP. + if _reactor is None: + from twisted.internet import reactor + + _reactor = reactor + try: ip = ip_address(self.host) if isinstance(ip, IPv4Address): - endpoint = TCP4ClientEndpoint( - self.hs.get_reactor(), self.host, self.port - ) + endpoint = TCP4ClientEndpoint(_reactor, self.host, self.port) elif isinstance(ip, IPv6Address): - endpoint = TCP6ClientEndpoint( - self.hs.get_reactor(), self.host, self.port - ) + endpoint = TCP6ClientEndpoint(_reactor, self.host, self.port) else: raise ValueError("Unknown IP address provided: %s" % (self.host,)) except ValueError: - endpoint = HostnameEndpoint(self.hs.get_reactor(), self.host, self.port) + endpoint = HostnameEndpoint(_reactor, self.host, self.port) factory = Factory.forProtocol(Protocol) - self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor()) + self._service = ClientService(endpoint, factory, clock=_reactor) self._service.startService() + self._stopping = False self._connect() - def stop(self): + def close(self): + self._stopping = True self._service.stopService() def _connect(self) -> None: """ Triggers an attempt to connect then write to the remote if not already writing. """ + # Do not attempt to open multiple connections. if self._connection_waiter: return self._connection_waiter = self._service.whenConnected(failAfterFailures=1) - @self._connection_waiter.addErrback - def fail(r): - r.printTraceback(file=sys.__stderr__) + def fail(failure: Failure) -> None: + # If the Deferred was cancelled (e.g. during shutdown) do not try to + # reconnect (this will cause an infinite loop of errors). + if failure.check(CancelledError) and self._stopping: + return + + # For a different error, print the traceback and re-connect. + failure.printTraceback(file=sys.__stderr__) self._connection_waiter = None self._connect() - @self._connection_waiter.addCallback - def writer(r): + def writer(result: Protocol) -> None: # We have a connection. If we already have a producer, and its # transport is the same, just trigger a resumeProducing. - if self._producer and r.transport is self._producer.transport: + if self._producer and result.transport is self._producer.transport: self._producer.resumeProducing() self._connection_waiter = None return @@ -158,29 +174,29 @@ class TCPLogObserver: # Make a new producer and start it. self._producer = LogProducer( - buffer=self._buffer, - transport=r.transport, - format_event=self.format_event, + buffer=self._buffer, transport=result.transport, format=self.format, ) - r.transport.registerProducer(self._producer, True) + result.transport.registerProducer(self._producer, True) self._producer.resumeProducing() self._connection_waiter = None + self._connection_waiter.addCallbacks(writer, fail) + def _handle_pressure(self) -> None: """ - Handle backpressure by shedding events. + Handle backpressure by shedding records. The buffer will, in this order, until the buffer is below the maximum: - - Shed DEBUG events - - Shed INFO events - - Shed the middle 50% of the events. + - Shed DEBUG records. + - Shed INFO records. + - Shed the middle 50% of the records. """ if len(self._buffer) <= self.maximum_buffer: return # Strip out DEBUGs self._buffer = deque( - filter(lambda event: event["log_level"] != LogLevel.debug, self._buffer) + filter(lambda record: record.levelno > logging.DEBUG, self._buffer) ) if len(self._buffer) <= self.maximum_buffer: @@ -188,7 +204,7 @@ class TCPLogObserver: # Strip out INFOs self._buffer = deque( - filter(lambda event: event["log_level"] != LogLevel.info, self._buffer) + filter(lambda record: record.levelno > logging.INFO, self._buffer) ) if len(self._buffer) <= self.maximum_buffer: @@ -209,17 +225,17 @@ class TCPLogObserver: self._buffer.extend(reversed(end_buffer)) - def __call__(self, event: dict) -> None: - self._buffer.append(event) + def emit(self, record: logging.LogRecord) -> None: + self._buffer.append(record) # Handle backpressure, if it exists. try: self._handle_pressure() except Exception: - # If handling backpressure fails,clear the buffer and log the + # If handling backpressure fails, clear the buffer and log the # exception. self._buffer.clear() - self._logger.failure("Failed clearing backpressure") + logger.warning("Failed clearing backpressure") # Try and write immediately. self._connect() diff --git a/synapse/logging/_structured.py b/synapse/logging/_structured.py
index 0fc2ea609e..14d9c104c2 100644 --- a/synapse/logging/_structured.py +++ b/synapse/logging/_structured.py
@@ -12,138 +12,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import logging import os.path -import sys -import typing -import warnings -from typing import List +from typing import Any, Dict, Generator, Optional, Tuple -import attr -from constantly import NamedConstant, Names, ValueConstant, Values -from zope.interface import implementer - -from twisted.logger import ( - FileLogObserver, - FilteringLogObserver, - ILogObserver, - LogBeginner, - Logger, - LogLevel, - LogLevelFilterPredicate, - LogPublisher, - eventAsText, - jsonFileLogObserver, -) +from constantly import NamedConstant, Names from synapse.config._base import ConfigError -from synapse.logging._terse_json import ( - TerseJSONToConsoleLogObserver, - TerseJSONToTCPLogObserver, -) -from synapse.logging.context import current_context - - -def stdlib_log_level_to_twisted(level: str) -> LogLevel: - """ - Convert a stdlib log level to Twisted's log level. - """ - lvl = level.lower().replace("warning", "warn") - return LogLevel.levelWithName(lvl) - - -@attr.s -@implementer(ILogObserver) -class LogContextObserver: - """ - An ILogObserver which adds Synapse-specific log context information. - - Attributes: - observer (ILogObserver): The target parent observer. - """ - - observer = attr.ib() - - def __call__(self, event: dict) -> None: - """ - Consume a log event and emit it to the parent observer after filtering - and adding log context information. - - Args: - event (dict) - """ - # Filter out some useless events that Twisted outputs - if "log_text" in event: - if event["log_text"].startswith("DNSDatagramProtocol starting on "): - return - - if event["log_text"].startswith("(UDP Port "): - return - - if event["log_text"].startswith("Timing out client") or event[ - "log_format" - ].startswith("Timing out client"): - return - - context = current_context() - - # Copy the context information to the log event. - context.copy_to_twisted_log_entry(event) - - self.observer(event) - - -class PythonStdlibToTwistedLogger(logging.Handler): - """ - Transform a Python stdlib log message into a Twisted one. - """ - - def __init__(self, observer, *args, **kwargs): - """ - Args: - observer (ILogObserver): A Twisted logging observer. - *args, **kwargs: Args/kwargs to be passed to logging.Handler. - """ - self.observer = observer - super().__init__(*args, **kwargs) - - def emit(self, record: logging.LogRecord) -> None: - """ - Emit a record to Twisted's observer. - - Args: - record (logging.LogRecord) - """ - - self.observer( - { - "log_time": record.created, - "log_text": record.getMessage(), - "log_format": "{log_text}", - "log_namespace": record.name, - "log_level": stdlib_log_level_to_twisted(record.levelname), - } - ) - - -def SynapseFileLogObserver(outFile: typing.IO[str]) -> FileLogObserver: - """ - A log observer that formats events like the traditional log formatter and - sends them to `outFile`. - - Args: - outFile (file object): The file object to write to. - """ - - def formatEvent(_event: dict) -> str: - event = dict(_event) - event["log_level"] = event["log_level"].name.upper() - event["log_format"] = "- {log_namespace} - {log_level} - {request} - " + ( - event.get("log_format", "{log_text}") or "{log_text}" - ) - return eventAsText(event, includeSystem=False) + "\n" - - return FileLogObserver(outFile, formatEvent) class DrainType(Names): @@ -155,30 +29,12 @@ class DrainType(Names): NETWORK_JSON_TERSE = NamedConstant() -class OutputPipeType(Values): - stdout = ValueConstant(sys.__stdout__) - stderr = ValueConstant(sys.__stderr__) - - -@attr.s -class DrainConfiguration: - name = attr.ib() - type = attr.ib() - location = attr.ib() - options = attr.ib(default=None) - - -@attr.s -class NetworkJSONTerseOptions: - maximum_buffer = attr.ib(type=int) - - -DEFAULT_LOGGERS = {"synapse": {"level": "INFO"}} +DEFAULT_LOGGERS = {"synapse": {"level": "info"}} def parse_drain_configs( drains: dict, -) -> typing.Generator[DrainConfiguration, None, None]: +) -> Generator[Tuple[str, Dict[str, Any]], None, None]: """ Parse the drain configurations. @@ -186,11 +42,12 @@ def parse_drain_configs( drains (dict): A list of drain configurations. Yields: - DrainConfiguration instances. + dict instances representing a logging handler. Raises: ConfigError: If any of the drain configuration items are invalid. """ + for name, config in drains.items(): if "type" not in config: raise ConfigError("Logging drains require a 'type' key.") @@ -202,6 +59,18 @@ def parse_drain_configs( "%s is not a known logging drain type." % (config["type"],) ) + # Either use the default formatter or the tersejson one. + if logging_type in (DrainType.CONSOLE_JSON, DrainType.FILE_JSON,): + formatter = "json" # type: Optional[str] + elif logging_type in ( + DrainType.CONSOLE_JSON_TERSE, + DrainType.NETWORK_JSON_TERSE, + ): + formatter = "tersejson" + else: + # A formatter of None implies using the default formatter. + formatter = None + if logging_type in [ DrainType.CONSOLE, DrainType.CONSOLE_JSON, @@ -217,9 +86,11 @@ def parse_drain_configs( % (logging_type,) ) - pipe = OutputPipeType.lookupByName(location).value - - yield DrainConfiguration(name=name, type=logging_type, location=pipe) + yield name, { + "class": "logging.StreamHandler", + "formatter": formatter, + "stream": "ext://sys." + location, + } elif logging_type in [DrainType.FILE, DrainType.FILE_JSON]: if "location" not in config: @@ -233,18 +104,25 @@ def parse_drain_configs( "File paths need to be absolute, '%s' is a relative path" % (location,) ) - yield DrainConfiguration(name=name, type=logging_type, location=location) + + yield name, { + "class": "logging.FileHandler", + "formatter": formatter, + "filename": location, + } elif logging_type in [DrainType.NETWORK_JSON_TERSE]: host = config.get("host") port = config.get("port") maximum_buffer = config.get("maximum_buffer", 1000) - yield DrainConfiguration( - name=name, - type=logging_type, - location=(host, port), - options=NetworkJSONTerseOptions(maximum_buffer=maximum_buffer), - ) + + yield name, { + "class": "synapse.logging.RemoteHandler", + "formatter": formatter, + "host": host, + "port": port, + "maximum_buffer": maximum_buffer, + } else: raise ConfigError( @@ -253,126 +131,29 @@ def parse_drain_configs( ) -class StoppableLogPublisher(LogPublisher): +def setup_structured_logging(log_config: dict,) -> dict: """ - A log publisher that can tell its observers to shut down any external - communications. - """ - - def stop(self): - for obs in self._observers: - if hasattr(obs, "stop"): - obs.stop() - - -def setup_structured_logging( - hs, - config, - log_config: dict, - logBeginner: LogBeginner, - redirect_stdlib_logging: bool = True, -) -> LogPublisher: - """ - Set up Twisted's structured logging system. - - Args: - hs: The homeserver to use. - config (HomeserverConfig): The configuration of the Synapse homeserver. - log_config (dict): The log configuration to use. + Convert a legacy structured logging configuration (from Synapse < v1.23.0) + to one compatible with the new standard library handlers. """ - if config.no_redirect_stdio: - raise ConfigError( - "no_redirect_stdio cannot be defined using structured logging." - ) - - logger = Logger() - if "drains" not in log_config: raise ConfigError("The logging configuration requires a list of drains.") - observers = [] # type: List[ILogObserver] - - for observer in parse_drain_configs(log_config["drains"]): - # Pipe drains - if observer.type == DrainType.CONSOLE: - logger.debug( - "Starting up the {name} console logger drain", name=observer.name - ) - observers.append(SynapseFileLogObserver(observer.location)) - elif observer.type == DrainType.CONSOLE_JSON: - logger.debug( - "Starting up the {name} JSON console logger drain", name=observer.name - ) - observers.append(jsonFileLogObserver(observer.location)) - elif observer.type == DrainType.CONSOLE_JSON_TERSE: - logger.debug( - "Starting up the {name} terse JSON console logger drain", - name=observer.name, - ) - observers.append( - TerseJSONToConsoleLogObserver(observer.location, metadata={}) - ) - - # File drains - elif observer.type == DrainType.FILE: - logger.debug("Starting up the {name} file logger drain", name=observer.name) - log_file = open(observer.location, "at", buffering=1, encoding="utf8") - observers.append(SynapseFileLogObserver(log_file)) - elif observer.type == DrainType.FILE_JSON: - logger.debug( - "Starting up the {name} JSON file logger drain", name=observer.name - ) - log_file = open(observer.location, "at", buffering=1, encoding="utf8") - observers.append(jsonFileLogObserver(log_file)) - - elif observer.type == DrainType.NETWORK_JSON_TERSE: - metadata = {"server_name": hs.config.server_name} - log_observer = TerseJSONToTCPLogObserver( - hs=hs, - host=observer.location[0], - port=observer.location[1], - metadata=metadata, - maximum_buffer=observer.options.maximum_buffer, - ) - log_observer.start() - observers.append(log_observer) - else: - # We should never get here, but, just in case, throw an error. - raise ConfigError("%s drain type cannot be configured" % (observer.type,)) - - publisher = StoppableLogPublisher(*observers) - log_filter = LogLevelFilterPredicate() - - for namespace, namespace_config in log_config.get( - "loggers", DEFAULT_LOGGERS - ).items(): - # Set the log level for twisted.logger.Logger namespaces - log_filter.setLogLevelForNamespace( - namespace, - stdlib_log_level_to_twisted(namespace_config.get("level", "INFO")), - ) - - # Also set the log levels for the stdlib logger namespaces, to prevent - # them getting to PythonStdlibToTwistedLogger and having to be formatted - if "level" in namespace_config: - logging.getLogger(namespace).setLevel(namespace_config.get("level")) - - f = FilteringLogObserver(publisher, [log_filter]) - lco = LogContextObserver(f) - - if redirect_stdlib_logging: - stuff_into_twisted = PythonStdlibToTwistedLogger(lco) - stdliblogger = logging.getLogger() - stdliblogger.addHandler(stuff_into_twisted) - - # Always redirect standard I/O, otherwise other logging outputs might miss - # it. - logBeginner.beginLoggingTo([lco], redirectStandardIO=True) + new_config = { + "version": 1, + "formatters": { + "json": {"class": "synapse.logging.JsonFormatter"}, + "tersejson": {"class": "synapse.logging.TerseJsonFormatter"}, + }, + "handlers": {}, + "loggers": log_config.get("loggers", DEFAULT_LOGGERS), + "root": {"handlers": []}, + } - return publisher + for handler_name, handler in parse_drain_configs(log_config["drains"]): + new_config["handlers"][handler_name] = handler + # Add each handler to the root logger. + new_config["root"]["handlers"].append(handler_name) -def reload_structured_logging(*args, log_config=None) -> None: - warnings.warn( - "Currently the structured logging system can not be reloaded, doing nothing" - ) + return new_config diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py
index 9b46956ca9..2fbf5549a1 100644 --- a/synapse/logging/_terse_json.py +++ b/synapse/logging/_terse_json.py
@@ -16,141 +16,65 @@ """ Log formatters that output terse JSON. """ - import json -from typing import IO - -from twisted.logger import FileLogObserver - -from synapse.logging._remote import TCPLogObserver +import logging _encoder = json.JSONEncoder(ensure_ascii=False, separators=(",", ":")) - -def flatten_event(event: dict, metadata: dict, include_time: bool = False): - """ - Flatten a Twisted logging event to an dictionary capable of being sent - as a log event to a logging aggregation system. - - The format is vastly simplified and is not designed to be a "human readable - string" in the sense that traditional logs are. Instead, the structure is - optimised for searchability and filtering, with human-understandable log - keys. - - Args: - event (dict): The Twisted logging event we are flattening. - metadata (dict): Additional data to include with each log message. This - can be information like the server name. Since the target log - consumer does not know who we are other than by host IP, this - allows us to forward through static information. - include_time (bool): Should we include the `time` key? If False, the - event time is stripped from the event. - """ - new_event = {} - - # If it's a failure, make the new event's log_failure be the traceback text. - if "log_failure" in event: - new_event["log_failure"] = event["log_failure"].getTraceback() - - # If it's a warning, copy over a string representation of the warning. - if "warning" in event: - new_event["warning"] = str(event["warning"]) - - # Stdlib logging events have "log_text" as their human-readable portion, - # Twisted ones have "log_format". For now, include the log_format, so that - # context only given in the log format (e.g. what is being logged) is - # available. - if "log_text" in event: - new_event["log"] = event["log_text"] - else: - new_event["log"] = event["log_format"] - - # We want to include the timestamp when forwarding over the network, but - # exclude it when we are writing to stdout. This is because the log ingester - # (e.g. logstash, fluentd) can add its own timestamp. - if include_time: - new_event["time"] = round(event["log_time"], 2) - - # Convert the log level to a textual representation. - new_event["level"] = event["log_level"].name.upper() - - # Ignore these keys, and do not transfer them over to the new log object. - # They are either useless (isError), transferred manually above (log_time, - # log_level, etc), or contain Python objects which are not useful for output - # (log_logger, log_source). - keys_to_delete = [ - "isError", - "log_failure", - "log_format", - "log_level", - "log_logger", - "log_source", - "log_system", - "log_time", - "log_text", - "observer", - "warning", - ] - - # If it's from the Twisted legacy logger (twisted.python.log), it adds some - # more keys we want to purge. - if event.get("log_namespace") == "log_legacy": - keys_to_delete.extend(["message", "system", "time"]) - - # Rather than modify the dictionary in place, construct a new one with only - # the content we want. The original event should be considered 'frozen'. - for key in event.keys(): - - if key in keys_to_delete: - continue - - if isinstance(event[key], (str, int, bool, float)) or event[key] is None: - # If it's a plain type, include it as is. - new_event[key] = event[key] - else: - # If it's not one of those basic types, write out a string - # representation. This should probably be a warning in development, - # so that we are sure we are only outputting useful data. - new_event[key] = str(event[key]) - - # Add the metadata information to the event (e.g. the server_name). - new_event.update(metadata) - - return new_event - - -def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogObserver: - """ - A log observer that formats events to a flattened JSON representation. - - Args: - outFile: The file object to write to. - metadata: Metadata to be added to each log object. - """ - - def formatEvent(_event: dict) -> str: - flattened = flatten_event(_event, metadata) - return _encoder.encode(flattened) + "\n" - - return FileLogObserver(outFile, formatEvent) - - -def TerseJSONToTCPLogObserver( - hs, host: str, port: int, metadata: dict, maximum_buffer: int -) -> FileLogObserver: - """ - A log observer that formats events to a flattened JSON representation. - - Args: - hs (HomeServer): The homeserver that is being logged for. - host: The host of the logging target. - port: The logging target's port. - metadata: Metadata to be added to each log object. - maximum_buffer: The maximum buffer size. - """ - - def formatEvent(_event: dict) -> str: - flattened = flatten_event(_event, metadata, include_time=True) - return _encoder.encode(flattened) + "\n" - - return TCPLogObserver(hs, host, port, formatEvent, maximum_buffer) +# The properties of a standard LogRecord. +_LOG_RECORD_ATTRIBUTES = { + "args", + "asctime", + "created", + "exc_info", + # exc_text isn't a public attribute, but is used to cache the result of formatException. + "exc_text", + "filename", + "funcName", + "levelname", + "levelno", + "lineno", + "message", + "module", + "msecs", + "msg", + "name", + "pathname", + "process", + "processName", + "relativeCreated", + "stack_info", + "thread", + "threadName", +} + + +class JsonFormatter(logging.Formatter): + def format(self, record: logging.LogRecord) -> str: + event = { + "log": record.getMessage(), + "namespace": record.name, + "level": record.levelname, + } + + return self._format(record, event) + + def _format(self, record: logging.LogRecord, event: dict) -> str: + # Add any extra attributes to the event. + for key, value in record.__dict__.items(): + if key not in _LOG_RECORD_ATTRIBUTES: + event[key] = value + + return _encoder.encode(event) + + +class TerseJsonFormatter(JsonFormatter): + def format(self, record: logging.LogRecord) -> str: + event = { + "log": record.getMessage(), + "namespace": record.name, + "level": record.levelname, + "time": round(record.created, 2), + } + + return self._format(record, event) diff --git a/synapse/logging/filter.py b/synapse/logging/filter.py new file mode 100644
index 0000000000..1baf8dd679 --- /dev/null +++ b/synapse/logging/filter.py
@@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from typing_extensions import Literal + + +class MetadataFilter(logging.Filter): + """Logging filter that adds constant values to each record. + + Args: + metadata: Key-value pairs to add to each record. + """ + + def __init__(self, metadata: dict): + self._metadata = metadata + + def filter(self, record: logging.LogRecord) -> Literal[True]: + for key, value in self._metadata.items(): + setattr(record, key, value) + return True diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index e7cc74a5d2..f0c37eaf5e 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py
@@ -77,8 +77,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint): requester = Requester.deserialize(self.store, content["requester"]) - if requester.user: - request.authenticated_entity = requester.user.to_string() + request.requester = requester logger.info("remote_join: %s into room: %s", user_id, room_id) @@ -142,8 +141,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): requester = Requester.deserialize(self.store, content["requester"]) - if requester.user: - request.authenticated_entity = requester.user.to_string() + request.requester = requester # hopefully we're now on the master, so this won't recurse! event_id, stream_id = await self.member_handler.remote_reject_invite( diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index fc129dbaa7..8fa104c8d3 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py
@@ -115,8 +115,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): ratelimit = content["ratelimit"] extra_users = [UserID.from_string(u) for u in content["extra_users"]] - if requester.user: - request.authenticated_entity = requester.user.to_string() + request.requester = requester logger.info( "Got event to send with ID: %s into room: %s", event.event_id, event.room_id diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 957415e359..e0a70f3ca1 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py
@@ -18,6 +18,8 @@ import logging import re from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple +import attr + from synapse.api.constants import UserTypes from synapse.api.errors import Codes, StoreError, SynapseError, ThreepidValidationError from synapse.metrics.background_process_metrics import wrap_as_background_process @@ -38,6 +40,35 @@ THIRTY_MINUTES_IN_MS = 30 * 60 * 1000 logger = logging.getLogger(__name__) +@attr.s(frozen=True, slots=True) +class TokenLookupResult: + """Result of looking up an access token. + + Attributes: + user_id: The user that this token authenticates as + is_guest + shadow_banned + token_id: The ID of the access token looked up + device_id: The device associated with the token, if any. + valid_until_ms: The timestamp the token expires, if any. + token_owner: The "owner" of the token. This is either the same as the + user, or a server admin who is logged in as the user. + """ + + user_id = attr.ib(type=str) + is_guest = attr.ib(type=bool, default=False) + shadow_banned = attr.ib(type=bool, default=False) + token_id = attr.ib(type=Optional[int], default=None) + device_id = attr.ib(type=Optional[str], default=None) + valid_until_ms = attr.ib(type=Optional[int], default=None) + token_owner = attr.ib(type=str) + + # Make the token owner default to the user ID, which is the common case. + @token_owner.default + def _default_token_owner(self): + return self.user_id + + class RegistrationWorkerStore(CacheInvalidationWorkerStore): def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"): super().__init__(database, db_conn, hs) @@ -108,15 +139,13 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): return is_trial @cached() - async def get_user_by_access_token(self, token: str) -> Optional[dict]: + async def get_user_by_access_token(self, token: str) -> Optional[TokenLookupResult]: """Get a user from the given access token. Args: token: The access token of a user. Returns: - None, if the token did not match, otherwise dict - including the keys `name`, `is_guest`, `device_id`, `token_id`, - `valid_until_ms`. + None, if the token did not match, otherwise a `TokenLookupResult` """ return await self.db_pool.runInteraction( "get_user_by_access_token", self._query_for_auth, token @@ -224,7 +253,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): return [UserID.from_string(row[0]) for row in rows] res = await self.db_pool.runInteraction( - "get_expired_users", get_expired_users_txn, self.clock.time_msec() + "get_expired_users", get_expired_users_txn, self._clock.time_msec() ) return res @@ -390,7 +419,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): return { user_id: { "expired": ( - expiration is not None and self.clock.time_msec() >= expiration + expiration is not None and self._clock.time_msec() >= expiration ), "deactivated": deactivated == 1, } @@ -434,23 +463,24 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): await self.db_pool.runInteraction("set_server_admin", set_server_admin_txn) - def _query_for_auth(self, txn, token): + def _query_for_auth(self, txn, token: str) -> Optional[TokenLookupResult]: sql = """ - SELECT users.name, + SELECT users.name as user_id, users.is_guest, users.shadow_banned, access_tokens.id as token_id, access_tokens.device_id, - access_tokens.valid_until_ms + access_tokens.valid_until_ms, + access_tokens.user_id as token_owner FROM users - INNER JOIN access_tokens on users.name = access_tokens.user_id + INNER JOIN access_tokens on users.name = COALESCE(puppets_user_id, access_tokens.user_id) WHERE token = ? """ txn.execute(sql, (token,)) rows = self.db_pool.cursor_to_dict(txn) if rows: - return rows[0] + return TokenLookupResult(**rows[0]) return None diff --git a/synapse/storage/databases/main/schema/delta/58/22puppet_token.sql b/synapse/storage/databases/main/schema/delta/58/22puppet_token.sql new file mode 100644
index 0000000000..00a9431a97 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/58/22puppet_token.sql
@@ -0,0 +1,17 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Whether the access token is an admin token for controlling another user. +ALTER TABLE access_tokens ADD COLUMN puppets_user_id TEXT; diff --git a/synapse/types.py b/synapse/types.py
index 6c338fe0b7..5e52c1eb12 100644 --- a/synapse/types.py +++ b/synapse/types.py
@@ -29,6 +29,7 @@ from typing import ( Tuple, Type, TypeVar, + Union, ) import attr @@ -39,6 +40,7 @@ from unpaddedbase64 import decode_base64 from synapse.api.errors import Codes, SynapseError if TYPE_CHECKING: + from synapse.appservice.api import ApplicationService from synapse.storage.databases.main import DataStore # define a version of typing.Collection that works on python 3.5 @@ -75,6 +77,7 @@ class Requester( "shadow_banned", "device_id", "app_service", + "authenticated_entity", ], ) ): @@ -105,6 +108,7 @@ class Requester( "shadow_banned": self.shadow_banned, "device_id": self.device_id, "app_server_id": self.app_service.id if self.app_service else None, + "authenticated_entity": self.authenticated_entity, } @staticmethod @@ -130,16 +134,18 @@ class Requester( shadow_banned=input["shadow_banned"], device_id=input["device_id"], app_service=appservice, + authenticated_entity=input["authenticated_entity"], ) def create_requester( - user_id, - access_token_id=None, - is_guest=False, - shadow_banned=False, - device_id=None, - app_service=None, + user_id: Union[str, "UserID"], + access_token_id: Optional[int] = None, + is_guest: Optional[bool] = False, + shadow_banned: Optional[bool] = False, + device_id: Optional[str] = None, + app_service: Optional["ApplicationService"] = None, + authenticated_entity: Optional[str] = None, ): """ Create a new ``Requester`` object @@ -152,14 +158,27 @@ def create_requester( shadow_banned (bool): True if the user making this request is shadow-banned. device_id (str|None): device_id which was set at authentication time app_service (ApplicationService|None): the AS requesting on behalf of the user + authenticated_entity: The entity that authenticated when making the request. + This is different to the user_id when an admin user or the server is + "puppeting" the user. Returns: Requester """ if not isinstance(user_id, UserID): user_id = UserID.from_string(user_id) + + if authenticated_entity is None: + authenticated_entity = user_id.to_string() + return Requester( - user_id, access_token_id, is_guest, shadow_banned, device_id, app_service + user_id, + access_token_id, + is_guest, + shadow_banned, + device_id, + app_service, + authenticated_entity, ) diff --git a/synmark/__init__.py b/synmark/__init__.py
index 09bc7e7927..3d4ec3e184 100644 --- a/synmark/__init__.py +++ b/synmark/__init__.py
@@ -21,45 +21,6 @@ except ImportError: from twisted.internet.pollreactor import PollReactor as Reactor from twisted.internet.main import installReactor -from synapse.config.homeserver import HomeServerConfig -from synapse.util import Clock - -from tests.utils import default_config, setup_test_homeserver - - -async def make_homeserver(reactor, config=None): - """ - Make a Homeserver suitable for running benchmarks against. - - Args: - reactor: A Twisted reactor to run under. - config: A HomeServerConfig to use, or None. - """ - cleanup_tasks = [] - clock = Clock(reactor) - - if not config: - config = default_config("test") - - config_obj = HomeServerConfig() - config_obj.parse_config_dict(config, "", "") - - hs = setup_test_homeserver( - cleanup_tasks.append, config=config_obj, reactor=reactor, clock=clock - ) - stor = hs.get_datastore() - - # Run the database background updates. - if hasattr(stor.db_pool.updates, "do_next_background_update"): - while not await stor.db_pool.updates.has_completed_background_updates(): - await stor.db_pool.updates.do_next_background_update(1) - - def cleanup(): - for i in cleanup_tasks: - i() - - return hs, clock.sleep, cleanup - def make_reactor(): """ diff --git a/synmark/__main__.py b/synmark/__main__.py
index 17df9ddeb7..de13c1a909 100644 --- a/synmark/__main__.py +++ b/synmark/__main__.py
@@ -12,20 +12,20 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import sys from argparse import REMAINDER from contextlib import redirect_stderr from io import StringIO import pyperf -from synmark import make_reactor -from synmark.suites import SUITES from twisted.internet.defer import Deferred, ensureDeferred from twisted.logger import globalLogBeginner, textFileLogObserver from twisted.python.failure import Failure +from synmark import make_reactor +from synmark.suites import SUITES + from tests.utils import setupdb diff --git a/synmark/suites/logging.py b/synmark/suites/logging.py
index d8e4c7d58f..c9d9cf761e 100644 --- a/synmark/suites/logging.py +++ b/synmark/suites/logging.py
@@ -13,20 +13,22 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import warnings from io import StringIO from mock import Mock from pyperf import perf_counter -from synmark import make_homeserver from twisted.internet.defer import Deferred from twisted.internet.protocol import ServerFactory -from twisted.logger import LogBeginner, Logger, LogPublisher +from twisted.logger import LogBeginner, LogPublisher from twisted.protocols.basic import LineOnlyReceiver -from synapse.logging._structured import setup_structured_logging +from synapse.config.logger import _setup_stdlib_logging +from synapse.logging import RemoteHandler +from synapse.util import Clock class LineCounter(LineOnlyReceiver): @@ -62,7 +64,15 @@ async def main(reactor, loops): logger_factory.on_done = Deferred() port = reactor.listenTCP(0, logger_factory, interface="127.0.0.1") - hs, wait, cleanup = await make_homeserver(reactor) + # A fake homeserver config. + class Config: + server_name = "synmark-" + str(loops) + no_redirect_stdio = True + + hs_config = Config() + + # To be able to sleep. + clock = Clock(reactor) errors = StringIO() publisher = LogPublisher() @@ -72,47 +82,49 @@ async def main(reactor, loops): ) log_config = { - "loggers": {"synapse": {"level": "DEBUG"}}, - "drains": { + "version": 1, + "loggers": {"synapse": {"level": "DEBUG", "handlers": ["tersejson"]}}, + "formatters": {"tersejson": {"class": "synapse.logging.TerseJsonFormatter"}}, + "handlers": { "tersejson": { - "type": "network_json_terse", + "class": "synapse.logging.RemoteHandler", "host": "127.0.0.1", "port": port.getHost().port, "maximum_buffer": 100, + "_reactor": reactor, } }, } - logger = Logger(namespace="synapse.logging.test_terse_json", observer=publisher) - logging_system = setup_structured_logging( - hs, hs.config, log_config, logBeginner=beginner, redirect_stdlib_logging=False + logger = logging.getLogger("synapse.logging.test_terse_json") + _setup_stdlib_logging( + hs_config, log_config, logBeginner=beginner, ) # Wait for it to connect... - await logging_system._observers[0]._service.whenConnected() + for handler in logging.getLogger("synapse").handlers: + if isinstance(handler, RemoteHandler): + break + else: + raise RuntimeError("Improperly configured: no RemoteHandler found.") + + await handler._service.whenConnected() start = perf_counter() # Send a bunch of useful messages for i in range(0, loops): - logger.info("test message %s" % (i,)) - - if ( - len(logging_system._observers[0]._buffer) - == logging_system._observers[0].maximum_buffer - ): - while ( - len(logging_system._observers[0]._buffer) - > logging_system._observers[0].maximum_buffer / 2 - ): - await wait(0.01) + logger.info("test message %s", i) + + if len(handler._buffer) == handler.maximum_buffer: + while len(handler._buffer) > handler.maximum_buffer / 2: + await clock.sleep(0.01) await logger_factory.on_done end = perf_counter() - start - logging_system.stop() + handler.close() port.stopListening() - cleanup() return end diff --git a/tests/api/test_auth.py b/tests/api/test_auth.py
index cb6f29d670..0fd55f428a 100644 --- a/tests/api/test_auth.py +++ b/tests/api/test_auth.py
@@ -29,6 +29,7 @@ from synapse.api.errors import ( MissingClientTokenError, ResourceLimitError, ) +from synapse.storage.databases.main.registration import TokenLookupResult from synapse.types import UserID from tests import unittest @@ -61,7 +62,9 @@ class AuthTestCase(unittest.TestCase): @defer.inlineCallbacks def test_get_user_by_req_user_valid_token(self): - user_info = {"name": self.test_user, "token_id": "ditto", "device_id": "device"} + user_info = TokenLookupResult( + user_id=self.test_user, token_id=5, device_id="device" + ) self.store.get_user_by_access_token = Mock( return_value=defer.succeed(user_info) ) @@ -84,7 +87,7 @@ class AuthTestCase(unittest.TestCase): self.assertEqual(f.errcode, "M_UNKNOWN_TOKEN") def test_get_user_by_req_user_missing_token(self): - user_info = {"name": self.test_user, "token_id": "ditto"} + user_info = TokenLookupResult(user_id=self.test_user, token_id=5) self.store.get_user_by_access_token = Mock( return_value=defer.succeed(user_info) ) @@ -221,7 +224,7 @@ class AuthTestCase(unittest.TestCase): def test_get_user_from_macaroon(self): self.store.get_user_by_access_token = Mock( return_value=defer.succeed( - {"name": "@baldrick:matrix.org", "device_id": "device"} + TokenLookupResult(user_id="@baldrick:matrix.org", device_id="device") ) ) @@ -237,12 +240,11 @@ class AuthTestCase(unittest.TestCase): user_info = yield defer.ensureDeferred( self.auth.get_user_by_access_token(macaroon.serialize()) ) - user = user_info["user"] - self.assertEqual(UserID.from_string(user_id), user) + self.assertEqual(user_id, user_info.user_id) # TODO: device_id should come from the macaroon, but currently comes # from the db. - self.assertEqual(user_info["device_id"], "device") + self.assertEqual(user_info.device_id, "device") @defer.inlineCallbacks def test_get_guest_user_from_macaroon(self): @@ -264,10 +266,8 @@ class AuthTestCase(unittest.TestCase): user_info = yield defer.ensureDeferred( self.auth.get_user_by_access_token(serialized) ) - user = user_info["user"] - is_guest = user_info["is_guest"] - self.assertEqual(UserID.from_string(user_id), user) - self.assertTrue(is_guest) + self.assertEqual(user_id, user_info.user_id) + self.assertTrue(user_info.is_guest) self.store.get_user_by_id.assert_called_with(user_id) @defer.inlineCallbacks @@ -289,12 +289,9 @@ class AuthTestCase(unittest.TestCase): if token != tok: return defer.succeed(None) return defer.succeed( - { - "name": USER_ID, - "is_guest": False, - "token_id": 1234, - "device_id": "DEVICE", - } + TokenLookupResult( + user_id=USER_ID, is_guest=False, token_id=1234, device_id="DEVICE", + ) ) self.store.get_user_by_access_token = get_user diff --git a/tests/api/test_ratelimiting.py b/tests/api/test_ratelimiting.py
index 1e1f30d790..fe504d0869 100644 --- a/tests/api/test_ratelimiting.py +++ b/tests/api/test_ratelimiting.py
@@ -43,7 +43,7 @@ class TestRatelimiter(unittest.TestCase): def test_allowed_appservice_ratelimited_via_can_requester_do_action(self): appservice = ApplicationService( - None, "example.com", id="foo", rate_limited=True, + None, "example.com", id="foo", rate_limited=True, sender="@as:example.com", ) as_requester = create_requester("@user:example.com", app_service=appservice) @@ -68,7 +68,7 @@ class TestRatelimiter(unittest.TestCase): def test_allowed_appservice_via_can_requester_do_action(self): appservice = ApplicationService( - None, "example.com", id="foo", rate_limited=False, + None, "example.com", id="foo", rate_limited=False, sender="@as:example.com", ) as_requester = create_requester("@user:example.com", app_service=appservice) diff --git a/tests/appservice/test_appservice.py b/tests/appservice/test_appservice.py
index 236b608d58..0bffeb1150 100644 --- a/tests/appservice/test_appservice.py +++ b/tests/appservice/test_appservice.py
@@ -31,6 +31,7 @@ class ApplicationServiceTestCase(unittest.TestCase): def setUp(self): self.service = ApplicationService( id="unique_identifier", + sender="@as:test", url="some_url", token="some_token", hostname="matrix.org", # only used by get_groups_for_user diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py
index 4512c51311..875aaec2c6 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py
@@ -289,7 +289,7 @@ class DehydrationTestCase(unittest.HomeserverTestCase): # make sure that our device ID has changed user_info = self.get_success(self.auth.get_user_by_access_token(access_token)) - self.assertEqual(user_info["device_id"], retrieved_device_id) + self.assertEqual(user_info.device_id, retrieved_device_id) # make sure the device has the display name that was set from the login res = self.get_success(self.handler.get_device(user_id, retrieved_device_id)) diff --git a/tests/handlers/test_message.py b/tests/handlers/test_message.py
index 9f6f21a6e2..2e0fea04af 100644 --- a/tests/handlers/test_message.py +++ b/tests/handlers/test_message.py
@@ -46,7 +46,7 @@ class EventCreationTestCase(unittest.HomeserverTestCase): self.info = self.get_success( self.hs.get_datastore().get_user_by_access_token(self.access_token,) ) - self.token_id = self.info["token_id"] + self.token_id = self.info.token_id self.requester = create_requester(self.user_id, access_token_id=self.token_id) diff --git a/tests/logging/__init__.py b/tests/logging/__init__.py
index e69de29bb2..a58d51441c 100644 --- a/tests/logging/__init__.py +++ b/tests/logging/__init__.py
@@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + + +class LoggerCleanupMixin: + def get_logger(self, handler): + """ + Attach a handler to a logger and add clean-ups to remove revert this. + """ + # Create a logger and add the handler to it. + logger = logging.getLogger(__name__) + logger.addHandler(handler) + + # Ensure the logger actually logs something. + logger.setLevel(logging.INFO) + + # Ensure the logger gets cleaned-up appropriately. + self.addCleanup(logger.removeHandler, handler) + self.addCleanup(logger.setLevel, logging.NOTSET) + + return logger diff --git a/tests/logging/test_remote_handler.py b/tests/logging/test_remote_handler.py new file mode 100644
index 0000000000..4bc27a1d7d --- /dev/null +++ b/tests/logging/test_remote_handler.py
@@ -0,0 +1,169 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from twisted.test.proto_helpers import AccumulatingProtocol + +from synapse.logging import RemoteHandler + +from tests.logging import LoggerCleanupMixin +from tests.server import FakeTransport, get_clock +from tests.unittest import TestCase + + +def connect_logging_client(reactor, client_id): + # This is essentially tests.server.connect_client, but disabling autoflush on + # the client transport. This is necessary to avoid an infinite loop due to + # sending of data via the logging transport causing additional logs to be + # written. + factory = reactor.tcpClients.pop(client_id)[2] + client = factory.buildProtocol(None) + server = AccumulatingProtocol() + server.makeConnection(FakeTransport(client, reactor)) + client.makeConnection(FakeTransport(server, reactor, autoflush=False)) + + return client, server + + +class RemoteHandlerTestCase(LoggerCleanupMixin, TestCase): + def setUp(self): + self.reactor, _ = get_clock() + + def test_log_output(self): + """ + The remote handler delivers logs over TCP. + """ + handler = RemoteHandler("127.0.0.1", 9000, _reactor=self.reactor) + logger = self.get_logger(handler) + + logger.info("Hello there, %s!", "wally") + + # Trigger the connection + client, server = connect_logging_client(self.reactor, 0) + + # Trigger data being sent + client.transport.flush() + + # One log message, with a single trailing newline + logs = server.data.decode("utf8").splitlines() + self.assertEqual(len(logs), 1) + self.assertEqual(server.data.count(b"\n"), 1) + + # Ensure the data passed through properly. + self.assertEqual(logs[0], "Hello there, wally!") + + def test_log_backpressure_debug(self): + """ + When backpressure is hit, DEBUG logs will be shed. + """ + handler = RemoteHandler( + "127.0.0.1", 9000, maximum_buffer=10, _reactor=self.reactor + ) + logger = self.get_logger(handler) + + # Send some debug messages + for i in range(0, 3): + logger.debug("debug %s" % (i,)) + + # Send a bunch of useful messages + for i in range(0, 7): + logger.info("info %s" % (i,)) + + # The last debug message pushes it past the maximum buffer + logger.debug("too much debug") + + # Allow the reconnection + client, server = connect_logging_client(self.reactor, 0) + client.transport.flush() + + # Only the 7 infos made it through, the debugs were elided + logs = server.data.splitlines() + self.assertEqual(len(logs), 7) + self.assertNotIn(b"debug", server.data) + + def test_log_backpressure_info(self): + """ + When backpressure is hit, DEBUG and INFO logs will be shed. + """ + handler = RemoteHandler( + "127.0.0.1", 9000, maximum_buffer=10, _reactor=self.reactor + ) + logger = self.get_logger(handler) + + # Send some debug messages + for i in range(0, 3): + logger.debug("debug %s" % (i,)) + + # Send a bunch of useful messages + for i in range(0, 10): + logger.warning("warn %s" % (i,)) + + # Send a bunch of info messages + for i in range(0, 3): + logger.info("info %s" % (i,)) + + # The last debug message pushes it past the maximum buffer + logger.debug("too much debug") + + # Allow the reconnection + client, server = connect_logging_client(self.reactor, 0) + client.transport.flush() + + # The 10 warnings made it through, the debugs and infos were elided + logs = server.data.splitlines() + self.assertEqual(len(logs), 10) + self.assertNotIn(b"debug", server.data) + self.assertNotIn(b"info", server.data) + + def test_log_backpressure_cut_middle(self): + """ + When backpressure is hit, and no more DEBUG and INFOs cannot be culled, + it will cut the middle messages out. + """ + handler = RemoteHandler( + "127.0.0.1", 9000, maximum_buffer=10, _reactor=self.reactor + ) + logger = self.get_logger(handler) + + # Send a bunch of useful messages + for i in range(0, 20): + logger.warning("warn %s" % (i,)) + + # Allow the reconnection + client, server = connect_logging_client(self.reactor, 0) + client.transport.flush() + + # The first five and last five warnings made it through, the debugs and + # infos were elided + logs = server.data.decode("utf8").splitlines() + self.assertEqual( + ["warn %s" % (i,) for i in range(5)] + + ["warn %s" % (i,) for i in range(15, 20)], + logs, + ) + + def test_cancel_connection(self): + """ + Gracefully handle the connection being cancelled. + """ + handler = RemoteHandler( + "127.0.0.1", 9000, maximum_buffer=10, _reactor=self.reactor + ) + logger = self.get_logger(handler) + + # Send a message. + logger.info("Hello there, %s!", "wally") + + # Do not accept the connection and shutdown. This causes the pending + # connection to be cancelled (and should not raise any exceptions). + handler.close() diff --git a/tests/logging/test_structured.py b/tests/logging/test_structured.py deleted file mode 100644
index d36f5f426c..0000000000 --- a/tests/logging/test_structured.py +++ /dev/null
@@ -1,214 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2019 The Matrix.org Foundation C.I.C. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import os -import os.path -import shutil -import sys -import textwrap - -from twisted.logger import Logger, eventAsText, eventsFromJSONLogFile - -from synapse.config.logger import setup_logging -from synapse.logging._structured import setup_structured_logging -from synapse.logging.context import LoggingContext - -from tests.unittest import DEBUG, HomeserverTestCase - - -class FakeBeginner: - def beginLoggingTo(self, observers, **kwargs): - self.observers = observers - - -class StructuredLoggingTestBase: - """ - Test base that registers a cleanup handler to reset the stdlib log handler - to 'unset'. - """ - - def prepare(self, reactor, clock, hs): - def _cleanup(): - logging.getLogger("synapse").setLevel(logging.NOTSET) - - self.addCleanup(_cleanup) - - -class StructuredLoggingTestCase(StructuredLoggingTestBase, HomeserverTestCase): - """ - Tests for Synapse's structured logging support. - """ - - def test_output_to_json_round_trip(self): - """ - Synapse logs can be outputted to JSON and then read back again. - """ - temp_dir = self.mktemp() - os.mkdir(temp_dir) - self.addCleanup(shutil.rmtree, temp_dir) - - json_log_file = os.path.abspath(os.path.join(temp_dir, "out.json")) - - log_config = { - "drains": {"jsonfile": {"type": "file_json", "location": json_log_file}} - } - - # Begin the logger with our config - beginner = FakeBeginner() - setup_structured_logging( - self.hs, self.hs.config, log_config, logBeginner=beginner - ) - - # Make a logger and send an event - logger = Logger( - namespace="tests.logging.test_structured", observer=beginner.observers[0] - ) - logger.info("Hello there, {name}!", name="wally") - - # Read the log file and check it has the event we sent - with open(json_log_file, "r") as f: - logged_events = list(eventsFromJSONLogFile(f)) - self.assertEqual(len(logged_events), 1) - - # The event pulled from the file should render fine - self.assertEqual( - eventAsText(logged_events[0], includeTimestamp=False), - "[tests.logging.test_structured#info] Hello there, wally!", - ) - - def test_output_to_text(self): - """ - Synapse logs can be outputted to text. - """ - temp_dir = self.mktemp() - os.mkdir(temp_dir) - self.addCleanup(shutil.rmtree, temp_dir) - - log_file = os.path.abspath(os.path.join(temp_dir, "out.log")) - - log_config = {"drains": {"file": {"type": "file", "location": log_file}}} - - # Begin the logger with our config - beginner = FakeBeginner() - setup_structured_logging( - self.hs, self.hs.config, log_config, logBeginner=beginner - ) - - # Make a logger and send an event - logger = Logger( - namespace="tests.logging.test_structured", observer=beginner.observers[0] - ) - logger.info("Hello there, {name}!", name="wally") - - # Read the log file and check it has the event we sent - with open(log_file, "r") as f: - logged_events = f.read().strip().split("\n") - self.assertEqual(len(logged_events), 1) - - # The event pulled from the file should render fine - self.assertTrue( - logged_events[0].endswith( - " - tests.logging.test_structured - INFO - None - Hello there, wally!" - ) - ) - - def test_collects_logcontext(self): - """ - Test that log outputs have the attached logging context. - """ - log_config = {"drains": {}} - - # Begin the logger with our config - beginner = FakeBeginner() - publisher = setup_structured_logging( - self.hs, self.hs.config, log_config, logBeginner=beginner - ) - - logs = [] - - publisher.addObserver(logs.append) - - # Make a logger and send an event - logger = Logger( - namespace="tests.logging.test_structured", observer=beginner.observers[0] - ) - - with LoggingContext("testcontext", request="somereq"): - logger.info("Hello there, {name}!", name="steve") - - self.assertEqual(len(logs), 1) - self.assertEqual(logs[0]["request"], "somereq") - - -class StructuredLoggingConfigurationFileTestCase( - StructuredLoggingTestBase, HomeserverTestCase -): - def make_homeserver(self, reactor, clock): - - tempdir = self.mktemp() - os.mkdir(tempdir) - log_config_file = os.path.abspath(os.path.join(tempdir, "log.config.yaml")) - self.homeserver_log = os.path.abspath(os.path.join(tempdir, "homeserver.log")) - - config = self.default_config() - config["log_config"] = log_config_file - - with open(log_config_file, "w") as f: - f.write( - textwrap.dedent( - """\ - structured: true - - drains: - file: - type: file_json - location: %s - """ - % (self.homeserver_log,) - ) - ) - - self.addCleanup(self._sys_cleanup) - - return self.setup_test_homeserver(config=config) - - def _sys_cleanup(self): - sys.stdout = sys.__stdout__ - sys.stderr = sys.__stderr__ - - # Do not remove! We need the logging system to be set other than WARNING. - @DEBUG - def test_log_output(self): - """ - When a structured logging config is given, Synapse will use it. - """ - beginner = FakeBeginner() - publisher = setup_logging(self.hs, self.hs.config, logBeginner=beginner) - - # Make a logger and send an event - logger = Logger(namespace="tests.logging.test_structured", observer=publisher) - - with LoggingContext("testcontext", request="somereq"): - logger.info("Hello there, {name}!", name="steve") - - with open(self.homeserver_log, "r") as f: - logged_events = [ - eventAsText(x, includeTimestamp=False) for x in eventsFromJSONLogFile(f) - ] - - logs = "\n".join(logged_events) - self.assertTrue("***** STARTING SERVER *****" in logs) - self.assertTrue("Hello there, steve!" in logs) diff --git a/tests/logging/test_terse_json.py b/tests/logging/test_terse_json.py
index fd128b88e0..73f469b802 100644 --- a/tests/logging/test_terse_json.py +++ b/tests/logging/test_terse_json.py
@@ -14,57 +14,33 @@ # limitations under the License. import json -from collections import Counter +import logging +from io import StringIO -from twisted.logger import Logger +from synapse.logging._terse_json import JsonFormatter, TerseJsonFormatter -from synapse.logging._structured import setup_structured_logging +from tests.logging import LoggerCleanupMixin +from tests.unittest import TestCase -from tests.server import connect_client -from tests.unittest import HomeserverTestCase -from .test_structured import FakeBeginner, StructuredLoggingTestBase - - -class TerseJSONTCPTestCase(StructuredLoggingTestBase, HomeserverTestCase): - def test_log_output(self): +class TerseJsonTestCase(LoggerCleanupMixin, TestCase): + def test_terse_json_output(self): """ - The Terse JSON outputter delivers simplified structured logs over TCP. + The Terse JSON formatter converts log messages to JSON. """ - log_config = { - "drains": { - "tersejson": { - "type": "network_json_terse", - "host": "127.0.0.1", - "port": 8000, - } - } - } - - # Begin the logger with our config - beginner = FakeBeginner() - setup_structured_logging( - self.hs, self.hs.config, log_config, logBeginner=beginner - ) - - logger = Logger( - namespace="tests.logging.test_terse_json", observer=beginner.observers[0] - ) - logger.info("Hello there, {name}!", name="wally") - - # Trigger the connection - self.pump() + output = StringIO() - _, server = connect_client(self.reactor, 0) + handler = logging.StreamHandler(output) + handler.setFormatter(TerseJsonFormatter()) + logger = self.get_logger(handler) - # Trigger data being sent - self.pump() + logger.info("Hello there, %s!", "wally") - # One log message, with a single trailing newline - logs = server.data.decode("utf8").splitlines() + # One log message, with a single trailing newline. + data = output.getvalue() + logs = data.splitlines() self.assertEqual(len(logs), 1) - self.assertEqual(server.data.count(b"\n"), 1) - + self.assertEqual(data.count("\n"), 1) log = json.loads(logs[0]) # The terse logger should give us these keys. @@ -72,163 +48,74 @@ class TerseJSONTCPTestCase(StructuredLoggingTestBase, HomeserverTestCase): "log", "time", "level", - "log_namespace", - "request", - "scope", - "server_name", - "name", + "namespace", ] self.assertCountEqual(log.keys(), expected_log_keys) + self.assertEqual(log["log"], "Hello there, wally!") - # It contains the data we expect. - self.assertEqual(log["name"], "wally") - - def test_log_backpressure_debug(self): + def test_extra_data(self): """ - When backpressure is hit, DEBUG logs will be shed. + Additional information can be included in the structured logging. """ - log_config = { - "loggers": {"synapse": {"level": "DEBUG"}}, - "drains": { - "tersejson": { - "type": "network_json_terse", - "host": "127.0.0.1", - "port": 8000, - "maximum_buffer": 10, - } - }, - } - - # Begin the logger with our config - beginner = FakeBeginner() - setup_structured_logging( - self.hs, - self.hs.config, - log_config, - logBeginner=beginner, - redirect_stdlib_logging=False, - ) - - logger = Logger( - namespace="synapse.logging.test_terse_json", observer=beginner.observers[0] - ) + output = StringIO() - # Send some debug messages - for i in range(0, 3): - logger.debug("debug %s" % (i,)) + handler = logging.StreamHandler(output) + handler.setFormatter(TerseJsonFormatter()) + logger = self.get_logger(handler) - # Send a bunch of useful messages - for i in range(0, 7): - logger.info("test message %s" % (i,)) - - # The last debug message pushes it past the maximum buffer - logger.debug("too much debug") - - # Allow the reconnection - _, server = connect_client(self.reactor, 0) - self.pump() - - # Only the 7 infos made it through, the debugs were elided - logs = server.data.splitlines() - self.assertEqual(len(logs), 7) - - def test_log_backpressure_info(self): - """ - When backpressure is hit, DEBUG and INFO logs will be shed. - """ - log_config = { - "loggers": {"synapse": {"level": "DEBUG"}}, - "drains": { - "tersejson": { - "type": "network_json_terse", - "host": "127.0.0.1", - "port": 8000, - "maximum_buffer": 10, - } - }, - } - - # Begin the logger with our config - beginner = FakeBeginner() - setup_structured_logging( - self.hs, - self.hs.config, - log_config, - logBeginner=beginner, - redirect_stdlib_logging=False, - ) - - logger = Logger( - namespace="synapse.logging.test_terse_json", observer=beginner.observers[0] + logger.info( + "Hello there, %s!", "wally", extra={"foo": "bar", "int": 3, "bool": True} ) - # Send some debug messages - for i in range(0, 3): - logger.debug("debug %s" % (i,)) - - # Send a bunch of useful messages - for i in range(0, 10): - logger.warn("test warn %s" % (i,)) - - # Send a bunch of info messages - for i in range(0, 3): - logger.info("test message %s" % (i,)) - - # The last debug message pushes it past the maximum buffer - logger.debug("too much debug") - - # Allow the reconnection - client, server = connect_client(self.reactor, 0) - self.pump() + # One log message, with a single trailing newline. + data = output.getvalue() + logs = data.splitlines() + self.assertEqual(len(logs), 1) + self.assertEqual(data.count("\n"), 1) + log = json.loads(logs[0]) - # The 10 warnings made it through, the debugs and infos were elided - logs = list(map(json.loads, server.data.decode("utf8").splitlines())) - self.assertEqual(len(logs), 10) + # The terse logger should give us these keys. + expected_log_keys = [ + "log", + "time", + "level", + "namespace", + # The additional keys given via extra. + "foo", + "int", + "bool", + ] + self.assertCountEqual(log.keys(), expected_log_keys) - self.assertEqual(Counter([x["level"] for x in logs]), {"WARN": 10}) + # Check the values of the extra fields. + self.assertEqual(log["foo"], "bar") + self.assertEqual(log["int"], 3) + self.assertIs(log["bool"], True) - def test_log_backpressure_cut_middle(self): + def test_json_output(self): """ - When backpressure is hit, and no more DEBUG and INFOs cannot be culled, - it will cut the middle messages out. + The Terse JSON formatter converts log messages to JSON. """ - log_config = { - "loggers": {"synapse": {"level": "DEBUG"}}, - "drains": { - "tersejson": { - "type": "network_json_terse", - "host": "127.0.0.1", - "port": 8000, - "maximum_buffer": 10, - } - }, - } - - # Begin the logger with our config - beginner = FakeBeginner() - setup_structured_logging( - self.hs, - self.hs.config, - log_config, - logBeginner=beginner, - redirect_stdlib_logging=False, - ) + output = StringIO() - logger = Logger( - namespace="synapse.logging.test_terse_json", observer=beginner.observers[0] - ) + handler = logging.StreamHandler(output) + handler.setFormatter(JsonFormatter()) + logger = self.get_logger(handler) - # Send a bunch of useful messages - for i in range(0, 20): - logger.warn("test warn", num=i) + logger.info("Hello there, %s!", "wally") - # Allow the reconnection - client, server = connect_client(self.reactor, 0) - self.pump() + # One log message, with a single trailing newline. + data = output.getvalue() + logs = data.splitlines() + self.assertEqual(len(logs), 1) + self.assertEqual(data.count("\n"), 1) + log = json.loads(logs[0]) - # The first five and last five warnings made it through, the debugs and - # infos were elided - logs = list(map(json.loads, server.data.decode("utf8").splitlines())) - self.assertEqual(len(logs), 10) - self.assertEqual(Counter([x["level"] for x in logs]), {"WARN": 10}) - self.assertEqual([0, 1, 2, 3, 4, 15, 16, 17, 18, 19], [x["num"] for x in logs]) + # The terse logger should give us these keys. + expected_log_keys = [ + "log", + "level", + "namespace", + ] + self.assertCountEqual(log.keys(), expected_log_keys) + self.assertEqual(log["log"], "Hello there, wally!") diff --git a/tests/push/test_email.py b/tests/push/test_email.py
index d9993e6245..bcdcafa5a9 100644 --- a/tests/push/test_email.py +++ b/tests/push/test_email.py
@@ -100,7 +100,7 @@ class EmailPusherTests(HomeserverTestCase): user_tuple = self.get_success( self.hs.get_datastore().get_user_by_access_token(self.access_token) ) - token_id = user_tuple["token_id"] + token_id = user_tuple.token_id self.pusher = self.get_success( self.hs.get_pusherpool().add_pusher( diff --git a/tests/push/test_http.py b/tests/push/test_http.py
index 2f56cacc7a..826cebbf0c 100644 --- a/tests/push/test_http.py +++ b/tests/push/test_http.py
@@ -69,7 +69,7 @@ class HTTPPusherTests(HomeserverTestCase): user_tuple = self.get_success( self.hs.get_datastore().get_user_by_access_token(access_token) ) - token_id = user_tuple["token_id"] + token_id = user_tuple.token_id self.get_success( self.hs.get_pusherpool().add_pusher( @@ -181,7 +181,7 @@ class HTTPPusherTests(HomeserverTestCase): user_tuple = self.get_success( self.hs.get_datastore().get_user_by_access_token(access_token) ) - token_id = user_tuple["token_id"] + token_id = user_tuple.token_id self.get_success( self.hs.get_pusherpool().add_pusher( @@ -297,7 +297,7 @@ class HTTPPusherTests(HomeserverTestCase): user_tuple = self.get_success( self.hs.get_datastore().get_user_by_access_token(access_token) ) - token_id = user_tuple["token_id"] + token_id = user_tuple.token_id self.get_success( self.hs.get_pusherpool().add_pusher( @@ -379,7 +379,7 @@ class HTTPPusherTests(HomeserverTestCase): user_tuple = self.get_success( self.hs.get_datastore().get_user_by_access_token(access_token) ) - token_id = user_tuple["token_id"] + token_id = user_tuple.token_id self.get_success( self.hs.get_pusherpool().add_pusher( @@ -452,7 +452,7 @@ class HTTPPusherTests(HomeserverTestCase): user_tuple = self.get_success( self.hs.get_datastore().get_user_by_access_token(access_token) ) - token_id = user_tuple["token_id"] + token_id = user_tuple.token_id self.get_success( self.hs.get_pusherpool().add_pusher( diff --git a/tests/replication/_base.py b/tests/replication/_base.py
index f1e53f33cd..5c633ac6df 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py
@@ -16,7 +16,6 @@ import logging from typing import Any, Callable, List, Optional, Tuple import attr -import hiredis from twisted.internet.interfaces import IConsumer, IPullProducer, IReactorTime from twisted.internet.protocol import Protocol @@ -39,12 +38,22 @@ from synapse.util import Clock from tests import unittest from tests.server import FakeTransport, render +try: + import hiredis +except ImportError: + hiredis = None + logger = logging.getLogger(__name__) class BaseStreamTestCase(unittest.HomeserverTestCase): """Base class for tests of the replication streams""" + # hiredis is an optional dependency so we don't want to require it for running + # the tests. + if not hiredis: + skip = "Requires hiredis" + servlets = [ streams.register_servlets, ] diff --git a/tests/replication/test_pusher_shard.py b/tests/replication/test_pusher_shard.py
index 2bdc6edbb1..67c27a089f 100644 --- a/tests/replication/test_pusher_shard.py +++ b/tests/replication/test_pusher_shard.py
@@ -55,7 +55,7 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase): user_dict = self.get_success( self.hs.get_datastore().get_user_by_access_token(access_token) ) - token_id = user_dict["token_id"] + token_id = user_dict.token_id self.get_success( self.hs.get_pusherpool().add_pusher( diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py
index 0b728e7ffc..7df32e5093 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py
@@ -1206,7 +1206,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase): user_tuple = self.get_success( self.store.get_user_by_access_token(other_user_token) ) - token_id = user_tuple["token_id"] + token_id = user_tuple.token_id self.get_success( self.hs.get_pusherpool().add_pusher( diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py
index 46d0374ca4..3a1f7c5da9 100644 --- a/tests/rest/client/v2_alpha/test_register.py +++ b/tests/rest/client/v2_alpha/test_register.py
@@ -59,6 +59,7 @@ class RegisterRestServletTestCase(unittest.HomeserverTestCase): self.hs.config.server_name, id="1234", namespaces={"users": [{"regex": r"@as_user.*", "exclusive": True}]}, + sender="@as:test", ) self.hs.get_datastore().services_cache.append(appservice) diff --git a/tests/server.py b/tests/server.py
index ea9c22bc51..b97003fa5a 100644 --- a/tests/server.py +++ b/tests/server.py
@@ -571,12 +571,10 @@ def connect_client(reactor: IReactorTCP, client_id: int) -> AccumulatingProtocol reactor factory: The connecting factory to build. """ - factory = reactor.tcpClients[client_id][2] + factory = reactor.tcpClients.pop(client_id)[2] client = factory.buildProtocol(None) server = AccumulatingProtocol() server.makeConnection(FakeTransport(client, reactor)) client.makeConnection(FakeTransport(server, reactor)) - reactor.tcpClients.pop(client_id) - return client, server diff --git a/tests/storage/test_registration.py b/tests/storage/test_registration.py
index 6b582771fe..c8c7a90e5d 100644 --- a/tests/storage/test_registration.py +++ b/tests/storage/test_registration.py
@@ -69,11 +69,9 @@ class RegistrationStoreTestCase(unittest.TestCase): self.store.get_user_by_access_token(self.tokens[1]) ) - self.assertDictContainsSubset( - {"name": self.user_id, "device_id": self.device_id}, result - ) - - self.assertTrue("token_id" in result) + self.assertEqual(result.user_id, self.user_id) + self.assertEqual(result.device_id, self.device_id) + self.assertIsNotNone(result.token_id) @defer.inlineCallbacks def test_user_delete_access_tokens(self): @@ -105,7 +103,7 @@ class RegistrationStoreTestCase(unittest.TestCase): user = yield defer.ensureDeferred( self.store.get_user_by_access_token(self.tokens[0]) ) - self.assertEqual(self.user_id, user["name"]) + self.assertEqual(self.user_id, user.user_id) # now delete the rest yield defer.ensureDeferred(self.store.user_delete_access_tokens(self.user_id))