summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py1240
1 files changed, 608 insertions, 632 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 7002b3752e..906a405031 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -12,64 +13,59 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from ._base import SQLBaseStore
 
-from twisted.internet import defer, reactor
+import itertools
+import logging
+from collections import OrderedDict, deque, namedtuple
+from functools import wraps
 
-from synapse.events import FrozenEvent, USE_FROZEN_DICTS
-from synapse.events.utils import prune_event
+from six import iteritems
+from six.moves import range
 
-from synapse.util.async import ObservableDeferred
-from synapse.util.logcontext import (
-    preserve_fn, PreserveLoggingContext, preserve_context_over_deferred
-)
-from synapse.util.logutils import log_function
-from synapse.util.metrics import Measure
-from synapse.api.constants import EventTypes
-from synapse.api.errors import SynapseError
-from synapse.state import resolve_events
-from synapse.util.caches.descriptors import cached
-from synapse.types import get_domain_from_id
+from canonicaljson import json
+from prometheus_client import Counter
 
-from canonicaljson import encode_canonical_json
-from collections import deque, namedtuple, OrderedDict
-from functools import wraps
+from twisted.internet import defer
 
 import synapse.metrics
-
-import logging
-import ujson as json
-
+from synapse.api.constants import EventTypes
+from synapse.api.errors import SynapseError
 # these are only included to make the type annotations work
-from synapse.events import EventBase    # noqa: F401
-from synapse.events.snapshot import EventContext   # noqa: F401
+from synapse.events import EventBase  # noqa: F401
+from synapse.events.snapshot import EventContext  # noqa: F401
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.events_worker import EventsWorkerStore
+from synapse.types import RoomStreamToken, get_domain_from_id
+from synapse.util.async import ObservableDeferred
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+from synapse.util.frozenutils import frozendict_json_encoder
+from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
+from synapse.util.logutils import log_function
+from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
 
+persist_event_counter = Counter("synapse_storage_events_persisted_events", "")
+event_counter = Counter("synapse_storage_events_persisted_events_sep", "",
+                        ["type", "origin_type", "origin_entity"])
 
-metrics = synapse.metrics.get_metrics_for(__name__)
-persist_event_counter = metrics.register_counter("persisted_events")
-event_counter = metrics.register_counter(
-    "persisted_events_sep", labels=["type", "origin_type", "origin_entity"]
-)
+# The number of times we are recalculating the current state
+state_delta_counter = Counter("synapse_storage_events_state_delta", "")
 
+# The number of times we are recalculating state when there is only a
+# single forward extremity
+state_delta_single_event_counter = Counter(
+    "synapse_storage_events_state_delta_single_event", "")
 
-def encode_json(json_object):
-    if USE_FROZEN_DICTS:
-        # ujson doesn't like frozen_dicts
-        return encode_canonical_json(json_object)
-    else:
-        return json.dumps(json_object, ensure_ascii=False)
+# The number of times we are reculating state when we could have resonably
+# calculated the delta when we calculated the state for an event we were
+# persisting.
+state_delta_reuse_delta_counter = Counter(
+    "synapse_storage_events_state_delta_reuse_delta", "")
 
 
-# These values are used in the `enqueus_event` and `_do_fetch` methods to
-# control how we batch/bulk fetch events from the database.
-# The values are plucked out of thing air to make initial sync run faster
-# on jki.re
-# TODO: Make these configurable.
-EVENT_QUEUE_THREADS = 3  # Max number of threads that will fetch events
-EVENT_QUEUE_ITERATIONS = 3  # No. times we block waiting for requests for events
-EVENT_QUEUE_TIMEOUT_S = 0.1  # Timeout when waiting for requests for events
+def encode_json(json_object):
+    return frozendict_json_encoder.encode(json_object)
 
 
 class _EventPeristenceQueue(object):
@@ -88,19 +84,29 @@ class _EventPeristenceQueue(object):
     def add_to_queue(self, room_id, events_and_contexts, backfilled):
         """Add events to the queue, with the given persist_event options.
 
+        NB: due to the normal usage pattern of this method, it does *not*
+        follow the synapse logcontext rules, and leaves the logcontext in
+        place whether or not the returned deferred is ready.
+
         Args:
             room_id (str):
             events_and_contexts (list[(EventBase, EventContext)]):
             backfilled (bool):
+
+        Returns:
+            defer.Deferred: a deferred which will resolve once the events are
+                persisted. Runs its callbacks *without* a logcontext.
         """
         queue = self._event_persist_queues.setdefault(room_id, deque())
         if queue:
+            # if the last item in the queue has the same `backfilled` setting,
+            # we can just add these new events to that item.
             end_item = queue[-1]
             if end_item.backfilled == backfilled:
                 end_item.events_and_contexts.extend(events_and_contexts)
                 return end_item.deferred.observe()
 
-        deferred = ObservableDeferred(defer.Deferred())
+        deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True)
 
         queue.append(self._EventPersistQueueItem(
             events_and_contexts=events_and_contexts,
@@ -113,11 +119,11 @@ class _EventPeristenceQueue(object):
     def handle_queue(self, room_id, per_item_callback):
         """Attempts to handle the queue for a room if not already being handled.
 
-        The given callback will be invoked with for each item in the queue,1
+        The given callback will be invoked with for each item in the queue,
         of type _EventPersistQueueItem. The per_item_callback will continuously
         be called with new items, unless the queue becomnes empty. The return
         value of the function will be given to the deferreds waiting on the item,
-        exceptions will be passed to the deferres as well.
+        exceptions will be passed to the deferreds as well.
 
         This function should therefore be called whenever anything is added
         to the queue.
@@ -136,18 +142,23 @@ class _EventPeristenceQueue(object):
             try:
                 queue = self._get_drainining_queue(room_id)
                 for item in queue:
+                    # handle_queue_loop runs in the sentinel logcontext, so
+                    # there is no need to preserve_fn when running the
+                    # callbacks on the deferred.
                     try:
                         ret = yield per_item_callback(item)
-                        item.deferred.callback(ret)
-                    except Exception as e:
-                        item.deferred.errback(e)
+                        with PreserveLoggingContext():
+                            item.deferred.callback(ret)
+                    except Exception:
+                        item.deferred.errback()
             finally:
                 queue = self._event_persist_queues.pop(room_id, None)
                 if queue:
                     self._event_persist_queues[room_id] = queue
                 self._currently_persisting_rooms.discard(room_id)
 
-        preserve_fn(handle_queue_loop)()
+        # set handle_queue_loop off in the background
+        run_as_background_process("persist_events", handle_queue_loop)
 
     def _get_drainining_queue(self, room_id):
         queue = self._event_persist_queues.setdefault(room_id, deque())
@@ -183,13 +194,12 @@ def _retry_on_integrity_error(func):
     return f
 
 
-class EventsStore(SQLBaseStore):
+class EventsStore(EventsWorkerStore):
     EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
     EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
 
-    def __init__(self, hs):
-        super(EventsStore, self).__init__(hs)
-        self._clock = hs.get_clock()
+    def __init__(self, db_conn, hs):
+        super(EventsStore, self).__init__(db_conn, hs)
         self.register_background_update_handler(
             self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
         )
@@ -220,6 +230,8 @@ class EventsStore(SQLBaseStore):
 
         self._event_persist_queue = _EventPeristenceQueue()
 
+        self._state_resolution_handler = hs.get_state_resolution_handler()
+
     def persist_events(self, events_and_contexts, backfilled=False):
         """
         Write events to the database
@@ -232,8 +244,8 @@ class EventsStore(SQLBaseStore):
             partitioned.setdefault(event.room_id, []).append((event, ctx))
 
         deferreds = []
-        for room_id, evs_ctxs in partitioned.iteritems():
-            d = preserve_fn(self._event_persist_queue.add_to_queue)(
+        for room_id, evs_ctxs in iteritems(partitioned):
+            d = self._event_persist_queue.add_to_queue(
                 room_id, evs_ctxs,
                 backfilled=backfilled,
             )
@@ -242,7 +254,7 @@ class EventsStore(SQLBaseStore):
         for room_id in partitioned:
             self._maybe_start_persisting(room_id)
 
-        return preserve_context_over_deferred(
+        return make_deferred_yieldable(
             defer.gatherResults(deferreds, consumeErrors=True)
         )
 
@@ -267,7 +279,7 @@ class EventsStore(SQLBaseStore):
 
         self._maybe_start_persisting(event.room_id)
 
-        yield preserve_context_over_deferred(deferred)
+        yield make_deferred_yieldable(deferred)
 
         max_persisted_id = yield self._stream_id_gen.get_current_token()
         defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id))
@@ -275,10 +287,11 @@ class EventsStore(SQLBaseStore):
     def _maybe_start_persisting(self, room_id):
         @defer.inlineCallbacks
         def persisting_queue(item):
-            yield self._persist_events(
-                item.events_and_contexts,
-                backfilled=item.backfilled,
-            )
+            with Measure(self._clock, "persist_events"):
+                yield self._persist_events(
+                    item.events_and_contexts,
+                    backfilled=item.backfilled,
+                )
 
         self._event_persist_queue.handle_queue(room_id, persisting_queue)
 
@@ -316,7 +329,7 @@ class EventsStore(SQLBaseStore):
 
             chunks = [
                 events_and_contexts[x:x + 100]
-                for x in xrange(0, len(events_and_contexts), 100)
+                for x in range(0, len(events_and_contexts), 100)
             ]
 
             for chunk in chunks:
@@ -325,8 +338,23 @@ class EventsStore(SQLBaseStore):
 
                 # NB: Assumes that we are only persisting events for one room
                 # at a time.
+
+                # map room_id->list[event_ids] giving the new forward
+                # extremities in each room
                 new_forward_extremeties = {}
+
+                # map room_id->(type,state_key)->event_id tracking the full
+                # state in each room after adding these events.
+                # This is simply used to prefill the get_current_state_ids
+                # cache
                 current_state_for_room = {}
+
+                # map room_id->(to_delete, to_insert) where to_delete is a list
+                # of type/state keys to remove from current state, and to_insert
+                # is a map (type,key)->event_id giving the state delta in each
+                # room
+                state_delta_for_room = {}
+
                 if not backfilled:
                     with Measure(self._clock, "_calculate_state_and_extrem"):
                         # Work out the new "current state" for each room.
@@ -338,7 +366,7 @@ class EventsStore(SQLBaseStore):
                                 (event, context)
                             )
 
-                        for room_id, ev_ctx_rm in events_by_room.iteritems():
+                        for room_id, ev_ctx_rm in iteritems(events_by_room):
                             # Work out new extremities by recursively adding and removing
                             # the new events.
                             latest_event_ids = yield self.get_latest_event_ids_in_room(
@@ -348,7 +376,8 @@ class EventsStore(SQLBaseStore):
                                 room_id, ev_ctx_rm, latest_event_ids
                             )
 
-                            if new_latest_event_ids == set(latest_event_ids):
+                            latest_event_ids = set(latest_event_ids)
+                            if new_latest_event_ids == latest_event_ids:
                                 # No change in extremities, so no change in state
                                 continue
 
@@ -369,11 +398,63 @@ class EventsStore(SQLBaseStore):
                                 if all_single_prev_not_state:
                                     continue
 
-                            state = yield self._calculate_state_delta(
-                                room_id, ev_ctx_rm, new_latest_event_ids
+                            state_delta_counter.inc()
+                            if len(new_latest_event_ids) == 1:
+                                state_delta_single_event_counter.inc()
+
+                                # This is a fairly handwavey check to see if we could
+                                # have guessed what the delta would have been when
+                                # processing one of these events.
+                                # What we're interested in is if the latest extremities
+                                # were the same when we created the event as they are
+                                # now. When this server creates a new event (as opposed
+                                # to receiving it over federation) it will use the
+                                # forward extremities as the prev_events, so we can
+                                # guess this by looking at the prev_events and checking
+                                # if they match the current forward extremities.
+                                for ev, _ in ev_ctx_rm:
+                                    prev_event_ids = set(e for e, _ in ev.prev_events)
+                                    if latest_event_ids == prev_event_ids:
+                                        state_delta_reuse_delta_counter.inc()
+                                        break
+
+                            logger.info(
+                                "Calculating state delta for room %s", room_id,
                             )
-                            if state:
-                                current_state_for_room[room_id] = state
+                            with Measure(
+                                self._clock,
+                                "persist_events.get_new_state_after_events",
+                            ):
+                                res = yield self._get_new_state_after_events(
+                                    room_id,
+                                    ev_ctx_rm,
+                                    latest_event_ids,
+                                    new_latest_event_ids,
+                                )
+                                current_state, delta_ids = res
+
+                            # If either are not None then there has been a change,
+                            # and we need to work out the delta (or use that
+                            # given)
+                            if delta_ids is not None:
+                                # If there is a delta we know that we've
+                                # only added or replaced state, never
+                                # removed keys entirely.
+                                state_delta_for_room[room_id] = ([], delta_ids)
+                            elif current_state is not None:
+                                with Measure(
+                                    self._clock,
+                                    "persist_events.calculate_state_delta",
+                                ):
+                                    delta = yield self._calculate_state_delta(
+                                        room_id, current_state,
+                                    )
+                                state_delta_for_room[room_id] = delta
+
+                            # If we have the current_state then lets prefill
+                            # the cache with it.
+                            if current_state is not None:
+                                current_state_for_room[room_id] = current_state
 
                 yield self.runInteraction(
                     "persist_events",
@@ -381,10 +462,13 @@ class EventsStore(SQLBaseStore):
                     events_and_contexts=chunk,
                     backfilled=backfilled,
                     delete_existing=delete_existing,
-                    current_state_for_room=current_state_for_room,
+                    state_delta_for_room=state_delta_for_room,
                     new_forward_extremeties=new_forward_extremeties,
                 )
-                persist_event_counter.inc_by(len(chunk))
+                persist_event_counter.inc(len(chunk))
+                synapse.metrics.event_persisted_position.set(
+                    chunk[-1][0].internal_metadata.stream_ordering,
+                )
                 for event, context in chunk:
                     if context.app_service:
                         origin_type = "local"
@@ -396,14 +480,14 @@ class EventsStore(SQLBaseStore):
                         origin_type = "remote"
                         origin_entity = get_domain_from_id(event.sender)
 
-                    event_counter.inc(event.type, origin_type, origin_entity)
+                    event_counter.labels(event.type, origin_type, origin_entity).inc()
 
-                for room_id, (_, _, new_state) in current_state_for_room.iteritems():
+                for room_id, new_state in iteritems(current_state_for_room):
                     self.get_current_state_ids.prefill(
                         (room_id, ), new_state
                     )
 
-                for room_id, latest_event_ids in new_forward_extremeties.iteritems():
+                for room_id, latest_event_ids in iteritems(new_forward_extremeties):
                     self.get_latest_event_ids_in_room.prefill(
                         (room_id,), list(latest_event_ids)
                     )
@@ -450,183 +534,187 @@ class EventsStore(SQLBaseStore):
         defer.returnValue(new_latest_event_ids)
 
     @defer.inlineCallbacks
-    def _calculate_state_delta(self, room_id, events_context, new_latest_event_ids):
-        """Calculate the new state deltas for a room.
+    def _get_new_state_after_events(self, room_id, events_context, old_latest_event_ids,
+                                    new_latest_event_ids):
+        """Calculate the current state dict after adding some new events to
+        a room
 
-        Assumes that we are only persisting events for one room at a time.
+        Args:
+            room_id (str):
+                room to which the events are being added. Used for logging etc
+
+            events_context (list[(EventBase, EventContext)]):
+                events and contexts which are being added to the room
+
+            old_latest_event_ids (iterable[str]):
+                the old forward extremities for the room.
+
+            new_latest_event_ids (iterable[str]):
+                the new forward extremities for the room.
 
         Returns:
-            3-tuple (to_delete, to_insert, new_state) where both are state dicts,
-            i.e. (type, state_key) -> event_id. `to_delete` are the entries to
-            first be deleted from current_state_events, `to_insert` are entries
-            to insert. `new_state` is the full set of state.
-            May return None if there are no changes to be applied.
+            Deferred[tuple[dict[(str,str), str]|None, dict[(str,str), str]|None]]:
+            Returns a tuple of two state maps, the first being the full new current
+            state and the second being the delta to the existing current state.
+            If both are None then there has been no change.
+
+            If there has been a change then we only return the delta if its
+            already been calculated. Conversely if we do know the delta then
+            the new current state is only returned if we've already calculated
+            it.
         """
-        # Now we need to work out the different state sets for
-        # each state extremities
-        state_sets = []
-        state_groups = set()
-        missing_event_ids = []
-        was_updated = False
+
+        if not new_latest_event_ids:
+            return
+
+        # map from state_group to ((type, key) -> event_id) state map
+        state_groups_map = {}
+
+        # Map from (prev state group, new state group) -> delta state dict
+        state_group_deltas = {}
+
+        for ev, ctx in events_context:
+            if ctx.state_group is None:
+                # I don't think this can happen, but let's double-check
+                raise Exception(
+                    "Context for new extremity event %s has no state "
+                    "group" % (ev.event_id, ),
+                )
+
+            if ctx.state_group in state_groups_map:
+                continue
+
+            # We're only interested in pulling out state that has already
+            # been cached in the context. We'll pull stuff out of the DB later
+            # if necessary.
+            current_state_ids = ctx.get_cached_current_state_ids()
+            if current_state_ids is not None:
+                state_groups_map[ctx.state_group] = current_state_ids
+
+            if ctx.prev_group:
+                state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids
+
+        # We need to map the event_ids to their state groups. First, let's
+        # check if the event is one we're persisting, in which case we can
+        # pull the state group from its context.
+        # Otherwise we need to pull the state group from the database.
+
+        # Set of events we need to fetch groups for. (We know none of the old
+        # extremities are going to be in events_context).
+        missing_event_ids = set(old_latest_event_ids)
+
+        event_id_to_state_group = {}
         for event_id in new_latest_event_ids:
-            # First search in the list of new events we're adding,
-            # and then use the current state from that
+            # First search in the list of new events we're adding.
             for ev, ctx in events_context:
                 if event_id == ev.event_id:
-                    if ctx.current_state_ids is None:
-                        raise Exception("Unknown current state")
-
-                    # If we've already seen the state group don't bother adding
-                    # it to the state sets again
-                    if ctx.state_group not in state_groups:
-                        state_sets.append(ctx.current_state_ids)
-                        if ctx.delta_ids or hasattr(ev, "state_key"):
-                            was_updated = True
-                        if ctx.state_group:
-                            # Add this as a seen state group (if it has a state
-                            # group)
-                            state_groups.add(ctx.state_group)
+                    event_id_to_state_group[event_id] = ctx.state_group
                     break
             else:
                 # If we couldn't find it, then we'll need to pull
                 # the state from the database
-                was_updated = True
-                missing_event_ids.append(event_id)
+                missing_event_ids.add(event_id)
 
         if missing_event_ids:
-            # Now pull out the state for any missing events from DB
+            # Now pull out the state groups for any missing events from DB
             event_to_groups = yield self._get_state_group_for_events(
                 missing_event_ids,
             )
+            event_id_to_state_group.update(event_to_groups)
 
-            groups = set(event_to_groups.itervalues()) - state_groups
+        # State groups of old_latest_event_ids
+        old_state_groups = set(
+            event_id_to_state_group[evid] for evid in old_latest_event_ids
+        )
 
-            if groups:
-                group_to_state = yield self._get_state_for_groups(groups)
-                state_sets.extend(group_to_state.itervalues())
+        # State groups of new_latest_event_ids
+        new_state_groups = set(
+            event_id_to_state_group[evid] for evid in new_latest_event_ids
+        )
 
-        if not new_latest_event_ids:
-            current_state = {}
-        elif was_updated:
-            if len(state_sets) == 1:
-                # If there is only one state set, then we know what the current
-                # state is.
-                current_state = state_sets[0]
-            else:
-                # We work out the current state by passing the state sets to the
-                # state resolution algorithm. It may ask for some events, including
-                # the events we have yet to persist, so we need a slightly more
-                # complicated event lookup function than simply looking the events
-                # up in the db.
-                events_map = {ev.event_id: ev for ev, _ in events_context}
-
-                @defer.inlineCallbacks
-                def get_events(ev_ids):
-                    # We get the events by first looking at the list of events we
-                    # are trying to persist, and then fetching the rest from the DB.
-                    db = []
-                    to_return = {}
-                    for ev_id in ev_ids:
-                        ev = events_map.get(ev_id, None)
-                        if ev:
-                            to_return[ev_id] = ev
-                        else:
-                            db.append(ev_id)
-
-                    if db:
-                        evs = yield self.get_events(
-                            ev_ids, get_prev_content=False, check_redacted=False,
-                        )
-                        to_return.update(evs)
-                    defer.returnValue(to_return)
-
-                current_state = yield resolve_events(
-                    state_sets,
-                    state_map_factory=get_events,
-                )
-        else:
-            return
+        # If they old and new groups are the same then we don't need to do
+        # anything.
+        if old_state_groups == new_state_groups:
+            defer.returnValue((None, None))
 
-        existing_state = yield self.get_current_state_ids(room_id)
+        if len(new_state_groups) == 1 and len(old_state_groups) == 1:
+            # If we're going from one state group to another, lets check if
+            # we have a delta for that transition. If we do then we can just
+            # return that.
 
-        existing_events = set(existing_state.itervalues())
-        new_events = set(ev_id for ev_id in current_state.itervalues())
-        changed_events = existing_events ^ new_events
+            new_state_group = next(iter(new_state_groups))
+            old_state_group = next(iter(old_state_groups))
 
-        if not changed_events:
-            return
+            delta_ids = state_group_deltas.get(
+                (old_state_group, new_state_group,), None
+            )
+            if delta_ids is not None:
+                # We have a delta from the existing to new current state,
+                # so lets just return that. If we happen to already have
+                # the current state in memory then lets also return that,
+                # but it doesn't matter if we don't.
+                new_state = state_groups_map.get(new_state_group)
+                defer.returnValue((new_state, delta_ids))
+
+        # Now that we have calculated new_state_groups we need to get
+        # their state IDs so we can resolve to a single state set.
+        missing_state = new_state_groups - set(state_groups_map)
+        if missing_state:
+            group_to_state = yield self._get_state_for_groups(missing_state)
+            state_groups_map.update(group_to_state)
+
+        if len(new_state_groups) == 1:
+            # If there is only one state group, then we know what the current
+            # state is.
+            defer.returnValue((state_groups_map[new_state_groups.pop()], None))
+
+        # Ok, we need to defer to the state handler to resolve our state sets.
+
+        def get_events(ev_ids):
+            return self.get_events(
+                ev_ids, get_prev_content=False, check_redacted=False,
+            )
 
-        to_delete = {
-            key: ev_id for key, ev_id in existing_state.iteritems()
-            if ev_id in changed_events
+        state_groups = {
+            sg: state_groups_map[sg] for sg in new_state_groups
         }
-        events_to_insert = (new_events - existing_events)
-        to_insert = {
-            key: ev_id for key, ev_id in current_state.iteritems()
-            if ev_id in events_to_insert
-        }
-
-        defer.returnValue((to_delete, to_insert, current_state))
-
-    @defer.inlineCallbacks
-    def get_event(self, event_id, check_redacted=True,
-                  get_prev_content=False, allow_rejected=False,
-                  allow_none=False):
-        """Get an event from the database by event_id.
-
-        Args:
-            event_id (str): The event_id of the event to fetch
-            check_redacted (bool): If True, check if event has been redacted
-                and redact it.
-            get_prev_content (bool): If True and event is a state event,
-                include the previous states content in the unsigned field.
-            allow_rejected (bool): If True return rejected events.
-            allow_none (bool): If True, return None if no event found, if
-                False throw an exception.
 
-        Returns:
-            Deferred : A FrozenEvent.
-        """
-        events = yield self._get_events(
-            [event_id],
-            check_redacted=check_redacted,
-            get_prev_content=get_prev_content,
-            allow_rejected=allow_rejected,
+        events_map = {ev.event_id: ev for ev, _ in events_context}
+        logger.debug("calling resolve_state_groups from preserve_events")
+        res = yield self._state_resolution_handler.resolve_state_groups(
+            room_id, state_groups, events_map, get_events
         )
 
-        if not events and not allow_none:
-            raise SynapseError(404, "Could not find event %s" % (event_id,))
-
-        defer.returnValue(events[0] if events else None)
+        defer.returnValue((res.state, None))
 
     @defer.inlineCallbacks
-    def get_events(self, event_ids, check_redacted=True,
-                   get_prev_content=False, allow_rejected=False):
-        """Get events from the database
+    def _calculate_state_delta(self, room_id, current_state):
+        """Calculate the new state deltas for a room.
 
-        Args:
-            event_ids (list): The event_ids of the events to fetch
-            check_redacted (bool): If True, check if event has been redacted
-                and redact it.
-            get_prev_content (bool): If True and event is a state event,
-                include the previous states content in the unsigned field.
-            allow_rejected (bool): If True return rejected events.
+        Assumes that we are only persisting events for one room at a time.
 
         Returns:
-            Deferred : Dict from event_id to event.
+            tuple[list, dict] (to_delete, to_insert): where to_delete are the
+            type/state_keys to remove from current_state_events and `to_insert`
+            are the updates to current_state_events.
         """
-        events = yield self._get_events(
-            event_ids,
-            check_redacted=check_redacted,
-            get_prev_content=get_prev_content,
-            allow_rejected=allow_rejected,
-        )
+        existing_state = yield self.get_current_state_ids(room_id)
+
+        to_delete = [
+            key for key in existing_state
+            if key not in current_state
+        ]
+
+        to_insert = {
+            key: ev_id for key, ev_id in iteritems(current_state)
+            if ev_id != existing_state.get(key)
+        }
 
-        defer.returnValue({e.event_id: e for e in events})
+        defer.returnValue((to_delete, to_insert))
 
     @log_function
     def _persist_events_txn(self, txn, events_and_contexts, backfilled,
-                            delete_existing=False, current_state_for_room={},
+                            delete_existing=False, state_delta_for_room={},
                             new_forward_extremeties={}):
         """Insert some number of room events into the necessary database tables.
 
@@ -642,19 +730,21 @@ class EventsStore(SQLBaseStore):
             delete_existing (bool): True to purge existing table rows for the
                 events from the database. This is useful when retrying due to
                 IntegrityError.
-            current_state_for_room (dict[str, (list[str], list[str])]):
+            state_delta_for_room (dict[str, (list, dict)]):
                 The current-state delta for each room. For each room, a tuple
-                (to_delete, to_insert), being a list of event ids to be removed
-                from the current state, and a list of event ids to be added to
+                (to_delete, to_insert), being a list of type/state keys to be
+                removed from the current state, and a state set to be added to
                 the current state.
             new_forward_extremeties (dict[str, list[str]]):
                 The new forward extremities for each room. For each room, a
                 list of the event ids which are the forward extremities.
 
         """
+        all_events_and_contexts = events_and_contexts
+
         max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
 
-        self._update_current_state_txn(txn, current_state_for_room, max_stream_order)
+        self._update_current_state_txn(txn, state_delta_for_room, max_stream_order)
 
         self._update_forward_extremities_txn(
             txn,
@@ -698,9 +788,8 @@ class EventsStore(SQLBaseStore):
             events_and_contexts=events_and_contexts,
         )
 
-        # Insert into the state_groups, state_groups_state, and
-        # event_to_state_groups tables.
-        self._store_mult_state_groups_txn(txn, events_and_contexts)
+        # Insert into event_to_state_groups.
+        self._store_event_state_mappings_txn(txn, events_and_contexts)
 
         # _store_rejected_events_txn filters out any events which were
         # rejected, and returns the filtered list.
@@ -715,15 +804,53 @@ class EventsStore(SQLBaseStore):
         self._update_metadata_tables_txn(
             txn,
             events_and_contexts=events_and_contexts,
+            all_events_and_contexts=all_events_and_contexts,
             backfilled=backfilled,
         )
 
     def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
-        for room_id, current_state_tuple in state_delta_by_room.iteritems():
-                to_delete, to_insert, _ = current_state_tuple
+        for room_id, current_state_tuple in iteritems(state_delta_by_room):
+                to_delete, to_insert = current_state_tuple
+
+                # First we add entries to the current_state_delta_stream. We
+                # do this before updating the current_state_events table so
+                # that we can use it to calculate the `prev_event_id`. (This
+                # allows us to not have to pull out the existing state
+                # unnecessarily).
+                sql = """
+                    INSERT INTO current_state_delta_stream
+                    (stream_id, room_id, type, state_key, event_id, prev_event_id)
+                    SELECT ?, ?, ?, ?, ?, (
+                        SELECT event_id FROM current_state_events
+                        WHERE room_id = ? AND type = ? AND state_key = ?
+                    )
+                """
+                txn.executemany(sql, (
+                    (
+                        max_stream_order, room_id, etype, state_key, None,
+                        room_id, etype, state_key,
+                    )
+                    for etype, state_key in to_delete
+                    # We sanity check that we're deleting rather than updating
+                    if (etype, state_key) not in to_insert
+                ))
+                txn.executemany(sql, (
+                    (
+                        max_stream_order, room_id, etype, state_key, ev_id,
+                        room_id, etype, state_key,
+                    )
+                    for (etype, state_key), ev_id in iteritems(to_insert)
+                ))
+
+                # Now we actually update the current_state_events table
+
                 txn.executemany(
-                    "DELETE FROM current_state_events WHERE event_id = ?",
-                    [(ev_id,) for ev_id in to_delete.itervalues()],
+                    "DELETE FROM current_state_events"
+                    " WHERE room_id = ? AND type = ? AND state_key = ?",
+                    (
+                        (room_id, etype, state_key)
+                        for etype, state_key in itertools.chain(to_delete, to_insert)
+                    ),
                 )
 
                 self._simple_insert_many_txn(
@@ -736,30 +863,12 @@ class EventsStore(SQLBaseStore):
                             "type": key[0],
                             "state_key": key[1],
                         }
-                        for key, ev_id in to_insert.iteritems()
+                        for key, ev_id in iteritems(to_insert)
                     ],
                 )
 
-                state_deltas = {key: None for key in to_delete}
-                state_deltas.update(to_insert)
-
-                self._simple_insert_many_txn(
-                    txn,
-                    table="current_state_delta_stream",
-                    values=[
-                        {
-                            "stream_id": max_stream_order,
-                            "room_id": room_id,
-                            "type": key[0],
-                            "state_key": key[1],
-                            "event_id": ev_id,
-                            "prev_event_id": to_delete.get(key, None),
-                        }
-                        for key, ev_id in state_deltas.iteritems()
-                    ]
-                )
-
-                self._curr_state_delta_stream_cache.entity_has_changed(
+                txn.call_after(
+                    self._curr_state_delta_stream_cache.entity_has_changed,
                     room_id, max_stream_order,
                 )
 
@@ -771,19 +880,23 @@ class EventsStore(SQLBaseStore):
                 # and which we have added, then we invlidate the caches for all
                 # those users.
                 members_changed = set(
-                    state_key for ev_type, state_key in state_deltas
+                    state_key
+                    for ev_type, state_key in itertools.chain(to_delete, to_insert)
                     if ev_type == EventTypes.Member
                 )
 
                 for member in members_changed:
                     self._invalidate_cache_and_stream(
-                        txn, self.get_rooms_for_user, (member,)
+                        txn, self.get_rooms_for_user_with_stream_ordering, (member,)
                     )
 
                 for host in set(get_domain_from_id(u) for u in members_changed):
                     self._invalidate_cache_and_stream(
                         txn, self.is_host_joined, (room_id, host)
                     )
+                    self._invalidate_cache_and_stream(
+                        txn, self.was_host_joined, (room_id, host)
+                    )
 
                 self._invalidate_cache_and_stream(
                     txn, self.get_users_in_room, (room_id,)
@@ -795,7 +908,7 @@ class EventsStore(SQLBaseStore):
 
     def _update_forward_extremities_txn(self, txn, new_forward_extremities,
                                         max_stream_order):
-        for room_id, new_extrem in new_forward_extremities.iteritems():
+        for room_id, new_extrem in iteritems(new_forward_extremities):
             self._simple_delete_txn(
                 txn,
                 table="event_forward_extremities",
@@ -813,7 +926,7 @@ class EventsStore(SQLBaseStore):
                     "event_id": ev_id,
                     "room_id": room_id,
                 }
-                for room_id, new_extrem in new_forward_extremities.iteritems()
+                for room_id, new_extrem in iteritems(new_forward_extremities)
                 for ev_id in new_extrem
             ],
         )
@@ -830,7 +943,7 @@ class EventsStore(SQLBaseStore):
                     "event_id": event_id,
                     "stream_ordering": max_stream_order,
                 }
-                for room_id, new_extrem in new_forward_extremities.iteritems()
+                for room_id, new_extrem in iteritems(new_forward_extremities)
                 for event_id in new_extrem
             ]
         )
@@ -858,7 +971,7 @@ class EventsStore(SQLBaseStore):
                         new_events_and_contexts[event.event_id] = (event, context)
             else:
                 new_events_and_contexts[event.event_id] = (event, context)
-        return new_events_and_contexts.values()
+        return list(new_events_and_contexts.values())
 
     def _update_room_depths_txn(self, txn, events_and_contexts, backfilled):
         """Update min_depth for each room
@@ -884,7 +997,7 @@ class EventsStore(SQLBaseStore):
                     event.depth, depth_updates.get(event.room_id, event.depth)
                 )
 
-        for room_id, depth in depth_updates.iteritems():
+        for room_id, depth in iteritems(depth_updates):
             self._update_min_depth_for_room_txn(txn, room_id, depth)
 
     def _update_outliers_txn(self, txn, events_and_contexts):
@@ -932,10 +1045,9 @@ class EventsStore(SQLBaseStore):
                 # an outlier in the database. We now have some state at that
                 # so we need to update the state_groups table with that state.
 
-                # insert into the state_group, state_groups_state and
-                # event_to_state_groups tables.
+                # insert into event_to_state_groups.
                 try:
-                    self._store_mult_state_groups_txn(txn, ((event, context),))
+                    self._store_event_state_mappings_txn(txn, ((event, context),))
                 except Exception:
                     logger.exception("")
                     raise
@@ -1001,7 +1113,6 @@ class EventsStore(SQLBaseStore):
                 "event_edge_hashes",
                 "event_edges",
                 "event_forward_extremities",
-                "event_push_actions",
                 "event_reference_hashes",
                 "event_search",
                 "event_signatures",
@@ -1021,6 +1132,14 @@ class EventsStore(SQLBaseStore):
                 [(ev.event_id,) for ev, _ in events_and_contexts]
             )
 
+        for table in (
+            "event_push_actions",
+        ):
+            txn.executemany(
+                "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
+                [(ev.event_id,) for ev, _ in events_and_contexts]
+            )
+
     def _store_event_txn(self, txn, events_and_contexts):
         """Insert new events into the event and event_json tables
 
@@ -1110,27 +1229,33 @@ class EventsStore(SQLBaseStore):
             ec for ec in events_and_contexts if ec[0] not in to_remove
         ]
 
-    def _update_metadata_tables_txn(self, txn, events_and_contexts, backfilled):
+    def _update_metadata_tables_txn(self, txn, events_and_contexts,
+                                    all_events_and_contexts, backfilled):
         """Update all the miscellaneous tables for new events
 
         Args:
             txn (twisted.enterprise.adbapi.Connection): db connection
             events_and_contexts (list[(EventBase, EventContext)]): events
                 we are persisting
+            all_events_and_contexts (list[(EventBase, EventContext)]): all
+                events that we were going to persist. This includes events
+                we've already persisted, etc, that wouldn't appear in
+                events_and_context.
             backfilled (bool): True if the events were backfilled
         """
 
+        # Insert all the push actions into the event_push_actions table.
+        self._set_push_actions_for_event_and_users_txn(
+            txn,
+            events_and_contexts=events_and_contexts,
+            all_events_and_contexts=all_events_and_contexts,
+        )
+
         if not events_and_contexts:
             # nothing to do here
             return
 
         for event, context in events_and_contexts:
-            # Insert all the push actions into the event_push_actions table.
-            if context.push_actions:
-                self._set_push_actions_for_event_and_users_txn(
-                    txn, event, context.push_actions
-                )
-
             if event.type == EventTypes.Redaction and event.redacts is not None:
                 # Remove the entries in the event_push_actions table for the
                 # redacted event.
@@ -1263,7 +1388,7 @@ class EventsStore(SQLBaseStore):
                 " WHERE e.event_id IN (%s)"
             ) % (",".join(["?"] * len(ev_map)),)
 
-            txn.execute(sql, ev_map.keys())
+            txn.execute(sql, list(ev_map))
             rows = self.cursor_to_dict(txn)
             for row in rows:
                 event = ev_map[row["event_id"]]
@@ -1302,13 +1427,49 @@ class EventsStore(SQLBaseStore):
 
         defer.returnValue(set(r["event_id"] for r in rows))
 
-    def have_events(self, event_ids):
+    @defer.inlineCallbacks
+    def have_seen_events(self, event_ids):
         """Given a list of event ids, check if we have already processed them.
 
+        Args:
+            event_ids (iterable[str]):
+
         Returns:
-            dict: Has an entry for each event id we already have seen. Maps to
-            the rejected reason string if we rejected the event, else maps to
-            None.
+            Deferred[set[str]]: The events we have already seen.
+        """
+        results = set()
+
+        def have_seen_events_txn(txn, chunk):
+            sql = (
+                "SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
+                % (",".join("?" * len(chunk)), )
+            )
+            txn.execute(sql, chunk)
+            for (event_id, ) in txn:
+                results.add(event_id)
+
+        # break the input up into chunks of 100
+        input_iterator = iter(event_ids)
+        for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
+                          []):
+            yield self.runInteraction(
+                "have_seen_events",
+                have_seen_events_txn,
+                chunk,
+            )
+        defer.returnValue(results)
+
+    def get_seen_events_with_rejections(self, event_ids):
+        """Given a list of event ids, check if we rejected them.
+
+        Args:
+            event_ids (list[str])
+
+        Returns:
+            Deferred[dict[str, str|None):
+                Has an entry for each event id we already have seen. Maps to
+                the rejected reason string if we rejected the event, else maps
+                to None.
         """
         if not event_ids:
             return defer.succeed({})
@@ -1330,295 +1491,7 @@ class EventsStore(SQLBaseStore):
 
             return res
 
-        return self.runInteraction(
-            "have_events", f,
-        )
-
-    @defer.inlineCallbacks
-    def _get_events(self, event_ids, check_redacted=True,
-                    get_prev_content=False, allow_rejected=False):
-        if not event_ids:
-            defer.returnValue([])
-
-        event_id_list = event_ids
-        event_ids = set(event_ids)
-
-        event_entry_map = self._get_events_from_cache(
-            event_ids,
-            allow_rejected=allow_rejected,
-        )
-
-        missing_events_ids = [e for e in event_ids if e not in event_entry_map]
-
-        if missing_events_ids:
-            missing_events = yield self._enqueue_events(
-                missing_events_ids,
-                check_redacted=check_redacted,
-                allow_rejected=allow_rejected,
-            )
-
-            event_entry_map.update(missing_events)
-
-        events = []
-        for event_id in event_id_list:
-            entry = event_entry_map.get(event_id, None)
-            if not entry:
-                continue
-
-            if allow_rejected or not entry.event.rejected_reason:
-                if check_redacted and entry.redacted_event:
-                    event = entry.redacted_event
-                else:
-                    event = entry.event
-
-                events.append(event)
-
-                if get_prev_content:
-                    if "replaces_state" in event.unsigned:
-                        prev = yield self.get_event(
-                            event.unsigned["replaces_state"],
-                            get_prev_content=False,
-                            allow_none=True,
-                        )
-                        if prev:
-                            event.unsigned = dict(event.unsigned)
-                            event.unsigned["prev_content"] = prev.content
-                            event.unsigned["prev_sender"] = prev.sender
-
-        defer.returnValue(events)
-
-    def _invalidate_get_event_cache(self, event_id):
-            self._get_event_cache.invalidate((event_id,))
-
-    def _get_events_from_cache(self, events, allow_rejected, update_metrics=True):
-        """Fetch events from the caches
-
-        Args:
-            events (list(str)): list of event_ids to fetch
-            allow_rejected (bool): Whether to teturn events that were rejected
-            update_metrics (bool): Whether to update the cache hit ratio metrics
-
-        Returns:
-            dict of event_id -> _EventCacheEntry for each event_id in cache. If
-            allow_rejected is `False` then there will still be an entry but it
-            will be `None`
-        """
-        event_map = {}
-
-        for event_id in events:
-            ret = self._get_event_cache.get(
-                (event_id,), None,
-                update_metrics=update_metrics,
-            )
-            if not ret:
-                continue
-
-            if allow_rejected or not ret.event.rejected_reason:
-                event_map[event_id] = ret
-            else:
-                event_map[event_id] = None
-
-        return event_map
-
-    def _do_fetch(self, conn):
-        """Takes a database connection and waits for requests for events from
-        the _event_fetch_list queue.
-        """
-        event_list = []
-        i = 0
-        while True:
-            try:
-                with self._event_fetch_lock:
-                    event_list = self._event_fetch_list
-                    self._event_fetch_list = []
-
-                    if not event_list:
-                        single_threaded = self.database_engine.single_threaded
-                        if single_threaded or i > EVENT_QUEUE_ITERATIONS:
-                            self._event_fetch_ongoing -= 1
-                            return
-                        else:
-                            self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
-                            i += 1
-                            continue
-                    i = 0
-
-                event_id_lists = zip(*event_list)[0]
-                event_ids = [
-                    item for sublist in event_id_lists for item in sublist
-                ]
-
-                rows = self._new_transaction(
-                    conn, "do_fetch", [], [], None, self._fetch_event_rows, event_ids
-                )
-
-                row_dict = {
-                    r["event_id"]: r
-                    for r in rows
-                }
-
-                # We only want to resolve deferreds from the main thread
-                def fire(lst, res):
-                    for ids, d in lst:
-                        if not d.called:
-                            try:
-                                with PreserveLoggingContext():
-                                    d.callback([
-                                        res[i]
-                                        for i in ids
-                                        if i in res
-                                    ])
-                            except:
-                                logger.exception("Failed to callback")
-                with PreserveLoggingContext():
-                    reactor.callFromThread(fire, event_list, row_dict)
-            except Exception as e:
-                logger.exception("do_fetch")
-
-                # We only want to resolve deferreds from the main thread
-                def fire(evs):
-                    for _, d in evs:
-                        if not d.called:
-                            with PreserveLoggingContext():
-                                d.errback(e)
-
-                if event_list:
-                    with PreserveLoggingContext():
-                        reactor.callFromThread(fire, event_list)
-
-    @defer.inlineCallbacks
-    def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
-        """Fetches events from the database using the _event_fetch_list. This
-        allows batch and bulk fetching of events - it allows us to fetch events
-        without having to create a new transaction for each request for events.
-        """
-        if not events:
-            defer.returnValue({})
-
-        events_d = defer.Deferred()
-        with self._event_fetch_lock:
-            self._event_fetch_list.append(
-                (events, events_d)
-            )
-
-            self._event_fetch_lock.notify()
-
-            if self._event_fetch_ongoing < EVENT_QUEUE_THREADS:
-                self._event_fetch_ongoing += 1
-                should_start = True
-            else:
-                should_start = False
-
-        if should_start:
-            with PreserveLoggingContext():
-                self.runWithConnection(
-                    self._do_fetch
-                )
-
-        logger.debug("Loading %d events", len(events))
-        with PreserveLoggingContext():
-            rows = yield events_d
-        logger.debug("Loaded %d events (%d rows)", len(events), len(rows))
-
-        if not allow_rejected:
-            rows[:] = [r for r in rows if not r["rejects"]]
-
-        res = yield preserve_context_over_deferred(defer.gatherResults(
-            [
-                preserve_fn(self._get_event_from_row)(
-                    row["internal_metadata"], row["json"], row["redacts"],
-                    rejected_reason=row["rejects"],
-                )
-                for row in rows
-            ],
-            consumeErrors=True
-        ))
-
-        defer.returnValue({
-            e.event.event_id: e
-            for e in res if e
-        })
-
-    def _fetch_event_rows(self, txn, events):
-        rows = []
-        N = 200
-        for i in range(1 + len(events) / N):
-            evs = events[i * N:(i + 1) * N]
-            if not evs:
-                break
-
-            sql = (
-                "SELECT "
-                " e.event_id as event_id, "
-                " e.internal_metadata,"
-                " e.json,"
-                " r.redacts as redacts,"
-                " rej.event_id as rejects "
-                " FROM event_json as e"
-                " LEFT JOIN rejections as rej USING (event_id)"
-                " LEFT JOIN redactions as r ON e.event_id = r.redacts"
-                " WHERE e.event_id IN (%s)"
-            ) % (",".join(["?"] * len(evs)),)
-
-            txn.execute(sql, evs)
-            rows.extend(self.cursor_to_dict(txn))
-
-        return rows
-
-    @defer.inlineCallbacks
-    def _get_event_from_row(self, internal_metadata, js, redacted,
-                            rejected_reason=None):
-        with Measure(self._clock, "_get_event_from_row"):
-            d = json.loads(js)
-            internal_metadata = json.loads(internal_metadata)
-
-            if rejected_reason:
-                rejected_reason = yield self._simple_select_one_onecol(
-                    table="rejections",
-                    keyvalues={"event_id": rejected_reason},
-                    retcol="reason",
-                    desc="_get_event_from_row_rejected_reason",
-                )
-
-            original_ev = FrozenEvent(
-                d,
-                internal_metadata_dict=internal_metadata,
-                rejected_reason=rejected_reason,
-            )
-
-            redacted_event = None
-            if redacted:
-                redacted_event = prune_event(original_ev)
-
-                redaction_id = yield self._simple_select_one_onecol(
-                    table="redactions",
-                    keyvalues={"redacts": redacted_event.event_id},
-                    retcol="event_id",
-                    desc="_get_event_from_row_redactions",
-                )
-
-                redacted_event.unsigned["redacted_by"] = redaction_id
-                # Get the redaction event.
-
-                because = yield self.get_event(
-                    redaction_id,
-                    check_redacted=False,
-                    allow_none=True,
-                )
-
-                if because:
-                    # It's fine to do add the event directly, since get_pdu_json
-                    # will serialise this field correctly
-                    redacted_event.unsigned["redacted_because"] = because
-
-            cache_entry = _EventCacheEntry(
-                event=original_ev,
-                redacted_event=redacted_event,
-            )
-
-            self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
-
-        defer.returnValue(cache_entry)
+        return self.runInteraction("get_rejection_reasons", f)
 
     @defer.inlineCallbacks
     def count_daily_messages(self):
@@ -1778,7 +1651,7 @@ class EventsStore(SQLBaseStore):
 
             chunks = [
                 event_ids[i:i + 100]
-                for i in xrange(0, len(event_ids), 100)
+                for i in range(0, len(event_ids), 100)
             ]
             for chunk in chunks:
                 ev_rows = self._simple_select_many_txn(
@@ -2005,15 +1878,32 @@ class EventsStore(SQLBaseStore):
             )
         return self.runInteraction("get_all_new_events", get_all_new_events_txn)
 
-    def delete_old_state(self, room_id, topological_ordering):
+    def purge_history(
+        self, room_id, token, delete_local_events,
+    ):
+        """Deletes room history before a certain point
+
+        Args:
+            room_id (str):
+
+            token (str): A topological token to delete events before
+
+            delete_local_events (bool):
+                if True, we will delete local events as well as remote ones
+                (instead of just marking them as outliers and deleting their
+                state groups).
+        """
+
         return self.runInteraction(
-            "delete_old_state",
-            self._delete_old_state_txn, room_id, topological_ordering
+            "purge_history",
+            self._purge_history_txn, room_id, token,
+            delete_local_events,
         )
 
-    def _delete_old_state_txn(self, txn, room_id, topological_ordering):
-        """Deletes old room state
-        """
+    def _purge_history_txn(
+        self, txn, room_id, token_str, delete_local_events,
+    ):
+        token = RoomStreamToken.parse(token_str)
 
         # Tables that should be pruned:
         #     event_auth
@@ -2035,6 +1925,37 @@ class EventsStore(SQLBaseStore):
         #     state_groups
         #     state_groups_state
 
+        # we will build a temporary table listing the events so that we don't
+        # have to keep shovelling the list back and forth across the
+        # connection. Annoyingly the python sqlite driver commits the
+        # transaction on CREATE, so let's do this first.
+        #
+        # furthermore, we might already have the table from a previous (failed)
+        # purge attempt, so let's drop the table first.
+
+        txn.execute("DROP TABLE IF EXISTS events_to_purge")
+
+        txn.execute(
+            "CREATE TEMPORARY TABLE events_to_purge ("
+            "    event_id TEXT NOT NULL,"
+            "    should_delete BOOLEAN NOT NULL"
+            ")"
+        )
+
+        # create an index on should_delete because later we'll be looking for
+        # the should_delete / shouldn't_delete subsets
+        txn.execute(
+            "CREATE INDEX events_to_purge_should_delete"
+            " ON events_to_purge(should_delete)",
+        )
+
+        # We do joins against events_to_purge for e.g. calculating state
+        # groups to purge, etc., so lets make an index.
+        txn.execute(
+            "CREATE INDEX events_to_purge_id"
+            " ON events_to_purge(event_id)",
+        )
+
         # First ensure that we're not about to delete all the forward extremeties
         txn.execute(
             "SELECT e.event_id, e.depth FROM events as e "
@@ -2047,7 +1968,7 @@ class EventsStore(SQLBaseStore):
         rows = txn.fetchall()
         max_depth = max(row[0] for row in rows)
 
-        if max_depth <= topological_ordering:
+        if max_depth <= token.topological:
             # We need to ensure we don't delete all the events from the datanase
             # otherwise we wouldn't be able to send any events (due to not
             # having any backwards extremeties)
@@ -2055,42 +1976,48 @@ class EventsStore(SQLBaseStore):
                 400, "topological_ordering is greater than forward extremeties"
             )
 
-        logger.debug("[purge] looking for events to delete")
+        logger.info("[purge] looking for events to delete")
+
+        should_delete_expr = "state_key IS NULL"
+        should_delete_params = ()
+        if not delete_local_events:
+            should_delete_expr += " AND event_id NOT LIKE ?"
+            should_delete_params += ("%:" + self.hs.hostname, )
+
+        should_delete_params += (room_id, token.topological)
 
         txn.execute(
-            "SELECT event_id, state_key FROM events"
-            " LEFT JOIN state_events USING (room_id, event_id)"
-            " WHERE room_id = ? AND topological_ordering < ?",
-            (room_id, topological_ordering,)
+            "INSERT INTO events_to_purge"
+            " SELECT event_id, %s"
+            " FROM events AS e LEFT JOIN state_events USING (event_id)"
+            " WHERE e.room_id = ? AND topological_ordering < ?" % (
+                should_delete_expr,
+            ),
+            should_delete_params,
+        )
+        txn.execute(
+            "SELECT event_id, should_delete FROM events_to_purge"
         )
         event_rows = txn.fetchall()
-
-        to_delete = [
-            (event_id,) for event_id, state_key in event_rows
-            if state_key is None and not self.hs.is_mine_id(event_id)
-        ]
         logger.info(
-            "[purge] found %i events before cutoff, of which %i are remote"
-            " non-state events to delete", len(event_rows), len(to_delete))
-
-        for event_id, state_key in event_rows:
-            txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
+            "[purge] found %i events before cutoff, of which %i can be deleted",
+            len(event_rows), sum(1 for e in event_rows if e[1]),
+        )
 
-        logger.debug("[purge] Finding new backward extremities")
+        logger.info("[purge] Finding new backward extremities")
 
         # We calculate the new entries for the backward extremeties by finding
-        # all events that point to events that are to be purged
+        # events to be purged that are pointed to by events we're not going to
+        # purge.
         txn.execute(
-            "SELECT DISTINCT e.event_id FROM events as e"
-            " INNER JOIN event_edges as ed ON e.event_id = ed.prev_event_id"
-            " INNER JOIN events as e2 ON e2.event_id = ed.event_id"
-            " WHERE e.room_id = ? AND e.topological_ordering < ?"
-            " AND e2.topological_ordering >= ?",
-            (room_id, topological_ordering, topological_ordering)
+            "SELECT DISTINCT e.event_id FROM events_to_purge AS e"
+            " INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id"
+            " LEFT JOIN events_to_purge AS ep2 ON ed.event_id = ep2.event_id"
+            " WHERE ep2.event_id IS NULL",
         )
         new_backwards_extrems = txn.fetchall()
 
-        logger.debug("[purge] replacing backward extremities: %r", new_backwards_extrems)
+        logger.info("[purge] replacing backward extremities: %r", new_backwards_extrems)
 
         txn.execute(
             "DELETE FROM event_backward_extremities WHERE room_id = ?",
@@ -2106,34 +2033,39 @@ class EventsStore(SQLBaseStore):
             ]
         )
 
-        logger.debug("[purge] finding redundant state groups")
+        logger.info("[purge] finding redundant state groups")
 
         # Get all state groups that are only referenced by events that are
         # to be deleted.
-        txn.execute(
-            "SELECT state_group FROM event_to_state_groups"
-            " INNER JOIN events USING (event_id)"
-            " WHERE state_group IN ("
-            "   SELECT DISTINCT state_group FROM events"
-            "   INNER JOIN event_to_state_groups USING (event_id)"
-            "   WHERE room_id = ? AND topological_ordering < ?"
-            " )"
-            " GROUP BY state_group HAVING MAX(topological_ordering) < ?",
-            (room_id, topological_ordering, topological_ordering)
-        )
+        # This works by first getting state groups that we may want to delete,
+        # joining against event_to_state_groups to get events that use that
+        # state group, then left joining against events_to_purge again. Any
+        # state group where the left join produce *no nulls* are referenced
+        # only by events that are going to be purged.
+        txn.execute("""
+            SELECT state_group FROM
+            (
+                SELECT DISTINCT state_group FROM events_to_purge
+                INNER JOIN event_to_state_groups USING (event_id)
+            ) AS sp
+            INNER JOIN event_to_state_groups USING (state_group)
+            LEFT JOIN events_to_purge AS ep USING (event_id)
+            GROUP BY state_group
+            HAVING SUM(CASE WHEN ep.event_id IS NULL THEN 1 ELSE 0 END) = 0
+        """)
 
         state_rows = txn.fetchall()
-        logger.debug("[purge] found %i redundant state groups", len(state_rows))
+        logger.info("[purge] found %i redundant state groups", len(state_rows))
 
         # make a set of the redundant state groups, so that we can look them up
         # efficiently
         state_groups_to_delete = set([sg for sg, in state_rows])
 
         # Now we get all the state groups that rely on these state groups
-        logger.debug("[purge] finding state groups which depend on redundant"
-                     " state groups")
+        logger.info("[purge] finding state groups which depend on redundant"
+                    " state groups")
         remaining_state_groups = []
-        for i in xrange(0, len(state_rows), 100):
+        for i in range(0, len(state_rows), 100):
             chunk = [sg for sg, in state_rows[i:i + 100]]
             # look for state groups whose prev_state_group is one we are about
             # to delete
@@ -2156,7 +2088,7 @@ class EventsStore(SQLBaseStore):
         # Now we turn the state groups that reference to-be-deleted state
         # groups to non delta versions.
         for sg in remaining_state_groups:
-            logger.debug("[purge] de-delta-ing remaining state group %s", sg)
+            logger.info("[purge] de-delta-ing remaining state group %s", sg)
             curr_state = self._get_state_groups_from_groups_txn(
                 txn, [sg], types=None
             )
@@ -2189,11 +2121,11 @@ class EventsStore(SQLBaseStore):
                         "state_key": key[1],
                         "event_id": state_id,
                     }
-                    for key, state_id in curr_state.iteritems()
+                    for key, state_id in iteritems(curr_state)
                 ],
             )
 
-        logger.debug("[purge] removing redundant state groups")
+        logger.info("[purge] removing redundant state groups")
         txn.executemany(
             "DELETE FROM state_groups_state WHERE state_group = ?",
             state_rows
@@ -2203,18 +2135,15 @@ class EventsStore(SQLBaseStore):
             state_rows
         )
 
-        # Delete all non-state
-        logger.debug("[purge] removing events from event_to_state_groups")
-        txn.executemany(
-            "DELETE FROM event_to_state_groups WHERE event_id = ?",
-            [(event_id,) for event_id, _ in event_rows]
-        )
-
-        logger.debug("[purge] updating room_depth")
+        logger.info("[purge] removing events from event_to_state_groups")
         txn.execute(
-            "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
-            (topological_ordering, room_id,)
+            "DELETE FROM event_to_state_groups "
+            "WHERE event_id IN (SELECT event_id from events_to_purge)"
         )
+        for event_id, _ in event_rows:
+            txn.call_after(self._get_state_group_for_event.invalidate, (
+                event_id,
+            ))
 
         # Delete all remote non-state events
         for table in (
@@ -2226,28 +2155,75 @@ class EventsStore(SQLBaseStore):
             "event_edge_hashes",
             "event_edges",
             "event_forward_extremities",
-            "event_push_actions",
             "event_reference_hashes",
             "event_search",
             "event_signatures",
             "rejections",
         ):
-            logger.debug("[purge] removing remote non-state events from %s", table)
+            logger.info("[purge] removing events from %s", table)
 
-            txn.executemany(
-                "DELETE FROM %s WHERE event_id = ?" % (table,),
-                to_delete
+            txn.execute(
+                "DELETE FROM %s WHERE event_id IN ("
+                "    SELECT event_id FROM events_to_purge WHERE should_delete"
+                ")" % (table,),
+            )
+
+        # event_push_actions lacks an index on event_id, and has one on
+        # (room_id, event_id) instead.
+        for table in (
+            "event_push_actions",
+        ):
+            logger.info("[purge] removing events from %s", table)
+
+            txn.execute(
+                "DELETE FROM %s WHERE room_id = ? AND event_id IN ("
+                "    SELECT event_id FROM events_to_purge WHERE should_delete"
+                ")" % (table,),
+                (room_id, )
             )
 
         # Mark all state and own events as outliers
-        logger.debug("[purge] marking remaining events as outliers")
-        txn.executemany(
+        logger.info("[purge] marking remaining events as outliers")
+        txn.execute(
             "UPDATE events SET outlier = ?"
-            " WHERE event_id = ?",
-            [
-                (True, event_id,) for event_id, state_key in event_rows
-                if state_key is not None or self.hs.is_mine_id(event_id)
-            ]
+            " WHERE event_id IN ("
+            "    SELECT event_id FROM events_to_purge "
+            "    WHERE NOT should_delete"
+            ")",
+            (True,),
+        )
+
+        # synapse tries to take out an exclusive lock on room_depth whenever it
+        # persists events (because upsert), and once we run this update, we
+        # will block that for the rest of our transaction.
+        #
+        # So, let's stick it at the end so that we don't block event
+        # persistence.
+        #
+        # We do this by calculating the minimum depth of the backwards
+        # extremities. However, the events in event_backward_extremities
+        # are ones we don't have yet so we need to look at the events that
+        # point to it via event_edges table.
+        txn.execute("""
+            SELECT COALESCE(MIN(depth), 0)
+            FROM event_backward_extremities AS eb
+            INNER JOIN event_edges AS eg ON eg.prev_event_id = eb.event_id
+            INNER JOIN events AS e ON e.event_id = eg.event_id
+            WHERE eb.room_id = ?
+        """, (room_id,))
+        min_depth, = txn.fetchone()
+
+        logger.info("[purge] updating room_depth to %d", min_depth)
+
+        txn.execute(
+            "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
+            (min_depth, room_id,)
+        )
+
+        # finally, drop the temp table. this will commit the txn in sqlite,
+        # so make sure to keep this actually last.
+        txn.execute(
+            "DROP TABLE events_to_purge"
         )
 
         logger.info("[purge] done")
@@ -2260,7 +2236,7 @@ class EventsStore(SQLBaseStore):
         to_2, so_2 = yield self._get_event_ordering(event_id2)
         defer.returnValue((to_1, so_1) > (to_2, so_2))
 
-    @defer.inlineCallbacks
+    @cachedInlineCallbacks(max_entries=5000)
     def _get_event_ordering(self, event_id):
         res = yield self._simple_select_one(
             table="events",