summary refs log tree commit diff
path: root/synapse/storage/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/__init__.py')
-rw-r--r--synapse/storage/__init__.py45
1 files changed, 41 insertions, 4 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 7a3f6c4662..c8cab45f77 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -46,6 +46,9 @@ from .tags import TagsStore
 from .account_data import AccountDataStore
 
 
+from util.id_generators import IdGenerator, StreamIdGenerator
+
+
 import logging
 
 
@@ -58,6 +61,22 @@ logger = logging.getLogger(__name__)
 LAST_SEEN_GRANULARITY = 120*1000
 
 
+def get_datastore(hs):
+    logger.info("getting called!")
+
+    conn = hs.get_db_conn()
+    try:
+        cur = conn.cursor()
+        cur.execute("SELECT MIN(stream_ordering) FROM events",)
+        rows = cur.fetchall()
+        min_token = rows[0][0] if rows and rows[0] and rows[0][0] else -1
+        min_token = min(min_token, -1)
+
+        return DataStore(conn, hs, min_token)
+    finally:
+        conn.close()
+
+
 class DataStore(RoomMemberStore, RoomStore,
                 RegistrationStore, StreamStore, ProfileStore,
                 PresenceStore, TransactionStore,
@@ -79,18 +98,36 @@ class DataStore(RoomMemberStore, RoomStore,
                 EventPushActionsStore
                 ):
 
-    def __init__(self, hs):
-        super(DataStore, self).__init__(hs)
+    def __init__(self, db_conn, hs, min_stream_token):
         self.hs = hs
 
-        self.min_token_deferred = self._get_min_token()
-        self.min_token = None
+        self.min_stream_token = min_stream_token
 
         self.client_ip_last_seen = Cache(
             name="client_ip_last_seen",
             keylen=4,
         )
 
+        self._stream_id_gen = StreamIdGenerator(
+            db_conn, "events", "stream_ordering"
+        )
+        self._receipts_id_gen = StreamIdGenerator(
+            db_conn, "receipts_linearized", "stream_id"
+        )
+        self._account_data_id_gen = StreamIdGenerator(
+            db_conn, "account_data_max_stream_id", "stream_id"
+        )
+
+        self._transaction_id_gen = IdGenerator("sent_transactions", "id", self)
+        self._state_groups_id_gen = IdGenerator("state_groups", "id", self)
+        self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self)
+        self._refresh_tokens_id_gen = IdGenerator("refresh_tokens", "id", self)
+        self._pushers_id_gen = IdGenerator("pushers", "id", self)
+        self._push_rule_id_gen = IdGenerator("push_rules", "id", self)
+        self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self)
+
+        super(DataStore, self).__init__(hs)
+
     @defer.inlineCallbacks
     def insert_client_ip(self, user, access_token, ip, user_agent):
         now = int(self._clock.time_msec())