summary refs log tree commit diff
path: root/synapse/logging
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2020-10-29 12:53:57 -0400
committerGitHub <noreply@github.com>2020-10-29 12:53:57 -0400
commit8b42a4eefda409dfef9a3d10c9f55a778cae46a1 (patch)
tree91e4cc793d4d7bd5a96273f611847786e3c462cb /synapse/logging
parentAdd ability for access tokens to belong to one user but grant access to anoth... (diff)
downloadsynapse-8b42a4eefda409dfef9a3d10c9f55a778cae46a1.tar.xz
Gracefully handle a pending logging connection during shutdown. (#8685)
Diffstat (limited to 'synapse/logging')
-rw-r--r--synapse/logging/_remote.py27
1 files changed, 18 insertions, 9 deletions
diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py
index ba45424f02..fb937b3f28 100644
--- a/synapse/logging/_remote.py
+++ b/synapse/logging/_remote.py
@@ -26,7 +26,7 @@ from typing_extensions import Deque
 from zope.interface import implementer
 
 from twisted.application.internet import ClientService
-from twisted.internet.defer import Deferred
+from twisted.internet.defer import CancelledError, Deferred
 from twisted.internet.endpoints import (
     HostnameEndpoint,
     TCP4ClientEndpoint,
@@ -34,6 +34,7 @@ from twisted.internet.endpoints import (
 )
 from twisted.internet.interfaces import IPushProducer, ITransport
 from twisted.internet.protocol import Factory, Protocol
+from twisted.python.failure import Failure
 
 logger = logging.getLogger(__name__)
 
@@ -131,9 +132,11 @@ class RemoteHandler(logging.Handler):
         factory = Factory.forProtocol(Protocol)
         self._service = ClientService(endpoint, factory, clock=_reactor)
         self._service.startService()
+        self._stopping = False
         self._connect()
 
     def close(self):
+        self._stopping = True
         self._service.stopService()
 
     def _connect(self) -> None:
@@ -146,17 +149,21 @@ class RemoteHandler(logging.Handler):
 
         self._connection_waiter = self._service.whenConnected(failAfterFailures=1)
 
-        @self._connection_waiter.addErrback
-        def fail(r):
-            r.printTraceback(file=sys.__stderr__)
+        def fail(failure: Failure) -> None:
+            # If the Deferred was cancelled (e.g. during shutdown) do not try to
+            # reconnect (this will cause an infinite loop of errors).
+            if failure.check(CancelledError) and self._stopping:
+                return
+
+            # For a different error, print the traceback and re-connect.
+            failure.printTraceback(file=sys.__stderr__)
             self._connection_waiter = None
             self._connect()
 
-        @self._connection_waiter.addCallback
-        def writer(r):
+        def writer(result: Protocol) -> None:
             # We have a connection. If we already have a producer, and its
             # transport is the same, just trigger a resumeProducing.
-            if self._producer and r.transport is self._producer.transport:
+            if self._producer and result.transport is self._producer.transport:
                 self._producer.resumeProducing()
                 self._connection_waiter = None
                 return
@@ -167,12 +174,14 @@ class RemoteHandler(logging.Handler):
 
             # Make a new producer and start it.
             self._producer = LogProducer(
-                buffer=self._buffer, transport=r.transport, format=self.format,
+                buffer=self._buffer, transport=result.transport, format=self.format,
             )
-            r.transport.registerProducer(self._producer, True)
+            result.transport.registerProducer(self._producer, True)
             self._producer.resumeProducing()
             self._connection_waiter = None
 
+        self._connection_waiter.addCallbacks(writer, fail)
+
     def _handle_pressure(self) -> None:
         """
         Handle backpressure by shedding records.