diff --git a/changelog.d/8607.feature b/changelog.d/8607.feature
new file mode 100644
index 0000000000..fef1eccb92
--- /dev/null
+++ b/changelog.d/8607.feature
@@ -0,0 +1 @@
+Support generating structured logs via the standard logging configuration.
diff --git a/changelog.d/8607.misc b/changelog.d/8607.misc
deleted file mode 100644
index 9e56551a34..0000000000
--- a/changelog.d/8607.misc
+++ /dev/null
@@ -1 +0,0 @@
-Re-organize the structured logging code to separate the TCP transport handling from the JSON formatting.
diff --git a/changelog.d/8685.feature b/changelog.d/8685.feature
new file mode 100644
index 0000000000..fef1eccb92
--- /dev/null
+++ b/changelog.d/8685.feature
@@ -0,0 +1 @@
+Support generating structured logs via the standard logging configuration.
diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py
index ba45424f02..fb937b3f28 100644
--- a/synapse/logging/_remote.py
+++ b/synapse/logging/_remote.py
@@ -26,7 +26,7 @@ from typing_extensions import Deque
from zope.interface import implementer
from twisted.application.internet import ClientService
-from twisted.internet.defer import Deferred
+from twisted.internet.defer import CancelledError, Deferred
from twisted.internet.endpoints import (
HostnameEndpoint,
TCP4ClientEndpoint,
@@ -34,6 +34,7 @@ from twisted.internet.endpoints import (
)
from twisted.internet.interfaces import IPushProducer, ITransport
from twisted.internet.protocol import Factory, Protocol
+from twisted.python.failure import Failure
logger = logging.getLogger(__name__)
@@ -131,9 +132,11 @@ class RemoteHandler(logging.Handler):
factory = Factory.forProtocol(Protocol)
self._service = ClientService(endpoint, factory, clock=_reactor)
self._service.startService()
+ self._stopping = False
self._connect()
def close(self):
+ self._stopping = True
self._service.stopService()
def _connect(self) -> None:
@@ -146,17 +149,21 @@ class RemoteHandler(logging.Handler):
self._connection_waiter = self._service.whenConnected(failAfterFailures=1)
- @self._connection_waiter.addErrback
- def fail(r):
- r.printTraceback(file=sys.__stderr__)
+ def fail(failure: Failure) -> None:
+ # If the Deferred was cancelled (e.g. during shutdown) do not try to
+ # reconnect (this will cause an infinite loop of errors).
+ if failure.check(CancelledError) and self._stopping:
+ return
+
+ # For a different error, print the traceback and re-connect.
+ failure.printTraceback(file=sys.__stderr__)
self._connection_waiter = None
self._connect()
- @self._connection_waiter.addCallback
- def writer(r):
+ def writer(result: Protocol) -> None:
# We have a connection. If we already have a producer, and its
# transport is the same, just trigger a resumeProducing.
- if self._producer and r.transport is self._producer.transport:
+ if self._producer and result.transport is self._producer.transport:
self._producer.resumeProducing()
self._connection_waiter = None
return
@@ -167,12 +174,14 @@ class RemoteHandler(logging.Handler):
# Make a new producer and start it.
self._producer = LogProducer(
- buffer=self._buffer, transport=r.transport, format=self.format,
+ buffer=self._buffer, transport=result.transport, format=self.format,
)
- r.transport.registerProducer(self._producer, True)
+ result.transport.registerProducer(self._producer, True)
self._producer.resumeProducing()
self._connection_waiter = None
+ self._connection_waiter.addCallbacks(writer, fail)
+
def _handle_pressure(self) -> None:
"""
Handle backpressure by shedding records.
diff --git a/tests/logging/test_remote_handler.py b/tests/logging/test_remote_handler.py
index 58ee1f2f3c..4bc27a1d7d 100644
--- a/tests/logging/test_remote_handler.py
+++ b/tests/logging/test_remote_handler.py
@@ -151,3 +151,19 @@ class RemoteHandlerTestCase(LoggerCleanupMixin, TestCase):
+ ["warn %s" % (i,) for i in range(15, 20)],
logs,
)
+
+ def test_cancel_connection(self):
+ """
+ Gracefully handle the connection being cancelled.
+ """
+ handler = RemoteHandler(
+ "127.0.0.1", 9000, maximum_buffer=10, _reactor=self.reactor
+ )
+ logger = self.get_logger(handler)
+
+ # Send a message.
+ logger.info("Hello there, %s!", "wally")
+
+ # Do not accept the connection and shutdown. This causes the pending
+ # connection to be cancelled (and should not raise any exceptions).
+ handler.close()
|