summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/appservice.py42
-rw-r--r--synapse/handlers/e2e_keys.py10
-rw-r--r--synapse/handlers/federation.py141
-rw-r--r--synapse/handlers/initial_sync.py12
-rw-r--r--synapse/handlers/message.py230
-rw-r--r--synapse/handlers/presence.py19
-rw-r--r--synapse/handlers/receipts.py61
-rw-r--r--synapse/handlers/room_list.py43
-rw-r--r--synapse/handlers/room_member.py13
-rw-r--r--synapse/handlers/sync.py25
-rw-r--r--synapse/handlers/typing.py50
11 files changed, 398 insertions, 248 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 3dd3fa2a27..6cc2388306 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -18,7 +18,9 @@ from twisted.internet import defer
 import synapse
 from synapse.api.constants import EventTypes
 from synapse.util.metrics import Measure
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import (
+    make_deferred_yieldable, preserve_fn, run_in_background,
+)
 
 import logging
 
@@ -84,11 +86,16 @@ class ApplicationServicesHandler(object):
                     if not events:
                         break
 
+                    events_by_room = {}
                     for event in events:
+                        events_by_room.setdefault(event.room_id, []).append(event)
+
+                    @defer.inlineCallbacks
+                    def handle_event(event):
                         # Gather interested services
                         services = yield self._get_services_for_event(event)
                         if len(services) == 0:
-                            continue  # no services need notifying
+                            return  # no services need notifying
 
                         # Do we know this user exists? If not, poke the user
                         # query API for all services which match that user regex.
@@ -108,9 +115,33 @@ class ApplicationServicesHandler(object):
                                 service, event
                             )
 
-                    events_processed_counter.inc_by(len(events))
+                    @defer.inlineCallbacks
+                    def handle_room_events(events):
+                        for event in events:
+                            yield handle_event(event)
+
+                    yield make_deferred_yieldable(defer.gatherResults([
+                        run_in_background(handle_room_events, evs)
+                        for evs in events_by_room.itervalues()
+                    ], consumeErrors=True))
 
                     yield self.store.set_appservice_last_pos(upper_bound)
+
+                    now = self.clock.time_msec()
+                    ts = yield self.store.get_received_ts(events[-1].event_id)
+
+                    synapse.metrics.event_processing_positions.set(
+                        upper_bound, "appservice_sender",
+                    )
+
+                    events_processed_counter.inc_by(len(events))
+
+                    synapse.metrics.event_processing_lag.set(
+                        now - ts, "appservice_sender",
+                    )
+                    synapse.metrics.event_processing_last_ts.set(
+                        ts, "appservice_sender",
+                    )
             finally:
                 self.is_processing = False
 
@@ -167,7 +198,10 @@ class ApplicationServicesHandler(object):
         services = yield self._get_services_for_3pn(protocol)
 
         results = yield make_deferred_yieldable(defer.DeferredList([
-            preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
+            run_in_background(
+                self.appservice_api.query_3pe,
+                service, kind, protocol, fields,
+            )
             for service in services
         ], consumeErrors=True))
 
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 325c0c4a9f..25aec624af 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -24,7 +24,7 @@ from synapse.api.errors import (
     SynapseError, CodeMessageException, FederationDeniedError,
 )
 from synapse.types import get_domain_from_id, UserID
-from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.util.retryutils import NotRetryingDestination
 
 logger = logging.getLogger(__name__)
@@ -139,9 +139,9 @@ class E2eKeysHandler(object):
                 failures[destination] = _exception_to_failure(e)
 
         yield make_deferred_yieldable(defer.gatherResults([
-            preserve_fn(do_remote_query)(destination)
+            run_in_background(do_remote_query, destination)
             for destination in remote_queries_not_in_cache
-        ]))
+        ], consumeErrors=True))
 
         defer.returnValue({
             "device_keys": results, "failures": failures,
@@ -242,9 +242,9 @@ class E2eKeysHandler(object):
                 failures[destination] = _exception_to_failure(e)
 
         yield make_deferred_yieldable(defer.gatherResults([
-            preserve_fn(claim_client_keys)(destination)
+            run_in_background(claim_client_keys, destination)
             for destination in remote_queries
-        ]))
+        ], consumeErrors=True))
 
         logger.info(
             "Claimed one-time-keys: %s",
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 080aca3d71..bab0db2904 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -15,8 +15,16 @@
 # limitations under the License.
 
 """Contains handlers for federation events."""
+
+import httplib
+import itertools
+import logging
+import sys
+
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
+import six
+from twisted.internet import defer
 from unpaddedbase64 import decode_base64
 
 from ._base import BaseHandler
@@ -43,10 +51,6 @@ from synapse.util.retryutils import NotRetryingDestination
 
 from synapse.util.distributor import user_joined_room
 
-from twisted.internet import defer
-
-import itertools
-import logging
 
 logger = logging.getLogger(__name__)
 
@@ -115,6 +119,19 @@ class FederationHandler(BaseHandler):
             logger.debug("Already seen pdu %s", pdu.event_id)
             return
 
+        # do some initial sanity-checking of the event. In particular, make
+        # sure it doesn't have hundreds of prev_events or auth_events, which
+        # could cause a huge state resolution or cascade of event fetches.
+        try:
+            self._sanity_check_event(pdu)
+        except SynapseError as err:
+            raise FederationError(
+                "ERROR",
+                err.code,
+                err.msg,
+                affected=pdu.event_id,
+            )
+
         # If we are currently in the process of joining this room, then we
         # queue up events for later processing.
         if pdu.room_id in self.room_queues:
@@ -149,10 +166,6 @@ class FederationHandler(BaseHandler):
 
         auth_chain = []
 
-        have_seen = yield self.store.have_events(
-            [ev for ev, _ in pdu.prev_events]
-        )
-
         fetch_state = False
 
         # Get missing pdus if necessary.
@@ -168,7 +181,7 @@ class FederationHandler(BaseHandler):
             )
 
             prevs = {e_id for e_id, _ in pdu.prev_events}
-            seen = set(have_seen.keys())
+            seen = yield self.store.have_seen_events(prevs)
 
             if min_depth and pdu.depth < min_depth:
                 # This is so that we don't notify the user about this
@@ -196,8 +209,7 @@ class FederationHandler(BaseHandler):
 
                         # Update the set of things we've seen after trying to
                         # fetch the missing stuff
-                        have_seen = yield self.store.have_events(prevs)
-                        seen = set(have_seen.iterkeys())
+                        seen = yield self.store.have_seen_events(prevs)
 
                         if not prevs - seen:
                             logger.info(
@@ -248,8 +260,7 @@ class FederationHandler(BaseHandler):
             min_depth (int): Minimum depth of events to return.
         """
         # We recalculate seen, since it may have changed.
-        have_seen = yield self.store.have_events(prevs)
-        seen = set(have_seen.keys())
+        seen = yield self.store.have_seen_events(prevs)
 
         if not prevs - seen:
             return
@@ -361,9 +372,7 @@ class FederationHandler(BaseHandler):
             if auth_chain:
                 event_ids |= {e.event_id for e in auth_chain}
 
-            seen_ids = set(
-                (yield self.store.have_events(event_ids)).keys()
-            )
+            seen_ids = yield self.store.have_seen_events(event_ids)
 
             if state and auth_chain is not None:
                 # If we have any state or auth_chain given to us by the replication
@@ -527,9 +536,16 @@ class FederationHandler(BaseHandler):
     def backfill(self, dest, room_id, limit, extremities):
         """ Trigger a backfill request to `dest` for the given `room_id`
 
-        This will attempt to get more events from the remote. This may return
-        be successfull and still return no events if the other side has no new
-        events to offer.
+        This will attempt to get more events from the remote. If the other side
+        has no new events to offer, this will return an empty list.
+
+        As the events are received, we check their signatures, and also do some
+        sanity-checking on them. If any of the backfilled events are invalid,
+        this method throws a SynapseError.
+
+        TODO: make this more useful to distinguish failures of the remote
+        server from invalid events (there is probably no point in trying to
+        re-fetch invalid events from every other HS in the room.)
         """
         if dest == self.server_name:
             raise SynapseError(400, "Can't backfill from self.")
@@ -541,6 +557,16 @@ class FederationHandler(BaseHandler):
             extremities=extremities,
         )
 
+        # ideally we'd sanity check the events here for excess prev_events etc,
+        # but it's hard to reject events at this point without completely
+        # breaking backfill in the same way that it is currently broken by
+        # events whose signature we cannot verify (#3121).
+        #
+        # So for now we accept the events anyway. #3124 tracks this.
+        #
+        # for ev in events:
+        #     self._sanity_check_event(ev)
+
         # Don't bother processing events we already have.
         seen_events = yield self.store.have_events_in_timeline(
             set(e.event_id for e in events)
@@ -613,7 +639,8 @@ class FederationHandler(BaseHandler):
 
                 results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
                     [
-                        logcontext.preserve_fn(self.replication_layer.get_pdu)(
+                        logcontext.run_in_background(
+                            self.replication_layer.get_pdu,
                             [dest],
                             event_id,
                             outlier=True,
@@ -633,7 +660,7 @@ class FederationHandler(BaseHandler):
 
                 failed_to_fetch = missing_auth - set(auth_events)
 
-        seen_events = yield self.store.have_events(
+        seen_events = yield self.store.have_seen_events(
             set(auth_events.keys()) | set(state_events.keys())
         )
 
@@ -843,6 +870,38 @@ class FederationHandler(BaseHandler):
 
         defer.returnValue(False)
 
+    def _sanity_check_event(self, ev):
+        """
+        Do some early sanity checks of a received event
+
+        In particular, checks it doesn't have an excessive number of
+        prev_events or auth_events, which could cause a huge state resolution
+        or cascade of event fetches.
+
+        Args:
+            ev (synapse.events.EventBase): event to be checked
+
+        Returns: None
+
+        Raises:
+            SynapseError if the event does not pass muster
+        """
+        if len(ev.prev_events) > 20:
+            logger.warn("Rejecting event %s which has %i prev_events",
+                        ev.event_id, len(ev.prev_events))
+            raise SynapseError(
+                httplib.BAD_REQUEST,
+                "Too many prev_events",
+            )
+
+        if len(ev.auth_events) > 10:
+            logger.warn("Rejecting event %s which has %i auth_events",
+                        ev.event_id, len(ev.auth_events))
+            raise SynapseError(
+                httplib.BAD_REQUEST,
+                "Too many auth_events",
+            )
+
     @defer.inlineCallbacks
     def send_invite(self, target_host, event):
         """ Sends the invite to the remote server for signing.
@@ -967,7 +1026,7 @@ class FederationHandler(BaseHandler):
             # lots of requests for missing prev_events which we do actually
             # have. Hence we fire off the deferred, but don't wait for it.
 
-            logcontext.preserve_fn(self._handle_queued_pdus)(room_queue)
+            logcontext.run_in_background(self._handle_queued_pdus, room_queue)
 
         defer.returnValue(True)
 
@@ -1457,18 +1516,21 @@ class FederationHandler(BaseHandler):
                 backfilled=backfilled,
             )
         except:  # noqa: E722, as we reraise the exception this is fine.
-            # Ensure that we actually remove the entries in the push actions
-            # staging area
-            logcontext.preserve_fn(
-                self.store.remove_push_actions_from_staging
-            )(event.event_id)
-            raise
+            tp, value, tb = sys.exc_info()
+
+            logcontext.run_in_background(
+                self.store.remove_push_actions_from_staging,
+                event.event_id,
+            )
+
+            six.reraise(tp, value, tb)
 
         if not backfilled:
             # this intentionally does not yield: we don't care about the result
             # and don't need to wait for it.
-            logcontext.preserve_fn(self.pusher_pool.on_new_notifications)(
-                event_stream_id, max_stream_id
+            logcontext.run_in_background(
+                self.pusher_pool.on_new_notifications,
+                event_stream_id, max_stream_id,
             )
 
         defer.returnValue((context, event_stream_id, max_stream_id))
@@ -1482,7 +1544,8 @@ class FederationHandler(BaseHandler):
         """
         contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                logcontext.preserve_fn(self._prep_event)(
+                logcontext.run_in_background(
+                    self._prep_event,
                     origin,
                     ev_info["event"],
                     state=ev_info.get("state"),
@@ -1736,7 +1799,8 @@ class FederationHandler(BaseHandler):
             event_key = None
 
         if event_auth_events - current_state:
-            have_events = yield self.store.have_events(
+            # TODO: can we use store.have_seen_events here instead?
+            have_events = yield self.store.get_seen_events_with_rejections(
                 event_auth_events - current_state
             )
         else:
@@ -1759,12 +1823,12 @@ class FederationHandler(BaseHandler):
                     origin, event.room_id, event.event_id
                 )
 
-                seen_remotes = yield self.store.have_events(
+                seen_remotes = yield self.store.have_seen_events(
                     [e.event_id for e in remote_auth_chain]
                 )
 
                 for e in remote_auth_chain:
-                    if e.event_id in seen_remotes.keys():
+                    if e.event_id in seen_remotes:
                         continue
 
                     if e.event_id == event.event_id:
@@ -1791,7 +1855,7 @@ class FederationHandler(BaseHandler):
                     except AuthError:
                         pass
 
-                have_events = yield self.store.have_events(
+                have_events = yield self.store.get_seen_events_with_rejections(
                     [e_id for e_id, _ in event.auth_events]
                 )
                 seen_events = set(have_events.keys())
@@ -1810,7 +1874,8 @@ class FederationHandler(BaseHandler):
 
             different_events = yield logcontext.make_deferred_yieldable(
                 defer.gatherResults([
-                    logcontext.preserve_fn(self.store.get_event)(
+                    logcontext.run_in_background(
+                        self.store.get_event,
                         d,
                         allow_none=True,
                         allow_rejected=False,
@@ -1876,13 +1941,13 @@ class FederationHandler(BaseHandler):
                         local_auth_chain,
                     )
 
-                    seen_remotes = yield self.store.have_events(
+                    seen_remotes = yield self.store.have_seen_events(
                         [e.event_id for e in result["auth_chain"]]
                     )
 
                     # 3. Process any remote auth chain events we haven't seen.
                     for ev in result["auth_chain"]:
-                        if ev.event_id in seen_remotes.keys():
+                        if ev.event_id in seen_remotes:
                             continue
 
                         if ev.event_id == event.event_id:
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index c5267b4b84..cd33a86599 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -27,7 +27,7 @@ from synapse.types import (
 from synapse.util import unwrapFirstError
 from synapse.util.async import concurrently_execute
 from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
@@ -166,7 +166,8 @@ class InitialSyncHandler(BaseHandler):
                 (messages, token), current_state = yield make_deferred_yieldable(
                     defer.gatherResults(
                         [
-                            preserve_fn(self.store.get_recent_events_for_room)(
+                            run_in_background(
+                                self.store.get_recent_events_for_room,
                                 event.room_id,
                                 limit=limit,
                                 end_token=room_end_token,
@@ -391,9 +392,10 @@ class InitialSyncHandler(BaseHandler):
 
         presence, receipts, (messages, token) = yield defer.gatherResults(
             [
-                preserve_fn(get_presence)(),
-                preserve_fn(get_receipts)(),
-                preserve_fn(self.store.get_recent_events_for_room)(
+                run_in_background(get_presence),
+                run_in_background(get_receipts),
+                run_in_background(
+                    self.store.get_recent_events_for_room,
                     room_id,
                     limit=limit,
                     end_token=now_token.room_key,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 6de6e13b7b..23502eda70 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -13,6 +13,12 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import simplejson
+import sys
+
+from canonicaljson import encode_canonical_json
+import six
 from twisted.internet import defer, reactor
 from twisted.python.failure import Failure
 
@@ -25,7 +31,7 @@ from synapse.types import (
     UserID, RoomAlias, RoomStreamToken,
 )
 from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
-from synapse.util.logcontext import preserve_fn, run_in_background
+from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import measure_func
 from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.stringutils import random_string
@@ -34,12 +40,6 @@ from synapse.replication.http.send_event import send_event_to_master
 
 from ._base import BaseHandler
 
-from canonicaljson import encode_canonical_json
-
-import logging
-import random
-import simplejson
-
 logger = logging.getLogger(__name__)
 
 
@@ -433,7 +433,7 @@ class EventCreationHandler(object):
 
     @defer.inlineCallbacks
     def create_event(self, requester, event_dict, token_id=None, txn_id=None,
-                     prev_event_ids=None):
+                     prev_events_and_hashes=None):
         """
         Given a dict from a client, create a new event.
 
@@ -447,47 +447,52 @@ class EventCreationHandler(object):
             event_dict (dict): An entire event
             token_id (str)
             txn_id (str)
-            prev_event_ids (list): The prev event ids to use when creating the event
+
+            prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
+                the forward extremities to use as the prev_events for the
+                new event. For each event, a tuple of (event_id, hashes, depth)
+                where *hashes* is a map from algorithm to hash.
+
+                If None, they will be requested from the database.
 
         Returns:
             Tuple of created event (FrozenEvent), Context
         """
         builder = self.event_builder_factory.new(event_dict)
 
-        with (yield self.limiter.queue(builder.room_id)):
-            self.validator.validate_new(builder)
-
-            if builder.type == EventTypes.Member:
-                membership = builder.content.get("membership", None)
-                target = UserID.from_string(builder.state_key)
-
-                if membership in {Membership.JOIN, Membership.INVITE}:
-                    # If event doesn't include a display name, add one.
-                    profile = self.profile_handler
-                    content = builder.content
-
-                    try:
-                        if "displayname" not in content:
-                            content["displayname"] = yield profile.get_displayname(target)
-                        if "avatar_url" not in content:
-                            content["avatar_url"] = yield profile.get_avatar_url(target)
-                    except Exception as e:
-                        logger.info(
-                            "Failed to get profile information for %r: %s",
-                            target, e
-                        )
+        self.validator.validate_new(builder)
+
+        if builder.type == EventTypes.Member:
+            membership = builder.content.get("membership", None)
+            target = UserID.from_string(builder.state_key)
+
+            if membership in {Membership.JOIN, Membership.INVITE}:
+                # If event doesn't include a display name, add one.
+                profile = self.profile_handler
+                content = builder.content
+
+                try:
+                    if "displayname" not in content:
+                        content["displayname"] = yield profile.get_displayname(target)
+                    if "avatar_url" not in content:
+                        content["avatar_url"] = yield profile.get_avatar_url(target)
+                except Exception as e:
+                    logger.info(
+                        "Failed to get profile information for %r: %s",
+                        target, e
+                    )
 
-            if token_id is not None:
-                builder.internal_metadata.token_id = token_id
+        if token_id is not None:
+            builder.internal_metadata.token_id = token_id
 
-            if txn_id is not None:
-                builder.internal_metadata.txn_id = txn_id
+        if txn_id is not None:
+            builder.internal_metadata.txn_id = txn_id
 
-            event, context = yield self.create_new_client_event(
-                builder=builder,
-                requester=requester,
-                prev_event_ids=prev_event_ids,
-            )
+        event, context = yield self.create_new_client_event(
+            builder=builder,
+            requester=requester,
+            prev_events_and_hashes=prev_events_and_hashes,
+        )
 
         defer.returnValue((event, context))
 
@@ -557,64 +562,76 @@ class EventCreationHandler(object):
 
         See self.create_event and self.send_nonmember_event.
         """
-        event, context = yield self.create_event(
-            requester,
-            event_dict,
-            token_id=requester.access_token_id,
-            txn_id=txn_id
-        )
 
-        spam_error = self.spam_checker.check_event_for_spam(event)
-        if spam_error:
-            if not isinstance(spam_error, basestring):
-                spam_error = "Spam is not permitted here"
-            raise SynapseError(
-                403, spam_error, Codes.FORBIDDEN
+        # We limit the number of concurrent event sends in a room so that we
+        # don't fork the DAG too much. If we don't limit then we can end up in
+        # a situation where event persistence can't keep up, causing
+        # extremities to pile up, which in turn leads to state resolution
+        # taking longer.
+        with (yield self.limiter.queue(event_dict["room_id"])):
+            event, context = yield self.create_event(
+                requester,
+                event_dict,
+                token_id=requester.access_token_id,
+                txn_id=txn_id
             )
 
-        yield self.send_nonmember_event(
-            requester,
-            event,
-            context,
-            ratelimit=ratelimit,
-        )
+            spam_error = self.spam_checker.check_event_for_spam(event)
+            if spam_error:
+                if not isinstance(spam_error, basestring):
+                    spam_error = "Spam is not permitted here"
+                raise SynapseError(
+                    403, spam_error, Codes.FORBIDDEN
+                )
+
+            yield self.send_nonmember_event(
+                requester,
+                event,
+                context,
+                ratelimit=ratelimit,
+            )
         defer.returnValue(event)
 
     @measure_func("create_new_client_event")
     @defer.inlineCallbacks
-    def create_new_client_event(self, builder, requester=None, prev_event_ids=None):
-        if prev_event_ids:
-            prev_events = yield self.store.add_event_hashes(prev_event_ids)
-            prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
-            depth = prev_max_depth + 1
-        else:
-            latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
-                builder.room_id,
+    def create_new_client_event(self, builder, requester=None,
+                                prev_events_and_hashes=None):
+        """Create a new event for a local client
+
+        Args:
+            builder (EventBuilder):
+
+            requester (synapse.types.Requester|None):
+
+            prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
+                the forward extremities to use as the prev_events for the
+                new event. For each event, a tuple of (event_id, hashes, depth)
+                where *hashes* is a map from algorithm to hash.
+
+                If None, they will be requested from the database.
+
+        Returns:
+            Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
+        """
+
+        if prev_events_and_hashes is not None:
+            assert len(prev_events_and_hashes) <= 10, \
+                "Attempting to create an event with %i prev_events" % (
+                    len(prev_events_and_hashes),
             )
+        else:
+            prev_events_and_hashes = \
+                yield self.store.get_prev_events_for_room(builder.room_id)
 
-            # We want to limit the max number of prev events we point to in our
-            # new event
-            if len(latest_ret) > 10:
-                # Sort by reverse depth, so we point to the most recent.
-                latest_ret.sort(key=lambda a: -a[2])
-                new_latest_ret = latest_ret[:5]
-
-                # We also randomly point to some of the older events, to make
-                # sure that we don't completely ignore the older events.
-                if latest_ret[5:]:
-                    sample_size = min(5, len(latest_ret[5:]))
-                    new_latest_ret.extend(random.sample(latest_ret[5:], sample_size))
-                latest_ret = new_latest_ret
-
-            if latest_ret:
-                depth = max([d for _, _, d in latest_ret]) + 1
-            else:
-                depth = 1
+        if prev_events_and_hashes:
+            depth = max([d for _, _, d in prev_events_and_hashes]) + 1
+        else:
+            depth = 1
 
-            prev_events = [
-                (event_id, prev_hashes)
-                for event_id, prev_hashes, _ in latest_ret
-            ]
+        prev_events = [
+            (event_id, prev_hashes)
+            for event_id, prev_hashes, _ in prev_events_and_hashes
+        ]
 
         builder.prev_events = prev_events
         builder.depth = depth
@@ -713,8 +730,14 @@ class EventCreationHandler(object):
         except:  # noqa: E722, as we reraise the exception this is fine.
             # Ensure that we actually remove the entries in the push actions
             # staging area, if we calculated them.
-            preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id)
-            raise
+            tp, value, tb = sys.exc_info()
+
+            run_in_background(
+                self.store.remove_push_actions_from_staging,
+                event.event_id,
+            )
+
+            six.reraise(tp, value, tb)
 
     @defer.inlineCallbacks
     def persist_and_notify_client_event(
@@ -834,22 +857,33 @@ class EventCreationHandler(object):
 
         # this intentionally does not yield: we don't care about the result
         # and don't need to wait for it.
-        preserve_fn(self.pusher_pool.on_new_notifications)(
+        run_in_background(
+            self.pusher_pool.on_new_notifications,
             event_stream_id, max_stream_id
         )
 
         @defer.inlineCallbacks
         def _notify():
             yield run_on_reactor()
-            self.notifier.on_new_room_event(
-                event, event_stream_id, max_stream_id,
-                extra_users=extra_users
-            )
+            try:
+                self.notifier.on_new_room_event(
+                    event, event_stream_id, max_stream_id,
+                    extra_users=extra_users
+                )
+            except Exception:
+                logger.exception("Error notifying about new room event")
 
-        preserve_fn(_notify)()
+        run_in_background(_notify)
 
         if event.type == EventTypes.Message:
-            presence = self.hs.get_presence_handler()
             # We don't want to block sending messages on any presence code. This
             # matters as sometimes presence code can take a while.
-            preserve_fn(presence.bump_presence_active_time)(requester.user)
+            run_in_background(self._bump_active_time, requester.user)
+
+    @defer.inlineCallbacks
+    def _bump_active_time(self, user):
+        try:
+            presence = self.hs.get_presence_handler()
+            yield presence.bump_presence_active_time(user)
+        except Exception:
+            logger.exception("Error bumping presence active time")
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index a5e501897c..585f3e4da2 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -31,7 +31,7 @@ from synapse.storage.presence import UserPresenceState
 
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.async import Linearizer
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
@@ -255,6 +255,14 @@ class PresenceHandler(object):
         logger.info("Finished _persist_unpersisted_changes")
 
     @defer.inlineCallbacks
+    def _update_states_and_catch_exception(self, new_states):
+        try:
+            res = yield self._update_states(new_states)
+            defer.returnValue(res)
+        except Exception:
+            logger.exception("Error updating presence")
+
+    @defer.inlineCallbacks
     def _update_states(self, new_states):
         """Updates presence of users. Sets the appropriate timeouts. Pokes
         the notifier and federation if and only if the changed presence state
@@ -364,7 +372,7 @@ class PresenceHandler(object):
                     now=now,
                 )
 
-            preserve_fn(self._update_states)(changes)
+            run_in_background(self._update_states_and_catch_exception, changes)
         except Exception:
             logger.exception("Exception in _handle_timeouts loop")
 
@@ -422,20 +430,23 @@ class PresenceHandler(object):
 
         @defer.inlineCallbacks
         def _end():
-            if affect_presence:
+            try:
                 self.user_to_num_current_syncs[user_id] -= 1
 
                 prev_state = yield self.current_state_for_user(user_id)
                 yield self._update_states([prev_state.copy_and_replace(
                     last_user_sync_ts=self.clock.time_msec(),
                 )])
+            except Exception:
+                logger.exception("Error updating presence after sync")
 
         @contextmanager
         def _user_syncing():
             try:
                 yield
             finally:
-                preserve_fn(_end)()
+                if affect_presence:
+                    run_in_background(_end)
 
         defer.returnValue(_user_syncing())
 
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 3f215c2b4e..2e0672161c 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -135,37 +135,40 @@ class ReceiptsHandler(BaseHandler):
         """Given a list of receipts, works out which remote servers should be
         poked and pokes them.
         """
-        # TODO: Some of this stuff should be coallesced.
-        for receipt in receipts:
-            room_id = receipt["room_id"]
-            receipt_type = receipt["receipt_type"]
-            user_id = receipt["user_id"]
-            event_ids = receipt["event_ids"]
-            data = receipt["data"]
-
-            users = yield self.state.get_current_user_in_room(room_id)
-            remotedomains = set(get_domain_from_id(u) for u in users)
-            remotedomains = remotedomains.copy()
-            remotedomains.discard(self.server_name)
-
-            logger.debug("Sending receipt to: %r", remotedomains)
-
-            for domain in remotedomains:
-                self.federation.send_edu(
-                    destination=domain,
-                    edu_type="m.receipt",
-                    content={
-                        room_id: {
-                            receipt_type: {
-                                user_id: {
-                                    "event_ids": event_ids,
-                                    "data": data,
+        try:
+            # TODO: Some of this stuff should be coallesced.
+            for receipt in receipts:
+                room_id = receipt["room_id"]
+                receipt_type = receipt["receipt_type"]
+                user_id = receipt["user_id"]
+                event_ids = receipt["event_ids"]
+                data = receipt["data"]
+
+                users = yield self.state.get_current_user_in_room(room_id)
+                remotedomains = set(get_domain_from_id(u) for u in users)
+                remotedomains = remotedomains.copy()
+                remotedomains.discard(self.server_name)
+
+                logger.debug("Sending receipt to: %r", remotedomains)
+
+                for domain in remotedomains:
+                    self.federation.send_edu(
+                        destination=domain,
+                        edu_type="m.receipt",
+                        content={
+                            room_id: {
+                                receipt_type: {
+                                    user_id: {
+                                        "event_ids": event_ids,
+                                        "data": data,
+                                    }
                                 }
-                            }
+                            },
                         },
-                    },
-                    key=(room_id, receipt_type, user_id),
-                )
+                        key=(room_id, receipt_type, user_id),
+                    )
+        except Exception:
+            logger.exception("Error pushing receipts to remote servers")
 
     @defer.inlineCallbacks
     def get_receipts_for_room(self, room_id, to_key):
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 5d81f59b44..add3f9b009 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -20,7 +20,6 @@ from ._base import BaseHandler
 from synapse.api.constants import (
     EventTypes, JoinRules,
 )
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 from synapse.util.async import concurrently_execute
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.caches.response_cache import ResponseCache
@@ -44,8 +43,9 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
 class RoomListHandler(BaseHandler):
     def __init__(self, hs):
         super(RoomListHandler, self).__init__(hs)
-        self.response_cache = ResponseCache(hs)
-        self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000)
+        self.response_cache = ResponseCache(hs, "room_list")
+        self.remote_response_cache = ResponseCache(hs, "remote_room_list",
+                                                   timeout_ms=30 * 1000)
 
     def get_local_public_room_list(self, limit=None, since_token=None,
                                    search_filter=None,
@@ -77,18 +77,11 @@ class RoomListHandler(BaseHandler):
             )
 
         key = (limit, since_token, network_tuple)
-        result = self.response_cache.get(key)
-        if not result:
-            logger.info("No cached result, calculating one.")
-            result = self.response_cache.set(
-                key,
-                preserve_fn(self._get_public_room_list)(
-                    limit, since_token, network_tuple=network_tuple
-                )
-            )
-        else:
-            logger.info("Using cached deferred result.")
-        return make_deferred_yieldable(result)
+        return self.response_cache.wrap(
+            key,
+            self._get_public_room_list,
+            limit, since_token, network_tuple=network_tuple,
+        )
 
     @defer.inlineCallbacks
     def _get_public_room_list(self, limit=None, since_token=None,
@@ -422,18 +415,14 @@ class RoomListHandler(BaseHandler):
             server_name, limit, since_token, include_all_networks,
             third_party_instance_id,
         )
-        result = self.remote_response_cache.get(key)
-        if not result:
-            result = self.remote_response_cache.set(
-                key,
-                repl_layer.get_public_rooms(
-                    server_name, limit=limit, since_token=since_token,
-                    search_filter=search_filter,
-                    include_all_networks=include_all_networks,
-                    third_party_instance_id=third_party_instance_id,
-                )
-            )
-        return result
+        return self.remote_response_cache.wrap(
+            key,
+            repl_layer.get_public_rooms,
+            server_name, limit=limit, since_token=since_token,
+            search_filter=search_filter,
+            include_all_networks=include_all_networks,
+            third_party_instance_id=third_party_instance_id,
+        )
 
 
 class RoomListNextBatch(namedtuple("RoomListNextBatch", (
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index c45142d38d..714583f1d5 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -149,7 +149,7 @@ class RoomMemberHandler(object):
     @defer.inlineCallbacks
     def _local_membership_update(
         self, requester, target, room_id, membership,
-        prev_event_ids,
+        prev_events_and_hashes,
         txn_id=None,
         ratelimit=True,
         content=None,
@@ -175,7 +175,7 @@ class RoomMemberHandler(object):
             },
             token_id=requester.access_token_id,
             txn_id=txn_id,
-            prev_event_ids=prev_event_ids,
+            prev_events_and_hashes=prev_events_and_hashes,
         )
 
         # Check if this event matches the previous membership event for the user.
@@ -314,7 +314,12 @@ class RoomMemberHandler(object):
                     403, "Invites have been disabled on this server",
                 )
 
-        latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
+        prev_events_and_hashes = yield self.store.get_prev_events_for_room(
+            room_id,
+        )
+        latest_event_ids = (
+            event_id for (event_id, _, _) in prev_events_and_hashes
+        )
         current_state_ids = yield self.state_handler.get_current_state_ids(
             room_id, latest_event_ids=latest_event_ids,
         )
@@ -403,7 +408,7 @@ class RoomMemberHandler(object):
             membership=effective_membership_state,
             txn_id=txn_id,
             ratelimit=ratelimit,
-            prev_event_ids=latest_event_ids,
+            prev_events_and_hashes=prev_events_and_hashes,
             content=content,
         )
         defer.returnValue(res)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 0f713ce038..b52e4c2aff 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -15,7 +15,7 @@
 
 from synapse.api.constants import Membership, EventTypes
 from synapse.util.async import concurrently_execute
-from synapse.util.logcontext import LoggingContext, make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure, measure_func
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.push.clientformat import format_push_rules_for_user
@@ -52,6 +52,7 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [
         to tell if room needs to be part of the sync result.
         """
         return bool(self.events)
+    __bool__ = __nonzero__  # python3
 
 
 class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
@@ -76,6 +77,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
             # nb the notification count does not, er, count: if there's nothing
             # else in the result, we don't need to send it.
         )
+    __bool__ = __nonzero__  # python3
 
 
 class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
@@ -95,6 +97,7 @@ class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
             or self.state
             or self.account_data
         )
+    __bool__ = __nonzero__  # python3
 
 
 class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
@@ -106,6 +109,7 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
     def __nonzero__(self):
         """Invited rooms should always be reported to the client"""
         return True
+    __bool__ = __nonzero__  # python3
 
 
 class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
@@ -117,6 +121,7 @@ class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
 
     def __nonzero__(self):
         return bool(self.join or self.invite or self.leave)
+    __bool__ = __nonzero__  # python3
 
 
 class DeviceLists(collections.namedtuple("DeviceLists", [
@@ -127,6 +132,7 @@ class DeviceLists(collections.namedtuple("DeviceLists", [
 
     def __nonzero__(self):
         return bool(self.changed or self.left)
+    __bool__ = __nonzero__  # python3
 
 
 class SyncResult(collections.namedtuple("SyncResult", [
@@ -159,6 +165,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
             self.device_lists or
             self.groups
         )
+    __bool__ = __nonzero__  # python3
 
 
 class SyncHandler(object):
@@ -169,7 +176,7 @@ class SyncHandler(object):
         self.presence_handler = hs.get_presence_handler()
         self.event_sources = hs.get_event_sources()
         self.clock = hs.get_clock()
-        self.response_cache = ResponseCache(hs)
+        self.response_cache = ResponseCache(hs, "sync")
         self.state = hs.get_state_handler()
 
     def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
@@ -180,15 +187,11 @@ class SyncHandler(object):
         Returns:
             A Deferred SyncResult.
         """
-        result = self.response_cache.get(sync_config.request_key)
-        if not result:
-            result = self.response_cache.set(
-                sync_config.request_key,
-                preserve_fn(self._wait_for_sync_for_user)(
-                    sync_config, since_token, timeout, full_state
-                )
-            )
-        return make_deferred_yieldable(result)
+        return self.response_cache.wrap(
+            sync_config.request_key,
+            self._wait_for_sync_for_user,
+            sync_config, since_token, timeout, full_state,
+        )
 
     @defer.inlineCallbacks
     def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 77c0cf146f..5d9736e88f 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -16,7 +16,7 @@
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError, AuthError
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
 from synapse.types import UserID, get_domain_from_id
@@ -97,7 +97,8 @@ class TypingHandler(object):
             if self.hs.is_mine_id(member.user_id):
                 last_fed_poke = self._member_last_federation_poke.get(member, None)
                 if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
-                    preserve_fn(self._push_remote)(
+                    run_in_background(
+                        self._push_remote,
                         member=member,
                         typing=True
                     )
@@ -196,7 +197,7 @@ class TypingHandler(object):
     def _push_update(self, member, typing):
         if self.hs.is_mine_id(member.user_id):
             # Only send updates for changes to our own users.
-            preserve_fn(self._push_remote)(member, typing)
+            run_in_background(self._push_remote, member, typing)
 
         self._push_update_local(
             member=member,
@@ -205,28 +206,31 @@ class TypingHandler(object):
 
     @defer.inlineCallbacks
     def _push_remote(self, member, typing):
-        users = yield self.state.get_current_user_in_room(member.room_id)
-        self._member_last_federation_poke[member] = self.clock.time_msec()
+        try:
+            users = yield self.state.get_current_user_in_room(member.room_id)
+            self._member_last_federation_poke[member] = self.clock.time_msec()
 
-        now = self.clock.time_msec()
-        self.wheel_timer.insert(
-            now=now,
-            obj=member,
-            then=now + FEDERATION_PING_INTERVAL,
-        )
+            now = self.clock.time_msec()
+            self.wheel_timer.insert(
+                now=now,
+                obj=member,
+                then=now + FEDERATION_PING_INTERVAL,
+            )
 
-        for domain in set(get_domain_from_id(u) for u in users):
-            if domain != self.server_name:
-                self.federation.send_edu(
-                    destination=domain,
-                    edu_type="m.typing",
-                    content={
-                        "room_id": member.room_id,
-                        "user_id": member.user_id,
-                        "typing": typing,
-                    },
-                    key=member,
-                )
+            for domain in set(get_domain_from_id(u) for u in users):
+                if domain != self.server_name:
+                    self.federation.send_edu(
+                        destination=domain,
+                        edu_type="m.typing",
+                        content={
+                            "room_id": member.room_id,
+                            "user_id": member.user_id,
+                            "typing": typing,
+                        },
+                        key=member,
+                    )
+        except Exception:
+            logger.exception("Error pushing typing notif to remotes")
 
     @defer.inlineCallbacks
     def _recv_edu(self, origin, content):