diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index ce0814bc25..0245197c02 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -18,7 +18,9 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import EventTypes
from synapse.util.metrics import Measure
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import (
+ make_deferred_yieldable, preserve_fn, run_in_background,
+)
import logging
@@ -84,11 +86,16 @@ class ApplicationServicesHandler(object):
if not events:
break
+ events_by_room = {}
for event in events:
+ events_by_room.setdefault(event.room_id, []).append(event)
+
+ @defer.inlineCallbacks
+ def handle_event(event):
# Gather interested services
services = yield self._get_services_for_event(event)
if len(services) == 0:
- continue # no services need notifying
+ return # no services need notifying
# Do we know this user exists? If not, poke the user
# query API for all services which match that user regex.
@@ -108,6 +115,16 @@ class ApplicationServicesHandler(object):
service, event
)
+ @defer.inlineCallbacks
+ def handle_room_events(events):
+ for event in events:
+ yield handle_event(event)
+
+ yield make_deferred_yieldable(defer.gatherResults([
+ run_in_background(handle_room_events, evs)
+ for evs in events_by_room.itervalues()
+ ], consumeErrors=True))
+
yield self.store.set_appservice_last_pos(upper_bound)
now = self.clock.time_msec()
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 00af539880..4ecd91deb5 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -31,6 +31,18 @@ class ResponseCache(object):
self.timeout_sec = timeout_ms / 1000.
def get(self, key):
+ """Look up the given key.
+
+ Returns a deferred which doesn't follow the synapse logcontext rules,
+ so you'll probably want to make_deferred_yieldable it.
+
+ Args:
+ key (str):
+
+ Returns:
+ twisted.internet.defer.Deferred|None: None if there is no entry
+ for this key; otherwise a deferred result.
+ """
result = self.pending_result_cache.get(key)
if result is not None:
return result.observe()
@@ -38,6 +50,26 @@ class ResponseCache(object):
return None
def set(self, key, deferred):
+ """Set the entry for the given key to the given deferred.
+
+ *deferred* should run its callbacks in the sentinel logcontext (ie,
+ you should wrap normal synapse deferreds with
+ logcontext.run_in_background).
+
+ Returns a new Deferred which also doesn't follow the synapse logcontext
+ rules, so you will want to make_deferred_yieldable it
+
+ (TODO: before using this more widely, it might make sense to refactor
+ it and get() so that they do the necessary wrapping rather than having
+ to do it everywhere ResponseCache is used.)
+
+ Args:
+ key (str):
+ deferred (twisted.internet.defer.Deferred):
+
+ Returns:
+ twisted.internet.defer.Deferred
+ """
result = ObservableDeferred(deferred, consumeErrors=True)
self.pending_result_cache[key] = result
|