diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py
new file mode 100644
index 0000000000..fb937b3f28
--- /dev/null
+++ b/synapse/logging/_remote.py
@@ -0,0 +1,241 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import sys
+import traceback
+from collections import deque
+from ipaddress import IPv4Address, IPv6Address, ip_address
+from math import floor
+from typing import Callable, Optional
+
+import attr
+from typing_extensions import Deque
+from zope.interface import implementer
+
+from twisted.application.internet import ClientService
+from twisted.internet.defer import CancelledError, Deferred
+from twisted.internet.endpoints import (
+ HostnameEndpoint,
+ TCP4ClientEndpoint,
+ TCP6ClientEndpoint,
+)
+from twisted.internet.interfaces import IPushProducer, ITransport
+from twisted.internet.protocol import Factory, Protocol
+from twisted.python.failure import Failure
+
+logger = logging.getLogger(__name__)
+
+
+@attr.s
+@implementer(IPushProducer)
+class LogProducer:
+ """
+ An IPushProducer that writes logs from its buffer to its transport when it
+ is resumed.
+
+ Args:
+ buffer: Log buffer to read logs from.
+ transport: Transport to write to.
+ format: A callable to format the log record to a string.
+ """
+
+ transport = attr.ib(type=ITransport)
+ _format = attr.ib(type=Callable[[logging.LogRecord], str])
+ _buffer = attr.ib(type=deque)
+ _paused = attr.ib(default=False, type=bool, init=False)
+
+ def pauseProducing(self):
+ self._paused = True
+
+ def stopProducing(self):
+ self._paused = True
+ self._buffer = deque()
+
+ def resumeProducing(self):
+ # If we're already producing, nothing to do.
+ self._paused = False
+
+ # Loop until paused.
+ while self._paused is False and (self._buffer and self.transport.connected):
+ try:
+ # Request the next record and format it.
+ record = self._buffer.popleft()
+ msg = self._format(record)
+
+ # Send it as a new line over the transport.
+ self.transport.write(msg.encode("utf8"))
+ self.transport.write(b"\n")
+ except Exception:
+ # Something has gone wrong writing to the transport -- log it
+ # and break out of the while.
+ traceback.print_exc(file=sys.__stderr__)
+ break
+
+
+class RemoteHandler(logging.Handler):
+ """
+ An logging handler that writes logs to a TCP target.
+
+ Args:
+ host: The host of the logging target.
+ port: The logging target's port.
+ maximum_buffer: The maximum buffer size.
+ """
+
+ def __init__(
+ self,
+ host: str,
+ port: int,
+ maximum_buffer: int = 1000,
+ level=logging.NOTSET,
+ _reactor=None,
+ ):
+ super().__init__(level=level)
+ self.host = host
+ self.port = port
+ self.maximum_buffer = maximum_buffer
+
+ self._buffer = deque() # type: Deque[logging.LogRecord]
+ self._connection_waiter = None # type: Optional[Deferred]
+ self._producer = None # type: Optional[LogProducer]
+
+ # Connect without DNS lookups if it's a direct IP.
+ if _reactor is None:
+ from twisted.internet import reactor
+
+ _reactor = reactor
+
+ try:
+ ip = ip_address(self.host)
+ if isinstance(ip, IPv4Address):
+ endpoint = TCP4ClientEndpoint(_reactor, self.host, self.port)
+ elif isinstance(ip, IPv6Address):
+ endpoint = TCP6ClientEndpoint(_reactor, self.host, self.port)
+ else:
+ raise ValueError("Unknown IP address provided: %s" % (self.host,))
+ except ValueError:
+ endpoint = HostnameEndpoint(_reactor, self.host, self.port)
+
+ factory = Factory.forProtocol(Protocol)
+ self._service = ClientService(endpoint, factory, clock=_reactor)
+ self._service.startService()
+ self._stopping = False
+ self._connect()
+
+ def close(self):
+ self._stopping = True
+ self._service.stopService()
+
+ def _connect(self) -> None:
+ """
+ Triggers an attempt to connect then write to the remote if not already writing.
+ """
+ # Do not attempt to open multiple connections.
+ if self._connection_waiter:
+ return
+
+ self._connection_waiter = self._service.whenConnected(failAfterFailures=1)
+
+ def fail(failure: Failure) -> None:
+ # If the Deferred was cancelled (e.g. during shutdown) do not try to
+ # reconnect (this will cause an infinite loop of errors).
+ if failure.check(CancelledError) and self._stopping:
+ return
+
+ # For a different error, print the traceback and re-connect.
+ failure.printTraceback(file=sys.__stderr__)
+ self._connection_waiter = None
+ self._connect()
+
+ def writer(result: Protocol) -> None:
+ # We have a connection. If we already have a producer, and its
+ # transport is the same, just trigger a resumeProducing.
+ if self._producer and result.transport is self._producer.transport:
+ self._producer.resumeProducing()
+ self._connection_waiter = None
+ return
+
+ # If the producer is still producing, stop it.
+ if self._producer:
+ self._producer.stopProducing()
+
+ # Make a new producer and start it.
+ self._producer = LogProducer(
+ buffer=self._buffer, transport=result.transport, format=self.format,
+ )
+ result.transport.registerProducer(self._producer, True)
+ self._producer.resumeProducing()
+ self._connection_waiter = None
+
+ self._connection_waiter.addCallbacks(writer, fail)
+
+ def _handle_pressure(self) -> None:
+ """
+ Handle backpressure by shedding records.
+
+ The buffer will, in this order, until the buffer is below the maximum:
+ - Shed DEBUG records.
+ - Shed INFO records.
+ - Shed the middle 50% of the records.
+ """
+ if len(self._buffer) <= self.maximum_buffer:
+ return
+
+ # Strip out DEBUGs
+ self._buffer = deque(
+ filter(lambda record: record.levelno > logging.DEBUG, self._buffer)
+ )
+
+ if len(self._buffer) <= self.maximum_buffer:
+ return
+
+ # Strip out INFOs
+ self._buffer = deque(
+ filter(lambda record: record.levelno > logging.INFO, self._buffer)
+ )
+
+ if len(self._buffer) <= self.maximum_buffer:
+ return
+
+ # Cut the middle entries out
+ buffer_split = floor(self.maximum_buffer / 2)
+
+ old_buffer = self._buffer
+ self._buffer = deque()
+
+ for i in range(buffer_split):
+ self._buffer.append(old_buffer.popleft())
+
+ end_buffer = []
+ for i in range(buffer_split):
+ end_buffer.append(old_buffer.pop())
+
+ self._buffer.extend(reversed(end_buffer))
+
+ def emit(self, record: logging.LogRecord) -> None:
+ self._buffer.append(record)
+
+ # Handle backpressure, if it exists.
+ try:
+ self._handle_pressure()
+ except Exception:
+ # If handling backpressure fails, clear the buffer and log the
+ # exception.
+ self._buffer.clear()
+ logger.warning("Failed clearing backpressure")
+
+ # Try and write immediately.
+ self._connect()
|