summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/10407.feature1
-rw-r--r--docs/sample_log_config.yaml5
-rw-r--r--synapse/config/logger.py5
-rw-r--r--synapse/logging/handlers.py88
4 files changed, 97 insertions, 2 deletions
diff --git a/changelog.d/10407.feature b/changelog.d/10407.feature
new file mode 100644
index 0000000000..db277d9ecd
--- /dev/null
+++ b/changelog.d/10407.feature
@@ -0,0 +1 @@
+Add a buffered logging handler which periodically flushes itself.
diff --git a/docs/sample_log_config.yaml b/docs/sample_log_config.yaml
index 669e600081..b088c83405 100644
--- a/docs/sample_log_config.yaml
+++ b/docs/sample_log_config.yaml
@@ -28,7 +28,7 @@ handlers:
     # will be a delay for INFO/DEBUG logs to get written, but WARNING/ERROR
     # logs will still be flushed immediately.
     buffer:
-        class: logging.handlers.MemoryHandler
+        class: synapse.logging.handlers.PeriodicallyFlushingMemoryHandler
         target: file
         # The capacity is the number of log lines that are buffered before
         # being written to disk. Increasing this will lead to better
@@ -36,6 +36,9 @@ handlers:
         # be written to disk.
         capacity: 10
         flushLevel: 30  # Flush for WARNING logs as well
+        # The period of time, in seconds, between forced flushes.
+        # Messages will not be delayed for longer than this time.
+        period: 5
 
     # A handler that writes logs to stderr. Unused by default, but can be used
     # instead of "buffer" and "file" in the logger handlers.
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index ad4e6e61c3..dcd3ed1dac 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -71,7 +71,7 @@ handlers:
     # will be a delay for INFO/DEBUG logs to get written, but WARNING/ERROR
     # logs will still be flushed immediately.
     buffer:
-        class: logging.handlers.MemoryHandler
+        class: synapse.logging.handlers.PeriodicallyFlushingMemoryHandler
         target: file
         # The capacity is the number of log lines that are buffered before
         # being written to disk. Increasing this will lead to better
@@ -79,6 +79,9 @@ handlers:
         # be written to disk.
         capacity: 10
         flushLevel: 30  # Flush for WARNING logs as well
+        # The period of time, in seconds, between forced flushes.
+        # Messages will not be delayed for longer than this time.
+        period: 5
 
     # A handler that writes logs to stderr. Unused by default, but can be used
     # instead of "buffer" and "file" in the logger handlers.
diff --git a/synapse/logging/handlers.py b/synapse/logging/handlers.py
new file mode 100644
index 0000000000..a6c212f300
--- /dev/null
+++ b/synapse/logging/handlers.py
@@ -0,0 +1,88 @@
+import logging
+import time
+from logging import Handler, LogRecord
+from logging.handlers import MemoryHandler
+from threading import Thread
+from typing import Optional
+
+from twisted.internet.interfaces import IReactorCore
+
+
+class PeriodicallyFlushingMemoryHandler(MemoryHandler):
+    """
+    This is a subclass of MemoryHandler that additionally spawns a background
+    thread to periodically flush the buffer.
+
+    This prevents messages from being buffered for too long.
+
+    Additionally, all messages will be immediately flushed if the reactor has
+    not yet been started.
+    """
+
+    def __init__(
+        self,
+        capacity: int,
+        flushLevel: int = logging.ERROR,
+        target: Optional[Handler] = None,
+        flushOnClose: bool = True,
+        period: float = 5.0,
+        reactor: Optional[IReactorCore] = None,
+    ) -> None:
+        """
+        period: the period between automatic flushes
+
+        reactor: if specified, a custom reactor to use. If not specifies,
+            defaults to the globally-installed reactor.
+            Log entries will be flushed immediately until this reactor has
+            started.
+        """
+        super().__init__(capacity, flushLevel, target, flushOnClose)
+
+        self._flush_period: float = period
+        self._active: bool = True
+        self._reactor_started = False
+
+        self._flushing_thread: Thread = Thread(
+            name="PeriodicallyFlushingMemoryHandler flushing thread",
+            target=self._flush_periodically,
+        )
+        self._flushing_thread.start()
+
+        def on_reactor_running():
+            self._reactor_started = True
+
+        reactor_to_use: IReactorCore
+        if reactor is None:
+            from twisted.internet import reactor as global_reactor
+
+            reactor_to_use = global_reactor  # type: ignore[assignment]
+        else:
+            reactor_to_use = reactor
+
+        # call our hook when the reactor start up
+        reactor_to_use.callWhenRunning(on_reactor_running)
+
+    def shouldFlush(self, record: LogRecord) -> bool:
+        """
+        Before reactor start-up, log everything immediately.
+        Otherwise, fall back to original behaviour of waiting for the buffer to fill.
+        """
+
+        if self._reactor_started:
+            return super().shouldFlush(record)
+        else:
+            return True
+
+    def _flush_periodically(self):
+        """
+        Whilst this handler is active, flush the handler periodically.
+        """
+
+        while self._active:
+            # flush is thread-safe; it acquires and releases the lock internally
+            self.flush()
+            time.sleep(self._flush_period)
+
+    def close(self) -> None:
+        self._active = False
+        super().close()