diff options
Diffstat (limited to 'synapse/logging/_remote.py')
-rw-r--r-- | synapse/logging/_remote.py | 27 |
1 files changed, 18 insertions, 9 deletions
diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py index ba45424f02..fb937b3f28 100644 --- a/synapse/logging/_remote.py +++ b/synapse/logging/_remote.py @@ -26,7 +26,7 @@ from typing_extensions import Deque from zope.interface import implementer from twisted.application.internet import ClientService -from twisted.internet.defer import Deferred +from twisted.internet.defer import CancelledError, Deferred from twisted.internet.endpoints import ( HostnameEndpoint, TCP4ClientEndpoint, @@ -34,6 +34,7 @@ from twisted.internet.endpoints import ( ) from twisted.internet.interfaces import IPushProducer, ITransport from twisted.internet.protocol import Factory, Protocol +from twisted.python.failure import Failure logger = logging.getLogger(__name__) @@ -131,9 +132,11 @@ class RemoteHandler(logging.Handler): factory = Factory.forProtocol(Protocol) self._service = ClientService(endpoint, factory, clock=_reactor) self._service.startService() + self._stopping = False self._connect() def close(self): + self._stopping = True self._service.stopService() def _connect(self) -> None: @@ -146,17 +149,21 @@ class RemoteHandler(logging.Handler): self._connection_waiter = self._service.whenConnected(failAfterFailures=1) - @self._connection_waiter.addErrback - def fail(r): - r.printTraceback(file=sys.__stderr__) + def fail(failure: Failure) -> None: + # If the Deferred was cancelled (e.g. during shutdown) do not try to + # reconnect (this will cause an infinite loop of errors). + if failure.check(CancelledError) and self._stopping: + return + + # For a different error, print the traceback and re-connect. + failure.printTraceback(file=sys.__stderr__) self._connection_waiter = None self._connect() - @self._connection_waiter.addCallback - def writer(r): + def writer(result: Protocol) -> None: # We have a connection. If we already have a producer, and its # transport is the same, just trigger a resumeProducing. - if self._producer and r.transport is self._producer.transport: + if self._producer and result.transport is self._producer.transport: self._producer.resumeProducing() self._connection_waiter = None return @@ -167,12 +174,14 @@ class RemoteHandler(logging.Handler): # Make a new producer and start it. self._producer = LogProducer( - buffer=self._buffer, transport=r.transport, format=self.format, + buffer=self._buffer, transport=result.transport, format=self.format, ) - r.transport.registerProducer(self._producer, True) + result.transport.registerProducer(self._producer, True) self._producer.resumeProducing() self._connection_waiter = None + self._connection_waiter.addCallbacks(writer, fail) + def _handle_pressure(self) -> None: """ Handle backpressure by shedding records. |