diff options
Diffstat (limited to 'synapse/logging/_terse_json.py')
-rw-r--r-- | synapse/logging/_terse_json.py | 199 |
1 files changed, 13 insertions, 186 deletions
diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py index 1b8916cfa2..9b46956ca9 100644 --- a/synapse/logging/_terse_json.py +++ b/synapse/logging/_terse_json.py @@ -18,26 +18,11 @@ Log formatters that output terse JSON. """ import json -import sys -import traceback -from collections import deque -from ipaddress import IPv4Address, IPv6Address, ip_address -from math import floor -from typing import IO, Optional +from typing import IO -import attr -from zope.interface import implementer +from twisted.logger import FileLogObserver -from twisted.application.internet import ClientService -from twisted.internet.defer import Deferred -from twisted.internet.endpoints import ( - HostnameEndpoint, - TCP4ClientEndpoint, - TCP6ClientEndpoint, -) -from twisted.internet.interfaces import IPushProducer, ITransport -from twisted.internet.protocol import Factory, Protocol -from twisted.logger import FileLogObserver, ILogObserver, Logger +from synapse.logging._remote import TCPLogObserver _encoder = json.JSONEncoder(ensure_ascii=False, separators=(",", ":")) @@ -150,180 +135,22 @@ def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogOb return FileLogObserver(outFile, formatEvent) -@attr.s -@implementer(IPushProducer) -class LogProducer: +def TerseJSONToTCPLogObserver( + hs, host: str, port: int, metadata: dict, maximum_buffer: int +) -> FileLogObserver: """ - An IPushProducer that writes logs from its buffer to its transport when it - is resumed. - - Args: - buffer: Log buffer to read logs from. - transport: Transport to write to. - """ - - transport = attr.ib(type=ITransport) - _buffer = attr.ib(type=deque) - _paused = attr.ib(default=False, type=bool, init=False) - - def pauseProducing(self): - self._paused = True - - def stopProducing(self): - self._paused = True - self._buffer = deque() - - def resumeProducing(self): - self._paused = False - - while self._paused is False and (self._buffer and self.transport.connected): - try: - event = self._buffer.popleft() - self.transport.write(_encoder.encode(event).encode("utf8")) - self.transport.write(b"\n") - except Exception: - # Something has gone wrong writing to the transport -- log it - # and break out of the while. - traceback.print_exc(file=sys.__stderr__) - break - - -@attr.s -@implementer(ILogObserver) -class TerseJSONToTCPLogObserver: - """ - An IObserver that writes JSON logs to a TCP target. + A log observer that formats events to a flattened JSON representation. Args: hs (HomeServer): The homeserver that is being logged for. host: The host of the logging target. port: The logging target's port. - metadata: Metadata to be added to each log entry. + metadata: Metadata to be added to each log object. + maximum_buffer: The maximum buffer size. """ - hs = attr.ib() - host = attr.ib(type=str) - port = attr.ib(type=int) - metadata = attr.ib(type=dict) - maximum_buffer = attr.ib(type=int) - _buffer = attr.ib(default=attr.Factory(deque), type=deque) - _connection_waiter = attr.ib(default=None, type=Optional[Deferred]) - _logger = attr.ib(default=attr.Factory(Logger)) - _producer = attr.ib(default=None, type=Optional[LogProducer]) - - def start(self) -> None: - - # Connect without DNS lookups if it's a direct IP. - try: - ip = ip_address(self.host) - if isinstance(ip, IPv4Address): - endpoint = TCP4ClientEndpoint( - self.hs.get_reactor(), self.host, self.port - ) - elif isinstance(ip, IPv6Address): - endpoint = TCP6ClientEndpoint( - self.hs.get_reactor(), self.host, self.port - ) - except ValueError: - endpoint = HostnameEndpoint(self.hs.get_reactor(), self.host, self.port) - - factory = Factory.forProtocol(Protocol) - self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor()) - self._service.startService() - self._connect() - - def stop(self): - self._service.stopService() - - def _connect(self) -> None: - """ - Triggers an attempt to connect then write to the remote if not already writing. - """ - if self._connection_waiter: - return - - self._connection_waiter = self._service.whenConnected(failAfterFailures=1) - - @self._connection_waiter.addErrback - def fail(r): - r.printTraceback(file=sys.__stderr__) - self._connection_waiter = None - self._connect() - - @self._connection_waiter.addCallback - def writer(r): - # We have a connection. If we already have a producer, and its - # transport is the same, just trigger a resumeProducing. - if self._producer and r.transport is self._producer.transport: - self._producer.resumeProducing() - self._connection_waiter = None - return - - # If the producer is still producing, stop it. - if self._producer: - self._producer.stopProducing() - - # Make a new producer and start it. - self._producer = LogProducer(buffer=self._buffer, transport=r.transport) - r.transport.registerProducer(self._producer, True) - self._producer.resumeProducing() - self._connection_waiter = None - - def _handle_pressure(self) -> None: - """ - Handle backpressure by shedding events. - - The buffer will, in this order, until the buffer is below the maximum: - - Shed DEBUG events - - Shed INFO events - - Shed the middle 50% of the events. - """ - if len(self._buffer) <= self.maximum_buffer: - return - - # Strip out DEBUGs - self._buffer = deque( - filter(lambda event: event["level"] != "DEBUG", self._buffer) - ) - - if len(self._buffer) <= self.maximum_buffer: - return - - # Strip out INFOs - self._buffer = deque( - filter(lambda event: event["level"] != "INFO", self._buffer) - ) - - if len(self._buffer) <= self.maximum_buffer: - return - - # Cut the middle entries out - buffer_split = floor(self.maximum_buffer / 2) - - old_buffer = self._buffer - self._buffer = deque() - - for i in range(buffer_split): - self._buffer.append(old_buffer.popleft()) - - end_buffer = [] - for i in range(buffer_split): - end_buffer.append(old_buffer.pop()) - - self._buffer.extend(reversed(end_buffer)) - - def __call__(self, event: dict) -> None: - flattened = flatten_event(event, self.metadata, include_time=True) - self._buffer.append(flattened) - - # Handle backpressure, if it exists. - try: - self._handle_pressure() - except Exception: - # If handling backpressure fails,clear the buffer and log the - # exception. - self._buffer.clear() - self._logger.failure("Failed clearing backpressure") + def formatEvent(_event: dict) -> str: + flattened = flatten_event(_event, metadata, include_time=True) + return _encoder.encode(flattened) + "\n" - # Try and write immediately. - self._connect() + return TCPLogObserver(hs, host, port, formatEvent, maximum_buffer) |