summary refs log tree commit diff
path: root/synapse/logging/_remote.py
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2020-10-29 07:27:37 -0400
committerGitHub <noreply@github.com>2020-10-29 07:27:37 -0400
commit00b24aa545091395f9a92d531836f6bf7b4460e0 (patch)
tree10d7333f2d1d9aaa0a6888c9ce3afb7d6feebf58 /synapse/logging/_remote.py
parentDon't require hiredis to run unit tests (#8680) (diff)
downloadsynapse-00b24aa545091395f9a92d531836f6bf7b4460e0.tar.xz
Support generating structured logs in addition to standard logs. (#8607)
This modifies the configuration of structured logging to be usable from
the standard Python logging configuration.

This also separates the formatting of logs from the transport allowing
JSON logs to files or standard logs to sockets.
Diffstat (limited to 'synapse/logging/_remote.py')
-rw-r--r--synapse/logging/_remote.py97
1 files changed, 52 insertions, 45 deletions
diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py
index 0caf325916..ba45424f02 100644
--- a/synapse/logging/_remote.py
+++ b/synapse/logging/_remote.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
 import sys
 import traceback
 from collections import deque
@@ -21,6 +22,7 @@ from math import floor
 from typing import Callable, Optional
 
 import attr
+from typing_extensions import Deque
 from zope.interface import implementer
 
 from twisted.application.internet import ClientService
@@ -32,7 +34,8 @@ from twisted.internet.endpoints import (
 )
 from twisted.internet.interfaces import IPushProducer, ITransport
 from twisted.internet.protocol import Factory, Protocol
-from twisted.logger import ILogObserver, Logger, LogLevel
+
+logger = logging.getLogger(__name__)
 
 
 @attr.s
@@ -45,11 +48,11 @@ class LogProducer:
     Args:
         buffer: Log buffer to read logs from.
         transport: Transport to write to.
-        format_event: A callable to format the log entry to a string.
+        format: A callable to format the log record to a string.
     """
 
     transport = attr.ib(type=ITransport)
-    format_event = attr.ib(type=Callable[[dict], str])
+    _format = attr.ib(type=Callable[[logging.LogRecord], str])
     _buffer = attr.ib(type=deque)
     _paused = attr.ib(default=False, type=bool, init=False)
 
@@ -61,16 +64,19 @@ class LogProducer:
         self._buffer = deque()
 
     def resumeProducing(self):
+        # If we're already producing, nothing to do.
         self._paused = False
 
+        # Loop until paused.
         while self._paused is False and (self._buffer and self.transport.connected):
             try:
-                # Request the next event and format it.
-                event = self._buffer.popleft()
-                msg = self.format_event(event)
+                # Request the next record and format it.
+                record = self._buffer.popleft()
+                msg = self._format(record)
 
                 # Send it as a new line over the transport.
                 self.transport.write(msg.encode("utf8"))
+                self.transport.write(b"\n")
             except Exception:
                 # Something has gone wrong writing to the transport -- log it
                 # and break out of the while.
@@ -78,60 +84,63 @@ class LogProducer:
                 break
 
 
-@attr.s
-@implementer(ILogObserver)
-class TCPLogObserver:
+class RemoteHandler(logging.Handler):
     """
-    An IObserver that writes JSON logs to a TCP target.
+    An logging handler that writes logs to a TCP target.
 
     Args:
-        hs (HomeServer): The homeserver that is being logged for.
         host: The host of the logging target.
         port: The logging target's port.
-        format_event: A callable to format the log entry to a string.
         maximum_buffer: The maximum buffer size.
     """
 
-    hs = attr.ib()
-    host = attr.ib(type=str)
-    port = attr.ib(type=int)
-    format_event = attr.ib(type=Callable[[dict], str])
-    maximum_buffer = attr.ib(type=int)
-    _buffer = attr.ib(default=attr.Factory(deque), type=deque)
-    _connection_waiter = attr.ib(default=None, type=Optional[Deferred])
-    _logger = attr.ib(default=attr.Factory(Logger))
-    _producer = attr.ib(default=None, type=Optional[LogProducer])
-
-    def start(self) -> None:
+    def __init__(
+        self,
+        host: str,
+        port: int,
+        maximum_buffer: int = 1000,
+        level=logging.NOTSET,
+        _reactor=None,
+    ):
+        super().__init__(level=level)
+        self.host = host
+        self.port = port
+        self.maximum_buffer = maximum_buffer
+
+        self._buffer = deque()  # type: Deque[logging.LogRecord]
+        self._connection_waiter = None  # type: Optional[Deferred]
+        self._producer = None  # type: Optional[LogProducer]
 
         # Connect without DNS lookups if it's a direct IP.
+        if _reactor is None:
+            from twisted.internet import reactor
+
+            _reactor = reactor
+
         try:
             ip = ip_address(self.host)
             if isinstance(ip, IPv4Address):
-                endpoint = TCP4ClientEndpoint(
-                    self.hs.get_reactor(), self.host, self.port
-                )
+                endpoint = TCP4ClientEndpoint(_reactor, self.host, self.port)
             elif isinstance(ip, IPv6Address):
-                endpoint = TCP6ClientEndpoint(
-                    self.hs.get_reactor(), self.host, self.port
-                )
+                endpoint = TCP6ClientEndpoint(_reactor, self.host, self.port)
             else:
                 raise ValueError("Unknown IP address provided: %s" % (self.host,))
         except ValueError:
-            endpoint = HostnameEndpoint(self.hs.get_reactor(), self.host, self.port)
+            endpoint = HostnameEndpoint(_reactor, self.host, self.port)
 
         factory = Factory.forProtocol(Protocol)
-        self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor())
+        self._service = ClientService(endpoint, factory, clock=_reactor)
         self._service.startService()
         self._connect()
 
-    def stop(self):
+    def close(self):
         self._service.stopService()
 
     def _connect(self) -> None:
         """
         Triggers an attempt to connect then write to the remote if not already writing.
         """
+        # Do not attempt to open multiple connections.
         if self._connection_waiter:
             return
 
@@ -158,9 +167,7 @@ class TCPLogObserver:
 
             # Make a new producer and start it.
             self._producer = LogProducer(
-                buffer=self._buffer,
-                transport=r.transport,
-                format_event=self.format_event,
+                buffer=self._buffer, transport=r.transport, format=self.format,
             )
             r.transport.registerProducer(self._producer, True)
             self._producer.resumeProducing()
@@ -168,19 +175,19 @@ class TCPLogObserver:
 
     def _handle_pressure(self) -> None:
         """
-        Handle backpressure by shedding events.
+        Handle backpressure by shedding records.
 
         The buffer will, in this order, until the buffer is below the maximum:
-            - Shed DEBUG events
-            - Shed INFO events
-            - Shed the middle 50% of the events.
+            - Shed DEBUG records.
+            - Shed INFO records.
+            - Shed the middle 50% of the records.
         """
         if len(self._buffer) <= self.maximum_buffer:
             return
 
         # Strip out DEBUGs
         self._buffer = deque(
-            filter(lambda event: event["log_level"] != LogLevel.debug, self._buffer)
+            filter(lambda record: record.levelno > logging.DEBUG, self._buffer)
         )
 
         if len(self._buffer) <= self.maximum_buffer:
@@ -188,7 +195,7 @@ class TCPLogObserver:
 
         # Strip out INFOs
         self._buffer = deque(
-            filter(lambda event: event["log_level"] != LogLevel.info, self._buffer)
+            filter(lambda record: record.levelno > logging.INFO, self._buffer)
         )
 
         if len(self._buffer) <= self.maximum_buffer:
@@ -209,17 +216,17 @@ class TCPLogObserver:
 
         self._buffer.extend(reversed(end_buffer))
 
-    def __call__(self, event: dict) -> None:
-        self._buffer.append(event)
+    def emit(self, record: logging.LogRecord) -> None:
+        self._buffer.append(record)
 
         # Handle backpressure, if it exists.
         try:
             self._handle_pressure()
         except Exception:
-            # If handling backpressure fails,clear the buffer and log the
+            # If handling backpressure fails, clear the buffer and log the
             # exception.
             self._buffer.clear()
-            self._logger.failure("Failed clearing backpressure")
+            logger.warning("Failed clearing backpressure")
 
         # Try and write immediately.
         self._connect()