summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorAmber Brown <hawkowl@atleastfornow.net>2018-11-01 21:31:46 +1100
committerAmber Brown <hawkowl@atleastfornow.net>2018-11-01 21:31:46 +1100
commit907e6da5be20523bd1d0a14c196289a73182aa65 (patch)
tree30e700dadabd415c169c3850aaf73aef817ef7e2 /synapse/storage
parentMerge pull request #4072 from steamp0rt/patch-1 (diff)
parentchangelog (diff)
downloadsynapse-907e6da5be20523bd1d0a14c196289a73182aa65.tar.xz
Merge branch 'release-v0.33.8'
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py4
-rw-r--r--synapse/storage/directory.py2
-rw-r--r--synapse/storage/events.py47
-rw-r--r--synapse/storage/keys.py2
-rw-r--r--synapse/storage/monthly_active_users.py71
-rw-r--r--synapse/storage/pusher.py2
-rw-r--r--synapse/storage/registration.py37
-rw-r--r--synapse/storage/signatures.py2
-rw-r--r--synapse/storage/state.py845
-rw-r--r--synapse/storage/transactions.py2
10 files changed, 635 insertions, 379 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index be61147b9b..d9d0255d0b 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -18,7 +18,7 @@ import threading
 import time
 
 from six import PY2, iteritems, iterkeys, itervalues
-from six.moves import intern, range
+from six.moves import builtins, intern, range
 
 from canonicaljson import json
 from prometheus_client import Histogram
@@ -1233,7 +1233,7 @@ def db_to_json(db_content):
 
     # psycopg2 on Python 2 returns buffer objects, which we need to cast to
     # bytes to decode
-    if PY2 and isinstance(db_content, buffer):
+    if PY2 and isinstance(db_content, builtins.buffer):
         db_content = bytes(db_content)
 
     # Decode it to a Unicode string before feeding it to json.loads, so we
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index cfb687cb53..61a029a53c 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -90,7 +90,7 @@ class DirectoryWorkerStore(SQLBaseStore):
 class DirectoryStore(DirectoryWorkerStore):
     @defer.inlineCallbacks
     def create_room_alias_association(self, room_alias, room_id, servers, creator=None):
-        """ Creates an associatin between  a room alias and room_id/servers
+        """ Creates an association between a room alias and room_id/servers
 
         Args:
             room_alias (RoomAlias)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 03cedf3a75..8881b009df 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -34,6 +34,7 @@ from synapse.api.errors import SynapseError
 from synapse.events import EventBase  # noqa: F401
 from synapse.events.snapshot import EventContext  # noqa: F401
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.state import StateResolutionStore
 from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.storage.event_federation import EventFederationStore
 from synapse.storage.events_worker import EventsWorkerStore
@@ -731,11 +732,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
 
         # Ok, we need to defer to the state handler to resolve our state sets.
 
-        def get_events(ev_ids):
-            return self.get_events(
-                ev_ids, get_prev_content=False, check_redacted=False,
-            )
-
         state_groups = {
             sg: state_groups_map[sg] for sg in new_state_groups
         }
@@ -745,7 +741,8 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
 
         logger.debug("calling resolve_state_groups from preserve_events")
         res = yield self._state_resolution_handler.resolve_state_groups(
-            room_id, room_version, state_groups, events_map, get_events
+            room_id, room_version, state_groups, events_map,
+            state_res_store=StateResolutionStore(self)
         )
 
         defer.returnValue((res.state, None))
@@ -854,6 +851,27 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
         # Insert into event_to_state_groups.
         self._store_event_state_mappings_txn(txn, events_and_contexts)
 
+        # We want to store event_auth mappings for rejected events, as they're
+        # used in state res v2.
+        # This is only necessary if the rejected event appears in an accepted
+        # event's auth chain, but its easier for now just to store them (and
+        # it doesn't take much storage compared to storing the entire event
+        # anyway).
+        self._simple_insert_many_txn(
+            txn,
+            table="event_auth",
+            values=[
+                {
+                    "event_id": event.event_id,
+                    "room_id": event.room_id,
+                    "auth_id": auth_id,
+                }
+                for event, _ in events_and_contexts
+                for auth_id, _ in event.auth_events
+                if event.is_state()
+            ],
+        )
+
         # _store_rejected_events_txn filters out any events which were
         # rejected, and returns the filtered list.
         events_and_contexts = self._store_rejected_events_txn(
@@ -1329,21 +1347,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
                     txn, event.room_id, event.redacts
                 )
 
-        self._simple_insert_many_txn(
-            txn,
-            table="event_auth",
-            values=[
-                {
-                    "event_id": event.event_id,
-                    "room_id": event.room_id,
-                    "auth_id": auth_id,
-                }
-                for event, _ in events_and_contexts
-                for auth_id, _ in event.auth_events
-                if event.is_state()
-            ],
-        )
-
         # Update the event_forward_extremities, event_backward_extremities and
         # event_edges tables.
         self._handle_mult_prev_events(
@@ -2086,7 +2089,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
         for sg in remaining_state_groups:
             logger.info("[purge] de-delta-ing remaining state group %s", sg)
             curr_state = self._get_state_groups_from_groups_txn(
-                txn, [sg], types=None
+                txn, [sg],
             )
             curr_state = curr_state[sg]
 
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index a1331c1a61..8af17921e3 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -32,7 +32,7 @@ logger = logging.getLogger(__name__)
 # py2 sqlite has buffer hardcoded as only binary type, so we must use it,
 # despite being deprecated and removed in favor of memoryview
 if six.PY2:
-    db_binary_type = buffer
+    db_binary_type = six.moves.builtins.buffer
 else:
     db_binary_type = memoryview
 
diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py
index 0fe8c8e24c..cf4104dc2e 100644
--- a/synapse/storage/monthly_active_users.py
+++ b/synapse/storage/monthly_active_users.py
@@ -33,19 +33,29 @@ class MonthlyActiveUsersStore(SQLBaseStore):
         self._clock = hs.get_clock()
         self.hs = hs
         self.reserved_users = ()
+        # Do not add more reserved users than the total allowable number
+        self._initialise_reserved_users(
+            dbconn.cursor(),
+            hs.config.mau_limits_reserved_threepids[:self.hs.config.max_mau_value],
+        )
 
-    @defer.inlineCallbacks
-    def initialise_reserved_users(self, threepids):
-        store = self.hs.get_datastore()
+    def _initialise_reserved_users(self, txn, threepids):
+        """Ensures that reserved threepids are accounted for in the MAU table, should
+        be called on start up.
+
+        Args:
+            txn (cursor):
+            threepids (list[dict]): List of threepid dicts to reserve
+        """
         reserved_user_list = []
 
-        # Do not add more reserved users than the total allowable number
-        for tp in threepids[:self.hs.config.max_mau_value]:
-            user_id = yield store.get_user_id_by_threepid(
+        for tp in threepids:
+            user_id = self.get_user_id_by_threepid_txn(
+                txn,
                 tp["medium"], tp["address"]
             )
             if user_id:
-                yield self.upsert_monthly_active_user(user_id)
+                self.upsert_monthly_active_user_txn(txn, user_id)
                 reserved_user_list.append(user_id)
             else:
                 logger.warning(
@@ -55,8 +65,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def reap_monthly_active_users(self):
-        """
-        Cleans out monthly active user table to ensure that no stale
+        """Cleans out monthly active user table to ensure that no stale
         entries exist.
 
         Returns:
@@ -165,19 +174,44 @@ class MonthlyActiveUsersStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def upsert_monthly_active_user(self, user_id):
+        """Updates or inserts the user into the monthly active user table, which
+        is used to track the current MAU usage of the server
+
+        Args:
+            user_id (str): user to add/update
         """
-            Updates or inserts monthly active user member
-            Arguments:
-                user_id (str): user to add/update
-            Deferred[bool]: True if a new entry was created, False if an
-                existing one was updated.
+        is_insert = yield self.runInteraction(
+            "upsert_monthly_active_user", self.upsert_monthly_active_user_txn,
+            user_id
+        )
+
+        if is_insert:
+            self.user_last_seen_monthly_active.invalidate((user_id,))
+            self.get_monthly_active_count.invalidate(())
+
+    def upsert_monthly_active_user_txn(self, txn, user_id):
+        """Updates or inserts monthly active user member
+
+        Note that, after calling this method, it will generally be necessary
+        to invalidate the caches on user_last_seen_monthly_active and
+        get_monthly_active_count. We can't do that here, because we are running
+        in a database thread rather than the main thread, and we can't call
+        txn.call_after because txn may not be a LoggingTransaction.
+
+        Args:
+            txn (cursor):
+            user_id (str): user to add/update
+
+        Returns:
+            bool: True if a new entry was created, False if an
+            existing one was updated.
         """
         # Am consciously deciding to lock the table on the basis that is ought
         # never be a big table and alternative approaches (batching multiple
         # upserts into a single txn) introduced a lot of extra complexity.
         # See https://github.com/matrix-org/synapse/issues/3854 for more
-        is_insert = yield self._simple_upsert(
-            desc="upsert_monthly_active_user",
+        is_insert = self._simple_upsert_txn(
+            txn,
             table="monthly_active_users",
             keyvalues={
                 "user_id": user_id,
@@ -186,9 +220,8 @@ class MonthlyActiveUsersStore(SQLBaseStore):
                 "timestamp": int(self._clock.time_msec()),
             },
         )
-        if is_insert:
-            self.user_last_seen_monthly_active.invalidate((user_id,))
-            self.get_monthly_active_count.invalidate(())
+
+        return is_insert
 
     @cached(num_args=1)
     def user_last_seen_monthly_active(self, user_id):
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index c7987bfcdd..2743b52bad 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -29,7 +29,7 @@ from ._base import SQLBaseStore
 logger = logging.getLogger(__name__)
 
 if six.PY2:
-    db_binary_type = buffer
+    db_binary_type = six.moves.builtins.buffer
 else:
     db_binary_type = memoryview
 
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 26b429e307..80d76bf9d7 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -474,17 +474,44 @@ class RegistrationStore(RegistrationWorkerStore,
 
     @defer.inlineCallbacks
     def get_user_id_by_threepid(self, medium, address):
-        ret = yield self._simple_select_one(
+        """Returns user id from threepid
+
+        Args:
+            medium (str): threepid medium e.g. email
+            address (str): threepid address e.g. me@example.com
+
+        Returns:
+            Deferred[str|None]: user id or None if no user id/threepid mapping exists
+        """
+        user_id = yield self.runInteraction(
+            "get_user_id_by_threepid", self.get_user_id_by_threepid_txn,
+            medium, address
+        )
+        defer.returnValue(user_id)
+
+    def get_user_id_by_threepid_txn(self, txn, medium, address):
+        """Returns user id from threepid
+
+        Args:
+            txn (cursor):
+            medium (str): threepid medium e.g. email
+            address (str): threepid address e.g. me@example.com
+
+        Returns:
+            str|None: user id or None if no user id/threepid mapping exists
+        """
+        ret = self._simple_select_one_txn(
+            txn,
             "user_threepids",
             {
                 "medium": medium,
                 "address": address
             },
-            ['user_id'], True, 'get_user_id_by_threepid'
+            ['user_id'], True
         )
         if ret:
-            defer.returnValue(ret['user_id'])
-        defer.returnValue(None)
+            return ret['user_id']
+        return None
 
     def user_delete_threepid(self, user_id, medium, address):
         return self._simple_delete(
@@ -567,7 +594,7 @@ class RegistrationStore(RegistrationWorkerStore,
         def _find_next_generated_user_id(txn):
             txn.execute("SELECT name FROM users")
 
-            regex = re.compile("^@(\d+):")
+            regex = re.compile(r"^@(\d+):")
 
             found = set()
 
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 5623391f6e..158e9dbe7b 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -27,7 +27,7 @@ from ._base import SQLBaseStore
 # py2 sqlite has buffer hardcoded as only binary type, so we must use it,
 # despite being deprecated and removed in favor of memoryview
 if six.PY2:
-    db_binary_type = buffer
+    db_binary_type = six.moves.builtins.buffer
 else:
     db_binary_type = memoryview
 
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 3f4cbd61c4..ef65929bb2 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -19,6 +19,8 @@ from collections import namedtuple
 from six import iteritems, itervalues
 from six.moves import range
 
+import attr
+
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
@@ -48,6 +50,318 @@ class _GetStateGroupDelta(namedtuple("_GetStateGroupDelta", ("prev_group", "delt
         return len(self.delta_ids) if self.delta_ids else 0
 
 
+@attr.s(slots=True)
+class StateFilter(object):
+    """A filter used when querying for state.
+
+    Attributes:
+        types (dict[str, set[str]|None]): Map from type to set of state keys (or
+            None). This specifies which state_keys for the given type to fetch
+            from the DB. If None then all events with that type are fetched. If
+            the set is empty then no events with that type are fetched.
+        include_others (bool): Whether to fetch events with types that do not
+            appear in `types`.
+    """
+
+    types = attr.ib()
+    include_others = attr.ib(default=False)
+
+    def __attrs_post_init__(self):
+        # If `include_others` is set we canonicalise the filter by removing
+        # wildcards from the types dictionary
+        if self.include_others:
+            self.types = {
+                k: v for k, v in iteritems(self.types)
+                if v is not None
+            }
+
+    @staticmethod
+    def all():
+        """Creates a filter that fetches everything.
+
+        Returns:
+            StateFilter
+        """
+        return StateFilter(types={}, include_others=True)
+
+    @staticmethod
+    def none():
+        """Creates a filter that fetches nothing.
+
+        Returns:
+            StateFilter
+        """
+        return StateFilter(types={}, include_others=False)
+
+    @staticmethod
+    def from_types(types):
+        """Creates a filter that only fetches the given types
+
+        Args:
+            types (Iterable[tuple[str, str|None]]): A list of type and state
+                keys to fetch. A state_key of None fetches everything for
+                that type
+
+        Returns:
+            StateFilter
+        """
+        type_dict = {}
+        for typ, s in types:
+            if typ in type_dict:
+                if type_dict[typ] is None:
+                    continue
+
+            if s is None:
+                type_dict[typ] = None
+                continue
+
+            type_dict.setdefault(typ, set()).add(s)
+
+        return StateFilter(types=type_dict)
+
+    @staticmethod
+    def from_lazy_load_member_list(members):
+        """Creates a filter that returns all non-member events, plus the member
+        events for the given users
+
+        Args:
+            members (iterable[str]): Set of user IDs
+
+        Returns:
+            StateFilter
+        """
+        return StateFilter(
+            types={EventTypes.Member: set(members)},
+            include_others=True,
+        )
+
+    def return_expanded(self):
+        """Creates a new StateFilter where type wild cards have been removed
+        (except for memberships). The returned filter is a superset of the
+        current one, i.e. anything that passes the current filter will pass
+        the returned filter.
+
+        This helps the caching as the DictionaryCache knows if it has *all* the
+        state, but does not know if it has all of the keys of a particular type,
+        which makes wildcard lookups expensive unless we have a complete cache.
+        Hence, if we are doing a wildcard lookup, populate the cache fully so
+        that we can do an efficient lookup next time.
+
+        Note that since we have two caches, one for membership events and one for
+        other events, we can be a bit more clever than simply returning
+        `StateFilter.all()` if `has_wildcards()` is True.
+
+        We return a StateFilter where:
+            1. the list of membership events to return is the same
+            2. if there is a wildcard that matches non-member events we
+               return all non-member events
+
+        Returns:
+            StateFilter
+        """
+
+        if self.is_full():
+            # If we're going to return everything then there's nothing to do
+            return self
+
+        if not self.has_wildcards():
+            # If there are no wild cards, there's nothing to do
+            return self
+
+        if EventTypes.Member in self.types:
+            get_all_members = self.types[EventTypes.Member] is None
+        else:
+            get_all_members = self.include_others
+
+        has_non_member_wildcard = self.include_others or any(
+            state_keys is None
+            for t, state_keys in iteritems(self.types)
+            if t != EventTypes.Member
+        )
+
+        if not has_non_member_wildcard:
+            # If there are no non-member wild cards we can just return ourselves
+            return self
+
+        if get_all_members:
+            # We want to return everything.
+            return StateFilter.all()
+        else:
+            # We want to return all non-members, but only particular
+            # memberships
+            return StateFilter(
+                types={EventTypes.Member: self.types[EventTypes.Member]},
+                include_others=True,
+            )
+
+    def make_sql_filter_clause(self):
+        """Converts the filter to an SQL clause.
+
+        For example:
+
+            f = StateFilter.from_types([("m.room.create", "")])
+            clause, args = f.make_sql_filter_clause()
+            clause == "(type = ? AND state_key = ?)"
+            args == ['m.room.create', '']
+
+
+        Returns:
+            tuple[str, list]: The SQL string (may be empty) and arguments. An
+            empty SQL string is returned when the filter matches everything
+            (i.e. is "full").
+        """
+
+        where_clause = ""
+        where_args = []
+
+        if self.is_full():
+            return where_clause, where_args
+
+        if not self.include_others and not self.types:
+            # i.e. this is an empty filter, so we need to return a clause that
+            # will match nothing
+            return "1 = 2", []
+
+        # First we build up a lost of clauses for each type/state_key combo
+        clauses = []
+        for etype, state_keys in iteritems(self.types):
+            if state_keys is None:
+                clauses.append("(type = ?)")
+                where_args.append(etype)
+                continue
+
+            for state_key in state_keys:
+                clauses.append("(type = ? AND state_key = ?)")
+                where_args.extend((etype, state_key))
+
+        # This will match anything that appears in `self.types`
+        where_clause = " OR ".join(clauses)
+
+        # If we want to include stuff that's not in the types dict then we add
+        # a `OR type NOT IN (...)` clause to the end.
+        if self.include_others:
+            if where_clause:
+                where_clause += " OR "
+
+            where_clause += "type NOT IN (%s)" % (
+                ",".join(["?"] * len(self.types)),
+            )
+            where_args.extend(self.types)
+
+        return where_clause, where_args
+
+    def max_entries_returned(self):
+        """Returns the maximum number of entries this filter will return if
+        known, otherwise returns None.
+
+        For example a simple state filter asking for `("m.room.create", "")`
+        will return 1, whereas the default state filter will return None.
+
+        This is used to bail out early if the right number of entries have been
+        fetched.
+        """
+        if self.has_wildcards():
+            return None
+
+        return len(self.concrete_types())
+
+    def filter_state(self, state_dict):
+        """Returns the state filtered with by this StateFilter
+
+        Args:
+            state (dict[tuple[str, str], Any]): The state map to filter
+
+        Returns:
+            dict[tuple[str, str], Any]: The filtered state map
+        """
+        if self.is_full():
+            return dict(state_dict)
+
+        filtered_state = {}
+        for k, v in iteritems(state_dict):
+            typ, state_key = k
+            if typ in self.types:
+                state_keys = self.types[typ]
+                if state_keys is None or state_key in state_keys:
+                    filtered_state[k] = v
+            elif self.include_others:
+                filtered_state[k] = v
+
+        return filtered_state
+
+    def is_full(self):
+        """Whether this filter fetches everything or not
+
+        Returns:
+            bool
+        """
+        return self.include_others and not self.types
+
+    def has_wildcards(self):
+        """Whether the filter includes wildcards or is attempting to fetch
+        specific state.
+
+        Returns:
+            bool
+        """
+
+        return (
+            self.include_others
+            or any(
+                state_keys is None
+                for state_keys in itervalues(self.types)
+            )
+        )
+
+    def concrete_types(self):
+        """Returns a list of concrete type/state_keys (i.e. not None) that
+        will be fetched. This will be a complete list if `has_wildcards`
+        returns False, but otherwise will be a subset (or even empty).
+
+        Returns:
+            list[tuple[str,str]]
+        """
+        return [
+            (t, s)
+            for t, state_keys in iteritems(self.types)
+            if state_keys is not None
+            for s in state_keys
+        ]
+
+    def get_member_split(self):
+        """Return the filter split into two: one which assumes it's exclusively
+        matching against member state, and one which assumes it's matching
+        against non member state.
+
+        This is useful due to the returned filters giving correct results for
+        `is_full()`, `has_wildcards()`, etc, when operating against maps that
+        either exclusively contain member events or only contain non-member
+        events. (Which is the case when dealing with the member vs non-member
+        state caches).
+
+        Returns:
+            tuple[StateFilter, StateFilter]: The member and non member filters
+        """
+
+        if EventTypes.Member in self.types:
+            state_keys = self.types[EventTypes.Member]
+            if state_keys is None:
+                member_filter = StateFilter.all()
+            else:
+                member_filter = StateFilter({EventTypes.Member: state_keys})
+        elif self.include_others:
+            member_filter = StateFilter.all()
+        else:
+            member_filter = StateFilter.none()
+
+        non_member_filter = StateFilter(
+            types={k: v for k, v in iteritems(self.types) if k != EventTypes.Member},
+            include_others=self.include_others,
+        )
+
+        return member_filter, non_member_filter
+
+
 # this inherits from EventsWorkerStore because it calls self.get_events
 class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
     """The parts of StateGroupStore that can be called from workers.
@@ -152,61 +466,41 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         )
 
     # FIXME: how should this be cached?
-    def get_filtered_current_state_ids(self, room_id, types, filtered_types=None):
+    def get_filtered_current_state_ids(self, room_id, state_filter=StateFilter.all()):
         """Get the current state event of a given type for a room based on the
         current_state_events table.  This may not be as up-to-date as the result
         of doing a fresh state resolution as per state_handler.get_current_state
+
         Args:
             room_id (str)
-            types (list[(Str, (Str|None))]): List of (type, state_key) tuples
-                which are used to filter the state fetched. `state_key` may be
-                None, which matches any `state_key`
-            filtered_types (list[Str]|None): List of types to apply the above filter to.
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
+
         Returns:
-            deferred: dict of (type, state_key) -> event
+            Deferred[dict[tuple[str, str], str]]: Map from type/state_key to
+            event ID.
         """
 
-        include_other_types = False if filtered_types is None else True
-
         def _get_filtered_current_state_ids_txn(txn):
             results = {}
-            sql = """SELECT type, state_key, event_id FROM current_state_events
-                     WHERE room_id = ? %s"""
-            # Turns out that postgres doesn't like doing a list of OR's and
-            # is about 1000x slower, so we just issue a query for each specific
-            # type seperately.
-            if types:
-                clause_to_args = [
-                    (
-                        "AND type = ? AND state_key = ?",
-                        (etype, state_key)
-                    ) if state_key is not None else (
-                        "AND type = ?",
-                        (etype,)
-                    )
-                    for etype, state_key in types
-                ]
-
-                if include_other_types:
-                    unique_types = set(filtered_types)
-                    clause_to_args.append(
-                        (
-                            "AND type <> ? " * len(unique_types),
-                            list(unique_types)
-                        )
-                    )
-            else:
-                # If types is None we fetch all the state, and so just use an
-                # empty where clause with no extra args.
-                clause_to_args = [("", [])]
-            for where_clause, where_args in clause_to_args:
-                args = [room_id]
-                args.extend(where_args)
-                txn.execute(sql % (where_clause,), args)
-                for row in txn:
-                    typ, state_key, event_id = row
-                    key = (intern_string(typ), intern_string(state_key))
-                    results[key] = event_id
+            sql = """
+                SELECT type, state_key, event_id FROM current_state_events
+                WHERE room_id = ?
+            """
+
+            where_clause, where_args = state_filter.make_sql_filter_clause()
+
+            if where_clause:
+                sql += " AND (%s)" % (where_clause,)
+
+            args = [room_id]
+            args.extend(where_args)
+            txn.execute(sql, args)
+            for row in txn:
+                typ, state_key, event_id = row
+                key = (intern_string(typ), intern_string(state_key))
+                results[key] = event_id
+
             return results
 
         return self.runInteraction(
@@ -322,20 +616,14 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         })
 
     @defer.inlineCallbacks
-    def _get_state_groups_from_groups(self, groups, types, members=None):
+    def _get_state_groups_from_groups(self, groups, state_filter):
         """Returns the state groups for a given set of groups, filtering on
         types of state events.
 
         Args:
             groups(list[int]): list of state group IDs to query
-            types (Iterable[str, str|None]|None): list of 2-tuples of the form
-                (`type`, `state_key`), where a `state_key` of `None` matches all
-                state_keys for the `type`. If None, all types are returned.
-            members (bool|None): If not None, then, in addition to any filtering
-                implied by types, the results are also filtered to only include
-                member events (if True), or to exclude member events (if False)
-
-        Returns:
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
         Returns:
             Deferred[dict[int, dict[tuple[str, str], str]]]:
                 dict of state_group_id -> (dict of (type, state_key) -> event id)
@@ -346,19 +634,23 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         for chunk in chunks:
             res = yield self.runInteraction(
                 "_get_state_groups_from_groups",
-                self._get_state_groups_from_groups_txn, chunk, types, members,
+                self._get_state_groups_from_groups_txn, chunk, state_filter,
             )
             results.update(res)
 
         defer.returnValue(results)
 
     def _get_state_groups_from_groups_txn(
-        self, txn, groups, types=None, members=None,
+        self, txn, groups, state_filter=StateFilter.all(),
     ):
         results = {group: {} for group in groups}
 
-        if types is not None:
-            types = list(set(types))  # deduplicate types list
+        where_clause, where_args = state_filter.make_sql_filter_clause()
+
+        # Unless the filter clause is empty, we're going to append it after an
+        # existing where clause
+        if where_clause:
+            where_clause = " AND (%s)" % (where_clause,)
 
         if isinstance(self.database_engine, PostgresEngine):
             # Temporarily disable sequential scans in this transaction. This is
@@ -374,79 +666,33 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
             # group for the given type, state_key.
             # This may return multiple rows per (type, state_key), but last_value
             # should be the same.
-            sql = ("""
+            sql = """
                 WITH RECURSIVE state(state_group) AS (
                     VALUES(?::bigint)
                     UNION ALL
                     SELECT prev_state_group FROM state_group_edges e, state s
                     WHERE s.state_group = e.state_group
                 )
-                SELECT type, state_key, last_value(event_id) OVER (
+                SELECT DISTINCT type, state_key, last_value(event_id) OVER (
                     PARTITION BY type, state_key ORDER BY state_group ASC
                     ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
                 ) AS event_id FROM state_groups_state
                 WHERE state_group IN (
                     SELECT state_group FROM state
                 )
-                %s
-            """)
+            """
 
-            if members is True:
-                sql += " AND type = '%s'" % (EventTypes.Member,)
-            elif members is False:
-                sql += " AND type <> '%s'" % (EventTypes.Member,)
-
-            # Turns out that postgres doesn't like doing a list of OR's and
-            # is about 1000x slower, so we just issue a query for each specific
-            # type seperately.
-            if types is not None:
-                clause_to_args = [
-                    (
-                        "AND type = ? AND state_key = ?",
-                        (etype, state_key)
-                    ) if state_key is not None else (
-                        "AND type = ?",
-                        (etype,)
-                    )
-                    for etype, state_key in types
-                ]
-            else:
-                # If types is None we fetch all the state, and so just use an
-                # empty where clause with no extra args.
-                clause_to_args = [("", [])]
-
-            for where_clause, where_args in clause_to_args:
-                for group in groups:
-                    args = [group]
-                    args.extend(where_args)
+            for group in groups:
+                args = [group]
+                args.extend(where_args)
 
-                    txn.execute(sql % (where_clause,), args)
-                    for row in txn:
-                        typ, state_key, event_id = row
-                        key = (typ, state_key)
-                        results[group][key] = event_id
+                txn.execute(sql + where_clause, args)
+                for row in txn:
+                    typ, state_key, event_id = row
+                    key = (typ, state_key)
+                    results[group][key] = event_id
         else:
-            where_args = []
-            where_clauses = []
-            wildcard_types = False
-            if types is not None:
-                for typ in types:
-                    if typ[1] is None:
-                        where_clauses.append("(type = ?)")
-                        where_args.append(typ[0])
-                        wildcard_types = True
-                    else:
-                        where_clauses.append("(type = ? AND state_key = ?)")
-                        where_args.extend([typ[0], typ[1]])
-
-                where_clause = "AND (%s)" % (" OR ".join(where_clauses))
-            else:
-                where_clause = ""
-
-            if members is True:
-                where_clause += " AND type = '%s'" % EventTypes.Member
-            elif members is False:
-                where_clause += " AND type <> '%s'" % EventTypes.Member
+            max_entries_returned = state_filter.max_entries_returned()
 
             # We don't use WITH RECURSIVE on sqlite3 as there are distributions
             # that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
@@ -460,12 +706,11 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
                     # without the right indices (which we can't add until
                     # after we finish deduping state, which requires this func)
                     args = [next_group]
-                    if types:
-                        args.extend(where_args)
+                    args.extend(where_args)
 
                     txn.execute(
                         "SELECT type, state_key, event_id FROM state_groups_state"
-                        " WHERE state_group = ? %s" % (where_clause,),
+                        " WHERE state_group = ? " + where_clause,
                         args
                     )
                     results[group].update(
@@ -481,9 +726,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
                     # wildcards (i.e. Nones) in which case we have to do an exhaustive
                     # search
                     if (
-                        types is not None and
-                        not wildcard_types and
-                        len(results[group]) == len(types)
+                        max_entries_returned is not None and
+                        len(results[group]) == max_entries_returned
                     ):
                         break
 
@@ -498,20 +742,14 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         return results
 
     @defer.inlineCallbacks
-    def get_state_for_events(self, event_ids, types, filtered_types=None):
+    def get_state_for_events(self, event_ids, state_filter=StateFilter.all()):
         """Given a list of event_ids and type tuples, return a list of state
-        dicts for each event. The state dicts will only have the type/state_keys
-        that are in the `types` list.
+        dicts for each event.
 
         Args:
             event_ids (list[string])
-            types (list[(str, str|None)]|None): List of (type, state_key) tuples
-                which are used to filter the state fetched. If `state_key` is None,
-                all events are returned of the given type.
-                May be None, which matches any key.
-            filtered_types(list[str]|None): Only apply filtering via `types` to this
-                list of event types.  Other types of events are returned unfiltered.
-                If None, `types` filtering is applied to all events.
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
 
         Returns:
             deferred: A dict of (event_id) -> (type, state_key) -> [state_events]
@@ -521,7 +759,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         )
 
         groups = set(itervalues(event_to_groups))
-        group_to_state = yield self._get_state_for_groups(groups, types, filtered_types)
+        group_to_state = yield self._get_state_for_groups(groups, state_filter)
 
         state_event_map = yield self.get_events(
             [ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)],
@@ -540,20 +778,15 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         defer.returnValue({event: event_to_state[event] for event in event_ids})
 
     @defer.inlineCallbacks
-    def get_state_ids_for_events(self, event_ids, types=None, filtered_types=None):
+    def get_state_ids_for_events(self, event_ids, state_filter=StateFilter.all()):
         """
         Get the state dicts corresponding to a list of events, containing the event_ids
         of the state events (as opposed to the events themselves)
 
         Args:
             event_ids(list(str)): events whose state should be returned
-            types(list[(str, str|None)]|None): List of (type, state_key) tuples
-                which are used to filter the state fetched. If `state_key` is None,
-                all events are returned of the given type.
-                May be None, which matches any key.
-            filtered_types(list[str]|None): Only apply filtering via `types` to this
-                list of event types.  Other types of events are returned unfiltered.
-                If None, `types` filtering is applied to all events.
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
 
         Returns:
             A deferred dict from event_id -> (type, state_key) -> event_id
@@ -563,7 +796,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         )
 
         groups = set(itervalues(event_to_groups))
-        group_to_state = yield self._get_state_for_groups(groups, types, filtered_types)
+        group_to_state = yield self._get_state_for_groups(groups, state_filter)
 
         event_to_state = {
             event_id: group_to_state[group]
@@ -573,45 +806,35 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         defer.returnValue({event: event_to_state[event] for event in event_ids})
 
     @defer.inlineCallbacks
-    def get_state_for_event(self, event_id, types=None, filtered_types=None):
+    def get_state_for_event(self, event_id, state_filter=StateFilter.all()):
         """
         Get the state dict corresponding to a particular event
 
         Args:
             event_id(str): event whose state should be returned
-            types(list[(str, str|None)]|None): List of (type, state_key) tuples
-                which are used to filter the state fetched. If `state_key` is None,
-                all events are returned of the given type.
-                May be None, which matches any key.
-            filtered_types(list[str]|None): Only apply filtering via `types` to this
-                list of event types.  Other types of events are returned unfiltered.
-                If None, `types` filtering is applied to all events.
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
 
         Returns:
             A deferred dict from (type, state_key) -> state_event
         """
-        state_map = yield self.get_state_for_events([event_id], types, filtered_types)
+        state_map = yield self.get_state_for_events([event_id], state_filter)
         defer.returnValue(state_map[event_id])
 
     @defer.inlineCallbacks
-    def get_state_ids_for_event(self, event_id, types=None, filtered_types=None):
+    def get_state_ids_for_event(self, event_id, state_filter=StateFilter.all()):
         """
         Get the state dict corresponding to a particular event
 
         Args:
             event_id(str): event whose state should be returned
-            types(list[(str, str|None)]|None): List of (type, state_key) tuples
-                which are used to filter the state fetched. If `state_key` is None,
-                all events are returned of the given type.
-                May be None, which matches any key.
-            filtered_types(list[str]|None): Only apply filtering via `types` to this
-                list of event types.  Other types of events are returned unfiltered.
-                If None, `types` filtering is applied to all events.
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
 
         Returns:
             A deferred dict from (type, state_key) -> state_event
         """
-        state_map = yield self.get_state_ids_for_events([event_id], types, filtered_types)
+        state_map = yield self.get_state_ids_for_events([event_id], state_filter)
         defer.returnValue(state_map[event_id])
 
     @cached(max_entries=50000)
@@ -642,18 +865,14 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         defer.returnValue({row["event_id"]: row["state_group"] for row in rows})
 
-    def _get_some_state_from_cache(self, cache, group, types, filtered_types=None):
+    def _get_state_for_group_using_cache(self, cache, group, state_filter):
         """Checks if group is in cache. See `_get_state_for_groups`
 
         Args:
             cache(DictionaryCache): the state group cache to use
             group(int): The state group to lookup
-            types(list[str, str|None]): List of 2-tuples of the form
-                (`type`, `state_key`), where a `state_key` of `None` matches all
-                state_keys for the `type`.
-            filtered_types(list[str]|None): Only apply filtering via `types` to this
-                list of event types.  Other types of events are returned unfiltered.
-                If None, `types` filtering is applied to all events.
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
 
         Returns 2-tuple (`state_dict`, `got_all`).
         `got_all` is a bool indicating if we successfully retrieved all
@@ -662,124 +881,102 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         """
         is_all, known_absent, state_dict_ids = cache.get(group)
 
-        type_to_key = {}
+        if is_all or state_filter.is_full():
+            # Either we have everything or want everything, either way
+            # `is_all` tells us whether we've gotten everything.
+            return state_filter.filter_state(state_dict_ids), is_all
 
         # tracks whether any of our requested types are missing from the cache
         missing_types = False
 
-        for typ, state_key in types:
-            key = (typ, state_key)
-
-            if (
-                state_key is None or
-                (filtered_types is not None and typ not in filtered_types)
-            ):
-                type_to_key[typ] = None
-                # we mark the type as missing from the cache because
-                # when the cache was populated it might have been done with a
-                # restricted set of state_keys, so the wildcard will not work
-                # and the cache may be incomplete.
-                missing_types = True
-            else:
-                if type_to_key.get(typ, object()) is not None:
-                    type_to_key.setdefault(typ, set()).add(state_key)
-
+        if state_filter.has_wildcards():
+            # We don't know if we fetched all the state keys for the types in
+            # the filter that are wildcards, so we have to assume that we may
+            # have missed some.
+            missing_types = True
+        else:
+            # There aren't any wild cards, so `concrete_types()` returns the
+            # complete list of event types we're wanting.
+            for key in state_filter.concrete_types():
                 if key not in state_dict_ids and key not in known_absent:
                     missing_types = True
+                    break
 
-        sentinel = object()
-
-        def include(typ, state_key):
-            valid_state_keys = type_to_key.get(typ, sentinel)
-            if valid_state_keys is sentinel:
-                return filtered_types is not None and typ not in filtered_types
-            if valid_state_keys is None:
-                return True
-            if state_key in valid_state_keys:
-                return True
-            return False
-
-        got_all = is_all
-        if not got_all:
-            # the cache is incomplete. We may still have got all the results we need, if
-            # we don't have any wildcards in the match list.
-            if not missing_types and filtered_types is None:
-                got_all = True
-
-        return {
-            k: v for k, v in iteritems(state_dict_ids)
-            if include(k[0], k[1])
-        }, got_all
-
-    def _get_all_state_from_cache(self, cache, group):
-        """Checks if group is in cache. See `_get_state_for_groups`
-
-        Returns 2-tuple (`state_dict`, `got_all`). `got_all` is a bool
-        indicating if we successfully retrieved all requests state from the
-        cache, if False we need to query the DB for the missing state.
-
-        Args:
-            cache(DictionaryCache): the state group cache to use
-            group: The state group to lookup
-        """
-        is_all, _, state_dict_ids = cache.get(group)
-
-        return state_dict_ids, is_all
+        return state_filter.filter_state(state_dict_ids), not missing_types
 
     @defer.inlineCallbacks
-    def _get_state_for_groups(self, groups, types=None, filtered_types=None):
+    def _get_state_for_groups(self, groups, state_filter=StateFilter.all()):
         """Gets the state at each of a list of state groups, optionally
         filtering by type/state_key
 
         Args:
             groups (iterable[int]): list of state groups for which we want
                 to get the state.
-            types (None|iterable[(str, None|str)]):
-                indicates the state type/keys required. If None, the whole
-                state is fetched and returned.
-
-                Otherwise, each entry should be a `(type, state_key)` tuple to
-                include in the response. A `state_key` of None is a wildcard
-                meaning that we require all state with that type.
-            filtered_types(list[str]|None): Only apply filtering via `types` to this
-                list of event types.  Other types of events are returned unfiltered.
-                If None, `types` filtering is applied to all events.
-
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
         Returns:
             Deferred[dict[int, dict[tuple[str, str], str]]]:
                 dict of state_group_id -> (dict of (type, state_key) -> event id)
         """
-        if types is not None:
-            non_member_types = [t for t in types if t[0] != EventTypes.Member]
 
-            if filtered_types is not None and EventTypes.Member not in filtered_types:
-                # we want all of the membership events
-                member_types = None
-            else:
-                member_types = [t for t in types if t[0] == EventTypes.Member]
-
-        else:
-            non_member_types = None
-            member_types = None
+        member_filter, non_member_filter = state_filter.get_member_split()
 
-        non_member_state = yield self._get_state_for_groups_using_cache(
-            groups, self._state_group_cache, non_member_types, filtered_types,
+        # Now we look them up in the member and non-member caches
+        non_member_state, incomplete_groups_nm, = (
+            yield self._get_state_for_groups_using_cache(
+                groups, self._state_group_cache,
+                state_filter=non_member_filter,
+            )
         )
-        # XXX: we could skip this entirely if member_types is []
-        member_state = yield self._get_state_for_groups_using_cache(
-            # we set filtered_types=None as member_state only ever contain members.
-            groups, self._state_group_members_cache, member_types, None,
+
+        member_state, incomplete_groups_m, = (
+            yield self._get_state_for_groups_using_cache(
+                groups, self._state_group_members_cache,
+                state_filter=member_filter,
+            )
         )
 
-        state = non_member_state
+        state = dict(non_member_state)
         for group in groups:
             state[group].update(member_state[group])
 
+        # Now fetch any missing groups from the database
+
+        incomplete_groups = incomplete_groups_m | incomplete_groups_nm
+
+        if not incomplete_groups:
+            defer.returnValue(state)
+
+        cache_sequence_nm = self._state_group_cache.sequence
+        cache_sequence_m = self._state_group_members_cache.sequence
+
+        # Help the cache hit ratio by expanding the filter a bit
+        db_state_filter = state_filter.return_expanded()
+
+        group_to_state_dict = yield self._get_state_groups_from_groups(
+            list(incomplete_groups),
+            state_filter=db_state_filter,
+        )
+
+        # Now lets update the caches
+        self._insert_into_cache(
+            group_to_state_dict,
+            db_state_filter,
+            cache_seq_num_members=cache_sequence_m,
+            cache_seq_num_non_members=cache_sequence_nm,
+        )
+
+        # And finally update the result dict, by filtering out any extra
+        # stuff we pulled out of the database.
+        for group, group_state_dict in iteritems(group_to_state_dict):
+            # We just replace any existing entries, as we will have loaded
+            # everything we need from the database anyway.
+            state[group] = state_filter.filter_state(group_state_dict)
+
         defer.returnValue(state)
 
-    @defer.inlineCallbacks
     def _get_state_for_groups_using_cache(
-        self, groups, cache, types=None, filtered_types=None
+        self, groups, cache, state_filter,
     ):
         """Gets the state at each of a list of state groups, optionally
         filtering by type/state_key, querying from a specific cache.
@@ -790,89 +987,85 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
             cache (DictionaryCache): the cache of group ids to state dicts which
                 we will pass through - either the normal state cache or the specific
                 members state cache.
-            types (None|iterable[(str, None|str)]):
-                indicates the state type/keys required. If None, the whole
-                state is fetched and returned.
-
-                Otherwise, each entry should be a `(type, state_key)` tuple to
-                include in the response. A `state_key` of None is a wildcard
-                meaning that we require all state with that type.
-            filtered_types(list[str]|None): Only apply filtering via `types` to this
-                list of event types.  Other types of events are returned unfiltered.
-                If None, `types` filtering is applied to all events.
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
 
         Returns:
-            Deferred[dict[int, dict[tuple[str, str], str]]]:
-                dict of state_group_id -> (dict of (type, state_key) -> event id)
+            tuple[dict[int, dict[tuple[str, str], str]], set[int]]: Tuple of
+            dict of state_group_id -> (dict of (type, state_key) -> event id)
+            of entries in the cache, and the state group ids either missing
+            from the cache or incomplete.
         """
-        if types:
-            types = frozenset(types)
         results = {}
-        missing_groups = []
-        if types is not None:
-            for group in set(groups):
-                state_dict_ids, got_all = self._get_some_state_from_cache(
-                    cache, group, types, filtered_types
-                )
-                results[group] = state_dict_ids
+        incomplete_groups = set()
+        for group in set(groups):
+            state_dict_ids, got_all = self._get_state_for_group_using_cache(
+                cache, group, state_filter
+            )
+            results[group] = state_dict_ids
 
-                if not got_all:
-                    missing_groups.append(group)
-        else:
-            for group in set(groups):
-                state_dict_ids, got_all = self._get_all_state_from_cache(
-                    cache, group
-                )
+            if not got_all:
+                incomplete_groups.add(group)
 
-                results[group] = state_dict_ids
+        return results, incomplete_groups
 
-                if not got_all:
-                    missing_groups.append(group)
+    def _insert_into_cache(self, group_to_state_dict, state_filter,
+                           cache_seq_num_members, cache_seq_num_non_members):
+        """Inserts results from querying the database into the relevant cache.
 
-        if missing_groups:
-            # Okay, so we have some missing_types, let's fetch them.
-            cache_seq_num = cache.sequence
+        Args:
+            group_to_state_dict (dict): The new entries pulled from database.
+                Map from state group to state dict
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
+            cache_seq_num_members (int): Sequence number of member cache since
+                last lookup in cache
+            cache_seq_num_non_members (int): Sequence number of member cache since
+                last lookup in cache
+        """
 
-            # the DictionaryCache knows if it has *all* the state, but
-            # does not know if it has all of the keys of a particular type,
-            # which makes wildcard lookups expensive unless we have a complete
-            # cache. Hence, if we are doing a wildcard lookup, populate the
-            # cache fully so that we can do an efficient lookup next time.
+        # We need to work out which types we've fetched from the DB for the
+        # member vs non-member caches. This should be as accurate as possible,
+        # but can be an underestimate (e.g. when we have wild cards)
 
-            if filtered_types or (types and any(k is None for (t, k) in types)):
-                types_to_fetch = None
-            else:
-                types_to_fetch = types
+        member_filter, non_member_filter = state_filter.get_member_split()
+        if member_filter.is_full():
+            # We fetched all member events
+            member_types = None
+        else:
+            # `concrete_types()` will only return a subset when there are wild
+            # cards in the filter, but that's fine.
+            member_types = member_filter.concrete_types()
 
-            group_to_state_dict = yield self._get_state_groups_from_groups(
-                missing_groups, types_to_fetch, cache == self._state_group_members_cache,
-            )
+        if non_member_filter.is_full():
+            # We fetched all non member events
+            non_member_types = None
+        else:
+            non_member_types = non_member_filter.concrete_types()
+
+        for group, group_state_dict in iteritems(group_to_state_dict):
+            state_dict_members = {}
+            state_dict_non_members = {}
 
-            for group, group_state_dict in iteritems(group_to_state_dict):
-                state_dict = results[group]
-
-                # update the result, filtering by `types`.
-                if types:
-                    for k, v in iteritems(group_state_dict):
-                        (typ, _) = k
-                        if (
-                            (k in types or (typ, None) in types) or
-                            (filtered_types and typ not in filtered_types)
-                        ):
-                            state_dict[k] = v
+            for k, v in iteritems(group_state_dict):
+                if k[0] == EventTypes.Member:
+                    state_dict_members[k] = v
                 else:
-                    state_dict.update(group_state_dict)
-
-                # update the cache with all the things we fetched from the
-                # database.
-                cache.update(
-                    cache_seq_num,
-                    key=group,
-                    value=group_state_dict,
-                    fetched_keys=types_to_fetch,
-                )
+                    state_dict_non_members[k] = v
 
-        defer.returnValue(results)
+            self._state_group_members_cache.update(
+                cache_seq_num_members,
+                key=group,
+                value=state_dict_members,
+                fetched_keys=member_types,
+            )
+
+            self._state_group_cache.update(
+                cache_seq_num_non_members,
+                key=group,
+                value=state_dict_non_members,
+                fetched_keys=non_member_types,
+            )
 
     def store_state_group(self, event_id, room_id, prev_group, delta_ids,
                           current_state_ids):
@@ -1181,12 +1374,12 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
                         continue
 
                     prev_state = self._get_state_groups_from_groups_txn(
-                        txn, [prev_group], types=None
+                        txn, [prev_group],
                     )
                     prev_state = prev_state[prev_group]
 
                     curr_state = self._get_state_groups_from_groups_txn(
-                        txn, [state_group], types=None
+                        txn, [state_group],
                     )
                     curr_state = curr_state[state_group]
 
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index a3032cdce9..d8bf953ec0 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -30,7 +30,7 @@ from ._base import SQLBaseStore, db_to_json
 # py2 sqlite has buffer hardcoded as only binary type, so we must use it,
 # despite being deprecated and removed in favor of memoryview
 if six.PY2:
-    db_binary_type = buffer
+    db_binary_type = six.moves.builtins.buffer
 else:
     db_binary_type = memoryview