diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5cbe8c5978..bc1bc97e19 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -271,39 +271,35 @@ class EventsStore(SQLBaseStore):
len(events_and_contexts)
)
- state_group_id_manager = self._state_groups_id_gen.get_next_mult(
- len(events_and_contexts)
- )
with stream_ordering_manager as stream_orderings:
- with state_group_id_manager as state_group_ids:
- for (event, context), stream, state_group_id in zip(
- events_and_contexts, stream_orderings, state_group_ids
- ):
- event.internal_metadata.stream_ordering = stream
- # Assign a state group_id in case a new id is needed for
- # this context. In theory we only need to assign this
- # for contexts that have current_state and aren't outliers
- # but that make the code more complicated. Assigning an ID
- # per event only causes the state_group_ids to grow as fast
- # as the stream_ordering so in practise shouldn't be a problem.
- context.new_state_group_id = state_group_id
-
- chunks = [
- events_and_contexts[x:x + 100]
- for x in xrange(0, len(events_and_contexts), 100)
- ]
+ for (event, context), stream, in zip(
+ events_and_contexts, stream_orderings
+ ):
+ event.internal_metadata.stream_ordering = stream
+ # Assign a state group_id in case a new id is needed for
+ # this context. In theory we only need to assign this
+ # for contexts that have current_state and aren't outliers
+ # but that make the code more complicated. Assigning an ID
+ # per event only causes the state_group_ids to grow as fast
+ # as the stream_ordering so in practise shouldn't be a problem.
+ context.new_state_group_id = self._state_groups_id_gen.get_next()
+
+ chunks = [
+ events_and_contexts[x:x + 100]
+ for x in xrange(0, len(events_and_contexts), 100)
+ ]
- for chunk in chunks:
- # We can't easily parallelize these since different chunks
- # might contain the same event. :(
- yield self.runInteraction(
- "persist_events",
- self._persist_events_txn,
- events_and_contexts=chunk,
- backfilled=backfilled,
- delete_existing=delete_existing,
- )
- persist_event_counter.inc_by(len(chunk))
+ for chunk in chunks:
+ # We can't easily parallelize these since different chunks
+ # might contain the same event. :(
+ yield self.runInteraction(
+ "persist_events",
+ self._persist_events_txn,
+ events_and_contexts=chunk,
+ backfilled=backfilled,
+ delete_existing=delete_existing,
+ )
+ persist_event_counter.inc_by(len(chunk))
@_retry_on_integrity_error
@defer.inlineCallbacks
@@ -312,19 +308,18 @@ class EventsStore(SQLBaseStore):
delete_existing=False):
try:
with self._stream_id_gen.get_next() as stream_ordering:
- with self._state_groups_id_gen.get_next() as state_group_id:
- event.internal_metadata.stream_ordering = stream_ordering
- context.new_state_group_id = state_group_id
- yield self.runInteraction(
- "persist_event",
- self._persist_event_txn,
- event=event,
- context=context,
- current_state=current_state,
- backfilled=backfilled,
- delete_existing=delete_existing,
- )
- persist_event_counter.inc()
+ event.internal_metadata.stream_ordering = stream_ordering
+ context.new_state_group_id = self._state_groups_id_gen.get_next()
+ yield self.runInteraction(
+ "persist_event",
+ self._persist_event_txn,
+ event=event,
+ context=context,
+ current_state=current_state,
+ backfilled=backfilled,
+ delete_existing=delete_existing,
+ )
+ persist_event_counter.inc()
except _RollbackButIsFineException:
pass
|