summary refs log tree commit diff
path: root/synapse/handlers/appservice.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2018-04-12 10:42:36 +0100
committerGitHub <noreply@github.com>2018-04-12 10:42:36 +0100
commit971059a733776e92f6b5abc0829c5b5bc634c1ab (patch)
treec5d5331f3c846ca837b29a82e58d62e166ad7b63 /synapse/handlers/appservice.py
parentMerge pull request #2760 from Valodim/pypy (diff)
parentSend events to ASes concurrently (diff)
downloadsynapse-971059a733776e92f6b5abc0829c5b5bc634c1ab.tar.xz
Merge pull request #3088 from matrix-org/erikj/as_parallel
Send events to ASes concurrently
Diffstat (limited to 'synapse/handlers/appservice.py')
-rw-r--r--synapse/handlers/appservice.py21
1 files changed, 19 insertions, 2 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 3dd3fa2a27..69eacff949 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -18,7 +18,9 @@ from twisted.internet import defer
 import synapse
 from synapse.api.constants import EventTypes
 from synapse.util.metrics import Measure
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import (
+    make_deferred_yieldable, preserve_fn, run_in_background,
+)
 
 import logging
 
@@ -84,11 +86,16 @@ class ApplicationServicesHandler(object):
                     if not events:
                         break
 
+                    events_by_room = {}
                     for event in events:
+                        events_by_room.setdefault(event.room_id, []).append(event)
+
+                    @defer.inlineCallbacks
+                    def handle_event(event):
                         # Gather interested services
                         services = yield self._get_services_for_event(event)
                         if len(services) == 0:
-                            continue  # no services need notifying
+                            return  # no services need notifying
 
                         # Do we know this user exists? If not, poke the user
                         # query API for all services which match that user regex.
@@ -108,6 +115,16 @@ class ApplicationServicesHandler(object):
                                 service, event
                             )
 
+                    @defer.inlineCallbacks
+                    def handle_room_events(events):
+                        for event in events:
+                            yield handle_event(event)
+
+                    yield make_deferred_yieldable(defer.gatherResults([
+                        run_in_background(handle_room_events, evs)
+                        for evs in events_by_room.itervalues()
+                    ], consumeErrors=True))
+
                     events_processed_counter.inc_by(len(events))
 
                     yield self.store.set_appservice_last_pos(upper_bound)