diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 7a3f6c4662..c8cab45f77 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -46,6 +46,9 @@ from .tags import TagsStore
from .account_data import AccountDataStore
+from util.id_generators import IdGenerator, StreamIdGenerator
+
+
import logging
@@ -58,6 +61,22 @@ logger = logging.getLogger(__name__)
LAST_SEEN_GRANULARITY = 120*1000
+def get_datastore(hs):
+ logger.info("getting called!")
+
+ conn = hs.get_db_conn()
+ try:
+ cur = conn.cursor()
+ cur.execute("SELECT MIN(stream_ordering) FROM events",)
+ rows = cur.fetchall()
+ min_token = rows[0][0] if rows and rows[0] and rows[0][0] else -1
+ min_token = min(min_token, -1)
+
+ return DataStore(conn, hs, min_token)
+ finally:
+ conn.close()
+
+
class DataStore(RoomMemberStore, RoomStore,
RegistrationStore, StreamStore, ProfileStore,
PresenceStore, TransactionStore,
@@ -79,18 +98,36 @@ class DataStore(RoomMemberStore, RoomStore,
EventPushActionsStore
):
- def __init__(self, hs):
- super(DataStore, self).__init__(hs)
+ def __init__(self, db_conn, hs, min_stream_token):
self.hs = hs
- self.min_token_deferred = self._get_min_token()
- self.min_token = None
+ self.min_stream_token = min_stream_token
self.client_ip_last_seen = Cache(
name="client_ip_last_seen",
keylen=4,
)
+ self._stream_id_gen = StreamIdGenerator(
+ db_conn, "events", "stream_ordering"
+ )
+ self._receipts_id_gen = StreamIdGenerator(
+ db_conn, "receipts_linearized", "stream_id"
+ )
+ self._account_data_id_gen = StreamIdGenerator(
+ db_conn, "account_data_max_stream_id", "stream_id"
+ )
+
+ self._transaction_id_gen = IdGenerator("sent_transactions", "id", self)
+ self._state_groups_id_gen = IdGenerator("state_groups", "id", self)
+ self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self)
+ self._refresh_tokens_id_gen = IdGenerator("refresh_tokens", "id", self)
+ self._pushers_id_gen = IdGenerator("pushers", "id", self)
+ self._push_rule_id_gen = IdGenerator("push_rules", "id", self)
+ self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self)
+
+ super(DataStore, self).__init__(hs)
+
@defer.inlineCallbacks
def insert_client_ip(self, user, access_token, ip, user_agent):
now = int(self._clock.time_msec())
|