summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/handlers/appservice.py50
-rw-r--r--synapse/logging/opentracing.py10
-rw-r--r--synapse/metrics/background_process_metrics.py12
-rw-r--r--synapse/notifier.py34
-rw-r--r--synapse/push/pusherpool.py18
-rw-r--r--synapse/replication/tcp/redis.py4
6 files changed, 87 insertions, 41 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 07240d3a14..7826387e53 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 import logging
-from typing import Dict, List, Optional
+from typing import Dict, List, Optional, Union
 
 from prometheus_client import Counter
 
@@ -30,7 +30,10 @@ from synapse.metrics import (
     event_processing_loop_counter,
     event_processing_loop_room_count,
 )
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import (
+    run_as_background_process,
+    wrap_as_background_process,
+)
 from synapse.types import Collection, JsonDict, RoomStreamToken, UserID
 from synapse.util.metrics import Measure
 
@@ -53,7 +56,7 @@ class ApplicationServicesHandler:
         self.current_max = 0
         self.is_processing = False
 
-    async def notify_interested_services(self, max_token: RoomStreamToken):
+    def notify_interested_services(self, max_token: RoomStreamToken):
         """Notifies (pushes) all application services interested in this event.
 
         Pushing is done asynchronously, so this method won't block for any
@@ -72,6 +75,12 @@ class ApplicationServicesHandler:
         if self.is_processing:
             return
 
+        # We only start a new background process if necessary rather than
+        # optimistically (to cut down on overhead).
+        self._notify_interested_services(max_token)
+
+    @wrap_as_background_process("notify_interested_services")
+    async def _notify_interested_services(self, max_token: RoomStreamToken):
         with Measure(self.clock, "notify_interested_services"):
             self.is_processing = True
             try:
@@ -166,8 +175,11 @@ class ApplicationServicesHandler:
             finally:
                 self.is_processing = False
 
-    async def notify_interested_services_ephemeral(
-        self, stream_key: str, new_token: Optional[int], users: Collection[UserID] = [],
+    def notify_interested_services_ephemeral(
+        self,
+        stream_key: str,
+        new_token: Optional[int],
+        users: Collection[Union[str, UserID]] = [],
     ):
         """This is called by the notifier in the background
         when a ephemeral event handled by the homeserver.
@@ -183,13 +195,34 @@ class ApplicationServicesHandler:
             new_token: The latest stream token
             users: The user(s) involved with the event.
         """
+        if not self.notify_appservices:
+            return
+
+        if stream_key not in ("typing_key", "receipt_key", "presence_key"):
+            return
+
         services = [
             service
             for service in self.store.get_app_services()
             if service.supports_ephemeral
         ]
-        if not services or not self.notify_appservices:
+        if not services:
             return
+
+        # We only start a new background process if necessary rather than
+        # optimistically (to cut down on overhead).
+        self._notify_interested_services_ephemeral(
+            services, stream_key, new_token, users
+        )
+
+    @wrap_as_background_process("notify_interested_services_ephemeral")
+    async def _notify_interested_services_ephemeral(
+        self,
+        services: List[ApplicationService],
+        stream_key: str,
+        new_token: Optional[int],
+        users: Collection[Union[str, UserID]],
+    ):
         logger.info("Checking interested services for %s" % (stream_key))
         with Measure(self.clock, "notify_interested_services_ephemeral"):
             for service in services:
@@ -237,7 +270,7 @@ class ApplicationServicesHandler:
         return receipts
 
     async def _handle_presence(
-        self, service: ApplicationService, users: Collection[UserID]
+        self, service: ApplicationService, users: Collection[Union[str, UserID]]
     ):
         events = []  # type: List[JsonDict]
         presence_source = self.event_sources.sources["presence"]
@@ -245,6 +278,9 @@ class ApplicationServicesHandler:
             service, "presence"
         )
         for user in users:
+            if isinstance(user, str):
+                user = UserID.from_string(user)
+
             interested = await service.is_interested_in_presence(user, self.store)
             if not interested:
                 continue
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index e58850faff..ab586c318c 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -317,7 +317,7 @@ def ensure_active_span(message, ret=None):
 
 
 @contextlib.contextmanager
-def _noop_context_manager(*args, **kwargs):
+def noop_context_manager(*args, **kwargs):
     """Does exactly what it says on the tin"""
     yield
 
@@ -413,7 +413,7 @@ def start_active_span(
     """
 
     if opentracing is None:
-        return _noop_context_manager()
+        return noop_context_manager()
 
     return opentracing.tracer.start_active_span(
         operation_name,
@@ -428,7 +428,7 @@ def start_active_span(
 
 def start_active_span_follows_from(operation_name, contexts):
     if opentracing is None:
-        return _noop_context_manager()
+        return noop_context_manager()
 
     references = [opentracing.follows_from(context) for context in contexts]
     scope = start_active_span(operation_name, references=references)
@@ -459,7 +459,7 @@ def start_active_span_from_request(
     # Also, twisted uses byte arrays while opentracing expects strings.
 
     if opentracing is None:
-        return _noop_context_manager()
+        return noop_context_manager()
 
     header_dict = {
         k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
@@ -497,7 +497,7 @@ def start_active_span_from_edu(
     """
 
     if opentracing is None:
-        return _noop_context_manager()
+        return noop_context_manager()
 
     carrier = json_decoder.decode(edu_content.get("context", "{}")).get(
         "opentracing", {}
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index 08fbf78eee..658f6ecd72 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -24,7 +24,7 @@ from prometheus_client.core import REGISTRY, Counter, Gauge
 from twisted.internet import defer
 
 from synapse.logging.context import LoggingContext, PreserveLoggingContext
-from synapse.logging.opentracing import start_active_span
+from synapse.logging.opentracing import noop_context_manager, start_active_span
 
 if TYPE_CHECKING:
     import resource
@@ -167,7 +167,7 @@ class _BackgroundProcess:
         )
 
 
-def run_as_background_process(desc: str, func, *args, **kwargs):
+def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwargs):
     """Run the given function in its own logcontext, with resource metrics
 
     This should be used to wrap processes which are fired off to run in the
@@ -181,6 +181,9 @@ def run_as_background_process(desc: str, func, *args, **kwargs):
     Args:
         desc: a description for this background process type
         func: a function, which may return a Deferred or a coroutine
+        bg_start_span: Whether to start an opentracing span. Defaults to True.
+            Should only be disabled for processes that will not log to or tag
+            a span.
         args: positional args for func
         kwargs: keyword args for func
 
@@ -199,7 +202,10 @@ def run_as_background_process(desc: str, func, *args, **kwargs):
         with BackgroundProcessLoggingContext(desc) as context:
             context.request = "%s-%i" % (desc, count)
             try:
-                with start_active_span(desc, tags={"request_id": context.request}):
+                ctx = noop_context_manager()
+                if bg_start_span:
+                    ctx = start_active_span(desc, tags={"request_id": context.request})
+                with ctx:
                     result = func(*args, **kwargs)
 
                     if inspect.isawaitable(result):
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 858b487bec..eb56b26f21 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -40,7 +40,6 @@ from synapse.handlers.presence import format_user_presence_state
 from synapse.logging.context import PreserveLoggingContext
 from synapse.logging.utils import log_function
 from synapse.metrics import LaterGauge
-from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.streams.config import PaginationConfig
 from synapse.types import (
     Collection,
@@ -310,44 +309,37 @@ class Notifier:
         """
 
         # poke any interested application service.
-        run_as_background_process(
-            "_notify_app_services", self._notify_app_services, max_room_stream_token
-        )
-
-        run_as_background_process(
-            "_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_token
-        )
+        self._notify_app_services(max_room_stream_token)
+        self._notify_pusher_pool(max_room_stream_token)
 
         if self.federation_sender:
             self.federation_sender.notify_new_events(max_room_stream_token)
 
-    async def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
+    def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
         try:
-            await self.appservice_handler.notify_interested_services(
-                max_room_stream_token
-            )
+            self.appservice_handler.notify_interested_services(max_room_stream_token)
         except Exception:
             logger.exception("Error notifying application services of event")
 
-    async def _notify_app_services_ephemeral(
+    def _notify_app_services_ephemeral(
         self,
         stream_key: str,
         new_token: Union[int, RoomStreamToken],
-        users: Collection[UserID] = [],
+        users: Collection[Union[str, UserID]] = [],
     ):
         try:
             stream_token = None
             if isinstance(new_token, int):
                 stream_token = new_token
-            await self.appservice_handler.notify_interested_services_ephemeral(
+            self.appservice_handler.notify_interested_services_ephemeral(
                 stream_key, stream_token, users
             )
         except Exception:
             logger.exception("Error notifying application services of event")
 
-    async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
+    def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
         try:
-            await self._pusher_pool.on_new_notifications(max_room_stream_token)
+            self._pusher_pool.on_new_notifications(max_room_stream_token)
         except Exception:
             logger.exception("Error pusher pool of event")
 
@@ -384,12 +376,8 @@ class Notifier:
                 self.notify_replication()
 
                 # Notify appservices
-                run_as_background_process(
-                    "_notify_app_services_ephemeral",
-                    self._notify_app_services_ephemeral,
-                    stream_key,
-                    new_token,
-                    users,
+                self._notify_app_services_ephemeral(
+                    stream_key, new_token, users,
                 )
 
     def on_new_replication_data(self) -> None:
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 0080c68ce2..f325964983 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -19,7 +19,10 @@ from typing import TYPE_CHECKING, Dict, Union
 
 from prometheus_client import Gauge
 
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import (
+    run_as_background_process,
+    wrap_as_background_process,
+)
 from synapse.push import PusherConfigException
 from synapse.push.emailpusher import EmailPusher
 from synapse.push.httppusher import HttpPusher
@@ -187,7 +190,7 @@ class PusherPool:
                 )
                 await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
 
-    async def on_new_notifications(self, max_token: RoomStreamToken):
+    def on_new_notifications(self, max_token: RoomStreamToken):
         if not self.pushers:
             # nothing to do here.
             return
@@ -201,6 +204,17 @@ class PusherPool:
             # Nothing to do
             return
 
+        # We only start a new background process if necessary rather than
+        # optimistically (to cut down on overhead).
+        self._on_new_notifications(max_token)
+
+    @wrap_as_background_process("on_new_notifications")
+    async def _on_new_notifications(self, max_token: RoomStreamToken):
+        # We just use the minimum stream ordering and ignore the vector clock
+        # component. This is safe to do as long as we *always* ignore the vector
+        # clock components.
+        max_stream_id = max_token.stream
+
         prev_stream_id = self._last_room_stream_id_seen
         self._last_room_stream_id_seen = max_stream_id
 
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index de19705c1f..bc6ba709a7 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -166,7 +166,9 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
         Args:
             cmd (Command)
         """
-        run_as_background_process("send-cmd", self._async_send_command, cmd)
+        run_as_background_process(
+            "send-cmd", self._async_send_command, cmd, bg_start_span=False
+        )
 
     async def _async_send_command(self, cmd: Command):
         """Encode a replication command and send it over our outbound connection"""