summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py6
-rw-r--r--synapse/storage/client_ips.py2
-rw-r--r--synapse/storage/devices.py8
-rw-r--r--synapse/storage/event_federation.py22
-rw-r--r--synapse/storage/event_push_actions.py13
-rw-r--r--synapse/storage/events.py27
-rw-r--r--synapse/storage/roommember.py52
-rw-r--r--synapse/storage/schema/delta/50/make_event_content_nullable.py92
-rw-r--r--synapse/storage/schema/full_schemas/16/event_edges.sql3
-rw-r--r--synapse/storage/schema/full_schemas/16/im.sql7
-rw-r--r--synapse/storage/state.py147
-rw-r--r--synapse/storage/transactions.py8
12 files changed, 281 insertions, 106 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 1d41d8d445..44f37b4c1e 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -311,6 +311,12 @@ class SQLBaseStore(object):
         after_callbacks = []
         exception_callbacks = []
 
+        if LoggingContext.current_context() == LoggingContext.sentinel:
+            logger.warn(
+                "Starting db txn '%s' from sentinel context",
+                desc,
+            )
+
         try:
             result = yield self.runWithConnection(
                 self._new_transaction,
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 77ae10da3d..b8cefd43d6 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -102,7 +102,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
                 to_update,
             )
 
-        run_as_background_process(
+        return run_as_background_process(
             "update_client_ips", update,
         )
 
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index cc3cdf2ebc..c0943ecf91 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -21,6 +21,7 @@ from canonicaljson import json
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
 
 from ._base import Cache, SQLBaseStore
@@ -711,6 +712,9 @@ class DeviceStore(SQLBaseStore):
 
             logger.info("Pruned %d device list outbound pokes", txn.rowcount)
 
-        return self.runInteraction(
-            "_prune_old_outbound_device_pokes", _prune_txn
+        return run_as_background_process(
+            "prune_old_outbound_device_pokes",
+            self.runInteraction,
+            "_prune_old_outbound_device_pokes",
+            _prune_txn,
         )
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 8d366d1b91..5d3ee90017 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -23,6 +23,7 @@ from unpaddedbase64 import encode_base64
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.events import EventsWorkerStore
 from synapse.storage.signatures import SignatureWorkerStore
@@ -113,9 +114,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
         sql = (
             "SELECT b.event_id, MAX(e.depth) FROM events as e"
             " INNER JOIN event_edges as g"
-            " ON g.event_id = e.event_id AND g.room_id = e.room_id"
+            " ON g.event_id = e.event_id"
             " INNER JOIN event_backward_extremities as b"
-            " ON g.prev_event_id = b.event_id AND g.room_id = b.room_id"
+            " ON g.prev_event_id = b.event_id"
             " WHERE b.room_id = ? AND g.is_state is ?"
             " GROUP BY b.event_id"
         )
@@ -329,8 +330,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             "SELECT depth, prev_event_id FROM event_edges"
             " INNER JOIN events"
             " ON prev_event_id = events.event_id"
-            " AND event_edges.room_id = events.room_id"
-            " WHERE event_edges.room_id = ? AND event_edges.event_id = ?"
+            " WHERE event_edges.event_id = ?"
             " AND event_edges.is_state = ?"
             " LIMIT ?"
         )
@@ -364,7 +364,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
 
             txn.execute(
                 query,
-                (room_id, event_id, False, limit - len(event_results))
+                (event_id, False, limit - len(event_results))
             )
 
             for row in txn:
@@ -401,7 +401,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
 
         query = (
             "SELECT prev_event_id FROM event_edges "
-            "WHERE room_id = ? AND event_id = ? AND is_state = ? "
+            "WHERE event_id = ? AND is_state = ? "
             "LIMIT ?"
         )
 
@@ -410,7 +410,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             for event_id in front:
                 txn.execute(
                     query,
-                    (room_id, event_id, False, limit - len(event_results))
+                    (event_id, False, limit - len(event_results))
                 )
 
                 for e_id, in txn:
@@ -446,7 +446,7 @@ class EventFederationStore(EventFederationWorkerStore):
         )
 
         hs.get_clock().looping_call(
-            self._delete_old_forward_extrem_cache, 60 * 60 * 1000
+            self._delete_old_forward_extrem_cache, 60 * 60 * 1000,
         )
 
     def _update_min_depth_for_room_txn(self, txn, room_id, depth):
@@ -548,9 +548,11 @@ class EventFederationStore(EventFederationWorkerStore):
                 sql,
                 (self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
             )
-        return self.runInteraction(
+        return run_as_background_process(
+            "delete_old_forward_extrem_cache",
+            self.runInteraction,
             "_delete_old_forward_extrem_cache",
-            _delete_old_forward_extrem_cache_txn
+            _delete_old_forward_extrem_cache_txn,
         )
 
     def clean_room_for_join(self, room_id):
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 29b511ae5e..6840320641 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -22,6 +22,7 @@ from canonicaljson import json
 
 from twisted.internet import defer
 
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import LoggingTransaction, SQLBaseStore
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 
@@ -458,11 +459,12 @@ class EventPushActionsWorkerStore(SQLBaseStore):
                 "Error removing push actions after event persistence failure",
             )
 
-    @defer.inlineCallbacks
     def _find_stream_orderings_for_times(self):
-        yield self.runInteraction(
+        return run_as_background_process(
+            "event_push_action_stream_orderings",
+            self.runInteraction,
             "_find_stream_orderings_for_times",
-            self._find_stream_orderings_for_times_txn
+            self._find_stream_orderings_for_times_txn,
         )
 
     def _find_stream_orderings_for_times_txn(self, txn):
@@ -604,7 +606,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
 
         self._doing_notif_rotation = False
         self._rotate_notif_loop = self._clock.looping_call(
-            self._rotate_notifs, 30 * 60 * 1000
+            self._start_rotate_notifs, 30 * 60 * 1000,
         )
 
     def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts,
@@ -787,6 +789,9 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
             WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
         """, (room_id, user_id, stream_ordering))
 
+    def _start_rotate_notifs(self):
+        return run_as_background_process("rotate_notifs", self._rotate_notifs)
+
     @defer.inlineCallbacks
     def _rotate_notifs(self):
         if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 906a405031..2f482af3a1 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -142,15 +142,14 @@ class _EventPeristenceQueue(object):
             try:
                 queue = self._get_drainining_queue(room_id)
                 for item in queue:
-                    # handle_queue_loop runs in the sentinel logcontext, so
-                    # there is no need to preserve_fn when running the
-                    # callbacks on the deferred.
                     try:
                         ret = yield per_item_callback(item)
+                    except Exception:
+                        with PreserveLoggingContext():
+                            item.deferred.errback()
+                    else:
                         with PreserveLoggingContext():
                             item.deferred.callback(ret)
-                    except Exception:
-                        item.deferred.errback()
             finally:
                 queue = self._event_persist_queues.pop(room_id, None)
                 if queue:
@@ -521,7 +520,6 @@ class EventsStore(EventsWorkerStore):
             iterable=list(new_latest_event_ids),
             retcols=["prev_event_id"],
             keyvalues={
-                "room_id": room_id,
                 "is_state": False,
             },
             desc="_calculate_new_extremeties",
@@ -575,11 +573,13 @@ class EventsStore(EventsWorkerStore):
 
         for ev, ctx in events_context:
             if ctx.state_group is None:
-                # I don't think this can happen, but let's double-check
-                raise Exception(
-                    "Context for new extremity event %s has no state "
-                    "group" % (ev.event_id, ),
-                )
+                # This should only happen for outlier events.
+                if not ev.internal_metadata.is_outlier():
+                    raise Exception(
+                        "Context for new event %s has no state "
+                        "group" % (ev.event_id, ),
+                    )
+                continue
 
             if ctx.state_group in state_groups_map:
                 continue
@@ -607,7 +607,7 @@ class EventsStore(EventsWorkerStore):
         for event_id in new_latest_event_ids:
             # First search in the list of new events we're adding.
             for ev, ctx in events_context:
-                if event_id == ev.event_id:
+                if event_id == ev.event_id and ctx.state_group is not None:
                     event_id_to_state_group[event_id] = ctx.state_group
                     break
             else:
@@ -1137,7 +1137,7 @@ class EventsStore(EventsWorkerStore):
         ):
             txn.executemany(
                 "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
-                [(ev.event_id,) for ev, _ in events_and_contexts]
+                [(ev.room_id, ev.event_id) for ev, _ in events_and_contexts]
             )
 
     def _store_event_txn(self, txn, events_and_contexts):
@@ -1188,7 +1188,6 @@ class EventsStore(EventsWorkerStore):
                     "type": event.type,
                     "processed": True,
                     "outlier": event.internal_metadata.is_outlier(),
-                    "content": encode_json(event.content).decode("UTF-8"),
                     "origin_server_ts": int(event.origin_server_ts),
                     "received_ts": self._clock.time_msec(),
                     "sender": event.sender,
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 01697ab2c9..027bf8c85e 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -461,6 +461,30 @@ class RoomMemberWorkerStore(EventsWorkerStore):
     def _get_joined_hosts_cache(self, room_id):
         return _JoinedHostsCache(self, room_id)
 
+    @cachedInlineCallbacks(num_args=2)
+    def did_forget(self, user_id, room_id):
+        """Returns whether user_id has elected to discard history for room_id.
+
+        Returns False if they have since re-joined."""
+        def f(txn):
+            sql = (
+                "SELECT"
+                "  COUNT(*)"
+                " FROM"
+                "  room_memberships"
+                " WHERE"
+                "  user_id = ?"
+                " AND"
+                "  room_id = ?"
+                " AND"
+                "  forgotten = 0"
+            )
+            txn.execute(sql, (user_id, room_id))
+            rows = txn.fetchall()
+            return rows[0][0]
+        count = yield self.runInteraction("did_forget_membership", f)
+        defer.returnValue(count == 0)
+
 
 class RoomMemberStore(RoomMemberWorkerStore):
     def __init__(self, db_conn, hs):
@@ -568,32 +592,10 @@ class RoomMemberStore(RoomMemberWorkerStore):
             )
             txn.execute(sql, (user_id, room_id))
 
-            txn.call_after(self.did_forget.invalidate, (user_id, room_id))
-        return self.runInteraction("forget_membership", f)
-
-    @cachedInlineCallbacks(num_args=2)
-    def did_forget(self, user_id, room_id):
-        """Returns whether user_id has elected to discard history for room_id.
-
-        Returns False if they have since re-joined."""
-        def f(txn):
-            sql = (
-                "SELECT"
-                "  COUNT(*)"
-                " FROM"
-                "  room_memberships"
-                " WHERE"
-                "  user_id = ?"
-                " AND"
-                "  room_id = ?"
-                " AND"
-                "  forgotten = 0"
+            self._invalidate_cache_and_stream(
+                txn, self.did_forget, (user_id, room_id,),
             )
-            txn.execute(sql, (user_id, room_id))
-            rows = txn.fetchall()
-            return rows[0][0]
-        count = yield self.runInteraction("did_forget_membership", f)
-        defer.returnValue(count == 0)
+        return self.runInteraction("forget_membership", f)
 
     @defer.inlineCallbacks
     def _background_add_membership_profile(self, progress, batch_size):
diff --git a/synapse/storage/schema/delta/50/make_event_content_nullable.py b/synapse/storage/schema/delta/50/make_event_content_nullable.py
new file mode 100644
index 0000000000..7d27342e39
--- /dev/null
+++ b/synapse/storage/schema/delta/50/make_event_content_nullable.py
@@ -0,0 +1,92 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+We want to stop populating 'event.content', so we need to make it nullable.
+
+If this has to be rolled back, then the following should populate the missing data:
+
+Postgres:
+
+    UPDATE events SET content=(ej.json::json)->'content' FROM event_json ej
+    WHERE ej.event_id = events.event_id AND
+        stream_ordering < (
+            SELECT stream_ordering FROM events WHERE content IS NOT NULL
+            ORDER BY stream_ordering LIMIT 1
+        );
+
+    UPDATE events SET content=(ej.json::json)->'content' FROM event_json ej
+    WHERE ej.event_id = events.event_id AND
+        stream_ordering > (
+            SELECT stream_ordering FROM events WHERE content IS NOT NULL
+            ORDER BY stream_ordering DESC LIMIT 1
+        );
+
+SQLite:
+
+    UPDATE events SET content=(
+        SELECT json_extract(json,'$.content') FROM event_json ej
+        WHERE ej.event_id = events.event_id
+    )
+    WHERE
+        stream_ordering < (
+            SELECT stream_ordering FROM events WHERE content IS NOT NULL
+            ORDER BY stream_ordering LIMIT 1
+        )
+        OR stream_ordering > (
+            SELECT stream_ordering FROM events WHERE content IS NOT NULL
+            ORDER BY stream_ordering DESC LIMIT 1
+        );
+
+"""
+
+import logging
+
+from synapse.storage.engines import PostgresEngine
+
+logger = logging.getLogger(__name__)
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+    pass
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+    if isinstance(database_engine, PostgresEngine):
+        cur.execute("""
+            ALTER TABLE events ALTER COLUMN content DROP NOT NULL;
+        """)
+        return
+
+    # sqlite is an arse about this. ref: https://www.sqlite.org/lang_altertable.html
+
+    cur.execute("SELECT sql FROM sqlite_master WHERE tbl_name='events' AND type='table'")
+    (oldsql,) = cur.fetchone()
+
+    sql = oldsql.replace("content TEXT NOT NULL", "content TEXT")
+    if sql == oldsql:
+        raise Exception("Couldn't find null constraint to drop in %s" % oldsql)
+
+    logger.info("Replacing definition of 'events' with: %s", sql)
+
+    cur.execute("PRAGMA schema_version")
+    (oldver,) = cur.fetchone()
+    cur.execute("PRAGMA writable_schema=ON")
+    cur.execute(
+        "UPDATE sqlite_master SET sql=? WHERE tbl_name='events' AND type='table'",
+        (sql, ),
+    )
+    cur.execute("PRAGMA schema_version=%i" % (oldver+1,))
+    cur.execute("PRAGMA writable_schema=OFF")
diff --git a/synapse/storage/schema/full_schemas/16/event_edges.sql b/synapse/storage/schema/full_schemas/16/event_edges.sql
index 52eec88357..6b5a5a88fa 100644
--- a/synapse/storage/schema/full_schemas/16/event_edges.sql
+++ b/synapse/storage/schema/full_schemas/16/event_edges.sql
@@ -37,7 +37,8 @@ CREATE TABLE IF NOT EXISTS event_edges(
     event_id TEXT NOT NULL,
     prev_event_id TEXT NOT NULL,
     room_id TEXT NOT NULL,
-    is_state BOOL NOT NULL,
+    is_state BOOL NOT NULL,  -- true if this is a prev_state edge rather than a regular
+                             -- event dag edge.
     UNIQUE (event_id, prev_event_id, room_id, is_state)
 );
 
diff --git a/synapse/storage/schema/full_schemas/16/im.sql b/synapse/storage/schema/full_schemas/16/im.sql
index ba5346806e..5f5cb8d01d 100644
--- a/synapse/storage/schema/full_schemas/16/im.sql
+++ b/synapse/storage/schema/full_schemas/16/im.sql
@@ -19,7 +19,12 @@ CREATE TABLE IF NOT EXISTS events(
     event_id TEXT NOT NULL,
     type TEXT NOT NULL,
     room_id TEXT NOT NULL,
-    content TEXT NOT NULL,
+
+    -- 'content' used to be created NULLable, but as of delta 50 we drop that constraint.
+    -- the hack we use to drop the constraint doesn't work for an in-memory sqlite
+    -- database, which breaks the sytests. Hence, we no longer make it nullable.
+    content TEXT,
+
     unrecognized_keys TEXT,
     processed BOOL NOT NULL,
     outlier BOOL NOT NULL,
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 89a05c4618..b27b3ae144 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -186,7 +186,17 @@ class StateGroupWorkerStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def _get_state_groups_from_groups(self, groups, types):
-        """Returns dictionary state_group -> (dict of (type, state_key) -> event id)
+        """Returns the state groups for a given set of groups, filtering on
+        types of state events.
+
+        Args:
+            groups(list[int]): list of state group IDs to query
+            types (Iterable[str, str|None]|None): list of 2-tuples of the form
+                (`type`, `state_key`), where a `state_key` of `None` matches all
+                state_keys for the `type`. If None, all types are returned.
+
+        Returns:
+            dictionary state_group -> (dict of (type, state_key) -> event id)
         """
         results = {}
 
@@ -200,8 +210,11 @@ class StateGroupWorkerStore(SQLBaseStore):
 
         defer.returnValue(results)
 
-    def _get_state_groups_from_groups_txn(self, txn, groups, types=None):
+    def _get_state_groups_from_groups_txn(
+        self, txn, groups, types=None,
+    ):
         results = {group: {} for group in groups}
+
         if types is not None:
             types = list(set(types))  # deduplicate types list
 
@@ -239,7 +252,7 @@ class StateGroupWorkerStore(SQLBaseStore):
             # Turns out that postgres doesn't like doing a list of OR's and
             # is about 1000x slower, so we just issue a query for each specific
             # type seperately.
-            if types:
+            if types is not None:
                 clause_to_args = [
                     (
                         "AND type = ? AND state_key = ?",
@@ -278,6 +291,7 @@ class StateGroupWorkerStore(SQLBaseStore):
                     else:
                         where_clauses.append("(type = ? AND state_key = ?)")
                         where_args.extend([typ[0], typ[1]])
+
                 where_clause = "AND (%s)" % (" OR ".join(where_clauses))
             else:
                 where_clause = ""
@@ -332,16 +346,20 @@ class StateGroupWorkerStore(SQLBaseStore):
         return results
 
     @defer.inlineCallbacks
-    def get_state_for_events(self, event_ids, types):
+    def get_state_for_events(self, event_ids, types, filtered_types=None):
         """Given a list of event_ids and type tuples, return a list of state
         dicts for each event. The state dicts will only have the type/state_keys
         that are in the `types` list.
 
         Args:
-            event_ids (list)
-            types (list): List of (type, state_key) tuples which are used to
-                filter the state fetched. `state_key` may be None, which matches
-                any `state_key`
+            event_ids (list[string])
+            types (list[(str, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.
+                May be None, which matches any key.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
 
         Returns:
             deferred: A list of dicts corresponding to the event_ids given.
@@ -352,7 +370,7 @@ class StateGroupWorkerStore(SQLBaseStore):
         )
 
         groups = set(itervalues(event_to_groups))
-        group_to_state = yield self._get_state_for_groups(groups, types)
+        group_to_state = yield self._get_state_for_groups(groups, types, filtered_types)
 
         state_event_map = yield self.get_events(
             [ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)],
@@ -371,15 +389,19 @@ class StateGroupWorkerStore(SQLBaseStore):
         defer.returnValue({event: event_to_state[event] for event in event_ids})
 
     @defer.inlineCallbacks
-    def get_state_ids_for_events(self, event_ids, types=None):
+    def get_state_ids_for_events(self, event_ids, types=None, filtered_types=None):
         """
         Get the state dicts corresponding to a list of events
 
         Args:
             event_ids(list(str)): events whose state should be returned
-            types(list[(str, str)]|None): List of (type, state_key) tuples
-                which are used to filter the state fetched. May be None, which
-                matches any key
+            types(list[(str, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.
+                May be None, which matches any key.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
 
         Returns:
             A deferred dict from event_id -> (type, state_key) -> state_event
@@ -389,7 +411,7 @@ class StateGroupWorkerStore(SQLBaseStore):
         )
 
         groups = set(itervalues(event_to_groups))
-        group_to_state = yield self._get_state_for_groups(groups, types)
+        group_to_state = yield self._get_state_for_groups(groups, types, filtered_types)
 
         event_to_state = {
             event_id: group_to_state[group]
@@ -399,37 +421,45 @@ class StateGroupWorkerStore(SQLBaseStore):
         defer.returnValue({event: event_to_state[event] for event in event_ids})
 
     @defer.inlineCallbacks
-    def get_state_for_event(self, event_id, types=None):
+    def get_state_for_event(self, event_id, types=None, filtered_types=None):
         """
         Get the state dict corresponding to a particular event
 
         Args:
             event_id(str): event whose state should be returned
-            types(list[(str, str)]|None): List of (type, state_key) tuples
-                which are used to filter the state fetched. May be None, which
-                matches any key
+            types(list[(str, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.
+                May be None, which matches any key.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
 
         Returns:
             A deferred dict from (type, state_key) -> state_event
         """
-        state_map = yield self.get_state_for_events([event_id], types)
+        state_map = yield self.get_state_for_events([event_id], types, filtered_types)
         defer.returnValue(state_map[event_id])
 
     @defer.inlineCallbacks
-    def get_state_ids_for_event(self, event_id, types=None):
+    def get_state_ids_for_event(self, event_id, types=None, filtered_types=None):
         """
         Get the state dict corresponding to a particular event
 
         Args:
             event_id(str): event whose state should be returned
-            types(list[(str, str)]|None): List of (type, state_key) tuples
-                which are used to filter the state fetched. May be None, which
-                matches any key
+            types(list[(str, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.
+                May be None, which matches any key.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
 
         Returns:
             A deferred dict from (type, state_key) -> state_event
         """
-        state_map = yield self.get_state_ids_for_events([event_id], types)
+        state_map = yield self.get_state_ids_for_events([event_id], types, filtered_types)
         defer.returnValue(state_map[event_id])
 
     @cached(max_entries=50000)
@@ -460,56 +490,73 @@ class StateGroupWorkerStore(SQLBaseStore):
 
         defer.returnValue({row["event_id"]: row["state_group"] for row in rows})
 
-    def _get_some_state_from_cache(self, group, types):
+    def _get_some_state_from_cache(self, group, types, filtered_types=None):
         """Checks if group is in cache. See `_get_state_for_groups`
 
-        Returns 3-tuple (`state_dict`, `missing_types`, `got_all`).
-        `missing_types` is the list of types that aren't in the cache for that
-        group. `got_all` is a bool indicating if we successfully retrieved all
+        Args:
+            group(int): The state group to lookup
+            types(list[str, str|None]): List of 2-tuples of the form
+                (`type`, `state_key`), where a `state_key` of `None` matches all
+                state_keys for the `type`.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
+
+        Returns 2-tuple (`state_dict`, `got_all`).
+        `got_all` is a bool indicating if we successfully retrieved all
         requests state from the cache, if False we need to query the DB for the
         missing state.
-
-        Args:
-            group: The state group to lookup
-            types (list): List of 2-tuples of the form (`type`, `state_key`),
-                where a `state_key` of `None` matches all state_keys for the
-                `type`.
         """
         is_all, known_absent, state_dict_ids = self._state_group_cache.get(group)
 
         type_to_key = {}
-        missing_types = set()
+
+        # tracks whether any of ourrequested types are missing from the cache
+        missing_types = False
 
         for typ, state_key in types:
             key = (typ, state_key)
-            if state_key is None:
+
+            if (
+                state_key is None or
+                (filtered_types is not None and typ not in filtered_types)
+            ):
                 type_to_key[typ] = None
-                missing_types.add(key)
+                # we mark the type as missing from the cache because
+                # when the cache was populated it might have been done with a
+                # restricted set of state_keys, so the wildcard will not work
+                # and the cache may be incomplete.
+                missing_types = True
             else:
                 if type_to_key.get(typ, object()) is not None:
                     type_to_key.setdefault(typ, set()).add(state_key)
 
                 if key not in state_dict_ids and key not in known_absent:
-                    missing_types.add(key)
+                    missing_types = True
 
         sentinel = object()
 
         def include(typ, state_key):
             valid_state_keys = type_to_key.get(typ, sentinel)
             if valid_state_keys is sentinel:
-                return False
+                return filtered_types is not None and typ not in filtered_types
             if valid_state_keys is None:
                 return True
             if state_key in valid_state_keys:
                 return True
             return False
 
-        got_all = is_all or not missing_types
+        got_all = is_all
+        if not got_all:
+            # the cache is incomplete. We may still have got all the results we need, if
+            # we don't have any wildcards in the match list.
+            if not missing_types and filtered_types is None:
+                got_all = True
 
         return {
             k: v for k, v in iteritems(state_dict_ids)
             if include(k[0], k[1])
-        }, missing_types, got_all
+        }, got_all
 
     def _get_all_state_from_cache(self, group):
         """Checks if group is in cache. See `_get_state_for_groups`
@@ -526,7 +573,7 @@ class StateGroupWorkerStore(SQLBaseStore):
         return state_dict_ids, is_all
 
     @defer.inlineCallbacks
-    def _get_state_for_groups(self, groups, types=None):
+    def _get_state_for_groups(self, groups, types=None, filtered_types=None):
         """Gets the state at each of a list of state groups, optionally
         filtering by type/state_key
 
@@ -540,6 +587,9 @@ class StateGroupWorkerStore(SQLBaseStore):
                 Otherwise, each entry should be a `(type, state_key)` tuple to
                 include in the response. A `state_key` of None is a wildcard
                 meaning that we require all state with that type.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
 
         Returns:
             Deferred[dict[int, dict[(type, state_key), EventBase]]]
@@ -551,8 +601,8 @@ class StateGroupWorkerStore(SQLBaseStore):
         missing_groups = []
         if types is not None:
             for group in set(groups):
-                state_dict_ids, _, got_all = self._get_some_state_from_cache(
-                    group, types,
+                state_dict_ids, got_all = self._get_some_state_from_cache(
+                    group, types, filtered_types
                 )
                 results[group] = state_dict_ids
 
@@ -579,13 +629,13 @@ class StateGroupWorkerStore(SQLBaseStore):
             # cache. Hence, if we are doing a wildcard lookup, populate the
             # cache fully so that we can do an efficient lookup next time.
 
-            if types and any(k is None for (t, k) in types):
+            if filtered_types or (types and any(k is None for (t, k) in types)):
                 types_to_fetch = None
             else:
                 types_to_fetch = types
 
             group_to_state_dict = yield self._get_state_groups_from_groups(
-                missing_groups, types_to_fetch,
+                missing_groups, types_to_fetch
             )
 
             for group, group_state_dict in iteritems(group_to_state_dict):
@@ -595,7 +645,10 @@ class StateGroupWorkerStore(SQLBaseStore):
                 if types:
                     for k, v in iteritems(group_state_dict):
                         (typ, _) = k
-                        if k in types or (typ, None) in types:
+                        if (
+                            (k in types or (typ, None) in types) or
+                            (filtered_types and typ not in filtered_types)
+                        ):
                             state_dict[k] = v
                 else:
                     state_dict.update(group_state_dict)
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index c3bc94f56d..428e7fa36e 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -22,6 +22,7 @@ from canonicaljson import encode_canonical_json, json
 
 from twisted.internet import defer
 
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.caches.descriptors import cached
 
 from ._base import SQLBaseStore
@@ -57,7 +58,7 @@ class TransactionStore(SQLBaseStore):
     def __init__(self, db_conn, hs):
         super(TransactionStore, self).__init__(db_conn, hs)
 
-        self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000)
+        self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)
 
     def get_received_txn_response(self, transaction_id, origin):
         """For an incoming transaction from a given origin, check if we have
@@ -271,6 +272,11 @@ class TransactionStore(SQLBaseStore):
         txn.execute(query, (self._clock.time_msec(),))
         return self.cursor_to_dict(txn)
 
+    def _start_cleanup_transactions(self):
+        return run_as_background_process(
+            "cleanup_transactions", self._cleanup_transactions,
+        )
+
     def _cleanup_transactions(self):
         now = self._clock.time_msec()
         month_ago = now - 30 * 24 * 60 * 60 * 1000