Merge pull request #684 from matrix-org/markjh/backfill_id_gen
Use a stream id generator for backfilled ids
1 files changed, 5 insertions, 14 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 7468e6e00c..c4dc3b3d51 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -24,7 +24,6 @@ from synapse.util.logutils import log_function
from synapse.api.constants import EventTypes
from canonicaljson import encode_canonical_json
-from contextlib import contextmanager
from collections import namedtuple
import logging
@@ -66,14 +65,9 @@ class EventsStore(SQLBaseStore):
return
if backfilled:
- start = self.min_stream_token - 1
- self.min_stream_token -= len(events_and_contexts) + 1
- stream_orderings = range(start, self.min_stream_token, -1)
-
- @contextmanager
- def stream_ordering_manager():
- yield stream_orderings
- stream_ordering_manager = stream_ordering_manager()
+ stream_ordering_manager = self._backfill_id_gen.get_next_mult(
+ len(events_and_contexts)
+ )
else:
stream_ordering_manager = self._stream_id_gen.get_next_mult(
len(events_and_contexts)
@@ -130,7 +124,7 @@ class EventsStore(SQLBaseStore):
except _RollbackButIsFineException:
pass
- max_persisted_id = yield self._stream_id_gen.get_max_token()
+ max_persisted_id = yield self._stream_id_gen.get_current_token()
defer.returnValue((stream_ordering, max_persisted_id))
@defer.inlineCallbacks
@@ -1117,10 +1111,7 @@ class EventsStore(SQLBaseStore):
def get_current_backfill_token(self):
"""The current minimum token that backfilled events have reached"""
-
- # TODO: Fix race with the persit_event txn by using one of the
- # stream id managers
- return -self.min_stream_token
+ return -self._backfill_id_gen.get_current_token()
def get_all_new_events(self, last_backfill_id, last_forward_id,
current_backfill_id, current_forward_id, limit):
|