summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-04-12 11:36:07 +0100
committerErik Johnston <erik@matrix.org>2018-04-12 11:36:07 +0100
commit19ceb4851f8cea3494b64dfdf0db18d92f75f538 (patch)
tree1441deb2b7b439688993e540c4d2e86ef352ca95
parentDoc we raise on unknown event (diff)
parentMerge pull request #3059 from matrix-org/rav/doc_response_cache (diff)
downloadsynapse-19ceb4851f8cea3494b64dfdf0db18d92f75f538.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/processed_event_lag
-rw-r--r--synapse/handlers/appservice.py21
-rw-r--r--synapse/util/caches/response_cache.py32
2 files changed, 51 insertions, 2 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index ce0814bc25..0245197c02 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -18,7 +18,9 @@ from twisted.internet import defer
 import synapse
 from synapse.api.constants import EventTypes
 from synapse.util.metrics import Measure
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import (
+    make_deferred_yieldable, preserve_fn, run_in_background,
+)
 
 import logging
 
@@ -84,11 +86,16 @@ class ApplicationServicesHandler(object):
                     if not events:
                         break
 
+                    events_by_room = {}
                     for event in events:
+                        events_by_room.setdefault(event.room_id, []).append(event)
+
+                    @defer.inlineCallbacks
+                    def handle_event(event):
                         # Gather interested services
                         services = yield self._get_services_for_event(event)
                         if len(services) == 0:
-                            continue  # no services need notifying
+                            return  # no services need notifying
 
                         # Do we know this user exists? If not, poke the user
                         # query API for all services which match that user regex.
@@ -108,6 +115,16 @@ class ApplicationServicesHandler(object):
                                 service, event
                             )
 
+                    @defer.inlineCallbacks
+                    def handle_room_events(events):
+                        for event in events:
+                            yield handle_event(event)
+
+                    yield make_deferred_yieldable(defer.gatherResults([
+                        run_in_background(handle_room_events, evs)
+                        for evs in events_by_room.itervalues()
+                    ], consumeErrors=True))
+
                     yield self.store.set_appservice_last_pos(upper_bound)
 
                     now = self.clock.time_msec()
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 00af539880..4ecd91deb5 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -31,6 +31,18 @@ class ResponseCache(object):
         self.timeout_sec = timeout_ms / 1000.
 
     def get(self, key):
+        """Look up the given key.
+
+        Returns a deferred which doesn't follow the synapse logcontext rules,
+        so you'll probably want to make_deferred_yieldable it.
+
+        Args:
+            key (str):
+
+        Returns:
+            twisted.internet.defer.Deferred|None: None if there is no entry
+            for this key; otherwise a deferred result.
+        """
         result = self.pending_result_cache.get(key)
         if result is not None:
             return result.observe()
@@ -38,6 +50,26 @@ class ResponseCache(object):
             return None
 
     def set(self, key, deferred):
+        """Set the entry for the given key to the given deferred.
+
+        *deferred* should run its callbacks in the sentinel logcontext (ie,
+        you should wrap normal synapse deferreds with
+        logcontext.run_in_background).
+
+        Returns a new Deferred which also doesn't follow the synapse logcontext
+        rules, so you will want to make_deferred_yieldable it
+
+        (TODO: before using this more widely, it might make sense to refactor
+        it and get() so that they do the necessary wrapping rather than having
+        to do it everywhere ResponseCache is used.)
+
+        Args:
+            key (str):
+            deferred (twisted.internet.defer.Deferred):
+
+        Returns:
+            twisted.internet.defer.Deferred
+        """
         result = ObservableDeferred(deferred, consumeErrors=True)
         self.pending_result_cache[key] = result