summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2018-07-23 19:21:37 +0100
committerMatthew Hodgson <matthew@matrix.org>2018-07-23 19:21:37 +0100
commitadfe29ec0bedf394772e30583c271f142bccd3ba (patch)
treeba4759be562a0f9f9cddc514968fd16e7b0faa90 /synapse
parentincorporate review (diff)
parentMerge pull request #3584 from matrix-org/erikj/use_cached (diff)
downloadsynapse-adfe29ec0bedf394772e30583c271f142bccd3ba.tar.xz
Merge branch 'develop' into matthew/filter_members
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/auth.py6
-rwxr-xr-xsynapse/app/homeserver.py6
-rwxr-xr-xsynapse/app/synctl.py4
-rw-r--r--synapse/events/snapshot.py199
-rw-r--r--synapse/handlers/_base.py3
-rw-r--r--synapse/handlers/federation.py73
-rw-r--r--synapse/handlers/message.py35
-rw-r--r--synapse/handlers/room_member.py9
-rw-r--r--synapse/notifier.py2
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py7
-rw-r--r--synapse/replication/http/send_event.py7
-rw-r--r--synapse/rest/client/v1/admin.py122
-rw-r--r--synapse/secrets.py42
-rw-r--r--synapse/server.py5
-rw-r--r--synapse/state.py103
-rw-r--r--synapse/storage/devices.py53
-rw-r--r--synapse/storage/end_to_end_keys.py27
-rw-r--r--synapse/storage/events.py7
-rw-r--r--synapse/storage/push_rule.py7
-rw-r--r--synapse/storage/roommember.py7
-rw-r--r--synapse/util/async.py145
-rw-r--r--synapse/util/logcontext.py11
-rw-r--r--synapse/util/metrics.py19
-rw-r--r--synapse/visibility.py19
25 files changed, 646 insertions, 274 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 6dc16f0808..5c0f2f83aa 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -17,4 +17,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.33.0rc1"
+__version__ = "0.33.0"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index bc629832d9..535bdb449d 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -65,8 +65,9 @@ class Auth(object):
 
     @defer.inlineCallbacks
     def check_from_context(self, event, context, do_sig_check=True):
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
         auth_events_ids = yield self.compute_auth_events(
-            event, context.prev_state_ids, for_verification=True,
+            event, prev_state_ids, for_verification=True,
         )
         auth_events = yield self.store.get_events(auth_events_ids)
         auth_events = {
@@ -544,7 +545,8 @@ class Auth(object):
 
     @defer.inlineCallbacks
     def add_auth_events(self, builder, context):
-        auth_ids = yield self.compute_auth_events(builder, context.prev_state_ids)
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        auth_ids = yield self.compute_auth_events(builder, prev_state_ids)
 
         auth_events_entries = yield self.store.add_event_hashes(
             auth_ids
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 14e6dca522..2ad1beb8d8 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -18,6 +18,8 @@ import logging
 import os
 import sys
 
+from six import iteritems
+
 from twisted.application import service
 from twisted.internet import defer, reactor
 from twisted.web.resource import EncodingResourceWrapper, NoResource
@@ -442,7 +444,7 @@ def run(hs):
         stats["total_nonbridged_users"] = total_nonbridged_users
 
         daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
-        for name, count in daily_user_type_results.iteritems():
+        for name, count in iteritems(daily_user_type_results):
             stats["daily_user_type_" + name] = count
 
         room_count = yield hs.get_datastore().get_room_count()
@@ -453,7 +455,7 @@ def run(hs):
         stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
 
         r30_results = yield hs.get_datastore().count_r30_users()
-        for name, count in r30_results.iteritems():
+        for name, count in iteritems(r30_results):
             stats["r30_users_" + name] = count
 
         daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index 68acc15a9a..d658f967ba 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -25,6 +25,8 @@ import subprocess
 import sys
 import time
 
+from six import iteritems
+
 import yaml
 
 SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
@@ -173,7 +175,7 @@ def main():
         os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
 
     cache_factors = config.get("synctl_cache_factors", {})
-    for cache_name, factor in cache_factors.iteritems():
+    for cache_name, factor in iteritems(cache_factors):
         os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
 
     worker_configfiles = []
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index bcd9bb5946..189212b0fa 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -13,22 +13,18 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from six import iteritems
+
 from frozendict import frozendict
 
 from twisted.internet import defer
 
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+
 
 class EventContext(object):
     """
     Attributes:
-        current_state_ids (dict[(str, str), str]):
-            The current state map including the current event.
-            (type, state_key) -> event_id
-
-        prev_state_ids (dict[(str, str), str]):
-            The current state map excluding the current event.
-            (type, state_key) -> event_id
-
         state_group (int|None): state group id, if the state has been stored
             as a state group. This is usually only None if e.g. the event is
             an outlier.
@@ -45,38 +41,77 @@ class EventContext(object):
 
         prev_state_events (?): XXX: is this ever set to anything other than
             the empty list?
+
+        _current_state_ids (dict[(str, str), str]|None):
+            The current state map including the current event. None if outlier
+            or we haven't fetched the state from DB yet.
+            (type, state_key) -> event_id
+
+        _prev_state_ids (dict[(str, str), str]|None):
+            The current state map excluding the current event. None if outlier
+            or we haven't fetched the state from DB yet.
+            (type, state_key) -> event_id
+
+        _fetching_state_deferred (Deferred|None): Resolves when *_state_ids have
+            been calculated. None if we haven't started calculating yet
+
+        _event_type (str): The type of the event the context is associated with.
+            Only set when state has not been fetched yet.
+
+        _event_state_key (str|None): The state_key of the event the context is
+            associated with. Only set when state has not been fetched yet.
+
+        _prev_state_id (str|None): If the event associated with the context is
+            a state event, then `_prev_state_id` is the event_id of the state
+            that was replaced.
+            Only set when state has not been fetched yet.
     """
 
     __slots__ = [
-        "current_state_ids",
-        "prev_state_ids",
         "state_group",
         "rejected",
         "prev_group",
         "delta_ids",
         "prev_state_events",
         "app_service",
+        "_current_state_ids",
+        "_prev_state_ids",
+        "_prev_state_id",
+        "_event_type",
+        "_event_state_key",
+        "_fetching_state_deferred",
     ]
 
     def __init__(self):
+        self.prev_state_events = []
+        self.rejected = False
+        self.app_service = None
+
+    @staticmethod
+    def with_state(state_group, current_state_ids, prev_state_ids,
+                   prev_group=None, delta_ids=None):
+        context = EventContext()
+
         # The current state including the current event
-        self.current_state_ids = None
+        context._current_state_ids = current_state_ids
         # The current state excluding the current event
-        self.prev_state_ids = None
-        self.state_group = None
+        context._prev_state_ids = prev_state_ids
+        context.state_group = state_group
 
-        self.rejected = False
+        context._prev_state_id = None
+        context._event_type = None
+        context._event_state_key = None
+        context._fetching_state_deferred = defer.succeed(None)
 
         # A previously persisted state group and a delta between that
         # and this state.
-        self.prev_group = None
-        self.delta_ids = None
+        context.prev_group = prev_group
+        context.delta_ids = delta_ids
 
-        self.prev_state_events = None
-
-        self.app_service = None
+        return context
 
-    def serialize(self, event):
+    @defer.inlineCallbacks
+    def serialize(self, event, store):
         """Converts self to a type that can be serialized as JSON, and then
         deserialized by `deserialize`
 
@@ -92,11 +127,12 @@ class EventContext(object):
         # the prev_state_ids, so if we're a state event we include the event
         # id that we replaced in the state.
         if event.is_state():
-            prev_state_id = self.prev_state_ids.get((event.type, event.state_key))
+            prev_state_ids = yield self.get_prev_state_ids(store)
+            prev_state_id = prev_state_ids.get((event.type, event.state_key))
         else:
             prev_state_id = None
 
-        return {
+        defer.returnValue({
             "prev_state_id": prev_state_id,
             "event_type": event.type,
             "event_state_key": event.state_key if event.is_state() else None,
@@ -106,10 +142,9 @@ class EventContext(object):
             "delta_ids": _encode_state_dict(self.delta_ids),
             "prev_state_events": self.prev_state_events,
             "app_service_id": self.app_service.id if self.app_service else None
-        }
+        })
 
     @staticmethod
-    @defer.inlineCallbacks
     def deserialize(store, input):
         """Converts a dict that was produced by `serialize` back into a
         EventContext.
@@ -122,32 +157,114 @@ class EventContext(object):
             EventContext
         """
         context = EventContext()
+
+        # We use the state_group and prev_state_id stuff to pull the
+        # current_state_ids out of the DB and construct prev_state_ids.
+        context._prev_state_id = input["prev_state_id"]
+        context._event_type = input["event_type"]
+        context._event_state_key = input["event_state_key"]
+
+        context._current_state_ids = None
+        context._prev_state_ids = None
+        context._fetching_state_deferred = None
+
         context.state_group = input["state_group"]
-        context.rejected = input["rejected"]
         context.prev_group = input["prev_group"]
         context.delta_ids = _decode_state_dict(input["delta_ids"])
+
+        context.rejected = input["rejected"]
         context.prev_state_events = input["prev_state_events"]
 
-        # We use the state_group and prev_state_id stuff to pull the
-        # current_state_ids out of the DB and construct prev_state_ids.
-        prev_state_id = input["prev_state_id"]
-        event_type = input["event_type"]
-        event_state_key = input["event_state_key"]
+        app_service_id = input["app_service_id"]
+        if app_service_id:
+            context.app_service = store.get_app_service_by_id(app_service_id)
+
+        return context
+
+    @defer.inlineCallbacks
+    def get_current_state_ids(self, store):
+        """Gets the current state IDs
+
+        Returns:
+            Deferred[dict[(str, str), str]|None]: Returns None if state_group
+            is None, which happens when the associated event is an outlier.
+        """
+
+        if not self._fetching_state_deferred:
+            self._fetching_state_deferred = run_in_background(
+                self._fill_out_state, store,
+            )
+
+        yield make_deferred_yieldable(self._fetching_state_deferred)
+
+        defer.returnValue(self._current_state_ids)
+
+    @defer.inlineCallbacks
+    def get_prev_state_ids(self, store):
+        """Gets the prev state IDs
+
+        Returns:
+            Deferred[dict[(str, str), str]|None]: Returns None if state_group
+            is None, which happens when the associated event is an outlier.
+        """
+
+        if not self._fetching_state_deferred:
+            self._fetching_state_deferred = run_in_background(
+                self._fill_out_state, store,
+            )
+
+        yield make_deferred_yieldable(self._fetching_state_deferred)
 
-        context.current_state_ids = yield store.get_state_ids_for_group(
-            context.state_group,
+        defer.returnValue(self._prev_state_ids)
+
+    def get_cached_current_state_ids(self):
+        """Gets the current state IDs if we have them already cached.
+
+        Returns:
+            dict[(str, str), str]|None: Returns None if we haven't cached the
+            state or if state_group is None, which happens when the associated
+            event is an outlier.
+        """
+
+        return self._current_state_ids
+
+    @defer.inlineCallbacks
+    def _fill_out_state(self, store):
+        """Called to populate the _current_state_ids and _prev_state_ids
+        attributes by loading from the database.
+        """
+        if self.state_group is None:
+            return
+
+        self._current_state_ids = yield store.get_state_ids_for_group(
+            self.state_group,
         )
-        if prev_state_id and event_state_key:
-            context.prev_state_ids = dict(context.current_state_ids)
-            context.prev_state_ids[(event_type, event_state_key)] = prev_state_id
+        if self._prev_state_id and self._event_state_key is not None:
+            self._prev_state_ids = dict(self._current_state_ids)
+
+            key = (self._event_type, self._event_state_key)
+            self._prev_state_ids[key] = self._prev_state_id
         else:
-            context.prev_state_ids = context.current_state_ids
+            self._prev_state_ids = self._current_state_ids
 
-        app_service_id = input["app_service_id"]
-        if app_service_id:
-            context.app_service = store.get_app_service_by_id(app_service_id)
+    @defer.inlineCallbacks
+    def update_state(self, state_group, prev_state_ids, current_state_ids,
+                     delta_ids):
+        """Replace the state in the context
+        """
+
+        # We need to make sure we wait for any ongoing fetching of state
+        # to complete so that the updated state doesn't get clobbered
+        if self._fetching_state_deferred:
+            yield make_deferred_yieldable(self._fetching_state_deferred)
+
+        self.state_group = state_group
+        self._prev_state_ids = prev_state_ids
+        self._current_state_ids = current_state_ids
+        self.delta_ids = delta_ids
 
-        defer.returnValue(context)
+        # We need to ensure that that we've marked as having fetched the state
+        self._fetching_state_deferred = defer.succeed(None)
 
 
 def _encode_state_dict(state_dict):
@@ -159,7 +276,7 @@ def _encode_state_dict(state_dict):
 
     return [
         (etype, state_key, v)
-        for (etype, state_key), v in state_dict.iteritems()
+        for (etype, state_key), v in iteritems(state_dict)
     ]
 
 
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index b6a8b3aa3b..704181d2d3 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -112,8 +112,9 @@ class BaseHandler(object):
             guest_access = event.content.get("guest_access", "forbidden")
             if guest_access != "can_join":
                 if context:
+                    current_state_ids = yield context.get_current_state_ids(self.store)
                     current_state = yield self.store.get_events(
-                        list(context.current_state_ids.values())
+                        list(current_state_ids.values())
                     )
                 else:
                     current_state = yield self.state_handler.get_current_state(
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 65f6041b10..14654d59f1 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -21,8 +21,8 @@ import logging
 import sys
 
 import six
-from six import iteritems
-from six.moves import http_client
+from six import iteritems, itervalues
+from six.moves import http_client, zip
 
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
@@ -486,7 +486,10 @@ class FederationHandler(BaseHandler):
                 # joined the room. Don't bother if the user is just
                 # changing their profile info.
                 newly_joined = True
-                prev_state_id = context.prev_state_ids.get(
+
+                prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+                prev_state_id = prev_state_ids.get(
                     (event.type, event.state_key)
                 )
                 if prev_state_id:
@@ -731,7 +734,7 @@ class FederationHandler(BaseHandler):
             """
             joined_users = [
                 (state_key, int(event.depth))
-                for (e_type, state_key), event in state.iteritems()
+                for (e_type, state_key), event in iteritems(state)
                 if e_type == EventTypes.Member
                 and event.membership == Membership.JOIN
             ]
@@ -748,7 +751,7 @@ class FederationHandler(BaseHandler):
                 except Exception:
                     pass
 
-            return sorted(joined_domains.iteritems(), key=lambda d: d[1])
+            return sorted(joined_domains.items(), key=lambda d: d[1])
 
         curr_domains = get_domains_from_state(curr_state)
 
@@ -811,7 +814,7 @@ class FederationHandler(BaseHandler):
         tried_domains = set(likely_domains)
         tried_domains.add(self.server_name)
 
-        event_ids = list(extremities.iterkeys())
+        event_ids = list(extremities.keys())
 
         logger.debug("calling resolve_state_groups in _maybe_backfill")
         resolve = logcontext.preserve_fn(
@@ -827,15 +830,15 @@ class FederationHandler(BaseHandler):
         states = dict(zip(event_ids, [s.state for s in states]))
 
         state_map = yield self.store.get_events(
-            [e_id for ids in states.itervalues() for e_id in ids.itervalues()],
+            [e_id for ids in itervalues(states) for e_id in itervalues(ids)],
             get_prev_content=False
         )
         states = {
             key: {
                 k: state_map[e_id]
-                for k, e_id in state_dict.iteritems()
+                for k, e_id in iteritems(state_dict)
                 if e_id in state_map
-            } for key, state_dict in states.iteritems()
+            } for key, state_dict in iteritems(states)
         }
 
         for e_id, _ in sorted_extremeties_tuple:
@@ -1106,10 +1109,12 @@ class FederationHandler(BaseHandler):
                 user = UserID.from_string(event.state_key)
                 yield user_joined_room(self.distributor, user, event.room_id)
 
-        state_ids = list(context.prev_state_ids.values())
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+        state_ids = list(prev_state_ids.values())
         auth_chain = yield self.store.get_auth_chain(state_ids)
 
-        state = yield self.store.get_events(list(context.prev_state_ids.values()))
+        state = yield self.store.get_events(list(prev_state_ids.values()))
 
         defer.returnValue({
             "state": list(state.values()),
@@ -1515,7 +1520,7 @@ class FederationHandler(BaseHandler):
         yield self.store.persist_events(
             [
                 (ev_info["event"], context)
-                for ev_info, context in itertools.izip(event_infos, contexts)
+                for ev_info, context in zip(event_infos, contexts)
             ],
             backfilled=backfilled,
         )
@@ -1635,8 +1640,9 @@ class FederationHandler(BaseHandler):
         )
 
         if not auth_events:
+            prev_state_ids = yield context.get_prev_state_ids(self.store)
             auth_events_ids = yield self.auth.compute_auth_events(
-                event, context.prev_state_ids, for_verification=True,
+                event, prev_state_ids, for_verification=True,
             )
             auth_events = yield self.store.get_events(auth_events_ids)
             auth_events = {
@@ -1876,9 +1882,10 @@ class FederationHandler(BaseHandler):
                         break
 
             if do_resolution:
+                prev_state_ids = yield context.get_prev_state_ids(self.store)
                 # 1. Get what we think is the auth chain.
                 auth_ids = yield self.auth.compute_auth_events(
-                    event, context.prev_state_ids
+                    event, prev_state_ids
                 )
                 local_auth_chain = yield self.store.get_auth_chain(
                     auth_ids, include_given=True
@@ -1968,21 +1975,35 @@ class FederationHandler(BaseHandler):
             k: a.event_id for k, a in iteritems(auth_events)
             if k != event_key
         }
-        context.current_state_ids = dict(context.current_state_ids)
-        context.current_state_ids.update(state_updates)
+        current_state_ids = yield context.get_current_state_ids(self.store)
+        current_state_ids = dict(current_state_ids)
+
+        current_state_ids.update(state_updates)
+
         if context.delta_ids is not None:
-            context.delta_ids = dict(context.delta_ids)
-            context.delta_ids.update(state_updates)
-        context.prev_state_ids = dict(context.prev_state_ids)
-        context.prev_state_ids.update({
+            delta_ids = dict(context.delta_ids)
+            delta_ids.update(state_updates)
+
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        prev_state_ids = dict(prev_state_ids)
+
+        prev_state_ids.update({
             k: a.event_id for k, a in iteritems(auth_events)
         })
-        context.state_group = yield self.store.store_state_group(
+
+        state_group = yield self.store.store_state_group(
             event.event_id,
             event.room_id,
             prev_group=context.prev_group,
-            delta_ids=context.delta_ids,
-            current_state_ids=context.current_state_ids,
+            delta_ids=delta_ids,
+            current_state_ids=current_state_ids,
+        )
+
+        yield context.update_state(
+            state_group=state_group,
+            current_state_ids=current_state_ids,
+            prev_state_ids=prev_state_ids,
+            delta_ids=delta_ids,
         )
 
     @defer.inlineCallbacks
@@ -2222,7 +2243,8 @@ class FederationHandler(BaseHandler):
             event.content["third_party_invite"]["signed"]["token"]
         )
         original_invite = None
-        original_invite_id = context.prev_state_ids.get(key)
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        original_invite_id = prev_state_ids.get(key)
         if original_invite_id:
             original_invite = yield self.store.get_event(
                 original_invite_id, allow_none=True
@@ -2264,7 +2286,8 @@ class FederationHandler(BaseHandler):
         signed = event.content["third_party_invite"]["signed"]
         token = signed["token"]
 
-        invite_event_id = context.prev_state_ids.get(
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        invite_event_id = prev_state_ids.get(
             (EventTypes.ThirdPartyInvite, token,)
         )
 
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a39b852ceb..7571975c22 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -33,7 +33,7 @@ from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.replication.http.send_event import send_event_to_master
 from synapse.types import RoomAlias, RoomStreamToken, UserID
-from synapse.util.async import Limiter, ReadWriteLock
+from synapse.util.async import Linearizer, ReadWriteLock
 from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import measure_func
@@ -427,7 +427,7 @@ class EventCreationHandler(object):
 
         # We arbitrarily limit concurrent event creation for a room to 5.
         # This is to stop us from diverging history *too* much.
-        self.limiter = Limiter(max_count=5)
+        self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
 
         self.action_generator = hs.get_action_generator()
 
@@ -630,7 +630,8 @@ class EventCreationHandler(object):
         If so, returns the version of the event in context.
         Otherwise, returns None.
         """
-        prev_event_id = context.prev_state_ids.get((event.type, event.state_key))
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        prev_event_id = prev_state_ids.get((event.type, event.state_key))
         prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
         if not prev_event:
             return
@@ -752,8 +753,8 @@ class EventCreationHandler(object):
         event = builder.build()
 
         logger.debug(
-            "Created event %s with state: %s",
-            event.event_id, context.prev_state_ids,
+            "Created event %s",
+            event.event_id,
         )
 
         defer.returnValue(
@@ -806,8 +807,9 @@ class EventCreationHandler(object):
             # If we're a worker we need to hit out to the master.
             if self.config.worker_app:
                 yield send_event_to_master(
-                    self.hs.get_clock(),
-                    self.http_client,
+                    clock=self.hs.get_clock(),
+                    store=self.store,
+                    client=self.http_client,
                     host=self.config.worker_replication_host,
                     port=self.config.worker_replication_http_port,
                     requester=requester,
@@ -884,9 +886,11 @@ class EventCreationHandler(object):
                         e.sender == event.sender
                     )
 
+                current_state_ids = yield context.get_current_state_ids(self.store)
+
                 state_to_include_ids = [
                     e_id
-                    for k, e_id in iteritems(context.current_state_ids)
+                    for k, e_id in iteritems(current_state_ids)
                     if k[0] in self.hs.config.room_invite_state_types
                     or k == (EventTypes.Member, event.sender)
                 ]
@@ -922,8 +926,9 @@ class EventCreationHandler(object):
                     )
 
         if event.type == EventTypes.Redaction:
+            prev_state_ids = yield context.get_prev_state_ids(self.store)
             auth_events_ids = yield self.auth.compute_auth_events(
-                event, context.prev_state_ids, for_verification=True,
+                event, prev_state_ids, for_verification=True,
             )
             auth_events = yield self.store.get_events(auth_events_ids)
             auth_events = {
@@ -943,11 +948,13 @@ class EventCreationHandler(object):
                         "You don't have permission to redact events"
                     )
 
-        if event.type == EventTypes.Create and context.prev_state_ids:
-            raise AuthError(
-                403,
-                "Changing the room create event is forbidden",
-            )
+        if event.type == EventTypes.Create:
+            prev_state_ids = yield context.get_prev_state_ids(self.store)
+            if prev_state_ids:
+                raise AuthError(
+                    403,
+                    "Changing the room create event is forbidden",
+                )
 
         (event_stream_id, max_stream_id) = yield self.store.persist_event(
             event, context=context
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 00f2e279bc..a832d91809 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -201,7 +201,9 @@ class RoomMemberHandler(object):
             ratelimit=ratelimit,
         )
 
-        prev_member_event_id = context.prev_state_ids.get(
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+        prev_member_event_id = prev_state_ids.get(
             (EventTypes.Member, target.to_string()),
             None
         )
@@ -496,9 +498,10 @@ class RoomMemberHandler(object):
         if prev_event is not None:
             return
 
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
         if event.membership == Membership.JOIN:
             if requester.is_guest:
-                guest_can_join = yield self._can_guest_join(context.prev_state_ids)
+                guest_can_join = yield self._can_guest_join(prev_state_ids)
                 if not guest_can_join:
                     # This should be an auth check, but guests are a local concept,
                     # so don't really fit into the general auth process.
@@ -517,7 +520,7 @@ class RoomMemberHandler(object):
             ratelimit=ratelimit,
         )
 
-        prev_member_event_id = context.prev_state_ids.get(
+        prev_member_event_id = prev_state_ids.get(
             (EventTypes.Member, event.state_key),
             None
         )
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 51cbd66f06..e650c3e494 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -274,7 +274,7 @@ class Notifier(object):
             logger.exception("Error notifying application services of event")
 
     def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
-        """ Used to inform listeners that something has happend event wise.
+        """ Used to inform listeners that something has happened event wise.
 
         Will wake up all listeners for the given users and rooms.
         """
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index bb181d94ee..1d14d3639c 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -112,7 +112,8 @@ class BulkPushRuleEvaluator(object):
 
     @defer.inlineCallbacks
     def _get_power_levels_and_sender_level(self, event, context):
-        pl_event_id = context.prev_state_ids.get(POWER_KEY)
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        pl_event_id = prev_state_ids.get(POWER_KEY)
         if pl_event_id:
             # fastpath: if there's a power level event, that's all we need, and
             # not having a power level event is an extreme edge case
@@ -120,7 +121,7 @@ class BulkPushRuleEvaluator(object):
             auth_events = {POWER_KEY: pl_event}
         else:
             auth_events_ids = yield self.auth.compute_auth_events(
-                event, context.prev_state_ids, for_verification=False,
+                event, prev_state_ids, for_verification=False,
             )
             auth_events = yield self.store.get_events(auth_events_ids)
             auth_events = {
@@ -304,7 +305,7 @@ class RulesForRoom(object):
 
                 push_rules_delta_state_cache_metric.inc_hits()
             else:
-                current_state_ids = context.current_state_ids
+                current_state_ids = yield context.get_current_state_ids(self.store)
                 push_rules_delta_state_cache_metric.inc_misses()
 
             push_rules_state_size_counter.inc(len(current_state_ids))
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index 2eede54792..5227bc333d 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -34,12 +34,13 @@ logger = logging.getLogger(__name__)
 
 
 @defer.inlineCallbacks
-def send_event_to_master(clock, client, host, port, requester, event, context,
+def send_event_to_master(clock, store, client, host, port, requester, event, context,
                          ratelimit, extra_users):
     """Send event to be handled on the master
 
     Args:
         clock (synapse.util.Clock)
+        store (DataStore)
         client (SimpleHttpClient)
         host (str): host of master
         port (int): port on master listening for HTTP replication
@@ -53,11 +54,13 @@ def send_event_to_master(clock, client, host, port, requester, event, context,
         host, port, event.event_id,
     )
 
+    serialized_context = yield context.serialize(event, store)
+
     payload = {
         "event": event.get_pdu_json(),
         "internal_metadata": event.internal_metadata.get_dict(),
         "rejected_reason": event.rejected_reason,
-        "context": context.serialize(event),
+        "context": serialized_context,
         "requester": requester.serialize(),
         "ratelimit": ratelimit,
         "extra_users": [u.to_string() for u in extra_users],
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 2dc50e582b..9e9c175970 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -14,6 +14,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import hashlib
+import hmac
 import logging
 
 from six.moves import http_client
@@ -63,6 +65,125 @@ class UsersRestServlet(ClientV1RestServlet):
         defer.returnValue((200, ret))
 
 
+class UserRegisterServlet(ClientV1RestServlet):
+    """
+    Attributes:
+         NONCE_TIMEOUT (int): Seconds until a generated nonce won't be accepted
+         nonces (dict[str, int]): The nonces that we will accept. A dict of
+             nonce to the time it was generated, in int seconds.
+    """
+    PATTERNS = client_path_patterns("/admin/register")
+    NONCE_TIMEOUT = 60
+
+    def __init__(self, hs):
+        super(UserRegisterServlet, self).__init__(hs)
+        self.handlers = hs.get_handlers()
+        self.reactor = hs.get_reactor()
+        self.nonces = {}
+        self.hs = hs
+
+    def _clear_old_nonces(self):
+        """
+        Clear out old nonces that are older than NONCE_TIMEOUT.
+        """
+        now = int(self.reactor.seconds())
+
+        for k, v in list(self.nonces.items()):
+            if now - v > self.NONCE_TIMEOUT:
+                del self.nonces[k]
+
+    def on_GET(self, request):
+        """
+        Generate a new nonce.
+        """
+        self._clear_old_nonces()
+
+        nonce = self.hs.get_secrets().token_hex(64)
+        self.nonces[nonce] = int(self.reactor.seconds())
+        return (200, {"nonce": nonce.encode('ascii')})
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        self._clear_old_nonces()
+
+        if not self.hs.config.registration_shared_secret:
+            raise SynapseError(400, "Shared secret registration is not enabled")
+
+        body = parse_json_object_from_request(request)
+
+        if "nonce" not in body:
+            raise SynapseError(
+                400, "nonce must be specified", errcode=Codes.BAD_JSON,
+            )
+
+        nonce = body["nonce"]
+
+        if nonce not in self.nonces:
+            raise SynapseError(
+                400, "unrecognised nonce",
+            )
+
+        # Delete the nonce, so it can't be reused, even if it's invalid
+        del self.nonces[nonce]
+
+        if "username" not in body:
+            raise SynapseError(
+                400, "username must be specified", errcode=Codes.BAD_JSON,
+            )
+        else:
+            if (not isinstance(body['username'], str) or len(body['username']) > 512):
+                raise SynapseError(400, "Invalid username")
+
+            username = body["username"].encode("utf-8")
+            if b"\x00" in username:
+                raise SynapseError(400, "Invalid username")
+
+        if "password" not in body:
+            raise SynapseError(
+                400, "password must be specified", errcode=Codes.BAD_JSON,
+            )
+        else:
+            if (not isinstance(body['password'], str) or len(body['password']) > 512):
+                raise SynapseError(400, "Invalid password")
+
+            password = body["password"].encode("utf-8")
+            if b"\x00" in password:
+                raise SynapseError(400, "Invalid password")
+
+        admin = body.get("admin", None)
+        got_mac = body["mac"]
+
+        want_mac = hmac.new(
+            key=self.hs.config.registration_shared_secret.encode(),
+            digestmod=hashlib.sha1,
+        )
+        want_mac.update(nonce)
+        want_mac.update(b"\x00")
+        want_mac.update(username)
+        want_mac.update(b"\x00")
+        want_mac.update(password)
+        want_mac.update(b"\x00")
+        want_mac.update(b"admin" if admin else b"notadmin")
+        want_mac = want_mac.hexdigest()
+
+        if not hmac.compare_digest(want_mac, got_mac):
+            raise SynapseError(
+                403, "HMAC incorrect",
+            )
+
+        # Reuse the parts of RegisterRestServlet to reduce code duplication
+        from synapse.rest.client.v2_alpha.register import RegisterRestServlet
+        register = RegisterRestServlet(self.hs)
+
+        (user_id, _) = yield register.registration_handler.register(
+            localpart=username.lower(), password=password, admin=bool(admin),
+            generate_token=False,
+        )
+
+        result = yield register._create_registration_details(user_id, body)
+        defer.returnValue((200, result))
+
+
 class WhoisRestServlet(ClientV1RestServlet):
     PATTERNS = client_path_patterns("/admin/whois/(?P<user_id>[^/]*)")
 
@@ -614,3 +735,4 @@ def register_servlets(hs, http_server):
     ShutdownRoomRestServlet(hs).register(http_server)
     QuarantineMediaInRoom(hs).register(http_server)
     ListMediaInRoom(hs).register(http_server)
+    UserRegisterServlet(hs).register(http_server)
diff --git a/synapse/secrets.py b/synapse/secrets.py
new file mode 100644
index 0000000000..f397daaa5e
--- /dev/null
+++ b/synapse/secrets.py
@@ -0,0 +1,42 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Injectable secrets module for Synapse.
+
+See https://docs.python.org/3/library/secrets.html#module-secrets for the API
+used in Python 3.6, and the API emulated in Python 2.7.
+"""
+
+import six
+
+if six.PY3:
+    import secrets
+
+    def Secrets():
+        return secrets
+
+
+else:
+
+    import os
+    import binascii
+
+    class Secrets(object):
+        def token_bytes(self, nbytes=32):
+            return os.urandom(nbytes)
+
+        def token_hex(self, nbytes=32):
+            return binascii.hexlify(self.token_bytes(nbytes))
diff --git a/synapse/server.py b/synapse/server.py
index 92bea96c5c..fd4f992258 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -74,6 +74,7 @@ from synapse.rest.media.v1.media_repository import (
     MediaRepository,
     MediaRepositoryResource,
 )
+from synapse.secrets import Secrets
 from synapse.server_notices.server_notices_manager import ServerNoticesManager
 from synapse.server_notices.server_notices_sender import ServerNoticesSender
 from synapse.server_notices.worker_server_notices_sender import WorkerServerNoticesSender
@@ -158,6 +159,7 @@ class HomeServer(object):
         'groups_server_handler',
         'groups_attestation_signing',
         'groups_attestation_renewer',
+        'secrets',
         'spam_checker',
         'room_member_handler',
         'federation_registry',
@@ -405,6 +407,9 @@ class HomeServer(object):
     def build_groups_attestation_renewer(self):
         return GroupAttestionRenewer(self)
 
+    def build_secrets(self):
+        return Secrets()
+
     def build_spam_checker(self):
         return SpamChecker(self)
 
diff --git a/synapse/state.py b/synapse/state.py
index 15a593d41c..32125c95df 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -18,7 +18,7 @@ import hashlib
 import logging
 from collections import namedtuple
 
-from six import iteritems, itervalues
+from six import iteritems, iterkeys, itervalues
 
 from frozendict import frozendict
 
@@ -203,25 +203,27 @@ class StateHandler(object):
             # If this is an outlier, then we know it shouldn't have any current
             # state. Certainly store.get_current_state won't return any, and
             # persisting the event won't store the state group.
-            context = EventContext()
             if old_state:
-                context.prev_state_ids = {
+                prev_state_ids = {
                     (s.type, s.state_key): s.event_id for s in old_state
                 }
                 if event.is_state():
-                    context.current_state_ids = dict(context.prev_state_ids)
+                    current_state_ids = dict(prev_state_ids)
                     key = (event.type, event.state_key)
-                    context.current_state_ids[key] = event.event_id
+                    current_state_ids[key] = event.event_id
                 else:
-                    context.current_state_ids = context.prev_state_ids
+                    current_state_ids = prev_state_ids
             else:
-                context.current_state_ids = {}
-                context.prev_state_ids = {}
-            context.prev_state_events = []
+                current_state_ids = {}
+                prev_state_ids = {}
 
             # We don't store state for outliers, so we don't generate a state
-            # froup for it.
-            context.state_group = None
+            # group for it.
+            context = EventContext.with_state(
+                state_group=None,
+                current_state_ids=current_state_ids,
+                prev_state_ids=prev_state_ids,
+            )
 
             defer.returnValue(context)
 
@@ -230,31 +232,35 @@ class StateHandler(object):
             # Let's just correctly fill out the context and create a
             # new state group for it.
 
-            context = EventContext()
-            context.prev_state_ids = {
+            prev_state_ids = {
                 (s.type, s.state_key): s.event_id for s in old_state
             }
 
             if event.is_state():
                 key = (event.type, event.state_key)
-                if key in context.prev_state_ids:
-                    replaces = context.prev_state_ids[key]
+                if key in prev_state_ids:
+                    replaces = prev_state_ids[key]
                     if replaces != event.event_id:  # Paranoia check
                         event.unsigned["replaces_state"] = replaces
-                context.current_state_ids = dict(context.prev_state_ids)
-                context.current_state_ids[key] = event.event_id
+                current_state_ids = dict(prev_state_ids)
+                current_state_ids[key] = event.event_id
             else:
-                context.current_state_ids = context.prev_state_ids
+                current_state_ids = prev_state_ids
 
-            context.state_group = yield self.store.store_state_group(
+            state_group = yield self.store.store_state_group(
                 event.event_id,
                 event.room_id,
                 prev_group=None,
                 delta_ids=None,
-                current_state_ids=context.current_state_ids,
+                current_state_ids=current_state_ids,
+            )
+
+            context = EventContext.with_state(
+                state_group=state_group,
+                current_state_ids=current_state_ids,
+                prev_state_ids=prev_state_ids,
             )
 
-            context.prev_state_events = []
             defer.returnValue(context)
 
         logger.debug("calling resolve_state_groups from compute_event_context")
@@ -262,47 +268,47 @@ class StateHandler(object):
             event.room_id, [e for e, _ in event.prev_events],
         )
 
-        curr_state = entry.state
+        prev_state_ids = entry.state
+        prev_group = None
+        delta_ids = None
 
-        context = EventContext()
-        context.prev_state_ids = curr_state
         if event.is_state():
             # If this is a state event then we need to create a new state
             # group for the state after this event.
 
             key = (event.type, event.state_key)
-            if key in context.prev_state_ids:
-                replaces = context.prev_state_ids[key]
+            if key in prev_state_ids:
+                replaces = prev_state_ids[key]
                 event.unsigned["replaces_state"] = replaces
 
-            context.current_state_ids = dict(context.prev_state_ids)
-            context.current_state_ids[key] = event.event_id
+            current_state_ids = dict(prev_state_ids)
+            current_state_ids[key] = event.event_id
 
             if entry.state_group:
                 # If the state at the event has a state group assigned then
                 # we can use that as the prev group
-                context.prev_group = entry.state_group
-                context.delta_ids = {
+                prev_group = entry.state_group
+                delta_ids = {
                     key: event.event_id
                 }
             elif entry.prev_group:
                 # If the state at the event only has a prev group, then we can
                 # use that as a prev group too.
-                context.prev_group = entry.prev_group
-                context.delta_ids = dict(entry.delta_ids)
-                context.delta_ids[key] = event.event_id
+                prev_group = entry.prev_group
+                delta_ids = dict(entry.delta_ids)
+                delta_ids[key] = event.event_id
 
-            context.state_group = yield self.store.store_state_group(
+            state_group = yield self.store.store_state_group(
                 event.event_id,
                 event.room_id,
-                prev_group=context.prev_group,
-                delta_ids=context.delta_ids,
-                current_state_ids=context.current_state_ids,
+                prev_group=prev_group,
+                delta_ids=delta_ids,
+                current_state_ids=current_state_ids,
             )
         else:
-            context.current_state_ids = context.prev_state_ids
-            context.prev_group = entry.prev_group
-            context.delta_ids = entry.delta_ids
+            current_state_ids = prev_state_ids
+            prev_group = entry.prev_group
+            delta_ids = entry.delta_ids
 
             if entry.state_group is None:
                 entry.state_group = yield self.store.store_state_group(
@@ -310,13 +316,20 @@ class StateHandler(object):
                     event.room_id,
                     prev_group=entry.prev_group,
                     delta_ids=entry.delta_ids,
-                    current_state_ids=context.current_state_ids,
+                    current_state_ids=current_state_ids,
                 )
                 entry.state_id = entry.state_group
 
-            context.state_group = entry.state_group
+            state_group = entry.state_group
+
+        context = EventContext.with_state(
+            state_group=state_group,
+            current_state_ids=current_state_ids,
+            prev_state_ids=prev_state_ids,
+            prev_group=prev_group,
+            delta_ids=delta_ids,
+        )
 
-        context.prev_state_events = []
         defer.returnValue(context)
 
     @defer.inlineCallbacks
@@ -647,7 +660,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
         for event_id in event_ids
     )
     if event_map is not None:
-        needed_events -= set(event_map.iterkeys())
+        needed_events -= set(iterkeys(event_map))
 
     logger.info("Asking for %d conflicted events", len(needed_events))
 
@@ -668,7 +681,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
     new_needed_events = set(itervalues(auth_events))
     new_needed_events -= needed_events
     if event_map is not None:
-        new_needed_events -= set(event_map.iterkeys())
+        new_needed_events -= set(iterkeys(event_map))
 
     logger.info("Asking for %d auth events", len(new_needed_events))
 
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index ec68e39f1e..cc3cdf2ebc 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -248,17 +248,31 @@ class DeviceStore(SQLBaseStore):
 
     def _update_remote_device_list_cache_entry_txn(self, txn, user_id, device_id,
                                                    content, stream_id):
-        self._simple_upsert_txn(
-            txn,
-            table="device_lists_remote_cache",
-            keyvalues={
-                "user_id": user_id,
-                "device_id": device_id,
-            },
-            values={
-                "content": json.dumps(content),
-            }
-        )
+        if content.get("deleted"):
+            self._simple_delete_txn(
+                txn,
+                table="device_lists_remote_cache",
+                keyvalues={
+                    "user_id": user_id,
+                    "device_id": device_id,
+                },
+            )
+
+            txn.call_after(
+                self.device_id_exists_cache.invalidate, (user_id, device_id,)
+            )
+        else:
+            self._simple_upsert_txn(
+                txn,
+                table="device_lists_remote_cache",
+                keyvalues={
+                    "user_id": user_id,
+                    "device_id": device_id,
+                },
+                values={
+                    "content": json.dumps(content),
+                }
+            )
 
         txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id,))
         txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,))
@@ -366,7 +380,7 @@ class DeviceStore(SQLBaseStore):
             now_stream_id = max(stream_id for stream_id in itervalues(query_map))
 
         devices = self._get_e2e_device_keys_txn(
-            txn, query_map.keys(), include_all_devices=True
+            txn, query_map.keys(), include_all_devices=True, include_deleted_devices=True
         )
 
         prev_sent_id_sql = """
@@ -393,12 +407,15 @@ class DeviceStore(SQLBaseStore):
 
                 prev_id = stream_id
 
-                key_json = device.get("key_json", None)
-                if key_json:
-                    result["keys"] = json.loads(key_json)
-                device_display_name = device.get("device_display_name", None)
-                if device_display_name:
-                    result["device_display_name"] = device_display_name
+                if device is not None:
+                    key_json = device.get("key_json", None)
+                    if key_json:
+                        result["keys"] = json.loads(key_json)
+                    device_display_name = device.get("device_display_name", None)
+                    if device_display_name:
+                        result["device_display_name"] = device_display_name
+                else:
+                    result["deleted"] = True
 
                 results.append(result)
 
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 7ae5c65482..523b4360c3 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -64,12 +64,18 @@ class EndToEndKeyStore(SQLBaseStore):
         )
 
     @defer.inlineCallbacks
-    def get_e2e_device_keys(self, query_list, include_all_devices=False):
+    def get_e2e_device_keys(
+        self, query_list, include_all_devices=False,
+        include_deleted_devices=False,
+    ):
         """Fetch a list of device keys.
         Args:
             query_list(list): List of pairs of user_ids and device_ids.
             include_all_devices (bool): whether to include entries for devices
                 that don't have device keys
+            include_deleted_devices (bool): whether to include null entries for
+                devices which no longer exist (but were in the query_list).
+                This option only takes effect if include_all_devices is true.
         Returns:
             Dict mapping from user-id to dict mapping from device_id to
             dict containing "key_json", "device_display_name".
@@ -79,7 +85,7 @@ class EndToEndKeyStore(SQLBaseStore):
 
         results = yield self.runInteraction(
             "get_e2e_device_keys", self._get_e2e_device_keys_txn,
-            query_list, include_all_devices,
+            query_list, include_all_devices, include_deleted_devices,
         )
 
         for user_id, device_keys in iteritems(results):
@@ -88,10 +94,19 @@ class EndToEndKeyStore(SQLBaseStore):
 
         defer.returnValue(results)
 
-    def _get_e2e_device_keys_txn(self, txn, query_list, include_all_devices):
+    def _get_e2e_device_keys_txn(
+        self, txn, query_list, include_all_devices=False,
+        include_deleted_devices=False,
+    ):
         query_clauses = []
         query_params = []
 
+        if include_all_devices is False:
+            include_deleted_devices = False
+
+        if include_deleted_devices:
+            deleted_devices = set(query_list)
+
         for (user_id, device_id) in query_list:
             query_clause = "user_id = ?"
             query_params.append(user_id)
@@ -119,8 +134,14 @@ class EndToEndKeyStore(SQLBaseStore):
 
         result = {}
         for row in rows:
+            if include_deleted_devices:
+                deleted_devices.remove((row["user_id"], row["device_id"]))
             result.setdefault(row["user_id"], {})[row["device_id"]] = row
 
+        if include_deleted_devices:
+            for user_id, device_id in deleted_devices:
+                result.setdefault(user_id, {})[device_id] = None
+
         return result
 
     @defer.inlineCallbacks
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 4ff0fdc4ab..c2910094d0 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -549,7 +549,12 @@ class EventsStore(EventsWorkerStore):
             if ctx.state_group in state_groups_map:
                 continue
 
-            state_groups_map[ctx.state_group] = ctx.current_state_ids
+            # We're only interested in pulling out state that has already
+            # been cached in the context. We'll pull stuff out of the DB later
+            # if necessary.
+            current_state_ids = ctx.get_cached_current_state_ids()
+            if current_state_ids is not None:
+                state_groups_map[ctx.state_group] = current_state_ids
 
         # We need to map the event_ids to their state groups. First, let's
         # check if the event is one we're persisting, in which case we can
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index be655d287b..af564b1b4e 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -186,6 +186,7 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore,
 
         defer.returnValue(results)
 
+    @defer.inlineCallbacks
     def bulk_get_push_rules_for_room(self, event, context):
         state_group = context.state_group
         if not state_group:
@@ -195,9 +196,11 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore,
             # To do this we set the state_group to a new object as object() != object()
             state_group = object()
 
-        return self._bulk_get_push_rules_for_room(
-            event.room_id, state_group, context.current_state_ids, event=event
+        current_state_ids = yield context.get_current_state_ids(self)
+        result = yield self._bulk_get_push_rules_for_room(
+            event.room_id, state_group, current_state_ids, event=event
         )
+        defer.returnValue(result)
 
     @cachedInlineCallbacks(num_args=2, cache_context=True)
     def _bulk_get_push_rules_for_room(self, room_id, state_group, current_state_ids,
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 02a802bed9..a27702a7a0 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -232,6 +232,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
         defer.returnValue(user_who_share_room)
 
+    @defer.inlineCallbacks
     def get_joined_users_from_context(self, event, context):
         state_group = context.state_group
         if not state_group:
@@ -241,11 +242,13 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             # To do this we set the state_group to a new object as object() != object()
             state_group = object()
 
-        return self._get_joined_users_from_context(
-            event.room_id, state_group, context.current_state_ids,
+        current_state_ids = yield context.get_current_state_ids(self)
+        result = yield self._get_joined_users_from_context(
+            event.room_id, state_group, current_state_ids,
             event=event,
             context=context,
         )
+        defer.returnValue(result)
 
     def get_joined_users_from_state(self, room_id, state_entry):
         state_group = state_entry.state_group
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 5d0fb39130..a7094e2fb4 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -12,7 +13,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+import collections
 import logging
 from contextlib import contextmanager
 
@@ -156,54 +157,72 @@ def concurrently_execute(func, args, limit):
 
 
 class Linearizer(object):
-    """Linearizes access to resources based on a key. Useful to ensure only one
-    thing is happening at a time on a given resource.
+    """Limits concurrent access to resources based on a key. Useful to ensure
+    only a few things happen at a time on a given resource.
 
     Example:
 
-        with (yield linearizer.queue("test_key")):
+        with (yield limiter.queue("test_key")):
             # do some work.
 
     """
-    def __init__(self, name=None, clock=None):
+    def __init__(self, name=None, max_count=1, clock=None):
+        """
+        Args:
+            max_count(int): The maximum number of concurrent accesses
+        """
         if name is None:
             self.name = id(self)
         else:
             self.name = name
-        self.key_to_defer = {}
 
         if not clock:
             from twisted.internet import reactor
             clock = Clock(reactor)
         self._clock = clock
+        self.max_count = max_count
+
+        # key_to_defer is a map from the key to a 2 element list where
+        # the first element is the number of things executing, and
+        # the second element is an OrderedDict, where the keys are deferreds for the
+        # things blocked from executing.
+        self.key_to_defer = {}
 
     @defer.inlineCallbacks
     def queue(self, key):
-        # If there is already a deferred in the queue, we pull it out so that
-        # we can wait on it later.
-        # Then we replace it with a deferred that we resolve *after* the
-        # context manager has exited.
-        # We only return the context manager after the previous deferred has
-        # resolved.
-        # This all has the net effect of creating a chain of deferreds that
-        # wait for the previous deferred before starting their work.
-        current_defer = self.key_to_defer.get(key)
+        entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()])
 
-        new_defer = defer.Deferred()
-        self.key_to_defer[key] = new_defer
+        # If the number of things executing is greater than the maximum
+        # then add a deferred to the list of blocked items
+        # When on of the things currently executing finishes it will callback
+        # this item so that it can continue executing.
+        if entry[0] >= self.max_count:
+            new_defer = defer.Deferred()
+            entry[1][new_defer] = 1
 
-        if current_defer:
             logger.info(
-                "Waiting to acquire linearizer lock %r for key %r", self.name, key
+                "Waiting to acquire linearizer lock %r for key %r", self.name, key,
             )
             try:
-                with PreserveLoggingContext():
-                    yield current_defer
-            except Exception:
-                logger.exception("Unexpected exception in Linearizer")
-
-            logger.info("Acquired linearizer lock %r for key %r", self.name,
-                        key)
+                yield make_deferred_yieldable(new_defer)
+            except Exception as e:
+                if isinstance(e, CancelledError):
+                    logger.info(
+                        "Cancelling wait for linearizer lock %r for key %r",
+                        self.name, key,
+                    )
+                else:
+                    logger.warn(
+                        "Unexpected exception waiting for linearizer lock %r for key %r",
+                        self.name, key,
+                    )
+
+                # we just have to take ourselves back out of the queue.
+                del entry[1][new_defer]
+                raise
+
+            logger.info("Acquired linearizer lock %r for key %r", self.name, key)
+            entry[0] += 1
 
             # if the code holding the lock completes synchronously, then it
             # will recursively run the next claimant on the list. That can
@@ -213,15 +232,15 @@ class Linearizer(object):
             # In order to break the cycle, we add a cheeky sleep(0) here to
             # ensure that we fall back to the reactor between each iteration.
             #
-            # (There's no particular need for it to happen before we return
-            # the context manager, but it needs to happen while we hold the
-            # lock, and the context manager's exit code must be synchronous,
-            # so actually this is the only sensible place.
+            # (This needs to happen while we hold the lock, and the context manager's exit
+            # code must be synchronous, so this is the only sensible place.)
             yield self._clock.sleep(0)
 
         else:
-            logger.info("Acquired uncontended linearizer lock %r for key %r",
-                        self.name, key)
+            logger.info(
+                "Acquired uncontended linearizer lock %r for key %r", self.name, key,
+            )
+            entry[0] += 1
 
         @contextmanager
         def _ctx_manager():
@@ -229,73 +248,15 @@ class Linearizer(object):
                 yield
             finally:
                 logger.info("Releasing linearizer lock %r for key %r", self.name, key)
-                with PreserveLoggingContext():
-                    new_defer.callback(None)
-                current_d = self.key_to_defer.get(key)
-                if current_d is new_defer:
-                    self.key_to_defer.pop(key, None)
-
-        defer.returnValue(_ctx_manager())
-
-
-class Limiter(object):
-    """Limits concurrent access to resources based on a key. Useful to ensure
-    only a few thing happen at a time on a given resource.
-
-    Example:
-
-        with (yield limiter.queue("test_key")):
-            # do some work.
-
-    """
-    def __init__(self, max_count):
-        """
-        Args:
-            max_count(int): The maximum number of concurrent access
-        """
-        self.max_count = max_count
-
-        # key_to_defer is a map from the key to a 2 element list where
-        # the first element is the number of things executing
-        # the second element is a list of deferreds for the things blocked from
-        # executing.
-        self.key_to_defer = {}
-
-    @defer.inlineCallbacks
-    def queue(self, key):
-        entry = self.key_to_defer.setdefault(key, [0, []])
-
-        # If the number of things executing is greater than the maximum
-        # then add a deferred to the list of blocked items
-        # When on of the things currently executing finishes it will callback
-        # this item so that it can continue executing.
-        if entry[0] >= self.max_count:
-            new_defer = defer.Deferred()
-            entry[1].append(new_defer)
-
-            logger.info("Waiting to acquire limiter lock for key %r", key)
-            with PreserveLoggingContext():
-                yield new_defer
-            logger.info("Acquired limiter lock for key %r", key)
-        else:
-            logger.info("Acquired uncontended limiter lock for key %r", key)
-
-        entry[0] += 1
-
-        @contextmanager
-        def _ctx_manager():
-            try:
-                yield
-            finally:
-                logger.info("Releasing limiter lock for key %r", key)
 
                 # We've finished executing so check if there are any things
                 # blocked waiting to execute and start one of them
                 entry[0] -= 1
 
                 if entry[1]:
-                    next_def = entry[1].pop(0)
+                    (next_def, _) = entry[1].popitem(last=False)
 
+                    # we need to run the next thing in the sentinel context.
                     with PreserveLoggingContext():
                         next_def.callback(None)
                 elif entry[0] == 0:
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index f6c7175f74..8dcae50b39 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -99,6 +99,17 @@ class ContextResourceUsage(object):
         self.db_sched_duration_sec = 0
         self.evt_db_fetch_count = 0
 
+    def __repr__(self):
+        return ("<ContextResourceUsage ru_stime='%r', ru_utime='%r', "
+                "db_txn_count='%r', db_txn_duration_sec='%r', "
+                "db_sched_duration_sec='%r', evt_db_fetch_count='%r'>") % (
+                    self.ru_stime,
+                    self.ru_utime,
+                    self.db_txn_count,
+                    self.db_txn_duration_sec,
+                    self.db_sched_duration_sec,
+                    self.evt_db_fetch_count,)
+
     def __iadd__(self, other):
         """Add another ContextResourceUsage's stats to this one's.
 
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 6ba7107896..97f1267380 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -104,12 +104,19 @@ class Measure(object):
             logger.warn("Expected context. (%r)", self.name)
             return
 
-        usage = context.get_resource_usage() - self.start_usage
-        block_ru_utime.labels(self.name).inc(usage.ru_utime)
-        block_ru_stime.labels(self.name).inc(usage.ru_stime)
-        block_db_txn_count.labels(self.name).inc(usage.db_txn_count)
-        block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec)
-        block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec)
+        current = context.get_resource_usage()
+        usage = current - self.start_usage
+        try:
+            block_ru_utime.labels(self.name).inc(usage.ru_utime)
+            block_ru_stime.labels(self.name).inc(usage.ru_stime)
+            block_db_txn_count.labels(self.name).inc(usage.db_txn_count)
+            block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec)
+            block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec)
+        except ValueError:
+            logger.warn(
+                "Failed to save metrics! OLD: %r, NEW: %r",
+                self.start_usage, current
+            )
 
         if self.created_context:
             self.start_context.__exit__(exc_type, exc_val, exc_tb)
diff --git a/synapse/visibility.py b/synapse/visibility.py
index 9b97ea2b83..ba0499a022 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -12,11 +12,12 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import itertools
+
 import logging
 import operator
 
-import six
+from six import iteritems, itervalues
+from six.moves import map
 
 from twisted.internet import defer
 
@@ -221,7 +222,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
         return event
 
     # check each event: gives an iterable[None|EventBase]
-    filtered_events = itertools.imap(allowed, events)
+    filtered_events = map(allowed, events)
 
     # remove the None entries
     filtered_events = filter(operator.truth, filtered_events)
@@ -261,7 +262,7 @@ def filter_events_for_server(store, server_name, events):
                 # membership states for the requesting server to determine
                 # if the server is either in the room or has been invited
                 # into the room.
-                for ev in state.itervalues():
+                for ev in itervalues(state):
                     if ev.type != EventTypes.Member:
                         continue
                     try:
@@ -295,7 +296,7 @@ def filter_events_for_server(store, server_name, events):
     )
 
     visibility_ids = set()
-    for sids in event_to_state_ids.itervalues():
+    for sids in itervalues(event_to_state_ids):
         hist = sids.get((EventTypes.RoomHistoryVisibility, ""))
         if hist:
             visibility_ids.add(hist)
@@ -308,7 +309,7 @@ def filter_events_for_server(store, server_name, events):
         event_map = yield store.get_events(visibility_ids)
         all_open = all(
             e.content.get("history_visibility") in (None, "shared", "world_readable")
-            for e in event_map.itervalues()
+            for e in itervalues(event_map)
         )
 
     if all_open:
@@ -346,7 +347,7 @@ def filter_events_for_server(store, server_name, events):
     #
     state_key_to_event_id_set = {
         e
-        for key_to_eid in six.itervalues(event_to_state_ids)
+        for key_to_eid in itervalues(event_to_state_ids)
         for e in key_to_eid.items()
     }
 
@@ -369,10 +370,10 @@ def filter_events_for_server(store, server_name, events):
     event_to_state = {
         e_id: {
             key: event_map[inner_e_id]
-            for key, inner_e_id in key_to_eid.iteritems()
+            for key, inner_e_id in iteritems(key_to_eid)
             if inner_e_id in event_map
         }
-        for e_id, key_to_eid in event_to_state_ids.iteritems()
+        for e_id, key_to_eid in iteritems(event_to_state_ids)
     }
 
     defer.returnValue([