summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-08-30 16:54:40 +0100
committerErik Johnston <erik@matrix.org>2016-08-30 16:55:11 +0100
commit5dc2a702cf2415a55bab8061053c0b47dc21dc93 (patch)
treefd07b850fa406e63860eddfdb95432bf9f977f9c /synapse
parentRemove state replication stream (diff)
downloadsynapse-5dc2a702cf2415a55bab8061053c0b47dc21dc93.tar.xz
Make _state_groups_id_gen a normal IdGenerator
Diffstat (limited to '')
-rw-r--r--synapse/storage/__init__.py2
-rw-r--r--synapse/storage/events.py83
-rw-r--r--synapse/storage/state.py3
3 files changed, 40 insertions, 48 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 4f4c723c5b..6c32773f25 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -115,7 +115,7 @@ class DataStore(RoomMemberStore, RoomStore,
         )
 
         self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id")
-        self._state_groups_id_gen = StreamIdGenerator(db_conn, "state_groups", "id")
+        self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id")
         self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
         self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id")
         self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5cbe8c5978..bc1bc97e19 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -271,39 +271,35 @@ class EventsStore(SQLBaseStore):
                 len(events_and_contexts)
             )
 
-        state_group_id_manager = self._state_groups_id_gen.get_next_mult(
-            len(events_and_contexts)
-        )
         with stream_ordering_manager as stream_orderings:
-            with state_group_id_manager as state_group_ids:
-                for (event, context), stream, state_group_id in zip(
-                    events_and_contexts, stream_orderings, state_group_ids
-                ):
-                    event.internal_metadata.stream_ordering = stream
-                    # Assign a state group_id in case a new id is needed for
-                    # this context. In theory we only need to assign this
-                    # for contexts that have current_state and aren't outliers
-                    # but that make the code more complicated. Assigning an ID
-                    # per event only causes the state_group_ids to grow as fast
-                    # as the stream_ordering so in practise shouldn't be a problem.
-                    context.new_state_group_id = state_group_id
-
-                chunks = [
-                    events_and_contexts[x:x + 100]
-                    for x in xrange(0, len(events_and_contexts), 100)
-                ]
+            for (event, context), stream, in zip(
+                events_and_contexts, stream_orderings
+            ):
+                event.internal_metadata.stream_ordering = stream
+                # Assign a state group_id in case a new id is needed for
+                # this context. In theory we only need to assign this
+                # for contexts that have current_state and aren't outliers
+                # but that make the code more complicated. Assigning an ID
+                # per event only causes the state_group_ids to grow as fast
+                # as the stream_ordering so in practise shouldn't be a problem.
+                context.new_state_group_id = self._state_groups_id_gen.get_next()
+
+            chunks = [
+                events_and_contexts[x:x + 100]
+                for x in xrange(0, len(events_and_contexts), 100)
+            ]
 
-                for chunk in chunks:
-                    # We can't easily parallelize these since different chunks
-                    # might contain the same event. :(
-                    yield self.runInteraction(
-                        "persist_events",
-                        self._persist_events_txn,
-                        events_and_contexts=chunk,
-                        backfilled=backfilled,
-                        delete_existing=delete_existing,
-                    )
-                    persist_event_counter.inc_by(len(chunk))
+            for chunk in chunks:
+                # We can't easily parallelize these since different chunks
+                # might contain the same event. :(
+                yield self.runInteraction(
+                    "persist_events",
+                    self._persist_events_txn,
+                    events_and_contexts=chunk,
+                    backfilled=backfilled,
+                    delete_existing=delete_existing,
+                )
+                persist_event_counter.inc_by(len(chunk))
 
     @_retry_on_integrity_error
     @defer.inlineCallbacks
@@ -312,19 +308,18 @@ class EventsStore(SQLBaseStore):
                        delete_existing=False):
         try:
             with self._stream_id_gen.get_next() as stream_ordering:
-                with self._state_groups_id_gen.get_next() as state_group_id:
-                    event.internal_metadata.stream_ordering = stream_ordering
-                    context.new_state_group_id = state_group_id
-                    yield self.runInteraction(
-                        "persist_event",
-                        self._persist_event_txn,
-                        event=event,
-                        context=context,
-                        current_state=current_state,
-                        backfilled=backfilled,
-                        delete_existing=delete_existing,
-                    )
-                    persist_event_counter.inc()
+                event.internal_metadata.stream_ordering = stream_ordering
+                context.new_state_group_id = self._state_groups_id_gen.get_next()
+                yield self.runInteraction(
+                    "persist_event",
+                    self._persist_event_txn,
+                    event=event,
+                    context=context,
+                    current_state=current_state,
+                    backfilled=backfilled,
+                    delete_existing=delete_existing,
+                )
+                persist_event_counter.inc()
         except _RollbackButIsFineException:
             pass
 
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index b1d461fef5..0442353287 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -526,6 +526,3 @@ class StateStore(SQLBaseStore):
         return self.runInteraction(
             "get_all_new_state_groups", get_all_new_state_groups_txn
         )
-
-    def get_state_stream_token(self):
-        return self._state_groups_id_gen.get_current_token()