diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py
index 1b8916cfa2..9b46956ca9 100644
--- a/synapse/logging/_terse_json.py
+++ b/synapse/logging/_terse_json.py
@@ -18,26 +18,11 @@ Log formatters that output terse JSON.
"""
import json
-import sys
-import traceback
-from collections import deque
-from ipaddress import IPv4Address, IPv6Address, ip_address
-from math import floor
-from typing import IO, Optional
+from typing import IO
-import attr
-from zope.interface import implementer
+from twisted.logger import FileLogObserver
-from twisted.application.internet import ClientService
-from twisted.internet.defer import Deferred
-from twisted.internet.endpoints import (
- HostnameEndpoint,
- TCP4ClientEndpoint,
- TCP6ClientEndpoint,
-)
-from twisted.internet.interfaces import IPushProducer, ITransport
-from twisted.internet.protocol import Factory, Protocol
-from twisted.logger import FileLogObserver, ILogObserver, Logger
+from synapse.logging._remote import TCPLogObserver
_encoder = json.JSONEncoder(ensure_ascii=False, separators=(",", ":"))
@@ -150,180 +135,22 @@ def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogOb
return FileLogObserver(outFile, formatEvent)
-@attr.s
-@implementer(IPushProducer)
-class LogProducer:
+def TerseJSONToTCPLogObserver(
+ hs, host: str, port: int, metadata: dict, maximum_buffer: int
+) -> FileLogObserver:
"""
- An IPushProducer that writes logs from its buffer to its transport when it
- is resumed.
-
- Args:
- buffer: Log buffer to read logs from.
- transport: Transport to write to.
- """
-
- transport = attr.ib(type=ITransport)
- _buffer = attr.ib(type=deque)
- _paused = attr.ib(default=False, type=bool, init=False)
-
- def pauseProducing(self):
- self._paused = True
-
- def stopProducing(self):
- self._paused = True
- self._buffer = deque()
-
- def resumeProducing(self):
- self._paused = False
-
- while self._paused is False and (self._buffer and self.transport.connected):
- try:
- event = self._buffer.popleft()
- self.transport.write(_encoder.encode(event).encode("utf8"))
- self.transport.write(b"\n")
- except Exception:
- # Something has gone wrong writing to the transport -- log it
- # and break out of the while.
- traceback.print_exc(file=sys.__stderr__)
- break
-
-
-@attr.s
-@implementer(ILogObserver)
-class TerseJSONToTCPLogObserver:
- """
- An IObserver that writes JSON logs to a TCP target.
+ A log observer that formats events to a flattened JSON representation.
Args:
hs (HomeServer): The homeserver that is being logged for.
host: The host of the logging target.
port: The logging target's port.
- metadata: Metadata to be added to each log entry.
+ metadata: Metadata to be added to each log object.
+ maximum_buffer: The maximum buffer size.
"""
- hs = attr.ib()
- host = attr.ib(type=str)
- port = attr.ib(type=int)
- metadata = attr.ib(type=dict)
- maximum_buffer = attr.ib(type=int)
- _buffer = attr.ib(default=attr.Factory(deque), type=deque)
- _connection_waiter = attr.ib(default=None, type=Optional[Deferred])
- _logger = attr.ib(default=attr.Factory(Logger))
- _producer = attr.ib(default=None, type=Optional[LogProducer])
-
- def start(self) -> None:
-
- # Connect without DNS lookups if it's a direct IP.
- try:
- ip = ip_address(self.host)
- if isinstance(ip, IPv4Address):
- endpoint = TCP4ClientEndpoint(
- self.hs.get_reactor(), self.host, self.port
- )
- elif isinstance(ip, IPv6Address):
- endpoint = TCP6ClientEndpoint(
- self.hs.get_reactor(), self.host, self.port
- )
- except ValueError:
- endpoint = HostnameEndpoint(self.hs.get_reactor(), self.host, self.port)
-
- factory = Factory.forProtocol(Protocol)
- self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor())
- self._service.startService()
- self._connect()
-
- def stop(self):
- self._service.stopService()
-
- def _connect(self) -> None:
- """
- Triggers an attempt to connect then write to the remote if not already writing.
- """
- if self._connection_waiter:
- return
-
- self._connection_waiter = self._service.whenConnected(failAfterFailures=1)
-
- @self._connection_waiter.addErrback
- def fail(r):
- r.printTraceback(file=sys.__stderr__)
- self._connection_waiter = None
- self._connect()
-
- @self._connection_waiter.addCallback
- def writer(r):
- # We have a connection. If we already have a producer, and its
- # transport is the same, just trigger a resumeProducing.
- if self._producer and r.transport is self._producer.transport:
- self._producer.resumeProducing()
- self._connection_waiter = None
- return
-
- # If the producer is still producing, stop it.
- if self._producer:
- self._producer.stopProducing()
-
- # Make a new producer and start it.
- self._producer = LogProducer(buffer=self._buffer, transport=r.transport)
- r.transport.registerProducer(self._producer, True)
- self._producer.resumeProducing()
- self._connection_waiter = None
-
- def _handle_pressure(self) -> None:
- """
- Handle backpressure by shedding events.
-
- The buffer will, in this order, until the buffer is below the maximum:
- - Shed DEBUG events
- - Shed INFO events
- - Shed the middle 50% of the events.
- """
- if len(self._buffer) <= self.maximum_buffer:
- return
-
- # Strip out DEBUGs
- self._buffer = deque(
- filter(lambda event: event["level"] != "DEBUG", self._buffer)
- )
-
- if len(self._buffer) <= self.maximum_buffer:
- return
-
- # Strip out INFOs
- self._buffer = deque(
- filter(lambda event: event["level"] != "INFO", self._buffer)
- )
-
- if len(self._buffer) <= self.maximum_buffer:
- return
-
- # Cut the middle entries out
- buffer_split = floor(self.maximum_buffer / 2)
-
- old_buffer = self._buffer
- self._buffer = deque()
-
- for i in range(buffer_split):
- self._buffer.append(old_buffer.popleft())
-
- end_buffer = []
- for i in range(buffer_split):
- end_buffer.append(old_buffer.pop())
-
- self._buffer.extend(reversed(end_buffer))
-
- def __call__(self, event: dict) -> None:
- flattened = flatten_event(event, self.metadata, include_time=True)
- self._buffer.append(flattened)
-
- # Handle backpressure, if it exists.
- try:
- self._handle_pressure()
- except Exception:
- # If handling backpressure fails,clear the buffer and log the
- # exception.
- self._buffer.clear()
- self._logger.failure("Failed clearing backpressure")
+ def formatEvent(_event: dict) -> str:
+ flattened = flatten_event(_event, metadata, include_time=True)
+ return _encoder.encode(flattened) + "\n"
- # Try and write immediately.
- self._connect()
+ return TCPLogObserver(hs, host, port, formatEvent, maximum_buffer)
|