diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 250ba536ea..57863bba4d 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -88,15 +88,6 @@ class DataStore(RoomMemberStore, RoomStore,
self.hs = hs
self.database_engine = hs.database_engine
- cur = db_conn.cursor()
- try:
- cur.execute("SELECT MIN(stream_ordering) FROM events",)
- rows = cur.fetchall()
- self.min_stream_token = rows[0][0] if rows and rows[0] and rows[0][0] else -1
- self.min_stream_token = min(self.min_stream_token, -1)
- finally:
- cur.close()
-
self.client_ip_last_seen = Cache(
name="client_ip_last_seen",
keylen=4,
@@ -105,6 +96,9 @@ class DataStore(RoomMemberStore, RoomStore,
self._stream_id_gen = StreamIdGenerator(
db_conn, "events", "stream_ordering"
)
+ self._backfill_id_gen = StreamIdGenerator(
+ db_conn, "events", "stream_ordering", step=-1
+ )
self._receipts_id_gen = StreamIdGenerator(
db_conn, "receipts_linearized", "stream_id"
)
@@ -116,7 +110,7 @@ class DataStore(RoomMemberStore, RoomStore,
)
self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id")
- self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id")
+ self._state_groups_id_gen = StreamIdGenerator(db_conn, "state_groups", "id")
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id")
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
@@ -129,7 +123,7 @@ class DataStore(RoomMemberStore, RoomStore,
extra_tables=[("deleted_pushers", "stream_id")],
)
- events_max = self._stream_id_gen.get_max_token()
+ events_max = self._stream_id_gen.get_current_token()
event_cache_prefill, min_event_val = self._get_cache_dict(
db_conn, "events",
entity_column="room_id",
@@ -145,7 +139,7 @@ class DataStore(RoomMemberStore, RoomStore,
"MembershipStreamChangeCache", events_max,
)
- account_max = self._account_data_id_gen.get_max_token()
+ account_max = self._account_data_id_gen.get_current_token()
self._account_data_stream_cache = StreamChangeCache(
"AccountDataAndTagsChangeCache", account_max,
)
@@ -156,7 +150,7 @@ class DataStore(RoomMemberStore, RoomStore,
db_conn, "presence_stream",
entity_column="user_id",
stream_column="stream_id",
- max_value=self._presence_id_gen.get_max_token(),
+ max_value=self._presence_id_gen.get_current_token(),
)
self.presence_stream_cache = StreamChangeCache(
"PresenceStreamChangeCache", min_presence_val,
@@ -167,7 +161,7 @@ class DataStore(RoomMemberStore, RoomStore,
db_conn, "push_rules_stream",
entity_column="user_id",
stream_column="stream_id",
- max_value=self._push_rules_stream_id_gen.get_max_token()[0],
+ max_value=self._push_rules_stream_id_gen.get_current_token()[0],
)
self.push_rules_stream_cache = StreamChangeCache(
|