summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2020-03-16 19:21:18 +0000
committerAndrew Morgan <andrew@amorgan.xyz>2020-03-16 19:21:18 +0000
commit6d29b5342449316944ffd26a5d61f5a7600298b7 (patch)
tree49d72fbfb565287ab59b45ad7b35fb85f417377b /synapse
parentExpose some homeserver functionality to spam checkers (#6259) (diff)
parentMerge pull request #6294 from matrix-org/erikj/add_state_storage (diff)
downloadsynapse-6d29b5342449316944ffd26a5d61f5a7600298b7.tar.xz
Merge pull request #6294 from matrix-org/erikj/add_state_storage
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/admin.py7
-rw-r--r--synapse/handlers/device.py3
-rw-r--r--synapse/handlers/events.py6
-rw-r--r--synapse/handlers/federation.py19
-rw-r--r--synapse/handlers/initial_sync.py14
-rw-r--r--synapse/handlers/message.py10
-rw-r--r--synapse/handlers/pagination.py6
-rw-r--r--synapse/handlers/room.py6
-rw-r--r--synapse/handlers/search.py12
-rw-r--r--synapse/handlers/sync.py20
-rw-r--r--synapse/notifier.py6
-rw-r--r--synapse/push/httppusher.py3
-rw-r--r--synapse/push/mailer.py3
-rw-r--r--synapse/push/push_tools.py9
-rw-r--r--synapse/state/__init__.py13
-rw-r--r--synapse/storage/__init__.py2
-rw-r--r--synapse/storage/persist_events.py2
-rw-r--r--synapse/storage/state.py233
-rw-r--r--synapse/visibility.py34
19 files changed, 340 insertions, 68 deletions
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 1a87b58838..6407d56f8e 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -30,6 +30,9 @@ class AdminHandler(BaseHandler):
     def __init__(self, hs):
         super(AdminHandler, self).__init__(hs)
 
+        self.storage = hs.get_storage()
+        self.state_store = self.storage.state
+
     @defer.inlineCallbacks
     def get_whois(self, user):
         connections = []
@@ -205,7 +208,7 @@ class AdminHandler(BaseHandler):
 
                 from_key = events[-1].internal_metadata.after
 
-                events = yield filter_events_for_client(self.store, user_id, events)
+                events = yield filter_events_for_client(self.storage, user_id, events)
 
                 writer.write_events(room_id, events)
 
@@ -241,7 +244,7 @@ class AdminHandler(BaseHandler):
             for event_id in extremities:
                 if not event_to_unseen_prevs[event_id]:
                     continue
-                state = yield self.store.get_state_for_event(event_id)
+                state = yield self.state_store.get_state_for_event(event_id)
                 writer.write_state(room_id, event_id, state)
 
         return writer.finished()
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index befef2cf3d..16b4617f68 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -46,6 +46,7 @@ class DeviceWorkerHandler(BaseHandler):
 
         self.hs = hs
         self.state = hs.get_state_handler()
+        self.state_store = hs.get_storage().state
         self._auth_handler = hs.get_auth_handler()
 
     @trace
@@ -178,7 +179,7 @@ class DeviceWorkerHandler(BaseHandler):
                 continue
 
             # mapping from event_id -> state_dict
-            prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
+            prev_state_ids = yield self.state_store.get_state_ids_for_events(event_ids)
 
             # Check if we've joined the room? If so we just blindly add all the users to
             # the "possibly changed" users.
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 5e748687e3..45fe13c62f 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -147,6 +147,10 @@ class EventStreamHandler(BaseHandler):
 
 
 class EventHandler(BaseHandler):
+    def __init__(self, hs):
+        super(EventHandler, self).__init__(hs)
+        self.storage = hs.get_storage()
+
     @defer.inlineCallbacks
     def get_event(self, user, room_id, event_id):
         """Retrieve a single specified event.
@@ -172,7 +176,7 @@ class EventHandler(BaseHandler):
         is_peeking = user.to_string() not in users
 
         filtered = yield filter_events_for_client(
-            self.store, user.to_string(), [event], is_peeking=is_peeking
+            self.storage, user.to_string(), [event], is_peeking=is_peeking
         )
 
         if not filtered:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0376c36a44..cad334346d 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -110,6 +110,7 @@ class FederationHandler(BaseHandler):
 
         self.store = hs.get_datastore()
         self.storage = hs.get_storage()
+        self.state_store = self.storage.state
         self.federation_client = hs.get_federation_client()
         self.state_handler = hs.get_state_handler()
         self.server_name = hs.hostname
@@ -325,7 +326,7 @@ class FederationHandler(BaseHandler):
                 event_map = {event_id: pdu}
                 try:
                     # Get the state of the events we know about
-                    ours = yield self.store.get_state_groups_ids(room_id, seen)
+                    ours = yield self.state_store.get_state_groups_ids(room_id, seen)
 
                     # state_maps is a list of mappings from (type, state_key) to event_id
                     state_maps = list(
@@ -891,7 +892,7 @@ class FederationHandler(BaseHandler):
         # We set `check_history_visibility_only` as we might otherwise get false
         # positives from users having been erased.
         filtered_extremities = yield filter_events_for_server(
-            self.store,
+            self.storage,
             self.server_name,
             list(extremities_events.values()),
             redact=False,
@@ -1559,7 +1560,7 @@ class FederationHandler(BaseHandler):
             event_id, allow_none=False, check_room_id=room_id
         )
 
-        state_groups = yield self.store.get_state_groups(room_id, [event_id])
+        state_groups = yield self.state_store.get_state_groups(room_id, [event_id])
 
         if state_groups:
             _, state = list(iteritems(state_groups)).pop()
@@ -1588,7 +1589,7 @@ class FederationHandler(BaseHandler):
             event_id, allow_none=False, check_room_id=room_id
         )
 
-        state_groups = yield self.store.get_state_groups_ids(room_id, [event_id])
+        state_groups = yield self.state_store.get_state_groups_ids(room_id, [event_id])
 
         if state_groups:
             _, state = list(state_groups.items()).pop()
@@ -1616,7 +1617,7 @@ class FederationHandler(BaseHandler):
 
         events = yield self.store.get_backfill_events(room_id, pdu_list, limit)
 
-        events = yield filter_events_for_server(self.store, origin, events)
+        events = yield filter_events_for_server(self.storage, origin, events)
 
         return events
 
@@ -1646,7 +1647,7 @@ class FederationHandler(BaseHandler):
             if not in_room:
                 raise AuthError(403, "Host not in room.")
 
-            events = yield filter_events_for_server(self.store, origin, [event])
+            events = yield filter_events_for_server(self.storage, origin, [event])
             event = events[0]
             return event
         else:
@@ -1914,7 +1915,7 @@ class FederationHandler(BaseHandler):
                 # given state at the event. This should correctly handle cases
                 # like bans, especially with state res v2.
 
-                state_sets = yield self.store.get_state_groups(
+                state_sets = yield self.state_store.get_state_groups(
                     event.room_id, extrem_ids
                 )
                 state_sets = list(state_sets.values())
@@ -2005,7 +2006,7 @@ class FederationHandler(BaseHandler):
         )
 
         missing_events = yield filter_events_for_server(
-            self.store, origin, missing_events
+            self.storage, origin, missing_events
         )
 
         return missing_events
@@ -2246,7 +2247,7 @@ class FederationHandler(BaseHandler):
 
         # create a new state group as a delta from the existing one.
         prev_group = context.state_group
-        state_group = yield self.store.store_state_group(
+        state_group = yield self.state_store.store_state_group(
             event.event_id,
             event.room_id,
             prev_group=prev_group,
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index f991efeee3..49c9e031f9 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -43,6 +43,8 @@ class InitialSyncHandler(BaseHandler):
         self.validator = EventValidator()
         self.snapshot_cache = SnapshotCache()
         self._event_serializer = hs.get_event_client_serializer()
+        self.storage = hs.get_storage()
+        self.state_store = self.storage.state
 
     def snapshot_all_rooms(
         self,
@@ -169,7 +171,7 @@ class InitialSyncHandler(BaseHandler):
                 elif event.membership == Membership.LEAVE:
                     room_end_token = "s%d" % (event.stream_ordering,)
                     deferred_room_state = run_in_background(
-                        self.store.get_state_for_events, [event.event_id]
+                        self.state_store.get_state_for_events, [event.event_id]
                     )
                     deferred_room_state.addCallback(
                         lambda states: states[event.event_id]
@@ -189,7 +191,9 @@ class InitialSyncHandler(BaseHandler):
                     )
                 ).addErrback(unwrapFirstError)
 
-                messages = yield filter_events_for_client(self.store, user_id, messages)
+                messages = yield filter_events_for_client(
+                    self.storage, user_id, messages
+                )
 
                 start_token = now_token.copy_and_replace("room_key", token)
                 end_token = now_token.copy_and_replace("room_key", room_end_token)
@@ -307,7 +311,7 @@ class InitialSyncHandler(BaseHandler):
     def _room_initial_sync_parted(
         self, user_id, room_id, pagin_config, membership, member_event_id, is_peeking
     ):
-        room_state = yield self.store.get_state_for_events([member_event_id])
+        room_state = yield self.state_store.get_state_for_events([member_event_id])
 
         room_state = room_state[member_event_id]
 
@@ -322,7 +326,7 @@ class InitialSyncHandler(BaseHandler):
         )
 
         messages = yield filter_events_for_client(
-            self.store, user_id, messages, is_peeking=is_peeking
+            self.storage, user_id, messages, is_peeking=is_peeking
         )
 
         start_token = StreamToken.START.copy_and_replace("room_key", token)
@@ -414,7 +418,7 @@ class InitialSyncHandler(BaseHandler):
         )
 
         messages = yield filter_events_for_client(
-            self.store, user_id, messages, is_peeking=is_peeking
+            self.storage, user_id, messages, is_peeking=is_peeking
         )
 
         start_token = now_token.copy_and_replace("room_key", token)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c2eba81980..f32339a14c 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -59,6 +59,8 @@ class MessageHandler(object):
         self.clock = hs.get_clock()
         self.state = hs.get_state_handler()
         self.store = hs.get_datastore()
+        self.storage = hs.get_storage()
+        self.state_store = self.storage.state
         self._event_serializer = hs.get_event_client_serializer()
 
     @defer.inlineCallbacks
@@ -82,7 +84,7 @@ class MessageHandler(object):
             data = yield self.state.get_current_state(room_id, event_type, state_key)
         elif membership == Membership.LEAVE:
             key = (event_type, state_key)
-            room_state = yield self.store.get_state_for_events(
+            room_state = yield self.state_store.get_state_for_events(
                 [membership_event_id], StateFilter.from_types([key])
             )
             data = room_state[membership_event_id].get(key)
@@ -135,12 +137,12 @@ class MessageHandler(object):
                 raise NotFoundError("Can't find event for token %s" % (at_token,))
 
             visible_events = yield filter_events_for_client(
-                self.store, user_id, last_events, apply_retention_policies=False
+                self.storage, user_id, last_events, apply_retention_policies=False
             )
 
             event = last_events[0]
             if visible_events:
-                room_state = yield self.store.get_state_for_events(
+                room_state = yield self.state_store.get_state_for_events(
                     [event.event_id], state_filter=state_filter
                 )
                 room_state = room_state[event.event_id]
@@ -161,7 +163,7 @@ class MessageHandler(object):
                 )
                 room_state = yield self.store.get_events(state_ids.values())
             elif membership == Membership.LEAVE:
-                room_state = yield self.store.get_state_for_events(
+                room_state = yield self.state_store.get_state_for_events(
                     [membership_event_id], state_filter=state_filter
                 )
                 room_state = room_state[membership_event_id]
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index d8c3feff16..f6969e2887 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -72,6 +72,8 @@ class PaginationHandler(object):
         self.hs = hs
         self.auth = hs.get_auth()
         self.store = hs.get_datastore()
+        self.storage = hs.get_storage()
+        self.state_store = self.storage.state
         self.clock = hs.get_clock()
         self._server_name = hs.hostname
 
@@ -363,7 +365,7 @@ class PaginationHandler(object):
                 events = event_filter.filter(events)
 
             events = yield filter_events_for_client(
-                self.store, user_id, events, is_peeking=(member_event_id is None)
+                self.storage, user_id, events, is_peeking=(member_event_id is None)
             )
 
         if not events:
@@ -382,7 +384,7 @@ class PaginationHandler(object):
                 (EventTypes.Member, event.sender) for event in events
             )
 
-            state_ids = yield self.store.get_state_ids_for_event(
+            state_ids = yield self.state_store.get_state_ids_for_event(
                 events[0].event_id, state_filter=state_filter
             )
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 0dce0109de..1fec1a1f31 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -849,6 +849,8 @@ class RoomContextHandler(object):
     def __init__(self, hs):
         self.hs = hs
         self.store = hs.get_datastore()
+        self.storage = hs.get_storage()
+        self.state_store = self.storage.state
 
     @defer.inlineCallbacks
     def get_event_context(self, user, room_id, event_id, limit, event_filter):
@@ -875,7 +877,7 @@ class RoomContextHandler(object):
 
         def filter_evts(events):
             return filter_events_for_client(
-                self.store, user.to_string(), events, is_peeking=is_peeking
+                self.storage, user.to_string(), events, is_peeking=is_peeking
             )
 
         event = yield self.store.get_event(
@@ -917,7 +919,7 @@ class RoomContextHandler(object):
         # first? Shouldn't we be consistent with /sync?
         # https://github.com/matrix-org/matrix-doc/issues/687
 
-        state = yield self.store.get_state_for_events(
+        state = yield self.state_store.get_state_for_events(
             [last_event_id], state_filter=state_filter
         )
         results["state"] = list(state[last_event_id].values())
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index cd5e90bacb..f4d8a60774 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -35,6 +35,8 @@ class SearchHandler(BaseHandler):
     def __init__(self, hs):
         super(SearchHandler, self).__init__(hs)
         self._event_serializer = hs.get_event_client_serializer()
+        self.storage = hs.get_storage()
+        self.state_store = self.storage.state
 
     @defer.inlineCallbacks
     def get_old_rooms_from_upgraded_room(self, room_id):
@@ -221,7 +223,7 @@ class SearchHandler(BaseHandler):
             filtered_events = search_filter.filter([r["event"] for r in results])
 
             events = yield filter_events_for_client(
-                self.store, user.to_string(), filtered_events
+                self.storage, user.to_string(), filtered_events
             )
 
             events.sort(key=lambda e: -rank_map[e.event_id])
@@ -271,7 +273,7 @@ class SearchHandler(BaseHandler):
                 filtered_events = search_filter.filter([r["event"] for r in results])
 
                 events = yield filter_events_for_client(
-                    self.store, user.to_string(), filtered_events
+                    self.storage, user.to_string(), filtered_events
                 )
 
                 room_events.extend(events)
@@ -340,11 +342,11 @@ class SearchHandler(BaseHandler):
                 )
 
                 res["events_before"] = yield filter_events_for_client(
-                    self.store, user.to_string(), res["events_before"]
+                    self.storage, user.to_string(), res["events_before"]
                 )
 
                 res["events_after"] = yield filter_events_for_client(
-                    self.store, user.to_string(), res["events_after"]
+                    self.storage, user.to_string(), res["events_after"]
                 )
 
                 res["start"] = now_token.copy_and_replace(
@@ -372,7 +374,7 @@ class SearchHandler(BaseHandler):
                         [(EventTypes.Member, sender) for sender in senders]
                     )
 
-                    state = yield self.store.get_state_for_event(
+                    state = yield self.state_store.get_state_for_event(
                         last_event_id, state_filter
                     )
 
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index d99160e9d7..43a082dcda 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -230,6 +230,8 @@ class SyncHandler(object):
         self.response_cache = ResponseCache(hs, "sync")
         self.state = hs.get_state_handler()
         self.auth = hs.get_auth()
+        self.storage = hs.get_storage()
+        self.state_store = self.storage.state
 
         # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
         self.lazy_loaded_members_cache = ExpiringCache(
@@ -417,7 +419,7 @@ class SyncHandler(object):
                     current_state_ids = frozenset(itervalues(current_state_ids))
 
                 recents = yield filter_events_for_client(
-                    self.store,
+                    self.storage,
                     sync_config.user.to_string(),
                     recents,
                     always_include_ids=current_state_ids,
@@ -470,7 +472,7 @@ class SyncHandler(object):
                     current_state_ids = frozenset(itervalues(current_state_ids))
 
                 loaded_recents = yield filter_events_for_client(
-                    self.store,
+                    self.storage,
                     sync_config.user.to_string(),
                     loaded_recents,
                     always_include_ids=current_state_ids,
@@ -509,7 +511,7 @@ class SyncHandler(object):
         Returns:
             A Deferred map from ((type, state_key)->Event)
         """
-        state_ids = yield self.store.get_state_ids_for_event(
+        state_ids = yield self.state_store.get_state_ids_for_event(
             event.event_id, state_filter=state_filter
         )
         if event.is_state():
@@ -580,7 +582,7 @@ class SyncHandler(object):
             return None
 
         last_event = last_events[-1]
-        state_ids = yield self.store.get_state_ids_for_event(
+        state_ids = yield self.state_store.get_state_ids_for_event(
             last_event.event_id,
             state_filter=StateFilter.from_types(
                 [(EventTypes.Name, ""), (EventTypes.CanonicalAlias, "")]
@@ -757,11 +759,11 @@ class SyncHandler(object):
 
             if full_state:
                 if batch:
-                    current_state_ids = yield self.store.get_state_ids_for_event(
+                    current_state_ids = yield self.state_store.get_state_ids_for_event(
                         batch.events[-1].event_id, state_filter=state_filter
                     )
 
-                    state_ids = yield self.store.get_state_ids_for_event(
+                    state_ids = yield self.state_store.get_state_ids_for_event(
                         batch.events[0].event_id, state_filter=state_filter
                     )
 
@@ -781,7 +783,7 @@ class SyncHandler(object):
                 )
             elif batch.limited:
                 if batch:
-                    state_at_timeline_start = yield self.store.get_state_ids_for_event(
+                    state_at_timeline_start = yield self.state_store.get_state_ids_for_event(
                         batch.events[0].event_id, state_filter=state_filter
                     )
                 else:
@@ -810,7 +812,7 @@ class SyncHandler(object):
                 )
 
                 if batch:
-                    current_state_ids = yield self.store.get_state_ids_for_event(
+                    current_state_ids = yield self.state_store.get_state_ids_for_event(
                         batch.events[-1].event_id, state_filter=state_filter
                     )
                 else:
@@ -841,7 +843,7 @@ class SyncHandler(object):
                         # So we fish out all the member events corresponding to the
                         # timeline here, and then dedupe any redundant ones below.
 
-                        state_ids = yield self.store.get_state_ids_for_event(
+                        state_ids = yield self.state_store.get_state_ids_for_event(
                             batch.events[0].event_id,
                             # we only want members!
                             state_filter=StateFilter.from_types(
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 4e091314e6..af161a81d7 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -159,6 +159,7 @@ class Notifier(object):
         self.room_to_user_streams = {}
 
         self.hs = hs
+        self.storage = hs.get_storage()
         self.event_sources = hs.get_event_sources()
         self.store = hs.get_datastore()
         self.pending_new_room_events = []
@@ -425,7 +426,10 @@ class Notifier(object):
 
                 if name == "room":
                     new_events = yield filter_events_for_client(
-                        self.store, user.to_string(), new_events, is_peeking=is_peeking
+                        self.storage,
+                        user.to_string(),
+                        new_events,
+                        is_peeking=is_peeking,
                     )
                 elif name == "presence":
                     now = self.clock.time_msec()
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index d64b774a49..a4c16b2608 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -64,6 +64,7 @@ class HttpPusher(object):
     def __init__(self, hs, pusherdict):
         self.hs = hs
         self.store = self.hs.get_datastore()
+        self.storage = self.hs.get_storage()
         self.clock = self.hs.get_clock()
         self.state_handler = self.hs.get_state_handler()
         self.user_id = pusherdict["user_name"]
@@ -329,7 +330,7 @@ class HttpPusher(object):
             return d
 
         ctx = yield push_tools.get_context_for_event(
-            self.store, self.state_handler, event, self.user_id
+            self.storage, self.state_handler, event, self.user_id
         )
 
         d = {
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 5b16ab4ae8..1d15a06a58 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -119,6 +119,7 @@ class Mailer(object):
         self.store = self.hs.get_datastore()
         self.macaroon_gen = self.hs.get_macaroon_generator()
         self.state_handler = self.hs.get_state_handler()
+        self.storage = hs.get_storage()
         self.app_name = app_name
 
         logger.info("Created Mailer for app_name %s" % app_name)
@@ -389,7 +390,7 @@ class Mailer(object):
         }
 
         the_events = yield filter_events_for_client(
-            self.store, user_id, results["events_before"]
+            self.storage, user_id, results["events_before"]
         )
         the_events.append(notif_event)
 
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index a54051a726..de5c101a58 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -16,6 +16,7 @@
 from twisted.internet import defer
 
 from synapse.push.presentable_names import calculate_room_name, name_from_member_event
+from synapse.storage import Storage
 
 
 @defer.inlineCallbacks
@@ -43,22 +44,22 @@ def get_badge_count(store, user_id):
 
 
 @defer.inlineCallbacks
-def get_context_for_event(store, state_handler, ev, user_id):
+def get_context_for_event(storage: Storage, state_handler, ev, user_id):
     ctx = {}
 
-    room_state_ids = yield store.get_state_ids_for_event(ev.event_id)
+    room_state_ids = yield storage.state.get_state_ids_for_event(ev.event_id)
 
     # we no longer bother setting room_alias, and make room_name the
     # human-readable name instead, be that m.room.name, an alias or
     # a list of people in the room
     name = yield calculate_room_name(
-        store, room_state_ids, user_id, fallback_to_single_member=False
+        storage.main, room_state_ids, user_id, fallback_to_single_member=False
     )
     if name:
         ctx["name"] = name
 
     sender_state_event_id = room_state_ids[("m.room.member", ev.sender)]
-    sender_state_event = yield store.get_event(sender_state_event_id)
+    sender_state_event = yield storage.main.get_event(sender_state_event_id)
     ctx["sender_display_name"] = name_from_member_event(sender_state_event)
 
     return ctx
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index dc9f5a9008..4e91eb66fe 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -103,6 +103,7 @@ class StateHandler(object):
     def __init__(self, hs):
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
+        self.state_store = hs.get_storage().state
         self.hs = hs
         self._state_resolution_handler = hs.get_state_resolution_handler()
 
@@ -271,7 +272,7 @@ class StateHandler(object):
             else:
                 current_state_ids = prev_state_ids
 
-            state_group = yield self.store.store_state_group(
+            state_group = yield self.state_store.store_state_group(
                 event.event_id,
                 event.room_id,
                 prev_group=None,
@@ -321,7 +322,7 @@ class StateHandler(object):
                 delta_ids = dict(entry.delta_ids)
                 delta_ids[key] = event.event_id
 
-            state_group = yield self.store.store_state_group(
+            state_group = yield self.state_store.store_state_group(
                 event.event_id,
                 event.room_id,
                 prev_group=prev_group,
@@ -334,7 +335,7 @@ class StateHandler(object):
             delta_ids = entry.delta_ids
 
             if entry.state_group is None:
-                entry.state_group = yield self.store.store_state_group(
+                entry.state_group = yield self.state_store.store_state_group(
                     event.event_id,
                     event.room_id,
                     prev_group=entry.prev_group,
@@ -376,14 +377,16 @@ class StateHandler(object):
         # map from state group id to the state in that state group (where
         # 'state' is a map from state key to event id)
         # dict[int, dict[(str, str), str]]
-        state_groups_ids = yield self.store.get_state_groups_ids(room_id, event_ids)
+        state_groups_ids = yield self.state_store.get_state_groups_ids(
+            room_id, event_ids
+        )
 
         if len(state_groups_ids) == 0:
             return _StateCacheEntry(state={}, state_group=None)
         elif len(state_groups_ids) == 1:
             name, state_list = list(state_groups_ids.items()).pop()
 
-            prev_group, delta_ids = yield self.store.get_state_group_delta(name)
+            prev_group, delta_ids = yield self.state_store.get_state_group_delta(name)
 
             return _StateCacheEntry(
                 state=state_list,
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index a6429d17ed..0a1a8cc1e5 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -30,6 +30,7 @@ stored in `synapse.storage.schema`.
 from synapse.storage.data_stores import DataStores
 from synapse.storage.data_stores.main import DataStore
 from synapse.storage.persist_events import EventsPersistenceStorage
+from synapse.storage.state import StateGroupStorage
 
 __all__ = ["DataStores", "DataStore"]
 
@@ -45,6 +46,7 @@ class Storage(object):
         self.main = stores.main
 
         self.persistence = EventsPersistenceStorage(hs, stores)
+        self.state = StateGroupStorage(hs, stores)
 
 
 def are_all_users_on_domain(txn, database_engine, domain):
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 931dcb6558..fa03ca9ff7 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -547,7 +547,7 @@ class EventsPersistenceStorage(object):
 
         if missing_event_ids:
             # Now pull out the state groups for any missing events from DB
-            event_to_groups = yield self.state_store._get_state_group_for_events(
+            event_to_groups = yield self.main_store._get_state_group_for_events(
                 missing_event_ids
             )
             event_id_to_state_group.update(event_to_groups)
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index a2df8fa827..3735846899 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -19,6 +19,8 @@ from six import iteritems, itervalues
 
 import attr
 
+from twisted.internet import defer
+
 from synapse.api.constants import EventTypes
 
 logger = logging.getLogger(__name__)
@@ -322,3 +324,234 @@ class StateFilter(object):
         )
 
         return member_filter, non_member_filter
+
+
+class StateGroupStorage(object):
+    """High level interface to fetching state for event.
+    """
+
+    def __init__(self, hs, stores):
+        self.stores = stores
+
+    def get_state_group_delta(self, state_group):
+        """Given a state group try to return a previous group and a delta between
+        the old and the new.
+
+        Returns:
+            Deferred[Tuple[Optional[int], Optional[list[dict[tuple[str, str], str]]]]]):
+                (prev_group, delta_ids)
+        """
+
+        return self.stores.main.get_state_group_delta(state_group)
+
+    @defer.inlineCallbacks
+    def get_state_groups_ids(self, _room_id, event_ids):
+        """Get the event IDs of all the state for the state groups for the given events
+
+        Args:
+            _room_id (str): id of the room for these events
+            event_ids (iterable[str]): ids of the events
+
+        Returns:
+            Deferred[dict[int, dict[tuple[str, str], str]]]:
+                dict of state_group_id -> (dict of (type, state_key) -> event id)
+        """
+        if not event_ids:
+            return {}
+
+        event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids)
+
+        groups = set(itervalues(event_to_groups))
+        group_to_state = yield self.stores.main._get_state_for_groups(groups)
+
+        return group_to_state
+
+    @defer.inlineCallbacks
+    def get_state_ids_for_group(self, state_group):
+        """Get the event IDs of all the state in the given state group
+
+        Args:
+            state_group (int)
+
+        Returns:
+            Deferred[dict]: Resolves to a map of (type, state_key) -> event_id
+        """
+        group_to_state = yield self._get_state_for_groups((state_group,))
+
+        return group_to_state[state_group]
+
+    @defer.inlineCallbacks
+    def get_state_groups(self, room_id, event_ids):
+        """ Get the state groups for the given list of event_ids
+        Returns:
+            Deferred[dict[int, list[EventBase]]]:
+                dict of state_group_id -> list of state events.
+        """
+        if not event_ids:
+            return {}
+
+        group_to_ids = yield self.get_state_groups_ids(room_id, event_ids)
+
+        state_event_map = yield self.stores.main.get_events(
+            [
+                ev_id
+                for group_ids in itervalues(group_to_ids)
+                for ev_id in itervalues(group_ids)
+            ],
+            get_prev_content=False,
+        )
+
+        return {
+            group: [
+                state_event_map[v]
+                for v in itervalues(event_id_map)
+                if v in state_event_map
+            ]
+            for group, event_id_map in iteritems(group_to_ids)
+        }
+
+    def _get_state_groups_from_groups(self, groups, state_filter):
+        """Returns the state groups for a given set of groups, filtering on
+        types of state events.
+
+        Args:
+            groups(list[int]): list of state group IDs to query
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
+        Returns:
+            Deferred[dict[int, dict[tuple[str, str], str]]]:
+                dict of state_group_id -> (dict of (type, state_key) -> event id)
+        """
+
+        return self.stores.main._get_state_groups_from_groups(groups, state_filter)
+
+    @defer.inlineCallbacks
+    def get_state_for_events(self, event_ids, state_filter=StateFilter.all()):
+        """Given a list of event_ids and type tuples, return a list of state
+        dicts for each event.
+        Args:
+            event_ids (list[string])
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
+        Returns:
+            deferred: A dict of (event_id) -> (type, state_key) -> [state_events]
+        """
+        event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids)
+
+        groups = set(itervalues(event_to_groups))
+        group_to_state = yield self.stores.main._get_state_for_groups(
+            groups, state_filter
+        )
+
+        state_event_map = yield self.stores.main.get_events(
+            [ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)],
+            get_prev_content=False,
+        )
+
+        event_to_state = {
+            event_id: {
+                k: state_event_map[v]
+                for k, v in iteritems(group_to_state[group])
+                if v in state_event_map
+            }
+            for event_id, group in iteritems(event_to_groups)
+        }
+
+        return {event: event_to_state[event] for event in event_ids}
+
+    @defer.inlineCallbacks
+    def get_state_ids_for_events(self, event_ids, state_filter=StateFilter.all()):
+        """
+        Get the state dicts corresponding to a list of events, containing the event_ids
+        of the state events (as opposed to the events themselves)
+
+        Args:
+            event_ids(list(str)): events whose state should be returned
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
+
+        Returns:
+            A deferred dict from event_id -> (type, state_key) -> event_id
+        """
+        event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids)
+
+        groups = set(itervalues(event_to_groups))
+        group_to_state = yield self.stores.main._get_state_for_groups(
+            groups, state_filter
+        )
+
+        event_to_state = {
+            event_id: group_to_state[group]
+            for event_id, group in iteritems(event_to_groups)
+        }
+
+        return {event: event_to_state[event] for event in event_ids}
+
+    @defer.inlineCallbacks
+    def get_state_for_event(self, event_id, state_filter=StateFilter.all()):
+        """
+        Get the state dict corresponding to a particular event
+
+        Args:
+            event_id(str): event whose state should be returned
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
+
+        Returns:
+            A deferred dict from (type, state_key) -> state_event
+        """
+        state_map = yield self.get_state_for_events([event_id], state_filter)
+        return state_map[event_id]
+
+    @defer.inlineCallbacks
+    def get_state_ids_for_event(self, event_id, state_filter=StateFilter.all()):
+        """
+        Get the state dict corresponding to a particular event
+
+        Args:
+            event_id(str): event whose state should be returned
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
+
+        Returns:
+            A deferred dict from (type, state_key) -> state_event
+        """
+        state_map = yield self.get_state_ids_for_events([event_id], state_filter)
+        return state_map[event_id]
+
+    def _get_state_for_groups(self, groups, state_filter=StateFilter.all()):
+        """Gets the state at each of a list of state groups, optionally
+        filtering by type/state_key
+
+        Args:
+            groups (iterable[int]): list of state groups for which we want
+                to get the state.
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
+        Returns:
+            Deferred[dict[int, dict[tuple[str, str], str]]]:
+                dict of state_group_id -> (dict of (type, state_key) -> event id)
+        """
+        return self.stores.main._get_state_for_groups(groups, state_filter)
+
+    def store_state_group(
+        self, event_id, room_id, prev_group, delta_ids, current_state_ids
+    ):
+        """Store a new set of state, returning a newly assigned state group.
+
+        Args:
+            event_id (str): The event ID for which the state was calculated
+            room_id (str)
+            prev_group (int|None): A previous state group for the room, optional.
+            delta_ids (dict|None): The delta between state at `prev_group` and
+                `current_state_ids`, if `prev_group` was given. Same format as
+                `current_state_ids`.
+            current_state_ids (dict): The state to store. Map of (type, state_key)
+                to event_id.
+
+        Returns:
+            Deferred[int]: The state group ID
+        """
+        return self.stores.main.store_state_group(
+            event_id, room_id, prev_group, delta_ids, current_state_ids
+        )
diff --git a/synapse/visibility.py b/synapse/visibility.py
index a19011b793..c0e39f5be6 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -23,6 +23,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.events.utils import prune_event
+from synapse.storage import Storage
 from synapse.storage.state import StateFilter
 from synapse.types import get_domain_from_id
 
@@ -43,7 +44,7 @@ MEMBERSHIP_PRIORITY = (
 
 @defer.inlineCallbacks
 def filter_events_for_client(
-    store,
+    storage: Storage,
     user_id,
     events,
     is_peeking=False,
@@ -54,8 +55,7 @@ def filter_events_for_client(
     Check which events a user is allowed to see
 
     Args:
-        store (synapse.storage.DataStore): our datastore (can also be a worker
-            store)
+        storage
         user_id(str): user id to be checked
         events(list[synapse.events.EventBase]): sequence of events to be checked
         is_peeking(bool): should be True if:
@@ -77,12 +77,12 @@ def filter_events_for_client(
     events = list(e for e in events if not e.internal_metadata.is_soft_failed())
 
     types = ((EventTypes.RoomHistoryVisibility, ""), (EventTypes.Member, user_id))
-    event_id_to_state = yield store.get_state_for_events(
+    event_id_to_state = yield storage.state.get_state_for_events(
         frozenset(e.event_id for e in events),
         state_filter=StateFilter.from_types(types),
     )
 
-    ignore_dict_content = yield store.get_global_account_data_by_type_for_user(
+    ignore_dict_content = yield storage.main.get_global_account_data_by_type_for_user(
         "m.ignored_user_list", user_id
     )
 
@@ -93,7 +93,7 @@ def filter_events_for_client(
         else []
     )
 
-    erased_senders = yield store.are_users_erased((e.sender for e in events))
+    erased_senders = yield storage.main.are_users_erased((e.sender for e in events))
 
     if apply_retention_policies:
         room_ids = set(e.room_id for e in events)
@@ -101,7 +101,7 @@ def filter_events_for_client(
 
         for room_id in room_ids:
             retention_policies[room_id] = (
-                yield store.get_retention_policy_for_room(room_id)
+                yield storage.main.get_retention_policy_for_room(room_id)
             )
 
     def allowed(event):
@@ -128,7 +128,7 @@ def filter_events_for_client(
             max_lifetime = retention_policy.get("max_lifetime")
 
             if max_lifetime is not None:
-                oldest_allowed_ts = store.clock.time_msec() - max_lifetime
+                oldest_allowed_ts = storage.main.clock.time_msec() - max_lifetime
 
                 if event.origin_server_ts < oldest_allowed_ts:
                     return None
@@ -243,13 +243,17 @@ def filter_events_for_client(
 
 @defer.inlineCallbacks
 def filter_events_for_server(
-    store, server_name, events, redact=True, check_history_visibility_only=False
+    storage: Storage,
+    server_name,
+    events,
+    redact=True,
+    check_history_visibility_only=False,
 ):
     """Filter a list of events based on whether given server is allowed to
     see them.
 
     Args:
-        store (DataStore)
+        storage
         server_name (str)
         events (iterable[FrozenEvent])
         redact (bool): Whether to return a redacted version of the event, or
@@ -304,7 +308,7 @@ def filter_events_for_server(
     # Lets check to see if all the events have a history visibility
     # of "shared" or "world_readable". If thats the case then we don't
     # need to check membership (as we know the server is in the room).
-    event_to_state_ids = yield store.get_state_ids_for_events(
+    event_to_state_ids = yield storage.state.get_state_ids_for_events(
         frozenset(e.event_id for e in events),
         state_filter=StateFilter.from_types(
             types=((EventTypes.RoomHistoryVisibility, ""),)
@@ -322,14 +326,14 @@ def filter_events_for_server(
     if not visibility_ids:
         all_open = True
     else:
-        event_map = yield store.get_events(visibility_ids)
+        event_map = yield storage.main.get_events(visibility_ids)
         all_open = all(
             e.content.get("history_visibility") in (None, "shared", "world_readable")
             for e in itervalues(event_map)
         )
 
     if not check_history_visibility_only:
-        erased_senders = yield store.are_users_erased((e.sender for e in events))
+        erased_senders = yield storage.main.are_users_erased((e.sender for e in events))
     else:
         # We don't want to check whether users are erased, which is equivalent
         # to no users having been erased.
@@ -358,7 +362,7 @@ def filter_events_for_server(
 
     # first, for each event we're wanting to return, get the event_ids
     # of the history vis and membership state at those events.
-    event_to_state_ids = yield store.get_state_ids_for_events(
+    event_to_state_ids = yield storage.state.get_state_ids_for_events(
         frozenset(e.event_id for e in events),
         state_filter=StateFilter.from_types(
             types=((EventTypes.RoomHistoryVisibility, ""), (EventTypes.Member, None))
@@ -388,7 +392,7 @@ def filter_events_for_server(
             return False
         return state_key[idx + 1 :] == server_name
 
-    event_map = yield store.get_events(
+    event_map = yield storage.main.get_events(
         [
             e_id
             for e_id, key in iteritems(event_id_to_state_key)