summary refs log tree commit diff
path: root/synapse/handlers/appservice.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/appservice.py')
-rw-r--r--synapse/handlers/appservice.py71
1 files changed, 55 insertions, 16 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 05af54d31b..b596f098fd 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -15,14 +15,21 @@
 
 from twisted.internet import defer
 
+import synapse
 from synapse.api.constants import EventTypes
 from synapse.util.metrics import Measure
-from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
+from synapse.util.logcontext import (
+    make_deferred_yieldable, run_in_background,
+)
 
 import logging
 
 logger = logging.getLogger(__name__)
 
+metrics = synapse.metrics.get_metrics_for(__name__)
+
+events_processed_counter = metrics.register_counter("events_processed")
+
 
 def log_failure(failure):
     logger.error(
@@ -70,21 +77,25 @@ class ApplicationServicesHandler(object):
         with Measure(self.clock, "notify_interested_services"):
             self.is_processing = True
             try:
-                upper_bound = self.current_max
                 limit = 100
                 while True:
                     upper_bound, events = yield self.store.get_new_events_for_appservice(
-                        upper_bound, limit
+                        self.current_max, limit
                     )
 
                     if not events:
                         break
 
+                    events_by_room = {}
                     for event in events:
+                        events_by_room.setdefault(event.room_id, []).append(event)
+
+                    @defer.inlineCallbacks
+                    def handle_event(event):
                         # Gather interested services
                         services = yield self._get_services_for_event(event)
                         if len(services) == 0:
-                            continue  # no services need notifying
+                            return  # no services need notifying
 
                         # Do we know this user exists? If not, poke the user
                         # query API for all services which match that user regex.
@@ -100,14 +111,35 @@ class ApplicationServicesHandler(object):
 
                         # Fork off pushes to these services
                         for service in services:
-                            preserve_fn(self.scheduler.submit_event_for_as)(
-                                service, event
-                            )
+                            self.scheduler.submit_event_for_as(service, event)
+
+                    @defer.inlineCallbacks
+                    def handle_room_events(events):
+                        for event in events:
+                            yield handle_event(event)
+
+                    yield make_deferred_yieldable(defer.gatherResults([
+                        run_in_background(handle_room_events, evs)
+                        for evs in events_by_room.itervalues()
+                    ], consumeErrors=True))
 
                     yield self.store.set_appservice_last_pos(upper_bound)
 
-                    if len(events) < limit:
-                        break
+                    now = self.clock.time_msec()
+                    ts = yield self.store.get_received_ts(events[-1].event_id)
+
+                    synapse.metrics.event_processing_positions.set(
+                        upper_bound, "appservice_sender",
+                    )
+
+                    events_processed_counter.inc_by(len(events))
+
+                    synapse.metrics.event_processing_lag.set(
+                        now - ts, "appservice_sender",
+                    )
+                    synapse.metrics.event_processing_last_ts.set(
+                        ts, "appservice_sender",
+                    )
             finally:
                 self.is_processing = False
 
@@ -163,8 +195,11 @@ class ApplicationServicesHandler(object):
     def query_3pe(self, kind, protocol, fields):
         services = yield self._get_services_for_3pn(protocol)
 
-        results = yield preserve_context_over_deferred(defer.DeferredList([
-            preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
+        results = yield make_deferred_yieldable(defer.DeferredList([
+            run_in_background(
+                self.appservice_api.query_3pe,
+                service, kind, protocol, fields,
+            )
             for service in services
         ], consumeErrors=True))
 
@@ -225,11 +260,15 @@ class ApplicationServicesHandler(object):
             event based on the service regex.
         """
         services = self.store.get_app_services()
-        interested_list = [
-            s for s in services if (
-                yield s.is_interested(event, self.store)
-            )
-        ]
+
+        # we can't use a list comprehension here. Since python 3, list
+        # comprehensions use a generator internally. This means you can't yield
+        # inside of a list comprehension anymore.
+        interested_list = []
+        for s in services:
+            if (yield s.is_interested(event, self.store)):
+                interested_list.append(s)
+
         defer.returnValue(interested_list)
 
     def _get_services_for_user(self, user_id):