diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 07240d3a14..7826387e53 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -14,7 +14,7 @@
# limitations under the License.
import logging
-from typing import Dict, List, Optional
+from typing import Dict, List, Optional, Union
from prometheus_client import Counter
@@ -30,7 +30,10 @@ from synapse.metrics import (
event_processing_loop_counter,
event_processing_loop_room_count,
)
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import (
+ run_as_background_process,
+ wrap_as_background_process,
+)
from synapse.types import Collection, JsonDict, RoomStreamToken, UserID
from synapse.util.metrics import Measure
@@ -53,7 +56,7 @@ class ApplicationServicesHandler:
self.current_max = 0
self.is_processing = False
- async def notify_interested_services(self, max_token: RoomStreamToken):
+ def notify_interested_services(self, max_token: RoomStreamToken):
"""Notifies (pushes) all application services interested in this event.
Pushing is done asynchronously, so this method won't block for any
@@ -72,6 +75,12 @@ class ApplicationServicesHandler:
if self.is_processing:
return
+ # We only start a new background process if necessary rather than
+ # optimistically (to cut down on overhead).
+ self._notify_interested_services(max_token)
+
+ @wrap_as_background_process("notify_interested_services")
+ async def _notify_interested_services(self, max_token: RoomStreamToken):
with Measure(self.clock, "notify_interested_services"):
self.is_processing = True
try:
@@ -166,8 +175,11 @@ class ApplicationServicesHandler:
finally:
self.is_processing = False
- async def notify_interested_services_ephemeral(
- self, stream_key: str, new_token: Optional[int], users: Collection[UserID] = [],
+ def notify_interested_services_ephemeral(
+ self,
+ stream_key: str,
+ new_token: Optional[int],
+ users: Collection[Union[str, UserID]] = [],
):
"""This is called by the notifier in the background
when a ephemeral event handled by the homeserver.
@@ -183,13 +195,34 @@ class ApplicationServicesHandler:
new_token: The latest stream token
users: The user(s) involved with the event.
"""
+ if not self.notify_appservices:
+ return
+
+ if stream_key not in ("typing_key", "receipt_key", "presence_key"):
+ return
+
services = [
service
for service in self.store.get_app_services()
if service.supports_ephemeral
]
- if not services or not self.notify_appservices:
+ if not services:
return
+
+ # We only start a new background process if necessary rather than
+ # optimistically (to cut down on overhead).
+ self._notify_interested_services_ephemeral(
+ services, stream_key, new_token, users
+ )
+
+ @wrap_as_background_process("notify_interested_services_ephemeral")
+ async def _notify_interested_services_ephemeral(
+ self,
+ services: List[ApplicationService],
+ stream_key: str,
+ new_token: Optional[int],
+ users: Collection[Union[str, UserID]],
+ ):
logger.info("Checking interested services for %s" % (stream_key))
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
@@ -237,7 +270,7 @@ class ApplicationServicesHandler:
return receipts
async def _handle_presence(
- self, service: ApplicationService, users: Collection[UserID]
+ self, service: ApplicationService, users: Collection[Union[str, UserID]]
):
events = [] # type: List[JsonDict]
presence_source = self.event_sources.sources["presence"]
@@ -245,6 +278,9 @@ class ApplicationServicesHandler:
service, "presence"
)
for user in users:
+ if isinstance(user, str):
+ user = UserID.from_string(user)
+
interested = await service.is_interested_in_presence(user, self.store)
if not interested:
continue
|