From 73d091be48c6bfa1c41a8d31068d21fc28c26f6e Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Tue, 12 Nov 2019 13:12:25 +0000 Subject: A couple more instances --- synapse/logging/_terse_json.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/logging/_terse_json.py') diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py index 0ebbde06f2..76ce7d8808 100644 --- a/synapse/logging/_terse_json.py +++ b/synapse/logging/_terse_json.py @@ -153,7 +153,7 @@ class TerseJSONToTCPLogObserver(object): An IObserver that writes JSON logs to a TCP target. Args: - hs (HomeServer): The Homeserver that is being logged for. + hs (HomeServer): The homeserver that is being logged for. host: The host of the logging target. port: The logging target's port. metadata: Metadata to be added to each log entry. -- cgit 1.4.1 From 9eebd46048d0b34767047b2156760a1467f19ae6 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 26 Nov 2019 03:45:50 +1100 Subject: Improve the performance of structured logging (#6322) --- changelog.d/6322.misc | 1 + synapse/logging/_structured.py | 14 +++++- synapse/logging/_terse_json.py | 106 ++++++++++++++++++++++++++++++----------- tests/server.py | 2 + 4 files changed, 93 insertions(+), 30 deletions(-) create mode 100644 changelog.d/6322.misc (limited to 'synapse/logging/_terse_json.py') diff --git a/changelog.d/6322.misc b/changelog.d/6322.misc new file mode 100644 index 0000000000..70ef36ca80 --- /dev/null +++ b/changelog.d/6322.misc @@ -0,0 +1 @@ +Improve the performance of outputting structured logging. diff --git a/synapse/logging/_structured.py b/synapse/logging/_structured.py index 334ddaf39a..ffa7b20ca8 100644 --- a/synapse/logging/_structured.py +++ b/synapse/logging/_structured.py @@ -261,6 +261,18 @@ def parse_drain_configs( ) +class StoppableLogPublisher(LogPublisher): + """ + A log publisher that can tell its observers to shut down any external + communications. + """ + + def stop(self): + for obs in self._observers: + if hasattr(obs, "stop"): + obs.stop() + + def setup_structured_logging( hs, config, @@ -336,7 +348,7 @@ def setup_structured_logging( # We should never get here, but, just in case, throw an error. raise ConfigError("%s drain type cannot be configured" % (observer.type,)) - publisher = LogPublisher(*observers) + publisher = StoppableLogPublisher(*observers) log_filter = LogLevelFilterPredicate() for namespace, namespace_config in log_config.get( diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py index 76ce7d8808..05fc64f409 100644 --- a/synapse/logging/_terse_json.py +++ b/synapse/logging/_terse_json.py @@ -17,25 +17,29 @@ Log formatters that output terse JSON. """ +import json import sys +import traceback from collections import deque from ipaddress import IPv4Address, IPv6Address, ip_address from math import floor -from typing import IO +from typing import IO, Optional import attr -from simplejson import dumps from zope.interface import implementer from twisted.application.internet import ClientService +from twisted.internet.defer import Deferred from twisted.internet.endpoints import ( HostnameEndpoint, TCP4ClientEndpoint, TCP6ClientEndpoint, ) +from twisted.internet.interfaces import IPushProducer, ITransport from twisted.internet.protocol import Factory, Protocol from twisted.logger import FileLogObserver, ILogObserver, Logger -from twisted.python.failure import Failure + +_encoder = json.JSONEncoder(ensure_ascii=False, separators=(",", ":")) def flatten_event(event: dict, metadata: dict, include_time: bool = False): @@ -141,11 +145,49 @@ def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogOb def formatEvent(_event: dict) -> str: flattened = flatten_event(_event, metadata) - return dumps(flattened, ensure_ascii=False, separators=(",", ":")) + "\n" + return _encoder.encode(flattened) + "\n" return FileLogObserver(outFile, formatEvent) +@attr.s +@implementer(IPushProducer) +class LogProducer(object): + """ + An IPushProducer that writes logs from its buffer to its transport when it + is resumed. + + Args: + buffer: Log buffer to read logs from. + transport: Transport to write to. + """ + + transport = attr.ib(type=ITransport) + _buffer = attr.ib(type=deque) + _paused = attr.ib(default=False, type=bool, init=False) + + def pauseProducing(self): + self._paused = True + + def stopProducing(self): + self._paused = True + self._buffer = None + + def resumeProducing(self): + self._paused = False + + while self._paused is False and (self._buffer and self.transport.connected): + try: + event = self._buffer.popleft() + self.transport.write(_encoder.encode(event).encode("utf8")) + self.transport.write(b"\n") + except Exception: + # Something has gone wrong writing to the transport -- log it + # and break out of the while. + traceback.print_exc(file=sys.__stderr__) + break + + @attr.s @implementer(ILogObserver) class TerseJSONToTCPLogObserver(object): @@ -165,8 +207,9 @@ class TerseJSONToTCPLogObserver(object): metadata = attr.ib(type=dict) maximum_buffer = attr.ib(type=int) _buffer = attr.ib(default=attr.Factory(deque), type=deque) - _writer = attr.ib(default=None) + _connection_waiter = attr.ib(default=None, type=Optional[Deferred]) _logger = attr.ib(default=attr.Factory(Logger)) + _producer = attr.ib(default=None, type=Optional[LogProducer]) def start(self) -> None: @@ -187,38 +230,43 @@ class TerseJSONToTCPLogObserver(object): factory = Factory.forProtocol(Protocol) self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor()) self._service.startService() + self._connect() - def _write_loop(self) -> None: + def stop(self): + self._service.stopService() + + def _connect(self) -> None: """ - Implement the write loop. + Triggers an attempt to connect then write to the remote if not already writing. """ - if self._writer: + if self._connection_waiter: return - self._writer = self._service.whenConnected() + self._connection_waiter = self._service.whenConnected(failAfterFailures=1) + + @self._connection_waiter.addErrback + def fail(r): + r.printTraceback(file=sys.__stderr__) + self._connection_waiter = None + self._connect() - @self._writer.addBoth + @self._connection_waiter.addCallback def writer(r): - if isinstance(r, Failure): - r.printTraceback(file=sys.__stderr__) - self._writer = None - self.hs.get_reactor().callLater(1, self._write_loop) + # We have a connection. If we already have a producer, and its + # transport is the same, just trigger a resumeProducing. + if self._producer and r.transport is self._producer.transport: + self._producer.resumeProducing() return - try: - for event in self._buffer: - r.transport.write( - dumps(event, ensure_ascii=False, separators=(",", ":")).encode( - "utf8" - ) - ) - r.transport.write(b"\n") - self._buffer.clear() - except Exception as e: - sys.__stderr__.write("Failed writing out logs with %s\n" % (str(e),)) - - self._writer = False - self.hs.get_reactor().callLater(1, self._write_loop) + # If the producer is still producing, stop it. + if self._producer: + self._producer.stopProducing() + + # Make a new producer and start it. + self._producer = LogProducer(buffer=self._buffer, transport=r.transport) + r.transport.registerProducer(self._producer, True) + self._producer.resumeProducing() + self._connection_waiter = None def _handle_pressure(self) -> None: """ @@ -277,4 +325,4 @@ class TerseJSONToTCPLogObserver(object): self._logger.failure("Failed clearing backpressure") # Try and write immediately. - self._write_loop() + self._connect() diff --git a/tests/server.py b/tests/server.py index f878aeaada..2b7cf4242e 100644 --- a/tests/server.py +++ b/tests/server.py @@ -379,6 +379,7 @@ class FakeTransport(object): disconnecting = False disconnected = False + connected = True buffer = attr.ib(default=b"") producer = attr.ib(default=None) autoflush = attr.ib(default=True) @@ -402,6 +403,7 @@ class FakeTransport(object): "FakeTransport: Delaying disconnect until buffer is flushed" ) else: + self.connected = False self.disconnected = True def abortConnection(self): -- cgit 1.4.1 From fdec84aa427e2e3b806eb15f462d652f8554cc8d Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Tue, 3 Dec 2019 20:21:25 +1100 Subject: Add benchmarks for structured logging performance (#6266) --- changelog.d/6266.misc | 1 + synapse/logging/_terse_json.py | 1 + synmark/__init__.py | 72 +++++++++++++++++++++++++ synmark/__main__.py | 90 +++++++++++++++++++++++++++++++ synmark/suites/__init__.py | 3 ++ synmark/suites/logging.py | 118 +++++++++++++++++++++++++++++++++++++++++ tox.ini | 9 ++++ 7 files changed, 294 insertions(+) create mode 100644 changelog.d/6266.misc create mode 100644 synmark/__init__.py create mode 100644 synmark/__main__.py create mode 100644 synmark/suites/__init__.py create mode 100644 synmark/suites/logging.py (limited to 'synapse/logging/_terse_json.py') diff --git a/changelog.d/6266.misc b/changelog.d/6266.misc new file mode 100644 index 0000000000..634e421a79 --- /dev/null +++ b/changelog.d/6266.misc @@ -0,0 +1 @@ +Add benchmarks for structured logging and improve output performance. diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py index 05fc64f409..03934956f4 100644 --- a/synapse/logging/_terse_json.py +++ b/synapse/logging/_terse_json.py @@ -256,6 +256,7 @@ class TerseJSONToTCPLogObserver(object): # transport is the same, just trigger a resumeProducing. if self._producer and r.transport is self._producer.transport: self._producer.resumeProducing() + self._connection_waiter = None return # If the producer is still producing, stop it. diff --git a/synmark/__init__.py b/synmark/__init__.py new file mode 100644 index 0000000000..570eb818d9 --- /dev/null +++ b/synmark/__init__.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from twisted.internet import epollreactor +from twisted.internet.main import installReactor + +from synapse.config.homeserver import HomeServerConfig +from synapse.util import Clock + +from tests.utils import default_config, setup_test_homeserver + + +async def make_homeserver(reactor, config=None): + """ + Make a Homeserver suitable for running benchmarks against. + + Args: + reactor: A Twisted reactor to run under. + config: A HomeServerConfig to use, or None. + """ + cleanup_tasks = [] + clock = Clock(reactor) + + if not config: + config = default_config("test") + + config_obj = HomeServerConfig() + config_obj.parse_config_dict(config, "", "") + + hs = await setup_test_homeserver( + cleanup_tasks.append, config=config_obj, reactor=reactor, clock=clock + ) + stor = hs.get_datastore() + + # Run the database background updates. + if hasattr(stor, "do_next_background_update"): + while not await stor.has_completed_background_updates(): + await stor.do_next_background_update(1) + + def cleanup(): + for i in cleanup_tasks: + i() + + return hs, clock.sleep, cleanup + + +def make_reactor(): + """ + Instantiate and install a Twisted reactor suitable for testing (i.e. not the + default global one). + """ + reactor = epollreactor.EPollReactor() + + if "twisted.internet.reactor" in sys.modules: + del sys.modules["twisted.internet.reactor"] + installReactor(reactor) + + return reactor diff --git a/synmark/__main__.py b/synmark/__main__.py new file mode 100644 index 0000000000..ac59befbd4 --- /dev/null +++ b/synmark/__main__.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +from contextlib import redirect_stderr +from io import StringIO + +import pyperf +from synmark import make_reactor +from synmark.suites import SUITES + +from twisted.internet.defer import ensureDeferred +from twisted.logger import globalLogBeginner, textFileLogObserver +from twisted.python.failure import Failure + +from tests.utils import setupdb + + +def make_test(main): + """ + Take a benchmark function and wrap it in a reactor start and stop. + """ + + def _main(loops): + + reactor = make_reactor() + + file_out = StringIO() + with redirect_stderr(file_out): + + d = ensureDeferred(main(reactor, loops)) + + def on_done(_): + if isinstance(_, Failure): + _.printTraceback() + print(file_out.getvalue()) + reactor.stop() + return _ + + d.addBoth(on_done) + reactor.run() + + return d.result + + return _main + + +if __name__ == "__main__": + + def add_cmdline_args(cmd, args): + if args.log: + cmd.extend(["--log"]) + + runner = pyperf.Runner( + processes=3, min_time=2, show_name=True, add_cmdline_args=add_cmdline_args + ) + runner.argparser.add_argument("--log", action="store_true") + runner.parse_args() + + orig_loops = runner.args.loops + runner.args.inherit_environ = ["SYNAPSE_POSTGRES"] + + if runner.args.worker: + if runner.args.log: + globalLogBeginner.beginLoggingTo( + [textFileLogObserver(sys.__stdout__)], redirectStandardIO=False + ) + setupdb() + + for suite, loops in SUITES: + if loops: + runner.args.loops = loops + else: + runner.args.loops = orig_loops + loops = "auto" + runner.bench_time_func( + suite.__name__ + "_" + str(loops), make_test(suite.main), + ) diff --git a/synmark/suites/__init__.py b/synmark/suites/__init__.py new file mode 100644 index 0000000000..cfa3b0ba38 --- /dev/null +++ b/synmark/suites/__init__.py @@ -0,0 +1,3 @@ +from . import logging + +SUITES = [(logging, 1000), (logging, 10000), (logging, None)] diff --git a/synmark/suites/logging.py b/synmark/suites/logging.py new file mode 100644 index 0000000000..d8e4c7d58f --- /dev/null +++ b/synmark/suites/logging.py @@ -0,0 +1,118 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import warnings +from io import StringIO + +from mock import Mock + +from pyperf import perf_counter +from synmark import make_homeserver + +from twisted.internet.defer import Deferred +from twisted.internet.protocol import ServerFactory +from twisted.logger import LogBeginner, Logger, LogPublisher +from twisted.protocols.basic import LineOnlyReceiver + +from synapse.logging._structured import setup_structured_logging + + +class LineCounter(LineOnlyReceiver): + + delimiter = b"\n" + + def __init__(self, *args, **kwargs): + self.count = 0 + super().__init__(*args, **kwargs) + + def lineReceived(self, line): + self.count += 1 + + if self.count >= self.factory.wait_for and self.factory.on_done: + on_done = self.factory.on_done + self.factory.on_done = None + on_done.callback(True) + + +async def main(reactor, loops): + """ + Benchmark how long it takes to send `loops` messages. + """ + servers = [] + + def protocol(): + p = LineCounter() + servers.append(p) + return p + + logger_factory = ServerFactory.forProtocol(protocol) + logger_factory.wait_for = loops + logger_factory.on_done = Deferred() + port = reactor.listenTCP(0, logger_factory, interface="127.0.0.1") + + hs, wait, cleanup = await make_homeserver(reactor) + + errors = StringIO() + publisher = LogPublisher() + mock_sys = Mock() + beginner = LogBeginner( + publisher, errors, mock_sys, warnings, initialBufferSize=loops + ) + + log_config = { + "loggers": {"synapse": {"level": "DEBUG"}}, + "drains": { + "tersejson": { + "type": "network_json_terse", + "host": "127.0.0.1", + "port": port.getHost().port, + "maximum_buffer": 100, + } + }, + } + + logger = Logger(namespace="synapse.logging.test_terse_json", observer=publisher) + logging_system = setup_structured_logging( + hs, hs.config, log_config, logBeginner=beginner, redirect_stdlib_logging=False + ) + + # Wait for it to connect... + await logging_system._observers[0]._service.whenConnected() + + start = perf_counter() + + # Send a bunch of useful messages + for i in range(0, loops): + logger.info("test message %s" % (i,)) + + if ( + len(logging_system._observers[0]._buffer) + == logging_system._observers[0].maximum_buffer + ): + while ( + len(logging_system._observers[0]._buffer) + > logging_system._observers[0].maximum_buffer / 2 + ): + await wait(0.01) + + await logger_factory.on_done + + end = perf_counter() - start + + logging_system.stop() + port.stopListening() + cleanup() + + return end diff --git a/tox.ini b/tox.ini index 62b350ea6a..903a245fb0 100644 --- a/tox.ini +++ b/tox.ini @@ -102,6 +102,15 @@ commands = {envbindir}/coverage run "{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:} +[testenv:benchmark] +deps = + {[base]deps} + pyperf +setenv = + SYNAPSE_POSTGRES = 1 +commands = + python -m synmark {posargs:} + [testenv:packaging] skip_install=True deps = -- cgit 1.4.1 From 495005360cd37009818ad4214a5462c3dd0b15a6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Dec 2019 15:21:12 +0000 Subject: Bump version of mypy --- synapse/config/emailconfig.py | 3 ++- synapse/config/ratelimiting.py | 3 +-- synapse/config/server.py | 2 +- synapse/logging/_terse_json.py | 2 +- synapse/logging/context.py | 3 +++ synapse/streams/events.py | 4 +++- tox.ini | 2 +- 7 files changed, 12 insertions(+), 7 deletions(-) (limited to 'synapse/logging/_terse_json.py') diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py index 18f42a87f9..35756bed87 100644 --- a/synapse/config/emailconfig.py +++ b/synapse/config/emailconfig.py @@ -21,6 +21,7 @@ from __future__ import print_function import email.utils import os from enum import Enum +from typing import Optional import pkg_resources @@ -101,7 +102,7 @@ class EmailConfig(Config): # both in RegistrationConfig and here. We should factor this bit out self.account_threepid_delegate_email = self.trusted_third_party_id_servers[ 0 - ] + ] # type: Optional[str] self.using_identity_server_from_trusted_list = True else: raise ConfigError( diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py index 947f653e03..4a3bfc4354 100644 --- a/synapse/config/ratelimiting.py +++ b/synapse/config/ratelimiting.py @@ -83,10 +83,9 @@ class RatelimitConfig(Config): ) rc_admin_redaction = config.get("rc_admin_redaction") + self.rc_admin_redaction = None if rc_admin_redaction: self.rc_admin_redaction = RateLimitConfig(rc_admin_redaction) - else: - self.rc_admin_redaction = None def generate_config_section(self, **kwargs): return """\ diff --git a/synapse/config/server.py b/synapse/config/server.py index a4bef00936..50af858c76 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -200,7 +200,7 @@ class ServerConfig(Config): self.admin_contact = config.get("admin_contact", None) # FIXME: federation_domain_whitelist needs sytests - self.federation_domain_whitelist = None + self.federation_domain_whitelist = None # type: Optional[dict] federation_domain_whitelist = config.get("federation_domain_whitelist", None) if federation_domain_whitelist is not None: diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py index 03934956f4..c0b9384189 100644 --- a/synapse/logging/_terse_json.py +++ b/synapse/logging/_terse_json.py @@ -171,7 +171,7 @@ class LogProducer(object): def stopProducing(self): self._paused = True - self._buffer = None + self._buffer = deque() def resumeProducing(self): self._paused = False diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 6747f29e6a..33b322209d 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -405,6 +405,9 @@ class LoggingContext(object): """ current = get_thread_resource_usage() + # Indicate to mypy that we know that self.usage_start is None. + assert self.usage_start is not None + utime_delta = current.ru_utime - self.usage_start.ru_utime stime_delta = current.ru_stime - self.usage_start.ru_stime diff --git a/synapse/streams/events.py b/synapse/streams/events.py index b91fb2db7b..fcd2aaa9c9 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Any, Dict + from twisted.internet import defer from synapse.handlers.account_data import AccountDataEventSource @@ -35,7 +37,7 @@ class EventSources(object): def __init__(self, hs): self.sources = { name: cls(hs) for name, cls in EventSources.SOURCE_TYPES.items() - } + } # type: Dict[str, Any] self.store = hs.get_datastore() @defer.inlineCallbacks diff --git a/tox.ini b/tox.ini index 2ae82d674f..1d6428f64f 100644 --- a/tox.ini +++ b/tox.ini @@ -171,7 +171,7 @@ basepython = python3.7 skip_install = True deps = {[base]deps} - mypy==0.730 + mypy==0.750 mypy-zope env = MYPYPATH = stubs/ -- cgit 1.4.1