summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2016-04-04 00:38:21 +0100
committerMatthew Hodgson <matthew@matrix.org>2016-04-04 00:38:21 +0100
commit9f7dc2bef7cd39645ae74d96f2919a3f9fdb65ac (patch)
treeb818eb0b962de7e860584bb5286f98e60e12ac75 /synapse/storage
parentreport image size (bytewise) in OG meta (diff)
parentMerge pull request #686 from matrix-org/markjh/doc_strings (diff)
downloadsynapse-9f7dc2bef7cd39645ae74d96f2919a3f9fdb65ac.tar.xz
Merge branch 'develop' into matthew/preview_urls
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py22
-rw-r--r--synapse/storage/account_data.py4
-rw-r--r--synapse/storage/event_push_actions.py5
-rw-r--r--synapse/storage/events.py278
-rw-r--r--synapse/storage/presence.py6
-rw-r--r--synapse/storage/push_rule.py2
-rw-r--r--synapse/storage/pusher.py2
-rw-r--r--synapse/storage/receipts.py6
-rw-r--r--synapse/storage/registration.py15
-rw-r--r--synapse/storage/schema/delta/30/state_stream.sql38
-rw-r--r--synapse/storage/state.py59
-rw-r--r--synapse/storage/stream.py2
-rw-r--r--synapse/storage/tags.py6
-rw-r--r--synapse/storage/util/id_generators.py63
14 files changed, 346 insertions, 162 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 250ba536ea..57863bba4d 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -88,15 +88,6 @@ class DataStore(RoomMemberStore, RoomStore,
         self.hs = hs
         self.database_engine = hs.database_engine
 
-        cur = db_conn.cursor()
-        try:
-            cur.execute("SELECT MIN(stream_ordering) FROM events",)
-            rows = cur.fetchall()
-            self.min_stream_token = rows[0][0] if rows and rows[0] and rows[0][0] else -1
-            self.min_stream_token = min(self.min_stream_token, -1)
-        finally:
-            cur.close()
-
         self.client_ip_last_seen = Cache(
             name="client_ip_last_seen",
             keylen=4,
@@ -105,6 +96,9 @@ class DataStore(RoomMemberStore, RoomStore,
         self._stream_id_gen = StreamIdGenerator(
             db_conn, "events", "stream_ordering"
         )
+        self._backfill_id_gen = StreamIdGenerator(
+            db_conn, "events", "stream_ordering", step=-1
+        )
         self._receipts_id_gen = StreamIdGenerator(
             db_conn, "receipts_linearized", "stream_id"
         )
@@ -116,7 +110,7 @@ class DataStore(RoomMemberStore, RoomStore,
         )
 
         self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id")
-        self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id")
+        self._state_groups_id_gen = StreamIdGenerator(db_conn, "state_groups", "id")
         self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
         self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id")
         self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
@@ -129,7 +123,7 @@ class DataStore(RoomMemberStore, RoomStore,
             extra_tables=[("deleted_pushers", "stream_id")],
         )
 
-        events_max = self._stream_id_gen.get_max_token()
+        events_max = self._stream_id_gen.get_current_token()
         event_cache_prefill, min_event_val = self._get_cache_dict(
             db_conn, "events",
             entity_column="room_id",
@@ -145,7 +139,7 @@ class DataStore(RoomMemberStore, RoomStore,
             "MembershipStreamChangeCache", events_max,
         )
 
-        account_max = self._account_data_id_gen.get_max_token()
+        account_max = self._account_data_id_gen.get_current_token()
         self._account_data_stream_cache = StreamChangeCache(
             "AccountDataAndTagsChangeCache", account_max,
         )
@@ -156,7 +150,7 @@ class DataStore(RoomMemberStore, RoomStore,
             db_conn, "presence_stream",
             entity_column="user_id",
             stream_column="stream_id",
-            max_value=self._presence_id_gen.get_max_token(),
+            max_value=self._presence_id_gen.get_current_token(),
         )
         self.presence_stream_cache = StreamChangeCache(
             "PresenceStreamChangeCache", min_presence_val,
@@ -167,7 +161,7 @@ class DataStore(RoomMemberStore, RoomStore,
             db_conn, "push_rules_stream",
             entity_column="user_id",
             stream_column="stream_id",
-            max_value=self._push_rules_stream_id_gen.get_max_token()[0],
+            max_value=self._push_rules_stream_id_gen.get_current_token()[0],
         )
 
         self.push_rules_stream_cache = StreamChangeCache(
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index faddefe219..7a7fbf1e52 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -200,7 +200,7 @@ class AccountDataStore(SQLBaseStore):
                 "add_room_account_data", add_account_data_txn, next_id
             )
 
-        result = self._account_data_id_gen.get_max_token()
+        result = self._account_data_id_gen.get_current_token()
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -239,7 +239,7 @@ class AccountDataStore(SQLBaseStore):
                 "add_user_account_data", add_account_data_txn, next_id
             )
 
-        result = self._account_data_id_gen.get_max_token()
+        result = self._account_data_id_gen.get_current_token()
         defer.returnValue(result)
 
     def _update_max_stream_id(self, txn, next_id):
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index dc5830450a..3933b6e2c5 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -26,8 +26,9 @@ logger = logging.getLogger(__name__)
 class EventPushActionsStore(SQLBaseStore):
     def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
         """
-        :param event: the event set actions for
-        :param tuples: list of tuples of (user_id, actions)
+        Args:
+            event: the event set actions for
+            tuples: list of tuples of (user_id, actions)
         """
         values = []
         for uid, actions in tuples:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5233430028..c4dc3b3d51 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -24,7 +24,7 @@ from synapse.util.logutils import log_function
 from synapse.api.constants import EventTypes
 
 from canonicaljson import encode_canonical_json
-from contextlib import contextmanager
+from collections import namedtuple
 
 import logging
 import math
@@ -60,64 +60,71 @@ class EventsStore(SQLBaseStore):
         )
 
     @defer.inlineCallbacks
-    def persist_events(self, events_and_contexts, backfilled=False,
-                       is_new_state=True):
+    def persist_events(self, events_and_contexts, backfilled=False):
         if not events_and_contexts:
             return
 
         if backfilled:
-            start = self.min_stream_token - 1
-            self.min_stream_token -= len(events_and_contexts) + 1
-            stream_orderings = range(start, self.min_stream_token, -1)
-
-            @contextmanager
-            def stream_ordering_manager():
-                yield stream_orderings
-            stream_ordering_manager = stream_ordering_manager()
+            stream_ordering_manager = self._backfill_id_gen.get_next_mult(
+                len(events_and_contexts)
+            )
         else:
             stream_ordering_manager = self._stream_id_gen.get_next_mult(
                 len(events_and_contexts)
             )
 
+        state_group_id_manager = self._state_groups_id_gen.get_next_mult(
+            len(events_and_contexts)
+        )
         with stream_ordering_manager as stream_orderings:
-            for (event, _), stream in zip(events_and_contexts, stream_orderings):
-                event.internal_metadata.stream_ordering = stream
-
-            chunks = [
-                events_and_contexts[x:x + 100]
-                for x in xrange(0, len(events_and_contexts), 100)
-            ]
+            with state_group_id_manager as state_group_ids:
+                for (event, context), stream, state_group_id in zip(
+                    events_and_contexts, stream_orderings, state_group_ids
+                ):
+                    event.internal_metadata.stream_ordering = stream
+                    # Assign a state group_id in case a new id is needed for
+                    # this context. In theory we only need to assign this
+                    # for contexts that have current_state and aren't outliers
+                    # but that make the code more complicated. Assigning an ID
+                    # per event only causes the state_group_ids to grow as fast
+                    # as the stream_ordering so in practise shouldn't be a problem.
+                    context.new_state_group_id = state_group_id
+
+                chunks = [
+                    events_and_contexts[x:x + 100]
+                    for x in xrange(0, len(events_and_contexts), 100)
+                ]
 
-            for chunk in chunks:
-                # We can't easily parallelize these since different chunks
-                # might contain the same event. :(
-                yield self.runInteraction(
-                    "persist_events",
-                    self._persist_events_txn,
-                    events_and_contexts=chunk,
-                    backfilled=backfilled,
-                    is_new_state=is_new_state,
-                )
+                for chunk in chunks:
+                    # We can't easily parallelize these since different chunks
+                    # might contain the same event. :(
+                    yield self.runInteraction(
+                        "persist_events",
+                        self._persist_events_txn,
+                        events_and_contexts=chunk,
+                        backfilled=backfilled,
+                    )
 
     @defer.inlineCallbacks
     @log_function
-    def persist_event(self, event, context,
-                      is_new_state=True, current_state=None):
+    def persist_event(self, event, context, current_state=None):
+
         try:
             with self._stream_id_gen.get_next() as stream_ordering:
-                event.internal_metadata.stream_ordering = stream_ordering
-                yield self.runInteraction(
-                    "persist_event",
-                    self._persist_event_txn,
-                    event=event,
-                    context=context,
-                    is_new_state=is_new_state,
-                    current_state=current_state,
-                )
+                with self._state_groups_id_gen.get_next() as state_group_id:
+                    event.internal_metadata.stream_ordering = stream_ordering
+                    context.new_state_group_id = state_group_id
+                    yield self.runInteraction(
+                        "persist_event",
+                        self._persist_event_txn,
+                        event=event,
+                        context=context,
+                        current_state=current_state,
+                    )
         except _RollbackButIsFineException:
             pass
 
-        max_persisted_id = yield self._stream_id_gen.get_max_token()
+        max_persisted_id = yield self._stream_id_gen.get_current_token()
         defer.returnValue((stream_ordering, max_persisted_id))
 
     @defer.inlineCallbacks
@@ -177,8 +184,7 @@ class EventsStore(SQLBaseStore):
         defer.returnValue({e.event_id: e for e in events})
 
     @log_function
-    def _persist_event_txn(self, txn, event, context,
-                           is_new_state=True, current_state=None):
+    def _persist_event_txn(self, txn, event, context, current_state):
         # We purposefully do this first since if we include a `current_state`
         # key, we *want* to update the `current_state_events` table
         if current_state:
@@ -186,7 +192,16 @@ class EventsStore(SQLBaseStore):
             txn.call_after(self.get_rooms_for_user.invalidate_all)
             txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
             txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
-            txn.call_after(self.get_room_name_and_aliases, event.room_id)
+            txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))
+
+            # Add an entry to the current_state_resets table to record the point
+            # where we clobbered the current state
+            stream_order = event.internal_metadata.stream_ordering
+            self._simple_insert_txn(
+                txn,
+                table="current_state_resets",
+                values={"event_stream_ordering": stream_order}
+            )
 
             self._simple_delete_txn(
                 txn,
@@ -210,12 +225,10 @@ class EventsStore(SQLBaseStore):
             txn,
             [(event, context)],
             backfilled=False,
-            is_new_state=is_new_state,
         )
 
     @log_function
-    def _persist_events_txn(self, txn, events_and_contexts, backfilled,
-                            is_new_state=True):
+    def _persist_events_txn(self, txn, events_and_contexts, backfilled):
         depth_updates = {}
         for event, context in events_and_contexts:
             # Remove the any existing cache entries for the event_ids
@@ -282,9 +295,7 @@ class EventsStore(SQLBaseStore):
 
             outlier_persisted = have_persisted[event.event_id]
             if not event.internal_metadata.is_outlier() and outlier_persisted:
-                self._store_state_groups_txn(
-                    txn, event, context,
-                )
+                self._store_mult_state_groups_txn(txn, ((event, context),))
 
                 metadata_json = encode_json(
                     event.internal_metadata.get_dict()
@@ -299,6 +310,18 @@ class EventsStore(SQLBaseStore):
                     (metadata_json, event.event_id,)
                 )
 
+                stream_order = event.internal_metadata.stream_ordering
+                state_group_id = context.state_group or context.new_state_group_id
+                self._simple_insert_txn(
+                    txn,
+                    table="ex_outlier_stream",
+                    values={
+                        "event_stream_ordering": stream_order,
+                        "event_id": event.event_id,
+                        "state_group": state_group_id,
+                    }
+                )
+
                 sql = (
                     "UPDATE events SET outlier = ?"
                     " WHERE event_id = ?"
@@ -310,19 +333,14 @@ class EventsStore(SQLBaseStore):
 
                 self._update_extremeties(txn, [event])
 
-        events_and_contexts = filter(
-            lambda ec: ec[0] not in to_remove,
-            events_and_contexts
-        )
+        events_and_contexts = [
+            ec for ec in events_and_contexts if ec[0] not in to_remove
+        ]
 
         if not events_and_contexts:
             return
 
-        self._store_mult_state_groups_txn(txn, [
-            (event, context)
-            for event, context in events_and_contexts
-            if not event.internal_metadata.is_outlier()
-        ])
+        self._store_mult_state_groups_txn(txn, events_and_contexts)
 
         self._handle_mult_prev_events(
             txn,
@@ -421,10 +439,9 @@ class EventsStore(SQLBaseStore):
             txn, [event for event, _ in events_and_contexts]
         )
 
-        state_events_and_contexts = filter(
-            lambda i: i[0].is_state(),
-            events_and_contexts,
-        )
+        state_events_and_contexts = [
+            ec for ec in events_and_contexts if ec[0].is_state()
+        ]
 
         state_values = []
         for event, context in state_events_and_contexts:
@@ -462,32 +479,50 @@ class EventsStore(SQLBaseStore):
             ],
         )
 
-        if is_new_state:
-            for event, _ in state_events_and_contexts:
-                if not context.rejected:
-                    txn.call_after(
-                        self._get_current_state_for_key.invalidate,
-                        (event.room_id, event.type, event.state_key,)
-                    )
+        if backfilled:
+            # Backfilled events come before the current state so we don't need
+            # to update the current state table
+            return
 
-                    if event.type in [EventTypes.Name, EventTypes.Aliases]:
-                        txn.call_after(
-                            self.get_room_name_and_aliases.invalidate,
-                            (event.room_id,)
-                        )
-
-                    self._simple_upsert_txn(
-                        txn,
-                        "current_state_events",
-                        keyvalues={
-                            "room_id": event.room_id,
-                            "type": event.type,
-                            "state_key": event.state_key,
-                        },
-                        values={
-                            "event_id": event.event_id,
-                        }
-                    )
+        for event, _ in state_events_and_contexts:
+            if (not event.internal_metadata.is_invite_from_remote()
+                    and event.internal_metadata.is_outlier()):
+                # Outlier events generally shouldn't clobber the current state.
+                # However invites from remote severs for rooms we aren't in
+                # are a bit special: they don't come with any associated
+                # state so are technically an outlier, however all the
+                # client-facing code assumes that they are in the current
+                # state table so we insert the event anyway.
+                continue
+
+            if context.rejected:
+                # If the event failed it's auth checks then it shouldn't
+                # clobbler the current state.
+                continue
+
+            txn.call_after(
+                self._get_current_state_for_key.invalidate,
+                (event.room_id, event.type, event.state_key,)
+            )
+
+            if event.type in [EventTypes.Name, EventTypes.Aliases]:
+                txn.call_after(
+                    self.get_room_name_and_aliases.invalidate,
+                    (event.room_id,)
+                )
+
+            self._simple_upsert_txn(
+                txn,
+                "current_state_events",
+                keyvalues={
+                    "room_id": event.room_id,
+                    "type": event.type,
+                    "state_key": event.state_key,
+                },
+                values={
+                    "event_id": event.event_id,
+                }
+            )
 
         return
 
@@ -1076,10 +1111,7 @@ class EventsStore(SQLBaseStore):
 
     def get_current_backfill_token(self):
         """The current minimum token that backfilled events have reached"""
-
-        # TODO: Fix race with the persit_event txn by using one of the
-        # stream id managers
-        return -self.min_stream_token
+        return -self._backfill_id_gen.get_current_token()
 
     def get_all_new_events(self, last_backfill_id, last_forward_id,
                            current_backfill_id, current_forward_id, limit):
@@ -1087,10 +1119,12 @@ class EventsStore(SQLBaseStore):
         new events or as backfilled events"""
         def get_all_new_events_txn(txn):
             sql = (
-                "SELECT e.stream_ordering, ej.internal_metadata, ej.json"
+                "SELECT e.stream_ordering, ej.internal_metadata, ej.json, eg.state_group"
                 " FROM events as e"
                 " JOIN event_json as ej"
                 " ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
+                " LEFT JOIN event_to_state_groups as eg"
+                " ON e.event_id = eg.event_id"
                 " WHERE ? < e.stream_ordering AND e.stream_ordering <= ?"
                 " ORDER BY e.stream_ordering ASC"
                 " LIMIT ?"
@@ -1098,14 +1132,43 @@ class EventsStore(SQLBaseStore):
             if last_forward_id != current_forward_id:
                 txn.execute(sql, (last_forward_id, current_forward_id, limit))
                 new_forward_events = txn.fetchall()
+
+                if len(new_forward_events) == limit:
+                    upper_bound = new_forward_events[-1][0]
+                else:
+                    upper_bound = current_forward_id
+
+                sql = (
+                    "SELECT -event_stream_ordering FROM current_state_resets"
+                    " WHERE ? < event_stream_ordering"
+                    " AND event_stream_ordering <= ?"
+                    " ORDER BY event_stream_ordering ASC"
+                )
+                txn.execute(sql, (last_forward_id, upper_bound))
+                state_resets = txn.fetchall()
+
+                sql = (
+                    "SELECT -event_stream_ordering, event_id, state_group"
+                    " FROM ex_outlier_stream"
+                    " WHERE ? > event_stream_ordering"
+                    " AND event_stream_ordering >= ?"
+                    " ORDER BY event_stream_ordering DESC"
+                )
+                txn.execute(sql, (last_forward_id, upper_bound))
+                forward_ex_outliers = txn.fetchall()
             else:
                 new_forward_events = []
+                state_resets = []
+                forward_ex_outliers = []
 
             sql = (
-                "SELECT -e.stream_ordering, ej.internal_metadata, ej.json"
+                "SELECT -e.stream_ordering, ej.internal_metadata, ej.json,"
+                " eg.state_group"
                 " FROM events as e"
                 " JOIN event_json as ej"
                 " ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
+                " LEFT JOIN event_to_state_groups as eg"
+                " ON e.event_id = eg.event_id"
                 " WHERE ? > e.stream_ordering AND e.stream_ordering >= ?"
                 " ORDER BY e.stream_ordering DESC"
                 " LIMIT ?"
@@ -1113,8 +1176,35 @@ class EventsStore(SQLBaseStore):
             if last_backfill_id != current_backfill_id:
                 txn.execute(sql, (-last_backfill_id, -current_backfill_id, limit))
                 new_backfill_events = txn.fetchall()
+
+                if len(new_backfill_events) == limit:
+                    upper_bound = new_backfill_events[-1][0]
+                else:
+                    upper_bound = current_backfill_id
+
+                sql = (
+                    "SELECT -event_stream_ordering, event_id, state_group"
+                    " FROM ex_outlier_stream"
+                    " WHERE ? > event_stream_ordering"
+                    " AND event_stream_ordering >= ?"
+                    " ORDER BY event_stream_ordering DESC"
+                )
+                txn.execute(sql, (-last_backfill_id, -upper_bound))
+                backward_ex_outliers = txn.fetchall()
             else:
                 new_backfill_events = []
+                backward_ex_outliers = []
 
-            return (new_forward_events, new_backfill_events)
+            return AllNewEventsResult(
+                new_forward_events, new_backfill_events,
+                forward_ex_outliers, backward_ex_outliers,
+                state_resets,
+            )
         return self.runInteraction("get_all_new_events", get_all_new_events_txn)
+
+
+AllNewEventsResult = namedtuple("AllNewEventsResult", [
+    "new_forward_events", "new_backfill_events",
+    "forward_ex_outliers", "backward_ex_outliers",
+    "state_resets"
+])
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 4cec31e316..59b4ef5ce6 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -68,7 +68,9 @@ class PresenceStore(SQLBaseStore):
                 self._update_presence_txn, stream_orderings, presence_states,
             )
 
-        defer.returnValue((stream_orderings[-1], self._presence_id_gen.get_max_token()))
+        defer.returnValue((
+            stream_orderings[-1], self._presence_id_gen.get_current_token()
+        ))
 
     def _update_presence_txn(self, txn, stream_orderings, presence_states):
         for stream_id, state in zip(stream_orderings, presence_states):
@@ -155,7 +157,7 @@ class PresenceStore(SQLBaseStore):
         defer.returnValue([UserPresenceState(**row) for row in rows])
 
     def get_current_presence_token(self):
-        return self._presence_id_gen.get_max_token()
+        return self._presence_id_gen.get_current_token()
 
     def allow_presence_visible(self, observed_localpart, observer_userid):
         return self._simple_insert(
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 9dbad2fd5f..d2bf7f2aec 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -392,7 +392,7 @@ class PushRuleStore(SQLBaseStore):
         """Get the position of the push rules stream.
         Returns a pair of a stream id for the push_rules stream and the
         room stream ordering it corresponds to."""
-        return self._push_rules_stream_id_gen.get_max_token()
+        return self._push_rules_stream_id_gen.get_current_token()
 
     def have_push_rules_changed_for_user(self, user_id, last_id):
         if not self.push_rules_stream_cache.has_entity_changed(user_id, last_id):
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 87b2ac5773..d1669c778a 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -78,7 +78,7 @@ class PusherStore(SQLBaseStore):
         defer.returnValue(rows)
 
     def get_pushers_stream_token(self):
-        return self._pushers_id_gen.get_max_token()
+        return self._pushers_id_gen.get_current_token()
 
     def get_all_updated_pushers(self, last_id, current_id, limit):
         def get_all_updated_pushers_txn(txn):
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 6b9d848eaa..4befebc8e2 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -31,7 +31,7 @@ class ReceiptsStore(SQLBaseStore):
         super(ReceiptsStore, self).__init__(hs)
 
         self._receipts_stream_cache = StreamChangeCache(
-            "ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token()
+            "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token()
         )
 
     @cached(num_args=2)
@@ -221,7 +221,7 @@ class ReceiptsStore(SQLBaseStore):
         defer.returnValue(results)
 
     def get_max_receipt_stream_id(self):
-        return self._receipts_id_gen.get_max_token()
+        return self._receipts_id_gen.get_current_token()
 
     def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
                                       user_id, event_id, data, stream_id):
@@ -346,7 +346,7 @@ class ReceiptsStore(SQLBaseStore):
             room_id, receipt_type, user_id, event_ids, data
         )
 
-        max_persisted_id = self._stream_id_gen.get_max_token()
+        max_persisted_id = self._stream_id_gen.get_current_token()
 
         defer.returnValue((stream_id, max_persisted_id))
 
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index bd4eb88a92..d46a963bb8 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -458,12 +458,15 @@ class RegistrationStore(SQLBaseStore):
         """
         Gets the 3pid's guest access token if exists, else saves access_token.
 
-        :param medium (str): Medium of the 3pid. Must be "email".
-        :param address (str): 3pid address.
-        :param access_token (str): The access token to persist if none is
-            already persisted.
-        :param inviter_user_id (str): User ID of the inviter.
-        :return (deferred str): Whichever access token is persisted at the end
+        Args:
+            medium (str): Medium of the 3pid. Must be "email".
+            address (str): 3pid address.
+            access_token (str): The access token to persist if none is
+                already persisted.
+            inviter_user_id (str): User ID of the inviter.
+
+        Returns:
+            deferred str: Whichever access token is persisted at the end
             of this function call.
         """
         def insert(txn):
diff --git a/synapse/storage/schema/delta/30/state_stream.sql b/synapse/storage/schema/delta/30/state_stream.sql
new file mode 100644
index 0000000000..706fe1dcf4
--- /dev/null
+++ b/synapse/storage/schema/delta/30/state_stream.sql
@@ -0,0 +1,38 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * The positions in the event stream_ordering when the current_state was
+ * replaced by the state at the event.
+ */
+
+CREATE TABLE IF NOT EXISTS current_state_resets(
+    event_stream_ordering BIGINT PRIMARY KEY NOT NULL
+);
+
+/* The outlier events that have aquired a state group typically through
+ * backfill. This is tracked separately to the events table, as assigning a
+ * state group change the position of the existing event in the stream
+ * ordering.
+ * However since a stream_ordering is assigned in persist_event for the
+ * (event, state) pair, we can use that stream_ordering to identify when
+ * the new state was assigned for the event.
+ */
+CREATE TABLE IF NOT EXISTS ex_outlier_stream(
+    event_stream_ordering BIGINT PRIMARY KEY NOT NULL,
+    event_id TEXT NOT NULL,
+    state_group BIGINT NOT NULL
+);
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 02cefdff26..e9f9406014 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -64,12 +64,12 @@ class StateStore(SQLBaseStore):
             for group, state_map in group_to_state.items()
         })
 
-    def _store_state_groups_txn(self, txn, event, context):
-        return self._store_mult_state_groups_txn(txn, [(event, context)])
-
     def _store_mult_state_groups_txn(self, txn, events_and_contexts):
         state_groups = {}
         for event, context in events_and_contexts:
+            if event.internal_metadata.is_outlier():
+                continue
+
             if context.current_state is None:
                 continue
 
@@ -82,7 +82,8 @@ class StateStore(SQLBaseStore):
             if event.is_state():
                 state_events[(event.type, event.state_key)] = event
 
-            state_group = self._state_groups_id_gen.get_next()
+            state_group = context.new_state_group_id
+
             self._simple_insert_txn(
                 txn,
                 table="state_groups",
@@ -114,11 +115,10 @@ class StateStore(SQLBaseStore):
             table="event_to_state_groups",
             values=[
                 {
-                    "state_group": state_groups[event.event_id],
-                    "event_id": event.event_id,
+                    "state_group": state_group_id,
+                    "event_id": event_id,
                 }
-                for event, context in events_and_contexts
-                if context.current_state is not None
+                for event_id, state_group_id in state_groups.items()
             ],
         )
 
@@ -249,11 +249,14 @@ class StateStore(SQLBaseStore):
         """
         Get the state dict corresponding to a particular event
 
-        :param str event_id: event whose state should be returned
-        :param list[(str, str)]|None types: List of (type, state_key) tuples
-            which are used to filter the state fetched. May be None, which
-            matches any key
-        :return: a deferred dict from (type, state_key) -> state_event
+        Args:
+            event_id(str): event whose state should be returned
+            types(list[(str, str)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. May be None, which
+                matches any key
+
+        Returns:
+            A deferred dict from (type, state_key) -> state_event
         """
         state_map = yield self.get_state_for_events([event_id], types)
         defer.returnValue(state_map[event_id])
@@ -429,3 +432,33 @@ class StateStore(SQLBaseStore):
             }
 
         defer.returnValue(results)
+
+    def get_all_new_state_groups(self, last_id, current_id, limit):
+        def get_all_new_state_groups_txn(txn):
+            sql = (
+                "SELECT id, room_id, event_id FROM state_groups"
+                " WHERE ? < id AND id <= ? ORDER BY id LIMIT ?"
+            )
+            txn.execute(sql, (last_id, current_id, limit))
+            groups = txn.fetchall()
+
+            if not groups:
+                return ([], [])
+
+            lower_bound = groups[0][0]
+            upper_bound = groups[-1][0]
+            sql = (
+                "SELECT state_group, type, state_key, event_id"
+                " FROM state_groups_state"
+                " WHERE ? <= state_group AND state_group <= ?"
+            )
+
+            txn.execute(sql, (lower_bound, upper_bound))
+            state_group_state = txn.fetchall()
+            return (groups, state_group_state)
+        return self.runInteraction(
+            "get_all_new_state_groups", get_all_new_state_groups_txn
+        )
+
+    def get_state_stream_token(self):
+        return self._state_groups_id_gen.get_current_token()
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index cf84938be5..76bcd9cd00 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -539,7 +539,7 @@ class StreamStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_room_events_max_id(self, direction='f'):
-        token = yield self._stream_id_gen.get_max_token()
+        token = yield self._stream_id_gen.get_current_token()
         if direction != 'b':
             defer.returnValue("s%d" % (token,))
         else:
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index a0e6b42b30..9da23f34cb 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -30,7 +30,7 @@ class TagsStore(SQLBaseStore):
         Returns:
             A deferred int.
         """
-        return self._account_data_id_gen.get_max_token()
+        return self._account_data_id_gen.get_current_token()
 
     @cached()
     def get_tags_for_user(self, user_id):
@@ -200,7 +200,7 @@ class TagsStore(SQLBaseStore):
 
         self.get_tags_for_user.invalidate((user_id,))
 
-        result = self._account_data_id_gen.get_max_token()
+        result = self._account_data_id_gen.get_current_token()
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -222,7 +222,7 @@ class TagsStore(SQLBaseStore):
 
         self.get_tags_for_user.invalidate((user_id,))
 
-        result = self._account_data_id_gen.get_max_token()
+        result = self._account_data_id_gen.get_current_token()
         defer.returnValue(result)
 
     def _update_revision_txn(self, txn, user_id, room_id, next_id):
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index a02dfc7d58..f69f1cdad4 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -21,7 +21,7 @@ import threading
 class IdGenerator(object):
     def __init__(self, db_conn, table, column):
         self._lock = threading.Lock()
-        self._next_id = _load_max_id(db_conn, table, column)
+        self._next_id = _load_current_id(db_conn, table, column)
 
     def get_next(self):
         with self._lock:
@@ -29,12 +29,16 @@ class IdGenerator(object):
             return self._next_id
 
 
-def _load_max_id(db_conn, table, column):
+def _load_current_id(db_conn, table, column, step=1):
     cur = db_conn.cursor()
-    cur.execute("SELECT MAX(%s) FROM %s" % (column, table,))
+    if step == 1:
+        cur.execute("SELECT MAX(%s) FROM %s" % (column, table,))
+    else:
+        cur.execute("SELECT MIN(%s) FROM %s" % (column, table,))
     val, = cur.fetchone()
     cur.close()
-    return int(val) if val else 1
+    current_id = int(val) if val else step
+    return (max if step > 0 else min)(current_id, step)
 
 
 class StreamIdGenerator(object):
@@ -45,17 +49,32 @@ class StreamIdGenerator(object):
     all ids less than or equal to it have completed. This handles the fact that
     persistence of events can complete out of order.
 
+    Args:
+        db_conn(connection):  A database connection to use to fetch the
+            initial value of the generator from.
+        table(str): A database table to read the initial value of the id
+            generator from.
+        column(str): The column of the database table to read the initial
+            value from the id generator from.
+        extra_tables(list): List of pairs of database tables and columns to
+            use to source the initial value of the generator from. The value
+            with the largest magnitude is used.
+        step(int): which direction the stream ids grow in. +1 to grow
+            upwards, -1 to grow downwards.
+
     Usage:
         with stream_id_gen.get_next() as stream_id:
             # ... persist event ...
     """
-    def __init__(self, db_conn, table, column, extra_tables=[]):
+    def __init__(self, db_conn, table, column, extra_tables=[], step=1):
+        assert step != 0
         self._lock = threading.Lock()
-        self._current_max = _load_max_id(db_conn, table, column)
+        self._step = step
+        self._current = _load_current_id(db_conn, table, column, step)
         for table, column in extra_tables:
-            self._current_max = max(
-                self._current_max,
-                _load_max_id(db_conn, table, column)
+            self._current = (max if step > 0 else min)(
+                self._current,
+                _load_current_id(db_conn, table, column, step)
             )
         self._unfinished_ids = deque()
 
@@ -66,8 +85,8 @@ class StreamIdGenerator(object):
                 # ... persist event ...
         """
         with self._lock:
-            self._current_max += 1
-            next_id = self._current_max
+            self._current += self._step
+            next_id = self._current
 
             self._unfinished_ids.append(next_id)
 
@@ -88,8 +107,12 @@ class StreamIdGenerator(object):
                 # ... persist events ...
         """
         with self._lock:
-            next_ids = range(self._current_max + 1, self._current_max + n + 1)
-            self._current_max += n
+            next_ids = range(
+                self._current + self._step,
+                self._current + self._step * (n + 1),
+                self._step
+            )
+            self._current += n
 
             for next_id in next_ids:
                 self._unfinished_ids.append(next_id)
@@ -105,15 +128,15 @@ class StreamIdGenerator(object):
 
         return manager()
 
-    def get_max_token(self):
+    def get_current_token(self):
         """Returns the maximum stream id such that all stream ids less than or
         equal to it have been successfully persisted.
         """
         with self._lock:
             if self._unfinished_ids:
-                return self._unfinished_ids[0] - 1
+                return self._unfinished_ids[0] - self._step
 
-            return self._current_max
+            return self._current
 
 
 class ChainedIdGenerator(object):
@@ -125,7 +148,7 @@ class ChainedIdGenerator(object):
     def __init__(self, chained_generator, db_conn, table, column):
         self.chained_generator = chained_generator
         self._lock = threading.Lock()
-        self._current_max = _load_max_id(db_conn, table, column)
+        self._current_max = _load_current_id(db_conn, table, column)
         self._unfinished_ids = deque()
 
     def get_next(self):
@@ -137,7 +160,7 @@ class ChainedIdGenerator(object):
         with self._lock:
             self._current_max += 1
             next_id = self._current_max
-            chained_id = self.chained_generator.get_max_token()
+            chained_id = self.chained_generator.get_current_token()
 
             self._unfinished_ids.append((next_id, chained_id))
 
@@ -151,7 +174,7 @@ class ChainedIdGenerator(object):
 
         return manager()
 
-    def get_max_token(self):
+    def get_current_token(self):
         """Returns the maximum stream id such that all stream ids less than or
         equal to it have been successfully persisted.
         """
@@ -160,4 +183,4 @@ class ChainedIdGenerator(object):
                 stream_id, chained_id = self._unfinished_ids[0]
                 return (stream_id - 1, chained_id)
 
-            return (self._current_max, self.chained_generator.get_max_token())
+            return (self._current_max, self.chained_generator.get_current_token())