From 39230d217104f3cd7aba9065dc478f935ce1e614 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 24 Mar 2020 14:45:33 +0000 Subject: Clean up some LoggingContext stuff (#7120) * Pull Sentinel out of LoggingContext ... and drop a few unnecessary references to it * Factor out LoggingContext.current_context move `current_context` and `set_context` out to top-level functions. Mostly this means that I can more easily trace what's actually referring to LoggingContext, but I think it's generally neater. * move copy-to-parent into `stop` this really just makes `start` and `stop` more symetric. It also means that it behaves correctly if you manually `set_log_context` rather than using the context manager. * Replace `LoggingContext.alive` with `finished` Turn `alive` into `finished` and make it a bit better defined. --- synapse/logging/_structured.py | 4 +- synapse/logging/context.py | 234 +++++++++++++++++---------------- synapse/logging/scopecontextmanager.py | 13 +- 3 files changed, 125 insertions(+), 126 deletions(-) (limited to 'synapse/logging') diff --git a/synapse/logging/_structured.py b/synapse/logging/_structured.py index ffa7b20ca8..7372450b45 100644 --- a/synapse/logging/_structured.py +++ b/synapse/logging/_structured.py @@ -42,7 +42,7 @@ from synapse.logging._terse_json import ( TerseJSONToConsoleLogObserver, TerseJSONToTCPLogObserver, ) -from synapse.logging.context import LoggingContext +from synapse.logging.context import current_context def stdlib_log_level_to_twisted(level: str) -> LogLevel: @@ -86,7 +86,7 @@ class LogContextObserver(object): ].startswith("Timing out client"): return - context = LoggingContext.current_context() + context = current_context() # Copy the context information to the log event. if context is not None: diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 860b99a4c6..a8eafb1c7c 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -175,7 +175,54 @@ class ContextResourceUsage(object): return res -LoggingContextOrSentinel = Union["LoggingContext", "LoggingContext.Sentinel"] +LoggingContextOrSentinel = Union["LoggingContext", "_Sentinel"] + + +class _Sentinel(object): + """Sentinel to represent the root context""" + + __slots__ = ["previous_context", "finished", "request", "scope", "tag"] + + def __init__(self) -> None: + # Minimal set for compatibility with LoggingContext + self.previous_context = None + self.finished = False + self.request = None + self.scope = None + self.tag = None + + def __str__(self): + return "sentinel" + + def copy_to(self, record): + pass + + def copy_to_twisted_log_entry(self, record): + record["request"] = None + record["scope"] = None + + def start(self): + pass + + def stop(self): + pass + + def add_database_transaction(self, duration_sec): + pass + + def add_database_scheduled(self, sched_sec): + pass + + def record_event_fetch(self, event_count): + pass + + def __nonzero__(self): + return False + + __bool__ = __nonzero__ # python3 + + +SENTINEL_CONTEXT = _Sentinel() class LoggingContext(object): @@ -199,76 +246,33 @@ class LoggingContext(object): "_resource_usage", "usage_start", "main_thread", - "alive", + "finished", "request", "tag", "scope", ] - thread_local = threading.local() - - class Sentinel(object): - """Sentinel to represent the root context""" - - __slots__ = ["previous_context", "alive", "request", "scope", "tag"] - - def __init__(self) -> None: - # Minimal set for compatibility with LoggingContext - self.previous_context = None - self.alive = None - self.request = None - self.scope = None - self.tag = None - - def __str__(self): - return "sentinel" - - def copy_to(self, record): - pass - - def copy_to_twisted_log_entry(self, record): - record["request"] = None - record["scope"] = None - - def start(self): - pass - - def stop(self): - pass - - def add_database_transaction(self, duration_sec): - pass - - def add_database_scheduled(self, sched_sec): - pass - - def record_event_fetch(self, event_count): - pass - - def __nonzero__(self): - return False - - __bool__ = __nonzero__ # python3 - - sentinel = Sentinel() - def __init__(self, name=None, parent_context=None, request=None) -> None: - self.previous_context = LoggingContext.current_context() + self.previous_context = current_context() self.name = name # track the resources used by this context so far self._resource_usage = ContextResourceUsage() - # If alive has the thread resource usage when the logcontext last - # became active. + # The thread resource usage when the logcontext became active. None + # if the context is not currently active. self.usage_start = None self.main_thread = get_thread_id() self.request = None self.tag = "" - self.alive = True self.scope = None # type: Optional[_LogContextScope] + # keep track of whether we have hit the __exit__ block for this context + # (suggesting that the the thing that created the context thinks it should + # be finished, and that re-activating it would suggest an error). + self.finished = False + self.parent_context = parent_context if self.parent_context is not None: @@ -283,44 +287,15 @@ class LoggingContext(object): return str(self.request) return "%s@%x" % (self.name, id(self)) - @classmethod - def current_context(cls) -> LoggingContextOrSentinel: - """Get the current logging context from thread local storage - - Returns: - LoggingContext: the current logging context - """ - return getattr(cls.thread_local, "current_context", cls.sentinel) - - @classmethod - def set_current_context( - cls, context: LoggingContextOrSentinel - ) -> LoggingContextOrSentinel: - """Set the current logging context in thread local storage - Args: - context(LoggingContext): The context to activate. - Returns: - The context that was previously active - """ - current = cls.current_context() - - if current is not context: - current.stop() - cls.thread_local.current_context = context - context.start() - return current - def __enter__(self) -> "LoggingContext": """Enters this logging context into thread local storage""" - old_context = self.set_current_context(self) + old_context = set_current_context(self) if self.previous_context != old_context: logger.warning( "Expected previous context %r, found %r", self.previous_context, old_context, ) - self.alive = True - return self def __exit__(self, type, value, traceback) -> None: @@ -329,24 +304,19 @@ class LoggingContext(object): Returns: None to avoid suppressing any exceptions that were thrown. """ - current = self.set_current_context(self.previous_context) + current = set_current_context(self.previous_context) if current is not self: - if current is self.sentinel: + if current is SENTINEL_CONTEXT: logger.warning("Expected logging context %s was lost", self) else: logger.warning( "Expected logging context %s but found %s", self, current ) - self.alive = False - - # if we have a parent, pass our CPU usage stats on - if self.parent_context is not None and hasattr( - self.parent_context, "_resource_usage" - ): - self.parent_context._resource_usage += self._resource_usage - # reset them in case we get entered again - self._resource_usage.reset() + # the fact that we are here suggests that the caller thinks that everything + # is done and dusted for this logcontext, and further activity will not get + # recorded against the correct metrics. + self.finished = True def copy_to(self, record) -> None: """Copy logging fields from this context to a log record or @@ -371,9 +341,14 @@ class LoggingContext(object): logger.warning("Started logcontext %s on different thread", self) return + if self.finished: + logger.warning("Re-starting finished log context %s", self) + # If we haven't already started record the thread resource usage so # far - if not self.usage_start: + if self.usage_start: + logger.warning("Re-starting already-active log context %s", self) + else: self.usage_start = get_thread_resource_usage() def stop(self) -> None: @@ -396,6 +371,15 @@ class LoggingContext(object): self.usage_start = None + # if we have a parent, pass our CPU usage stats on + if self.parent_context is not None and hasattr( + self.parent_context, "_resource_usage" + ): + self.parent_context._resource_usage += self._resource_usage + + # reset them in case we get entered again + self._resource_usage.reset() + def get_resource_usage(self) -> ContextResourceUsage: """Get resources used by this logcontext so far. @@ -409,7 +393,7 @@ class LoggingContext(object): # If we are on the correct thread and we're currently running then we # can include resource usage so far. is_main_thread = get_thread_id() == self.main_thread - if self.alive and self.usage_start and is_main_thread: + if self.usage_start and is_main_thread: utime_delta, stime_delta = self._get_cputime() res.ru_utime += utime_delta res.ru_stime += stime_delta @@ -492,7 +476,7 @@ class LoggingContextFilter(logging.Filter): Returns: True to include the record in the log output. """ - context = LoggingContext.current_context() + context = current_context() for key, value in self.defaults.items(): setattr(record, key, value) @@ -512,27 +496,24 @@ class PreserveLoggingContext(object): __slots__ = ["current_context", "new_context", "has_parent"] - def __init__(self, new_context: Optional[LoggingContextOrSentinel] = None) -> None: - if new_context is None: - self.new_context = LoggingContext.sentinel # type: LoggingContextOrSentinel - else: - self.new_context = new_context + def __init__( + self, new_context: LoggingContextOrSentinel = SENTINEL_CONTEXT + ) -> None: + self.new_context = new_context def __enter__(self) -> None: """Captures the current logging context""" - self.current_context = LoggingContext.set_current_context(self.new_context) + self.current_context = set_current_context(self.new_context) if self.current_context: self.has_parent = self.current_context.previous_context is not None - if not self.current_context.alive: - logger.debug("Entering dead context: %s", self.current_context) def __exit__(self, type, value, traceback) -> None: """Restores the current logging context""" - context = LoggingContext.set_current_context(self.current_context) + context = set_current_context(self.current_context) if context != self.new_context: - if context is LoggingContext.sentinel: + if not context: logger.warning("Expected logging context %s was lost", self.new_context) else: logger.warning( @@ -541,9 +522,30 @@ class PreserveLoggingContext(object): context, ) - if self.current_context is not LoggingContext.sentinel: - if not self.current_context.alive: - logger.debug("Restoring dead context: %s", self.current_context) + +_thread_local = threading.local() +_thread_local.current_context = SENTINEL_CONTEXT + + +def current_context() -> LoggingContextOrSentinel: + """Get the current logging context from thread local storage""" + return getattr(_thread_local, "current_context", SENTINEL_CONTEXT) + + +def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSentinel: + """Set the current logging context in thread local storage + Args: + context(LoggingContext): The context to activate. + Returns: + The context that was previously active + """ + current = current_context() + + if current is not context: + current.stop() + _thread_local.current_context = context + context.start() + return current def nested_logging_context( @@ -572,7 +574,7 @@ def nested_logging_context( if parent_context is not None: context = parent_context # type: LoggingContextOrSentinel else: - context = LoggingContext.current_context() + context = current_context() return LoggingContext( parent_context=context, request=str(context.request) + "-" + suffix ) @@ -604,7 +606,7 @@ def run_in_background(f, *args, **kwargs): CRITICAL error about an unhandled error will be logged without much indication about where it came from. """ - current = LoggingContext.current_context() + current = current_context() try: res = f(*args, **kwargs) except: # noqa: E722 @@ -625,7 +627,7 @@ def run_in_background(f, *args, **kwargs): # The function may have reset the context before returning, so # we need to restore it now. - ctx = LoggingContext.set_current_context(current) + ctx = set_current_context(current) # The original context will be restored when the deferred # completes, but there is nothing waiting for it, so it will @@ -674,7 +676,7 @@ def make_deferred_yieldable(deferred): # ok, we can't be sure that a yield won't block, so let's reset the # logcontext, and add a callback to the deferred to restore it. - prev_context = LoggingContext.set_current_context(LoggingContext.sentinel) + prev_context = set_current_context(SENTINEL_CONTEXT) deferred.addBoth(_set_context_cb, prev_context) return deferred @@ -684,7 +686,7 @@ ResultT = TypeVar("ResultT") def _set_context_cb(result: ResultT, context: LoggingContext) -> ResultT: """A callback function which just sets the logging context""" - LoggingContext.set_current_context(context) + set_current_context(context) return result @@ -752,7 +754,7 @@ def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs): Deferred: A Deferred which fires a callback with the result of `f`, or an errback if `f` throws an exception. """ - logcontext = LoggingContext.current_context() + logcontext = current_context() def g(): with LoggingContext(parent_context=logcontext): diff --git a/synapse/logging/scopecontextmanager.py b/synapse/logging/scopecontextmanager.py index 4eed4f2338..dc3ab00cbb 100644 --- a/synapse/logging/scopecontextmanager.py +++ b/synapse/logging/scopecontextmanager.py @@ -19,7 +19,7 @@ from opentracing import Scope, ScopeManager import twisted -from synapse.logging.context import LoggingContext, nested_logging_context +from synapse.logging.context import current_context, nested_logging_context logger = logging.getLogger(__name__) @@ -49,11 +49,8 @@ class LogContextScopeManager(ScopeManager): (Scope) : the Scope that is active, or None if not available. """ - ctx = LoggingContext.current_context() - if ctx is LoggingContext.sentinel: - return None - else: - return ctx.scope + ctx = current_context() + return ctx.scope def activate(self, span, finish_on_close): """ @@ -70,9 +67,9 @@ class LogContextScopeManager(ScopeManager): """ enter_logcontext = False - ctx = LoggingContext.current_context() + ctx = current_context() - if ctx is LoggingContext.sentinel: + if not ctx: # We don't want this scope to affect. logger.error("Tried to activate scope outside of loggingcontext") return Scope(None, span) -- cgit 1.5.1 From 60adcbed919afd5c85442775eca822fec43d816d Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 31 Mar 2020 15:18:41 +0100 Subject: Fix "'NoneType' has no attribute start|stop" logcontext errors (#7181) Fixes #7179. --- changelog.d/7181.misc | 1 + synapse/http/site.py | 13 ++++++------- synapse/logging/context.py | 5 +++++ 3 files changed, 12 insertions(+), 7 deletions(-) create mode 100644 changelog.d/7181.misc (limited to 'synapse/logging') diff --git a/changelog.d/7181.misc b/changelog.d/7181.misc new file mode 100644 index 0000000000..731f4dcb52 --- /dev/null +++ b/changelog.d/7181.misc @@ -0,0 +1 @@ +Clean up some LoggingContext code. diff --git a/synapse/http/site.py b/synapse/http/site.py index e092193c9c..32feb0d968 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -193,6 +193,12 @@ class SynapseRequest(Request): self.finish_time = time.time() Request.connectionLost(self, reason) + if self.logcontext is None: + logger.info( + "Connection from %s lost before request headers were read", self.client + ) + return + # we only get here if the connection to the client drops before we send # the response. # @@ -236,13 +242,6 @@ class SynapseRequest(Request): def _finished_processing(self): """Log the completion of this request and update the metrics """ - - if self.logcontext is None: - # this can happen if the connection closed before we read the - # headers (so render was never called). In that case we'll already - # have logged a warning, so just bail out. - return - usage = self.logcontext.get_resource_usage() if self._processing_finished_time is None: diff --git a/synapse/logging/context.py b/synapse/logging/context.py index a8eafb1c7c..3254d6a8df 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -539,6 +539,11 @@ def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSe Returns: The context that was previously active """ + # everything blows up if we allow current_context to be set to None, so sanity-check + # that now. + if context is None: + raise TypeError("'context' argument may not be None") + current = current_context() if current is not context: -- cgit 1.5.1 From 0f05fd15304f1931ef167351de63cc8ffa1d3a98 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 3 Apr 2020 13:21:30 +0100 Subject: Reduce the number of calls to `resource.getrusage` (#7183) Let's just call `getrusage` once on each logcontext change, rather than twice. --- changelog.d/7183.misc | 1 + synapse/logging/context.py | 102 ++++++++++++++++++++++++++++----------------- 2 files changed, 64 insertions(+), 39 deletions(-) create mode 100644 changelog.d/7183.misc (limited to 'synapse/logging') diff --git a/changelog.d/7183.misc b/changelog.d/7183.misc new file mode 100644 index 0000000000..731f4dcb52 --- /dev/null +++ b/changelog.d/7183.misc @@ -0,0 +1 @@ +Clean up some LoggingContext code. diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 3254d6a8df..a8f674d13d 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -51,7 +51,7 @@ try: is_thread_resource_usage_supported = True - def get_thread_resource_usage(): + def get_thread_resource_usage() -> "Optional[resource._RUsage]": return resource.getrusage(RUSAGE_THREAD) @@ -60,7 +60,7 @@ except Exception: # won't track resource usage. is_thread_resource_usage_supported = False - def get_thread_resource_usage(): + def get_thread_resource_usage() -> "Optional[resource._RUsage]": return None @@ -201,10 +201,10 @@ class _Sentinel(object): record["request"] = None record["scope"] = None - def start(self): + def start(self, rusage: "Optional[resource._RUsage]"): pass - def stop(self): + def stop(self, rusage: "Optional[resource._RUsage]"): pass def add_database_transaction(self, duration_sec): @@ -261,7 +261,7 @@ class LoggingContext(object): # The thread resource usage when the logcontext became active. None # if the context is not currently active. - self.usage_start = None + self.usage_start = None # type: Optional[resource._RUsage] self.main_thread = get_thread_id() self.request = None @@ -336,7 +336,17 @@ class LoggingContext(object): record["request"] = self.request record["scope"] = self.scope - def start(self) -> None: + def start(self, rusage: "Optional[resource._RUsage]") -> None: + """ + Record that this logcontext is currently running. + + This should not be called directly: use set_current_context + + Args: + rusage: the resources used by the current thread, at the point of + switching to this logcontext. May be None if this platform doesn't + support getrusuage. + """ if get_thread_id() != self.main_thread: logger.warning("Started logcontext %s on different thread", self) return @@ -349,36 +359,48 @@ class LoggingContext(object): if self.usage_start: logger.warning("Re-starting already-active log context %s", self) else: - self.usage_start = get_thread_resource_usage() + self.usage_start = rusage - def stop(self) -> None: - if get_thread_id() != self.main_thread: - logger.warning("Stopped logcontext %s on different thread", self) - return + def stop(self, rusage: "Optional[resource._RUsage]") -> None: + """ + Record that this logcontext is no longer running. + + This should not be called directly: use set_current_context + + Args: + rusage: the resources used by the current thread, at the point of + switching away from this logcontext. May be None if this platform + doesn't support getrusuage. + """ + + try: + if get_thread_id() != self.main_thread: + logger.warning("Stopped logcontext %s on different thread", self) + return + + if not rusage: + return - # When we stop, let's record the cpu used since we started - if not self.usage_start: - # Log a warning on platforms that support thread usage tracking - if is_thread_resource_usage_supported: + # Record the cpu used since we started + if not self.usage_start: logger.warning( - "Called stop on logcontext %s without calling start", self + "Called stop on logcontext %s without recording a start rusage", + self, ) - return - - utime_delta, stime_delta = self._get_cputime() - self._resource_usage.ru_utime += utime_delta - self._resource_usage.ru_stime += stime_delta + return - self.usage_start = None + utime_delta, stime_delta = self._get_cputime(rusage) + self._resource_usage.ru_utime += utime_delta + self._resource_usage.ru_stime += stime_delta - # if we have a parent, pass our CPU usage stats on - if self.parent_context is not None and hasattr( - self.parent_context, "_resource_usage" - ): - self.parent_context._resource_usage += self._resource_usage + # if we have a parent, pass our CPU usage stats on + if self.parent_context: + self.parent_context._resource_usage += self._resource_usage - # reset them in case we get entered again - self._resource_usage.reset() + # reset them in case we get entered again + self._resource_usage.reset() + finally: + self.usage_start = None def get_resource_usage(self) -> ContextResourceUsage: """Get resources used by this logcontext so far. @@ -394,24 +416,24 @@ class LoggingContext(object): # can include resource usage so far. is_main_thread = get_thread_id() == self.main_thread if self.usage_start and is_main_thread: - utime_delta, stime_delta = self._get_cputime() + rusage = get_thread_resource_usage() + assert rusage is not None + utime_delta, stime_delta = self._get_cputime(rusage) res.ru_utime += utime_delta res.ru_stime += stime_delta return res - def _get_cputime(self) -> Tuple[float, float]: - """Get the cpu usage time so far + def _get_cputime(self, current: "resource._RUsage") -> Tuple[float, float]: + """Get the cpu usage time between start() and the given rusage + + Args: + rusage: the current resource usage Returns: Tuple[float, float]: seconds in user mode, seconds in system mode """ assert self.usage_start is not None - current = get_thread_resource_usage() - - # Indicate to mypy that we know that self.usage_start is None. - assert self.usage_start is not None - utime_delta = current.ru_utime - self.usage_start.ru_utime stime_delta = current.ru_stime - self.usage_start.ru_stime @@ -547,9 +569,11 @@ def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSe current = current_context() if current is not context: - current.stop() + rusage = get_thread_resource_usage() + current.stop(rusage) _thread_local.current_context = context - context.start() + context.start(rusage) + return current -- cgit 1.5.1 From 37f6823f5b91f27b9dd8de8fc0e52d5ea889647c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 29 Apr 2020 16:23:08 +0100 Subject: Add instance name to RDATA/POSITION commands (#7364) This is primarily for allowing us to send those commands from workers, but for now simply allows us to ignore echoed RDATA/POSITION commands that we sent (we get echoes of sent commands when using redis). Currently we log a WARNING on the master process every time we receive an echoed RDATA. --- changelog.d/7364.misc | 1 + docs/tcp_replication.md | 41 +++++++++++++++++++------------- synapse/app/_base.py | 4 ++-- synapse/logging/opentracing.py | 23 ++++++++---------- synapse/replication/tcp/commands.py | 37 +++++++++++++++++++--------- synapse/replication/tcp/handler.py | 17 ++++++++++--- synapse/server.py | 13 ++++++++-- synapse/server.pyi | 2 ++ tests/replication/slave/storage/_base.py | 1 + tests/replication/tcp/test_commands.py | 6 +++-- 10 files changed, 95 insertions(+), 50 deletions(-) create mode 100644 changelog.d/7364.misc (limited to 'synapse/logging') diff --git a/changelog.d/7364.misc b/changelog.d/7364.misc new file mode 100644 index 0000000000..bb5d727cf4 --- /dev/null +++ b/changelog.d/7364.misc @@ -0,0 +1 @@ +Add an `instance_name` to `RDATA` and `POSITION` replication commands. diff --git a/docs/tcp_replication.md b/docs/tcp_replication.md index b922d9cf7e..ab2fffbfe4 100644 --- a/docs/tcp_replication.md +++ b/docs/tcp_replication.md @@ -15,15 +15,17 @@ example flow would be (where '>' indicates master to worker and > SERVER example.com < REPLICATE - > POSITION events 53 - > RDATA events 54 ["$foo1:bar.com", ...] - > RDATA events 55 ["$foo4:bar.com", ...] + > POSITION events master 53 + > RDATA events master 54 ["$foo1:bar.com", ...] + > RDATA events master 55 ["$foo4:bar.com", ...] The example shows the server accepting a new connection and sending its identity with the `SERVER` command, followed by the client server to respond with the position of all streams. The server then periodically sends `RDATA` commands -which have the format `RDATA `, where the format of -`` is defined by the individual streams. +which have the format `RDATA `, where +the format of `` is defined by the individual streams. The +`` is the name of the Synapse process that generated the data +(usually "master"). Error reporting happens by either the client or server sending an ERROR command, and usually the connection will be closed. @@ -52,7 +54,7 @@ The basic structure of the protocol is line based, where the initial word of each line specifies the command. The rest of the line is parsed based on the command. For example, the RDATA command is defined as: - RDATA + RDATA (Note that may contains spaces, but cannot contain newlines.) @@ -136,11 +138,11 @@ the wire: < NAME synapse.app.appservice < PING 1490197665618 < REPLICATE - > POSITION events 1 - > POSITION backfill 1 - > POSITION caches 1 - > RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513] - > RDATA events 14 ["$149019767112vOHxz:localhost:8823", + > POSITION events master 1 + > POSITION backfill master 1 + > POSITION caches master 1 + > RDATA caches master 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513] + > RDATA events master 14 ["$149019767112vOHxz:localhost:8823", "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null] < PING 1490197675618 > ERROR server stopping @@ -151,10 +153,10 @@ position without needing to send data with the `RDATA` command. An example of a batched set of `RDATA` is: - > RDATA caches batch ["get_user_by_id",["@test:localhost:8823"],1490197670513] - > RDATA caches batch ["get_user_by_id",["@test2:localhost:8823"],1490197670513] - > RDATA caches batch ["get_user_by_id",["@test3:localhost:8823"],1490197670513] - > RDATA caches 54 ["get_user_by_id",["@test4:localhost:8823"],1490197670513] + > RDATA caches master batch ["get_user_by_id",["@test:localhost:8823"],1490197670513] + > RDATA caches master batch ["get_user_by_id",["@test2:localhost:8823"],1490197670513] + > RDATA caches master batch ["get_user_by_id",["@test3:localhost:8823"],1490197670513] + > RDATA caches master 54 ["get_user_by_id",["@test4:localhost:8823"],1490197670513] In this case the client shouldn't advance their caches token until it sees the the last `RDATA`. @@ -178,6 +180,11 @@ client (C): updates, and if so then fetch them out of band. Sent in response to a REPLICATE command (but can happen at any time). + The POSITION command includes the source of the stream. Currently all streams + are written by a single process (usually "master"). If fetching missing + updates via HTTP API, rather than via the DB, then processes should make the + request to the appropriate process. + #### ERROR (S, C) There was an error @@ -234,12 +241,12 @@ Each individual cache invalidation results in a row being sent down replication, which includes the cache name (the name of the function) and they key to invalidate. For example: - > RDATA caches 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251] + > RDATA caches master 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251] Alternatively, an entire cache can be invalidated by sending down a `null` instead of the key. For example: - > RDATA caches 550953772 ["get_user_by_id", null, 1550574873252] + > RDATA caches master 550953772 ["get_user_by_id", null, 1550574873252] However, there are times when a number of caches need to be invalidated at the same time with the same key. To reduce traffic we batch those diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 4d84f4595a..628292b890 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -270,7 +270,7 @@ def start(hs, listeners=None): # Start the tracer synapse.logging.opentracing.init_tracer( # type: ignore[attr-defined] # noqa - hs.config + hs ) # It is now safe to start your Synapse. @@ -316,7 +316,7 @@ def setup_sentry(hs): scope.set_tag("matrix_server_name", hs.config.server_name) app = hs.config.worker_app if hs.config.worker_app else "synapse.app.homeserver" - name = hs.config.worker_name if hs.config.worker_name else "master" + name = hs.get_instance_name() scope.set_tag("worker_app", app) scope.set_tag("worker_name", name) diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index 0638cec429..5dddf57008 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -171,7 +171,7 @@ import logging import re import types from functools import wraps -from typing import Dict +from typing import TYPE_CHECKING, Dict from canonicaljson import json @@ -179,6 +179,9 @@ from twisted.internet import defer from synapse.config import ConfigError +if TYPE_CHECKING: + from synapse.server import HomeServer + # Helper class @@ -297,14 +300,11 @@ def _noop_context_manager(*args, **kwargs): # Setup -def init_tracer(config): +def init_tracer(hs: "HomeServer"): """Set the whitelists and initialise the JaegerClient tracer - - Args: - config (HomeserverConfig): The config used by the homeserver """ global opentracing - if not config.opentracer_enabled: + if not hs.config.opentracer_enabled: # We don't have a tracer opentracing = None return @@ -315,18 +315,15 @@ def init_tracer(config): "installed." ) - # Include the worker name - name = config.worker_name if config.worker_name else "master" - # Pull out the jaeger config if it was given. Otherwise set it to something sensible. # See https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/config.py - set_homeserver_whitelist(config.opentracer_whitelist) + set_homeserver_whitelist(hs.config.opentracer_whitelist) JaegerConfig( - config=config.jaeger_config, - service_name="{} {}".format(config.server_name, name), - scope_manager=LogContextScopeManager(config), + config=hs.config.jaeger_config, + service_name="{} {}".format(hs.config.server_name, hs.get_instance_name()), + scope_manager=LogContextScopeManager(hs.config), ).initialize_tracer() diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index c7880d4b63..f58e384d17 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -95,7 +95,7 @@ class RdataCommand(Command): Format:: - RDATA + RDATA The `` may either be a numeric stream id OR "batch". The latter case is used to support sending multiple updates with the same stream ID. This @@ -105,33 +105,40 @@ class RdataCommand(Command): The client should batch all incoming RDATA with a token of "batch" (per stream_name) until it sees an RDATA with a numeric stream ID. + The `` is the source of the new data (usually "master"). + `` of "batch" maps to the instance variable `token` being None. An example of a batched series of RDATA:: - RDATA presence batch ["@foo:example.com", "online", ...] - RDATA presence batch ["@bar:example.com", "online", ...] - RDATA presence 59 ["@baz:example.com", "online", ...] + RDATA presence master batch ["@foo:example.com", "online", ...] + RDATA presence master batch ["@bar:example.com", "online", ...] + RDATA presence master 59 ["@baz:example.com", "online", ...] """ NAME = "RDATA" - def __init__(self, stream_name, token, row): + def __init__(self, stream_name, instance_name, token, row): self.stream_name = stream_name + self.instance_name = instance_name self.token = token self.row = row @classmethod def from_line(cls, line): - stream_name, token, row_json = line.split(" ", 2) + stream_name, instance_name, token, row_json = line.split(" ", 3) return cls( - stream_name, None if token == "batch" else int(token), json.loads(row_json) + stream_name, + instance_name, + None if token == "batch" else int(token), + json.loads(row_json), ) def to_line(self): return " ".join( ( self.stream_name, + self.instance_name, str(self.token) if self.token is not None else "batch", _json_encoder.encode(self.row), ) @@ -145,23 +152,31 @@ class PositionCommand(Command): """Sent by the server to tell the client the stream postition without needing to send an RDATA. + Format:: + + POSITION + On receipt of a POSITION command clients should check if they have missed any updates, and if so then fetch them out of band. + + The `` is the process that sent the command and is the source + of the stream. """ NAME = "POSITION" - def __init__(self, stream_name, token): + def __init__(self, stream_name, instance_name, token): self.stream_name = stream_name + self.instance_name = instance_name self.token = token @classmethod def from_line(cls, line): - stream_name, token = line.split(" ", 1) - return cls(stream_name, int(token)) + stream_name, instance_name, token = line.split(" ", 2) + return cls(stream_name, instance_name, int(token)) def to_line(self): - return " ".join((self.stream_name, str(self.token))) + return " ".join((self.stream_name, self.instance_name, str(self.token))) class ErrorCommand(_SimpleCommand): diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index b8f49a8d0f..6f7054d5af 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -79,6 +79,7 @@ class ReplicationCommandHandler: self._notifier = hs.get_notifier() self._clock = hs.get_clock() self._instance_id = hs.get_instance_id() + self._instance_name = hs.get_instance_name() # Set of streams that we've caught up with. self._streams_connected = set() # type: Set[str] @@ -156,7 +157,7 @@ class ReplicationCommandHandler: hs.config.redis.redis_host, hs.config.redis.redis_port, self._factory, ) else: - client_name = hs.config.worker_name + client_name = hs.get_instance_name() self._factory = DirectTcpReplicationClientFactory(hs, client_name, self) host = hs.config.worker_replication_host port = hs.config.worker_replication_port @@ -170,7 +171,9 @@ class ReplicationCommandHandler: for stream_name, stream in self._streams.items(): current_token = stream.current_token() - self.send_command(PositionCommand(stream_name, current_token)) + self.send_command( + PositionCommand(stream_name, self._instance_name, current_token) + ) async def on_USER_SYNC(self, conn: AbstractConnection, cmd: UserSyncCommand): user_sync_counter.inc() @@ -235,6 +238,10 @@ class ReplicationCommandHandler: await self._server_notices_sender.on_user_ip(cmd.user_id) async def on_RDATA(self, conn: AbstractConnection, cmd: RdataCommand): + if cmd.instance_name == self._instance_name: + # Ignore RDATA that are just our own echoes + return + stream_name = cmd.stream_name inbound_rdata_count.labels(stream_name).inc() @@ -286,6 +293,10 @@ class ReplicationCommandHandler: await self._replication_data_handler.on_rdata(stream_name, token, rows) async def on_POSITION(self, conn: AbstractConnection, cmd: PositionCommand): + if cmd.instance_name == self._instance_name: + # Ignore POSITION that are just our own echoes + return + stream = self._streams.get(cmd.stream_name) if not stream: logger.error("Got POSITION for unknown stream: %s", cmd.stream_name) @@ -485,7 +496,7 @@ class ReplicationCommandHandler: We need to check if the client is interested in the stream or not """ - self.send_command(RdataCommand(stream_name, token, data)) + self.send_command(RdataCommand(stream_name, self._instance_name, token, data)) UpdateToken = TypeVar("UpdateToken") diff --git a/synapse/server.py b/synapse/server.py index 9d273c980c..bf97a16c09 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -234,7 +234,8 @@ class HomeServer(object): self._listening_services = [] self.start_time = None - self.instance_id = random_string(5) + self._instance_id = random_string(5) + self._instance_name = config.worker_name or "master" self.clock = Clock(reactor) self.distributor = Distributor() @@ -254,7 +255,15 @@ class HomeServer(object): This is used to distinguish running instances in worker-based deployments. """ - return self.instance_id + return self._instance_id + + def get_instance_name(self) -> str: + """A unique name for this synapse process. + + Used to identify the process over replication and in config. Does not + change over restarts. + """ + return self._instance_name def setup(self): logger.info("Setting up.") diff --git a/synapse/server.pyi b/synapse/server.pyi index fc5886f762..18043a2593 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -122,6 +122,8 @@ class HomeServer(object): pass def get_instance_id(self) -> str: pass + def get_instance_name(self) -> str: + pass def get_event_builder_factory(self) -> EventBuilderFactory: pass def get_storage(self) -> synapse.storage.Storage: diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index 395c7d0306..1615dfab5e 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -57,6 +57,7 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase): # We now do some gut wrenching so that we have a client that is based # off of the slave store rather than the main store. self.replication_handler = ReplicationCommandHandler(self.hs) + self.replication_handler._instance_name = "worker" self.replication_handler._replication_data_handler = ReplicationDataHandler( self.slaved_store ) diff --git a/tests/replication/tcp/test_commands.py b/tests/replication/tcp/test_commands.py index 3cbcb513cc..7ddfd0a733 100644 --- a/tests/replication/tcp/test_commands.py +++ b/tests/replication/tcp/test_commands.py @@ -28,15 +28,17 @@ class ParseCommandTestCase(TestCase): self.assertIsInstance(cmd, ReplicateCommand) def test_parse_rdata(self): - line = 'RDATA events 6287863 ["ev", ["$eventid", "!roomid", "type", null, null, null]]' + line = 'RDATA events master 6287863 ["ev", ["$eventid", "!roomid", "type", null, null, null]]' cmd = parse_command_from_line(line) self.assertIsInstance(cmd, RdataCommand) self.assertEqual(cmd.stream_name, "events") + self.assertEqual(cmd.instance_name, "master") self.assertEqual(cmd.token, 6287863) def test_parse_rdata_batch(self): - line = 'RDATA presence batch ["@foo:example.com", "online"]' + line = 'RDATA presence master batch ["@foo:example.com", "online"]' cmd = parse_command_from_line(line) self.assertIsInstance(cmd, RdataCommand) self.assertEqual(cmd.stream_name, "presence") + self.assertEqual(cmd.instance_name, "master") self.assertIsNone(cmd.token) -- cgit 1.5.1 From fe69fb6263989b570366adf23d20091a0b91fb80 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 5 May 2020 09:21:34 -0400 Subject: Add backwards compatibility codepath to LoggingContext. (#7408) --- changelog.d/7408.misc | 1 + synapse/logging/context.py | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) create mode 100644 changelog.d/7408.misc (limited to 'synapse/logging') diff --git a/changelog.d/7408.misc b/changelog.d/7408.misc new file mode 100644 index 0000000000..731f4dcb52 --- /dev/null +++ b/changelog.d/7408.misc @@ -0,0 +1 @@ +Clean up some LoggingContext code. diff --git a/synapse/logging/context.py b/synapse/logging/context.py index a8f674d13d..856534e91a 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -27,6 +27,7 @@ import inspect import logging import threading import types +import warnings from typing import TYPE_CHECKING, Optional, Tuple, TypeVar, Union from typing_extensions import Literal @@ -287,6 +288,46 @@ class LoggingContext(object): return str(self.request) return "%s@%x" % (self.name, id(self)) + @classmethod + def current_context(cls) -> LoggingContextOrSentinel: + """Get the current logging context from thread local storage + + This exists for backwards compatibility. ``current_context()`` should be + called directly. + + Returns: + LoggingContext: the current logging context + """ + warnings.warn( + "synapse.logging.context.LoggingContext.current_context() is deprecated " + "in favor of synapse.logging.context.current_context().", + DeprecationWarning, + stacklevel=2, + ) + return current_context() + + @classmethod + def set_current_context( + cls, context: LoggingContextOrSentinel + ) -> LoggingContextOrSentinel: + """Set the current logging context in thread local storage + + This exists for backwards compatibility. ``set_current_context()`` should be + called directly. + + Args: + context(LoggingContext): The context to activate. + Returns: + The context that was previously active + """ + warnings.warn( + "synapse.logging.context.LoggingContext.set_current_context() is deprecated " + "in favor of synapse.logging.context.set_current_context().", + DeprecationWarning, + stacklevel=2, + ) + return set_current_context(context) + def __enter__(self) -> "LoggingContext": """Enters this logging context into thread local storage""" old_context = set_current_context(self) -- cgit 1.5.1 From d7c2df2fa3691069cc4fdeabd5028e246882d70c Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 6 May 2020 16:43:39 -0400 Subject: Improve per-block CPU and DB usage metrics (#7426) --- changelog.d/7426.misc | 1 + synapse/logging/context.py | 38 ++++++++++++++++++++++++++------------ 2 files changed, 27 insertions(+), 12 deletions(-) create mode 100644 changelog.d/7426.misc (limited to 'synapse/logging') diff --git a/changelog.d/7426.misc b/changelog.d/7426.misc new file mode 100644 index 0000000000..731f4dcb52 --- /dev/null +++ b/changelog.d/7426.misc @@ -0,0 +1 @@ +Clean up some LoggingContext code. diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 856534e91a..8b9c4e38bd 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -431,15 +431,7 @@ class LoggingContext(object): return utime_delta, stime_delta = self._get_cputime(rusage) - self._resource_usage.ru_utime += utime_delta - self._resource_usage.ru_stime += stime_delta - - # if we have a parent, pass our CPU usage stats on - if self.parent_context: - self.parent_context._resource_usage += self._resource_usage - - # reset them in case we get entered again - self._resource_usage.reset() + self.add_cputime(utime_delta, stime_delta) finally: self.usage_start = None @@ -497,30 +489,52 @@ class LoggingContext(object): return utime_delta, stime_delta + def add_cputime(self, utime_delta: float, stime_delta: float) -> None: + """Update the CPU time usage of this context (and any parents, recursively). + + Args: + utime_delta: additional user time, in seconds, spent in this context. + stime_delta: additional system time, in seconds, spent in this context. + """ + self._resource_usage.ru_utime += utime_delta + self._resource_usage.ru_stime += stime_delta + if self.parent_context: + self.parent_context.add_cputime(utime_delta, stime_delta) + def add_database_transaction(self, duration_sec: float) -> None: + """Record the use of a database transaction and the length of time it took. + + Args: + duration_sec: The number of seconds the database transaction took. + """ if duration_sec < 0: raise ValueError("DB txn time can only be non-negative") self._resource_usage.db_txn_count += 1 self._resource_usage.db_txn_duration_sec += duration_sec + if self.parent_context: + self.parent_context.add_database_transaction(duration_sec) def add_database_scheduled(self, sched_sec: float) -> None: """Record a use of the database pool Args: - sched_sec (float): number of seconds it took us to get a - connection + sched_sec: number of seconds it took us to get a connection """ if sched_sec < 0: raise ValueError("DB scheduling time can only be non-negative") self._resource_usage.db_sched_duration_sec += sched_sec + if self.parent_context: + self.parent_context.add_database_scheduled(sched_sec) def record_event_fetch(self, event_count: int) -> None: """Record a number of events being fetched from the db Args: - event_count (int): number of events being fetched + event_count: number of events being fetched """ self._resource_usage.evt_db_fetch_count += event_count + if self.parent_context: + self.parent_context.record_event_fetch(event_count) class LoggingContextFilter(logging.Filter): -- cgit 1.5.1