summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py8
-rw-r--r--synapse/storage/account_data.py13
-rw-r--r--synapse/storage/background_updates.py21
-rw-r--r--synapse/storage/client_ips.py1
-rw-r--r--synapse/storage/devices.py25
-rw-r--r--synapse/storage/end_to_end_keys.py56
-rw-r--r--synapse/storage/events.py123
-rw-r--r--synapse/storage/push_rule.py18
-rw-r--r--synapse/storage/room.py36
-rw-r--r--synapse/storage/roommember.py7
-rw-r--r--synapse/storage/schema/delta/37/remove_auth_idx.py4
-rw-r--r--synapse/storage/schema/delta/41/event_search_event_id_idx.sql17
-rw-r--r--synapse/storage/schema/delta/41/ratelimit.sql22
-rw-r--r--synapse/storage/state.py12
14 files changed, 282 insertions, 81 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index c659004e8d..58b73af7d2 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -60,12 +60,12 @@ class LoggingTransaction(object):
         object.__setattr__(self, "database_engine", database_engine)
         object.__setattr__(self, "after_callbacks", after_callbacks)
 
-    def call_after(self, callback, *args):
+    def call_after(self, callback, *args, **kwargs):
         """Call the given callback on the main twisted thread after the
         transaction has finished. Used to invalidate the caches on the
         correct thread.
         """
-        self.after_callbacks.append((callback, args))
+        self.after_callbacks.append((callback, args, kwargs))
 
     def __getattr__(self, name):
         return getattr(self.txn, name)
@@ -319,8 +319,8 @@ class SQLBaseStore(object):
                     inner_func, *args, **kwargs
                 )
         finally:
-            for after_callback, after_args in after_callbacks:
-                after_callback(*after_args)
+            for after_callback, after_args, after_kwargs in after_callbacks:
+                after_callback(*after_args, **after_kwargs)
         defer.returnValue(result)
 
     @defer.inlineCallbacks
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index ff14e54c11..aa84ffc2b0 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -308,16 +308,3 @@ class AccountDataStore(SQLBaseStore):
             " WHERE stream_id < ?"
         )
         txn.execute(update_max_id_sql, (next_id, next_id))
-
-    @cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000)
-    def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context):
-        ignored_account_data = yield self.get_global_account_data_by_type_for_user(
-            "m.ignored_user_list", ignorer_user_id,
-            on_invalidate=cache_context.invalidate,
-        )
-        if not ignored_account_data:
-            defer.returnValue(False)
-
-        defer.returnValue(
-            ignored_user_id in ignored_account_data.get("ignored_users", {})
-        )
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index d4cf0fc59b..7157fb1dfb 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -210,7 +210,9 @@ class BackgroundUpdateStore(SQLBaseStore):
         self._background_update_handlers[update_name] = update_handler
 
     def register_background_index_update(self, update_name, index_name,
-                                         table, columns, where_clause=None):
+                                         table, columns, where_clause=None,
+                                         unique=False,
+                                         psql_only=False):
         """Helper for store classes to do a background index addition
 
         To use:
@@ -226,6 +228,9 @@ class BackgroundUpdateStore(SQLBaseStore):
             index_name (str): name of index to add
             table (str): table to add index to
             columns (list[str]): columns/expressions to include in index
+            unique (bool): true to make a UNIQUE index
+            psql_only: true to only create this index on psql databases (useful
+                for virtual sqlite tables)
         """
 
         def create_index_psql(conn):
@@ -245,9 +250,11 @@ class BackgroundUpdateStore(SQLBaseStore):
                 c.execute(sql)
 
                 sql = (
-                    "CREATE INDEX CONCURRENTLY %(name)s ON %(table)s"
+                    "CREATE %(unique)s INDEX CONCURRENTLY %(name)s"
+                    " ON %(table)s"
                     " (%(columns)s) %(where_clause)s"
                 ) % {
+                    "unique": "UNIQUE" if unique else "",
                     "name": index_name,
                     "table": table,
                     "columns": ", ".join(columns),
@@ -270,9 +277,10 @@ class BackgroundUpdateStore(SQLBaseStore):
             # down at the wrong moment - hance we use IF NOT EXISTS. (SQLite
             # has supported CREATE TABLE|INDEX IF NOT EXISTS since 3.3.0.)
             sql = (
-                "CREATE INDEX IF NOT EXISTS %(name)s ON %(table)s"
+                "CREATE %(unique)s INDEX IF NOT EXISTS %(name)s ON %(table)s"
                 " (%(columns)s)"
             ) % {
+                "unique": "UNIQUE" if unique else "",
                 "name": index_name,
                 "table": table,
                 "columns": ", ".join(columns),
@@ -284,13 +292,16 @@ class BackgroundUpdateStore(SQLBaseStore):
 
         if isinstance(self.database_engine, engines.PostgresEngine):
             runner = create_index_psql
+        elif psql_only:
+            runner = None
         else:
             runner = create_index_sqlite
 
         @defer.inlineCallbacks
         def updater(progress, batch_size):
-            logger.info("Adding index %s to %s", index_name, table)
-            yield self.runWithConnection(runner)
+            if runner is not None:
+                logger.info("Adding index %s to %s", index_name, table)
+                yield self.runWithConnection(runner)
             yield self._end_background_update(update_name)
             defer.returnValue(1)
 
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index b01f0046e9..747d2df622 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -33,6 +33,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
         self.client_ip_last_seen = Cache(
             name="client_ip_last_seen",
             keylen=4,
+            max_entries=5000,
         )
 
         super(ClientIpStore, self).__init__(hs)
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index c8d5f5ba8b..d9936c88bb 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -18,7 +18,7 @@ import ujson as json
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
-from ._base import SQLBaseStore
+from ._base import SQLBaseStore, Cache
 from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
 
 
@@ -29,6 +29,14 @@ class DeviceStore(SQLBaseStore):
     def __init__(self, hs):
         super(DeviceStore, self).__init__(hs)
 
+        # Map of (user_id, device_id) -> bool. If there is an entry that implies
+        # the device exists.
+        self.device_id_exists_cache = Cache(
+            name="device_id_exists",
+            keylen=2,
+            max_entries=10000,
+        )
+
         self._clock.looping_call(
             self._prune_old_outbound_device_pokes, 60 * 60 * 1000
         )
@@ -54,6 +62,10 @@ class DeviceStore(SQLBaseStore):
             defer.Deferred: boolean whether the device was inserted or an
                 existing device existed with that ID.
         """
+        key = (user_id, device_id)
+        if self.device_id_exists_cache.get(key, None):
+            defer.returnValue(False)
+
         try:
             inserted = yield self._simple_insert(
                 "devices",
@@ -65,6 +77,7 @@ class DeviceStore(SQLBaseStore):
                 desc="store_device",
                 or_ignore=True,
             )
+            self.device_id_exists_cache.prefill(key, True)
             defer.returnValue(inserted)
         except Exception as e:
             logger.error("store_device with device_id=%s(%r) user_id=%s(%r)"
@@ -93,6 +106,7 @@ class DeviceStore(SQLBaseStore):
             desc="get_device",
         )
 
+    @defer.inlineCallbacks
     def delete_device(self, user_id, device_id):
         """Delete a device.
 
@@ -102,12 +116,15 @@ class DeviceStore(SQLBaseStore):
         Returns:
             defer.Deferred
         """
-        return self._simple_delete_one(
+        yield self._simple_delete_one(
             table="devices",
             keyvalues={"user_id": user_id, "device_id": device_id},
             desc="delete_device",
         )
 
+        self.device_id_exists_cache.invalidate((user_id, device_id))
+
+    @defer.inlineCallbacks
     def delete_devices(self, user_id, device_ids):
         """Deletes several devices.
 
@@ -117,13 +134,15 @@ class DeviceStore(SQLBaseStore):
         Returns:
             defer.Deferred
         """
-        return self._simple_delete_many(
+        yield self._simple_delete_many(
             table="devices",
             column="device_id",
             iterable=device_ids,
             keyvalues={"user_id": user_id},
             desc="delete_devices",
         )
+        for device_id in device_ids:
+            self.device_id_exists_cache.invalidate((user_id, device_id))
 
     def update_device(self, user_id, device_id, new_display_name=None):
         """Update a device.
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 7cbc1470fd..e00f31da2b 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 from twisted.internet import defer
 
-from synapse.api.errors import SynapseError
+from synapse.util.caches.descriptors import cached
 
 from canonicaljson import encode_canonical_json
 import ujson as json
@@ -123,18 +123,24 @@ class EndToEndKeyStore(SQLBaseStore):
         return result
 
     @defer.inlineCallbacks
-    def add_e2e_one_time_keys(self, user_id, device_id, time_now, key_list):
-        """Insert some new one time keys for a device.
+    def get_e2e_one_time_keys(self, user_id, device_id, key_ids):
+        """Retrieve a number of one-time keys for a user
 
-        Checks if any of the keys are already inserted, if they are then check
-        if they match. If they don't then we raise an error.
+        Args:
+            user_id(str): id of user to get keys for
+            device_id(str): id of device to get keys for
+            key_ids(list[str]): list of key ids (excluding algorithm) to
+                retrieve
+
+        Returns:
+            deferred resolving to Dict[(str, str), str]: map from (algorithm,
+            key_id) to json string for key
         """
 
-        # First we check if we have already persisted any of the keys.
         rows = yield self._simple_select_many_batch(
             table="e2e_one_time_keys_json",
             column="key_id",
-            iterable=[key_id for _, key_id, _ in key_list],
+            iterable=key_ids,
             retcols=("algorithm", "key_id", "key_json",),
             keyvalues={
                 "user_id": user_id,
@@ -143,20 +149,22 @@ class EndToEndKeyStore(SQLBaseStore):
             desc="add_e2e_one_time_keys_check",
         )
 
-        existing_key_map = {
+        defer.returnValue({
             (row["algorithm"], row["key_id"]): row["key_json"] for row in rows
-        }
-
-        new_keys = []  # Keys that we need to insert
-        for algorithm, key_id, json_bytes in key_list:
-            ex_bytes = existing_key_map.get((algorithm, key_id), None)
-            if ex_bytes:
-                if json_bytes != ex_bytes:
-                    raise SynapseError(
-                        400, "One time key with key_id %r already exists" % (key_id,)
-                    )
-            else:
-                new_keys.append((algorithm, key_id, json_bytes))
+        })
+
+    @defer.inlineCallbacks
+    def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys):
+        """Insert some new one time keys for a device. Errors if any of the
+        keys already exist.
+
+        Args:
+            user_id(str): id of user to get keys for
+            device_id(str): id of device to get keys for
+            time_now(long): insertion time to record (ms since epoch)
+            new_keys(iterable[(str, str, str)]: keys to add - each a tuple of
+                (algorithm, key_id, key json)
+        """
 
         def _add_e2e_one_time_keys(txn):
             # We are protected from race between lookup and insertion due to
@@ -177,10 +185,14 @@ class EndToEndKeyStore(SQLBaseStore):
                     for algorithm, key_id, json_bytes in new_keys
                 ],
             )
+            txn.call_after(
+                self.count_e2e_one_time_keys.invalidate, (user_id, device_id,)
+            )
         yield self.runInteraction(
             "add_e2e_one_time_keys_insert", _add_e2e_one_time_keys
         )
 
+    @cached(max_entries=10000)
     def count_e2e_one_time_keys(self, user_id, device_id):
         """ Count the number of one time keys the server has for a device
         Returns:
@@ -225,6 +237,9 @@ class EndToEndKeyStore(SQLBaseStore):
             )
             for user_id, device_id, algorithm, key_id in delete:
                 txn.execute(sql, (user_id, device_id, algorithm, key_id))
+                txn.call_after(
+                    self.count_e2e_one_time_keys.invalidate, (user_id, device_id,)
+                )
             return result
         return self.runInteraction(
             "claim_e2e_one_time_keys", _claim_e2e_one_time_keys
@@ -242,3 +257,4 @@ class EndToEndKeyStore(SQLBaseStore):
             keyvalues={"user_id": user_id, "device_id": device_id},
             desc="delete_e2e_one_time_keys_by_device"
         )
+        self.count_e2e_one_time_keys.invalidate((user_id, device_id,))
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 98707d40ee..c4aeb48800 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -207,6 +207,18 @@ class EventsStore(SQLBaseStore):
             where_clause="contains_url = true AND outlier = false",
         )
 
+        # an event_id index on event_search is useful for the purge_history
+        # api. Plus it means we get to enforce some integrity with a UNIQUE
+        # clause
+        self.register_background_index_update(
+            "event_search_event_id_idx",
+            index_name="event_search_event_id_idx",
+            table="event_search",
+            columns=["event_id"],
+            unique=True,
+            psql_only=True,
+        )
+
         self._event_persist_queue = _EventPeristenceQueue()
 
     def persist_events(self, events_and_contexts, backfilled=False):
@@ -387,6 +399,11 @@ class EventsStore(SQLBaseStore):
 
                     event_counter.inc(event.type, origin_type, origin_entity)
 
+                for room_id, (_, _, new_state) in current_state_for_room.iteritems():
+                    self.get_current_state_ids.prefill(
+                        (room_id, ), new_state
+                    )
+
     @defer.inlineCallbacks
     def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
         """Calculates the new forward extremeties for a room given events to
@@ -435,10 +452,10 @@ class EventsStore(SQLBaseStore):
         Assumes that we are only persisting events for one room at a time.
 
         Returns:
-            2-tuple (to_delete, to_insert) where both are state dicts, i.e.
-            (type, state_key) -> event_id. `to_delete` are the entries to
+            3-tuple (to_delete, to_insert, new_state) where both are state dicts,
+            i.e. (type, state_key) -> event_id. `to_delete` are the entries to
             first be deleted from current_state_events, `to_insert` are entries
-            to insert.
+            to insert. `new_state` is the full set of state.
             May return None if there are no changes to be applied.
         """
         # Now we need to work out the different state sets for
@@ -545,7 +562,7 @@ class EventsStore(SQLBaseStore):
             if ev_id in events_to_insert
         }
 
-        defer.returnValue((to_delete, to_insert))
+        defer.returnValue((to_delete, to_insert, current_state))
 
     @defer.inlineCallbacks
     def get_event(self, event_id, check_redacted=True,
@@ -698,7 +715,7 @@ class EventsStore(SQLBaseStore):
 
     def _update_current_state_txn(self, txn, state_delta_by_room):
         for room_id, current_state_tuple in state_delta_by_room.iteritems():
-                to_delete, to_insert = current_state_tuple
+                to_delete, to_insert, _ = current_state_tuple
                 txn.executemany(
                     "DELETE FROM current_state_events WHERE event_id = ?",
                     [(ev_id,) for ev_id in to_delete.itervalues()],
@@ -1343,11 +1360,26 @@ class EventsStore(SQLBaseStore):
     def _invalidate_get_event_cache(self, event_id):
             self._get_event_cache.invalidate((event_id,))
 
-    def _get_events_from_cache(self, events, allow_rejected):
+    def _get_events_from_cache(self, events, allow_rejected, update_metrics=True):
+        """Fetch events from the caches
+
+        Args:
+            events (list(str)): list of event_ids to fetch
+            allow_rejected (bool): Whether to teturn events that were rejected
+            update_metrics (bool): Whether to update the cache hit ratio metrics
+
+        Returns:
+            dict of event_id -> _EventCacheEntry for each event_id in cache. If
+            allow_rejected is `False` then there will still be an entry but it
+            will be `None`
+        """
         event_map = {}
 
         for event_id in events:
-            ret = self._get_event_cache.get((event_id,), None)
+            ret = self._get_event_cache.get(
+                (event_id,), None,
+                update_metrics=update_metrics,
+            )
             if not ret:
                 continue
 
@@ -2007,6 +2039,8 @@ class EventsStore(SQLBaseStore):
                 400, "topological_ordering is greater than forward extremeties"
             )
 
+        logger.debug("[purge] looking for events to delete")
+
         txn.execute(
             "SELECT event_id, state_key FROM events"
             " LEFT JOIN state_events USING (room_id, event_id)"
@@ -2015,9 +2049,19 @@ class EventsStore(SQLBaseStore):
         )
         event_rows = txn.fetchall()
 
+        to_delete = [
+            (event_id,) for event_id, state_key in event_rows
+            if state_key is None and not self.hs.is_mine_id(event_id)
+        ]
+        logger.info(
+            "[purge] found %i events before cutoff, of which %i are remote"
+            " non-state events to delete", len(event_rows), len(to_delete))
+
         for event_id, state_key in event_rows:
             txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
 
+        logger.debug("[purge] Finding new backward extremities")
+
         # We calculate the new entries for the backward extremeties by finding
         # all events that point to events that are to be purged
         txn.execute(
@@ -2030,6 +2074,8 @@ class EventsStore(SQLBaseStore):
         )
         new_backwards_extrems = txn.fetchall()
 
+        logger.debug("[purge] replacing backward extremities: %r", new_backwards_extrems)
+
         txn.execute(
             "DELETE FROM event_backward_extremities WHERE room_id = ?",
             (room_id,)
@@ -2044,6 +2090,8 @@ class EventsStore(SQLBaseStore):
             ]
         )
 
+        logger.debug("[purge] finding redundant state groups")
+
         # Get all state groups that are only referenced by events that are
         # to be deleted.
         txn.execute(
@@ -2059,15 +2107,20 @@ class EventsStore(SQLBaseStore):
         )
 
         state_rows = txn.fetchall()
-        state_groups_to_delete = [sg for sg, in state_rows]
+        logger.debug("[purge] found %i redundant state groups", len(state_rows))
+
+        # make a set of the redundant state groups, so that we can look them up
+        # efficiently
+        state_groups_to_delete = set([sg for sg, in state_rows])
 
         # Now we get all the state groups that rely on these state groups
-        new_state_edges = []
-        chunks = [
-            state_groups_to_delete[i:i + 100]
-            for i in xrange(0, len(state_groups_to_delete), 100)
-        ]
-        for chunk in chunks:
+        logger.debug("[purge] finding state groups which depend on redundant"
+                     " state groups")
+        remaining_state_groups = []
+        for i in xrange(0, len(state_rows), 100):
+            chunk = [sg for sg, in state_rows[i:i + 100]]
+            # look for state groups whose prev_state_group is one we are about
+            # to delete
             rows = self._simple_select_many_txn(
                 txn,
                 table="state_group_edges",
@@ -2076,21 +2129,28 @@ class EventsStore(SQLBaseStore):
                 retcols=["state_group"],
                 keyvalues={},
             )
-            new_state_edges.extend(row["state_group"] for row in rows)
+            remaining_state_groups.extend(
+                row["state_group"] for row in rows
+
+                # exclude state groups we are about to delete: no point in
+                # updating them
+                if row["state_group"] not in state_groups_to_delete
+            )
 
-        # Now we turn the state groups that reference to-be-deleted state groups
-        # to non delta versions.
-        for new_state_edge in new_state_edges:
+        # Now we turn the state groups that reference to-be-deleted state
+        # groups to non delta versions.
+        for sg in remaining_state_groups:
+            logger.debug("[purge] de-delta-ing remaining state group %s", sg)
             curr_state = self._get_state_groups_from_groups_txn(
-                txn, [new_state_edge], types=None
+                txn, [sg], types=None
             )
-            curr_state = curr_state[new_state_edge]
+            curr_state = curr_state[sg]
 
             self._simple_delete_txn(
                 txn,
                 table="state_groups_state",
                 keyvalues={
-                    "state_group": new_state_edge,
+                    "state_group": sg,
                 }
             )
 
@@ -2098,7 +2158,7 @@ class EventsStore(SQLBaseStore):
                 txn,
                 table="state_group_edges",
                 keyvalues={
-                    "state_group": new_state_edge,
+                    "state_group": sg,
                 }
             )
 
@@ -2107,7 +2167,7 @@ class EventsStore(SQLBaseStore):
                 table="state_groups_state",
                 values=[
                     {
-                        "state_group": new_state_edge,
+                        "state_group": sg,
                         "room_id": room_id,
                         "type": key[0],
                         "state_key": key[1],
@@ -2117,6 +2177,7 @@ class EventsStore(SQLBaseStore):
                 ],
             )
 
+        logger.debug("[purge] removing redundant state groups")
         txn.executemany(
             "DELETE FROM state_groups_state WHERE state_group = ?",
             state_rows
@@ -2125,22 +2186,21 @@ class EventsStore(SQLBaseStore):
             "DELETE FROM state_groups WHERE id = ?",
             state_rows
         )
+
         # Delete all non-state
+        logger.debug("[purge] removing events from event_to_state_groups")
         txn.executemany(
             "DELETE FROM event_to_state_groups WHERE event_id = ?",
             [(event_id,) for event_id, _ in event_rows]
         )
 
+        logger.debug("[purge] updating room_depth")
         txn.execute(
             "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
             (topological_ordering, room_id,)
         )
 
         # Delete all remote non-state events
-        to_delete = [
-            (event_id,) for event_id, state_key in event_rows
-            if state_key is None and not self.hs.is_mine_id(event_id)
-        ]
         for table in (
             "events",
             "event_json",
@@ -2156,16 +2216,15 @@ class EventsStore(SQLBaseStore):
             "event_signatures",
             "rejections",
         ):
+            logger.debug("[purge] removing remote non-state events from %s", table)
+
             txn.executemany(
                 "DELETE FROM %s WHERE event_id = ?" % (table,),
                 to_delete
             )
 
-        txn.executemany(
-            "DELETE FROM events WHERE event_id = ?",
-            to_delete
-        )
         # Mark all state and own events as outliers
+        logger.debug("[purge] marking remaining events as outliers")
         txn.executemany(
             "UPDATE events SET outlier = ?"
             " WHERE event_id = ?",
@@ -2175,6 +2234,8 @@ class EventsStore(SQLBaseStore):
             ]
         )
 
+        logger.info("[purge] done")
+
     @defer.inlineCallbacks
     def is_event_after(self, event_id1, event_id2):
         """Returns True if event_id1 is after event_id2 in the stream
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 353a135c4e..0a819d32c5 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -16,6 +16,7 @@
 from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
 from synapse.push.baserules import list_with_base_rules
+from synapse.api.constants import EventTypes
 from twisted.internet import defer
 
 import logging
@@ -184,11 +185,23 @@ class PushRuleStore(SQLBaseStore):
             if uid in local_users_in_room:
                 user_ids.add(uid)
 
+        forgotten = yield self.who_forgot_in_room(
+            event.room_id, on_invalidate=cache_context.invalidate,
+        )
+
+        for row in forgotten:
+            user_id = row["user_id"]
+            event_id = row["event_id"]
+
+            mem_id = current_state_ids.get((EventTypes.Member, user_id), None)
+            if event_id == mem_id:
+                user_ids.discard(user_id)
+
         rules_by_user = yield self.bulk_get_push_rules(
             user_ids, on_invalidate=cache_context.invalidate,
         )
 
-        rules_by_user = {k: v for k, v in rules_by_user.iteritems() if v is not None}
+        rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None}
 
         defer.returnValue(rules_by_user)
 
@@ -398,8 +411,7 @@ class PushRuleStore(SQLBaseStore):
         with self._push_rules_stream_id_gen.get_next() as ids:
             stream_id, event_stream_ordering = ids
             yield self.runInteraction(
-                "delete_push_rule", delete_push_rule_txn, stream_id,
-                event_stream_ordering,
+                "delete_push_rule", delete_push_rule_txn, stream_id, event_stream_ordering
             )
 
     @defer.inlineCallbacks
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index e4c56cc175..5d543652bb 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -16,7 +16,7 @@
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
-from synapse.util.caches.descriptors import cached
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
 from ._base import SQLBaseStore
 from .engines import PostgresEngine, Sqlite3Engine
@@ -33,6 +33,11 @@ OpsLevel = collections.namedtuple(
     ("ban_level", "kick_level", "redact_level",)
 )
 
+RatelimitOverride = collections.namedtuple(
+    "RatelimitOverride",
+    ("messages_per_second", "burst_count",)
+)
+
 
 class RoomStore(SQLBaseStore):
 
@@ -473,3 +478,32 @@ class RoomStore(SQLBaseStore):
         return self.runInteraction(
             "get_all_new_public_rooms", get_all_new_public_rooms
         )
+
+    @cachedInlineCallbacks(max_entries=10000)
+    def get_ratelimit_for_user(self, user_id):
+        """Check if there are any overrides for ratelimiting for the given
+        user
+
+        Args:
+            user_id (str)
+
+        Returns:
+            RatelimitOverride if there is an override, else None. If the contents
+            of RatelimitOverride are None or 0 then ratelimitng has been
+            disabled for that user entirely.
+        """
+        row = yield self._simple_select_one(
+            table="ratelimit_override",
+            keyvalues={"user_id": user_id},
+            retcols=("messages_per_second", "burst_count"),
+            allow_none=True,
+            desc="get_ratelimit_for_user",
+        )
+
+        if row:
+            defer.returnValue(RatelimitOverride(
+                messages_per_second=row["messages_per_second"],
+                burst_count=row["burst_count"],
+            ))
+        else:
+            defer.returnValue(None)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index ad3c9b06d9..404f3583eb 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -421,9 +421,13 @@ class RoomMemberStore(SQLBaseStore):
         # We check if we have any of the member event ids in the event cache
         # before we ask the DB
 
+        # We don't update the event cache hit ratio as it completely throws off
+        # the hit ratio counts. After all, we don't populate the cache if we
+        # miss it here
         event_map = self._get_events_from_cache(
             member_event_ids,
             allow_rejected=False,
+            update_metrics=False,
         )
 
         missing_member_event_ids = []
@@ -530,7 +534,7 @@ class RoomMemberStore(SQLBaseStore):
         assert state_group is not None
 
         joined_hosts = set()
-        for (etype, state_key), event_id in current_state_ids.items():
+        for etype, state_key in current_state_ids:
             if etype == EventTypes.Member:
                 try:
                     host = get_domain_from_id(state_key)
@@ -541,6 +545,7 @@ class RoomMemberStore(SQLBaseStore):
                 if host in joined_hosts:
                     continue
 
+                event_id = current_state_ids[(etype, state_key)]
                 event = yield self.get_event(event_id, allow_none=True)
                 if event and event.content["membership"] == Membership.JOIN:
                     joined_hosts.add(intern_string(host))
diff --git a/synapse/storage/schema/delta/37/remove_auth_idx.py b/synapse/storage/schema/delta/37/remove_auth_idx.py
index 784f3b348f..20ad8bd5a6 100644
--- a/synapse/storage/schema/delta/37/remove_auth_idx.py
+++ b/synapse/storage/schema/delta/37/remove_auth_idx.py
@@ -36,6 +36,10 @@ DROP INDEX IF EXISTS transactions_have_ref;
 -- and is used incredibly rarely.
 DROP INDEX IF EXISTS events_order_topo_stream_room;
 
+-- an equivalent index to this actually gets re-created in delta 41, because it
+-- turned out that deleting it wasn't a great plan :/. In any case, let's
+-- delete it here, and delta 41 will create a new one with an added UNIQUE
+-- constraint
 DROP INDEX IF EXISTS event_search_ev_idx;
 """
 
diff --git a/synapse/storage/schema/delta/41/event_search_event_id_idx.sql b/synapse/storage/schema/delta/41/event_search_event_id_idx.sql
new file mode 100644
index 0000000000..5d9cfecf36
--- /dev/null
+++ b/synapse/storage/schema/delta/41/event_search_event_id_idx.sql
@@ -0,0 +1,17 @@
+/* Copyright 2017 Vector Creations Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT into background_updates (update_name, progress_json)
+    VALUES ('event_search_event_id_idx', '{}');
diff --git a/synapse/storage/schema/delta/41/ratelimit.sql b/synapse/storage/schema/delta/41/ratelimit.sql
new file mode 100644
index 0000000000..a194bf0238
--- /dev/null
+++ b/synapse/storage/schema/delta/41/ratelimit.sql
@@ -0,0 +1,22 @@
+/* Copyright 2017 Vector Creations Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE ratelimit_override (
+    user_id TEXT NOT NULL,
+    messages_per_second BIGINT,
+    burst_count BIGINT
+);
+
+CREATE UNIQUE INDEX ratelimit_override_idx ON ratelimit_override(user_id);
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index a16afa8df5..85acf2ad1e 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -227,6 +227,18 @@ class StateStore(SQLBaseStore):
                     ],
                 )
 
+            # Prefill the state group cache with this group.
+            # It's fine to use the sequence like this as the state group map
+            # is immutable. (If the map wasn't immutable then this prefill could
+            # race with another update)
+            txn.call_after(
+                self._state_group_cache.update,
+                self._state_group_cache.sequence,
+                key=context.state_group,
+                value=dict(context.current_state_ids),
+                full=True,
+            )
+
         self._simple_insert_many_txn(
             txn,
             table="event_to_state_groups",