summary refs log tree commit diff
path: root/synapse/handlers/appservice.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-10-26 09:30:19 +0000
committerGitHub <noreply@github.com>2020-10-26 09:30:19 +0000
commit2b7c180879e5d62145feed88375ba55f18fc2ae5 (patch)
tree6891abafc487b675d6e2fe0f5d602dad2d46d1aa /synapse/handlers/appservice.py
parentFix typos and spelling errors. (#8639) (diff)
downloadsynapse-2b7c180879e5d62145feed88375ba55f18fc2ae5.tar.xz
Start fewer opentracing spans (#8640)
#8567 started a span for every background process. This is good as it means all Synapse code that gets run should be in a span (unless in the sentinel logging context), but it means we generate about 15x the number of spans as we did previously.

This PR attempts to reduce that number by a) not starting one for send commands to Redis, and b) deferring starting background processes until after we're sure they're necessary.

I don't really know how much this will help.
Diffstat (limited to 'synapse/handlers/appservice.py')
-rw-r--r--synapse/handlers/appservice.py50
1 files changed, 43 insertions, 7 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 07240d3a14..7826387e53 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 import logging
-from typing import Dict, List, Optional
+from typing import Dict, List, Optional, Union
 
 from prometheus_client import Counter
 
@@ -30,7 +30,10 @@ from synapse.metrics import (
     event_processing_loop_counter,
     event_processing_loop_room_count,
 )
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import (
+    run_as_background_process,
+    wrap_as_background_process,
+)
 from synapse.types import Collection, JsonDict, RoomStreamToken, UserID
 from synapse.util.metrics import Measure
 
@@ -53,7 +56,7 @@ class ApplicationServicesHandler:
         self.current_max = 0
         self.is_processing = False
 
-    async def notify_interested_services(self, max_token: RoomStreamToken):
+    def notify_interested_services(self, max_token: RoomStreamToken):
         """Notifies (pushes) all application services interested in this event.
 
         Pushing is done asynchronously, so this method won't block for any
@@ -72,6 +75,12 @@ class ApplicationServicesHandler:
         if self.is_processing:
             return
 
+        # We only start a new background process if necessary rather than
+        # optimistically (to cut down on overhead).
+        self._notify_interested_services(max_token)
+
+    @wrap_as_background_process("notify_interested_services")
+    async def _notify_interested_services(self, max_token: RoomStreamToken):
         with Measure(self.clock, "notify_interested_services"):
             self.is_processing = True
             try:
@@ -166,8 +175,11 @@ class ApplicationServicesHandler:
             finally:
                 self.is_processing = False
 
-    async def notify_interested_services_ephemeral(
-        self, stream_key: str, new_token: Optional[int], users: Collection[UserID] = [],
+    def notify_interested_services_ephemeral(
+        self,
+        stream_key: str,
+        new_token: Optional[int],
+        users: Collection[Union[str, UserID]] = [],
     ):
         """This is called by the notifier in the background
         when a ephemeral event handled by the homeserver.
@@ -183,13 +195,34 @@ class ApplicationServicesHandler:
             new_token: The latest stream token
             users: The user(s) involved with the event.
         """
+        if not self.notify_appservices:
+            return
+
+        if stream_key not in ("typing_key", "receipt_key", "presence_key"):
+            return
+
         services = [
             service
             for service in self.store.get_app_services()
             if service.supports_ephemeral
         ]
-        if not services or not self.notify_appservices:
+        if not services:
             return
+
+        # We only start a new background process if necessary rather than
+        # optimistically (to cut down on overhead).
+        self._notify_interested_services_ephemeral(
+            services, stream_key, new_token, users
+        )
+
+    @wrap_as_background_process("notify_interested_services_ephemeral")
+    async def _notify_interested_services_ephemeral(
+        self,
+        services: List[ApplicationService],
+        stream_key: str,
+        new_token: Optional[int],
+        users: Collection[Union[str, UserID]],
+    ):
         logger.info("Checking interested services for %s" % (stream_key))
         with Measure(self.clock, "notify_interested_services_ephemeral"):
             for service in services:
@@ -237,7 +270,7 @@ class ApplicationServicesHandler:
         return receipts
 
     async def _handle_presence(
-        self, service: ApplicationService, users: Collection[UserID]
+        self, service: ApplicationService, users: Collection[Union[str, UserID]]
     ):
         events = []  # type: List[JsonDict]
         presence_source = self.event_sources.sources["presence"]
@@ -245,6 +278,9 @@ class ApplicationServicesHandler:
             service, "presence"
         )
         for user in users:
+            if isinstance(user, str):
+                user = UserID.from_string(user)
+
             interested = await service.is_interested_in_presence(user, self.store)
             if not interested:
                 continue