summary refs log tree commit diff
path: root/synapse/logging/_remote.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/logging/_remote.py')
-rw-r--r--synapse/logging/_remote.py97
1 files changed, 52 insertions, 45 deletions
diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py
index 0caf325916..ba45424f02 100644
--- a/synapse/logging/_remote.py
+++ b/synapse/logging/_remote.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
 import sys
 import traceback
 from collections import deque
@@ -21,6 +22,7 @@ from math import floor
 from typing import Callable, Optional
 
 import attr
+from typing_extensions import Deque
 from zope.interface import implementer
 
 from twisted.application.internet import ClientService
@@ -32,7 +34,8 @@ from twisted.internet.endpoints import (
 )
 from twisted.internet.interfaces import IPushProducer, ITransport
 from twisted.internet.protocol import Factory, Protocol
-from twisted.logger import ILogObserver, Logger, LogLevel
+
+logger = logging.getLogger(__name__)
 
 
 @attr.s
@@ -45,11 +48,11 @@ class LogProducer:
     Args:
         buffer: Log buffer to read logs from.
         transport: Transport to write to.
-        format_event: A callable to format the log entry to a string.
+        format: A callable to format the log record to a string.
     """
 
     transport = attr.ib(type=ITransport)
-    format_event = attr.ib(type=Callable[[dict], str])
+    _format = attr.ib(type=Callable[[logging.LogRecord], str])
     _buffer = attr.ib(type=deque)
     _paused = attr.ib(default=False, type=bool, init=False)
 
@@ -61,16 +64,19 @@ class LogProducer:
         self._buffer = deque()
 
     def resumeProducing(self):
+        # If we're already producing, nothing to do.
         self._paused = False
 
+        # Loop until paused.
         while self._paused is False and (self._buffer and self.transport.connected):
             try:
-                # Request the next event and format it.
-                event = self._buffer.popleft()
-                msg = self.format_event(event)
+                # Request the next record and format it.
+                record = self._buffer.popleft()
+                msg = self._format(record)
 
                 # Send it as a new line over the transport.
                 self.transport.write(msg.encode("utf8"))
+                self.transport.write(b"\n")
             except Exception:
                 # Something has gone wrong writing to the transport -- log it
                 # and break out of the while.
@@ -78,60 +84,63 @@ class LogProducer:
                 break
 
 
-@attr.s
-@implementer(ILogObserver)
-class TCPLogObserver:
+class RemoteHandler(logging.Handler):
     """
-    An IObserver that writes JSON logs to a TCP target.
+    An logging handler that writes logs to a TCP target.
 
     Args:
-        hs (HomeServer): The homeserver that is being logged for.
         host: The host of the logging target.
         port: The logging target's port.
-        format_event: A callable to format the log entry to a string.
         maximum_buffer: The maximum buffer size.
     """
 
-    hs = attr.ib()
-    host = attr.ib(type=str)
-    port = attr.ib(type=int)
-    format_event = attr.ib(type=Callable[[dict], str])
-    maximum_buffer = attr.ib(type=int)
-    _buffer = attr.ib(default=attr.Factory(deque), type=deque)
-    _connection_waiter = attr.ib(default=None, type=Optional[Deferred])
-    _logger = attr.ib(default=attr.Factory(Logger))
-    _producer = attr.ib(default=None, type=Optional[LogProducer])
-
-    def start(self) -> None:
+    def __init__(
+        self,
+        host: str,
+        port: int,
+        maximum_buffer: int = 1000,
+        level=logging.NOTSET,
+        _reactor=None,
+    ):
+        super().__init__(level=level)
+        self.host = host
+        self.port = port
+        self.maximum_buffer = maximum_buffer
+
+        self._buffer = deque()  # type: Deque[logging.LogRecord]
+        self._connection_waiter = None  # type: Optional[Deferred]
+        self._producer = None  # type: Optional[LogProducer]
 
         # Connect without DNS lookups if it's a direct IP.
+        if _reactor is None:
+            from twisted.internet import reactor
+
+            _reactor = reactor
+
         try:
             ip = ip_address(self.host)
             if isinstance(ip, IPv4Address):
-                endpoint = TCP4ClientEndpoint(
-                    self.hs.get_reactor(), self.host, self.port
-                )
+                endpoint = TCP4ClientEndpoint(_reactor, self.host, self.port)
             elif isinstance(ip, IPv6Address):
-                endpoint = TCP6ClientEndpoint(
-                    self.hs.get_reactor(), self.host, self.port
-                )
+                endpoint = TCP6ClientEndpoint(_reactor, self.host, self.port)
             else:
                 raise ValueError("Unknown IP address provided: %s" % (self.host,))
         except ValueError:
-            endpoint = HostnameEndpoint(self.hs.get_reactor(), self.host, self.port)
+            endpoint = HostnameEndpoint(_reactor, self.host, self.port)
 
         factory = Factory.forProtocol(Protocol)
-        self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor())
+        self._service = ClientService(endpoint, factory, clock=_reactor)
         self._service.startService()
         self._connect()
 
-    def stop(self):
+    def close(self):
         self._service.stopService()
 
     def _connect(self) -> None:
         """
         Triggers an attempt to connect then write to the remote if not already writing.
         """
+        # Do not attempt to open multiple connections.
         if self._connection_waiter:
             return
 
@@ -158,9 +167,7 @@ class TCPLogObserver:
 
             # Make a new producer and start it.
             self._producer = LogProducer(
-                buffer=self._buffer,
-                transport=r.transport,
-                format_event=self.format_event,
+                buffer=self._buffer, transport=r.transport, format=self.format,
             )
             r.transport.registerProducer(self._producer, True)
             self._producer.resumeProducing()
@@ -168,19 +175,19 @@ class TCPLogObserver:
 
     def _handle_pressure(self) -> None:
         """
-        Handle backpressure by shedding events.
+        Handle backpressure by shedding records.
 
         The buffer will, in this order, until the buffer is below the maximum:
-            - Shed DEBUG events
-            - Shed INFO events
-            - Shed the middle 50% of the events.
+            - Shed DEBUG records.
+            - Shed INFO records.
+            - Shed the middle 50% of the records.
         """
         if len(self._buffer) <= self.maximum_buffer:
             return
 
         # Strip out DEBUGs
         self._buffer = deque(
-            filter(lambda event: event["log_level"] != LogLevel.debug, self._buffer)
+            filter(lambda record: record.levelno > logging.DEBUG, self._buffer)
         )
 
         if len(self._buffer) <= self.maximum_buffer:
@@ -188,7 +195,7 @@ class TCPLogObserver:
 
         # Strip out INFOs
         self._buffer = deque(
-            filter(lambda event: event["log_level"] != LogLevel.info, self._buffer)
+            filter(lambda record: record.levelno > logging.INFO, self._buffer)
         )
 
         if len(self._buffer) <= self.maximum_buffer:
@@ -209,17 +216,17 @@ class TCPLogObserver:
 
         self._buffer.extend(reversed(end_buffer))
 
-    def __call__(self, event: dict) -> None:
-        self._buffer.append(event)
+    def emit(self, record: logging.LogRecord) -> None:
+        self._buffer.append(record)
 
         # Handle backpressure, if it exists.
         try:
             self._handle_pressure()
         except Exception:
-            # If handling backpressure fails,clear the buffer and log the
+            # If handling backpressure fails, clear the buffer and log the
             # exception.
             self._buffer.clear()
-            self._logger.failure("Failed clearing backpressure")
+            logger.warning("Failed clearing backpressure")
 
         # Try and write immediately.
         self._connect()