summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorAmber Brown <hawkowl@atleastfornow.net>2019-03-25 20:37:08 +1100
committerGitHub <noreply@github.com>2019-03-25 20:37:08 +1100
commitac396a0d325eb7fc1311d5bd31b2693ff10fc53c (patch)
treebe1cfb6c9857f28bbf02e4331a5f223b8ae24ded /synapse/storage
parentMerge pull request #4869 from matrix-org/erikj/yaml_load (diff)
downloadsynapse-ac396a0d325eb7fc1311d5bd31b2693ff10fc53c.tar.xz
Refactor out state delta handling into its own class (#4917)
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/state_deltas.py74
-rw-r--r--synapse/storage/user_directory.py66
2 files changed, 76 insertions, 64 deletions
diff --git a/synapse/storage/state_deltas.py b/synapse/storage/state_deltas.py
new file mode 100644
index 0000000000..57bc45cdb9
--- /dev/null
+++ b/synapse/storage/state_deltas.py
@@ -0,0 +1,74 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.storage._base import SQLBaseStore
+
+logger = logging.getLogger(__name__)
+
+
+class StateDeltasStore(SQLBaseStore):
+
+    def get_current_state_deltas(self, prev_stream_id):
+        prev_stream_id = int(prev_stream_id)
+        if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id):
+            return []
+
+        def get_current_state_deltas_txn(txn):
+            # First we calculate the max stream id that will give us less than
+            # N results.
+            # We arbitarily limit to 100 stream_id entries to ensure we don't
+            # select toooo many.
+            sql = """
+                SELECT stream_id, count(*)
+                FROM current_state_delta_stream
+                WHERE stream_id > ?
+                GROUP BY stream_id
+                ORDER BY stream_id ASC
+                LIMIT 100
+            """
+            txn.execute(sql, (prev_stream_id,))
+
+            total = 0
+            max_stream_id = prev_stream_id
+            for max_stream_id, count in txn:
+                total += count
+                if total > 100:
+                    # We arbitarily limit to 100 entries to ensure we don't
+                    # select toooo many.
+                    break
+
+            # Now actually get the deltas
+            sql = """
+                SELECT stream_id, room_id, type, state_key, event_id, prev_event_id
+                FROM current_state_delta_stream
+                WHERE ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC
+            """
+            txn.execute(sql, (prev_stream_id, max_stream_id,))
+            return self.cursor_to_dict(txn)
+
+        return self.runInteraction(
+            "get_current_state_deltas", get_current_state_deltas_txn
+        )
+
+    def get_max_stream_id_in_current_state_deltas(self):
+        return self._simple_select_one_onecol(
+            table="current_state_delta_stream",
+            keyvalues={},
+            retcol="COALESCE(MAX(stream_id), -1)",
+            desc="get_max_stream_id_in_current_state_deltas",
+        )
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index d360e857d1..65bdb1b4a5 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -22,6 +22,7 @@ from synapse.api.constants import EventTypes, JoinRules
 from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 from synapse.storage.state import StateFilter
+from synapse.storage.state_deltas import StateDeltasStore
 from synapse.types import get_domain_from_id, get_localpart_from_id
 from synapse.util.caches.descriptors import cached
 
@@ -31,7 +32,7 @@ logger = logging.getLogger(__name__)
 TEMP_TABLE = "_temp_populate_user_directory"
 
 
-class UserDirectoryStore(BackgroundUpdateStore):
+class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
 
     # How many records do we calculate before sending it to
     # add_users_who_share_private_rooms?
@@ -488,16 +489,6 @@ class UserDirectoryStore(BackgroundUpdateStore):
 
         defer.returnValue(user_ids)
 
-    @defer.inlineCallbacks
-    def get_all_local_users(self):
-        """Get all local users
-        """
-        sql = """
-            SELECT name FROM users
-        """
-        rows = yield self._execute("get_all_local_users", None, sql)
-        defer.returnValue([name for name, in rows])
-
     def add_users_who_share_private_room(self, room_id, user_id_tuples):
         """Insert entries into the users_who_share_private_rooms table. The first
         user should be a local user.
@@ -675,59 +666,6 @@ class UserDirectoryStore(BackgroundUpdateStore):
             desc="update_user_directory_stream_pos",
         )
 
-    def get_current_state_deltas(self, prev_stream_id):
-        prev_stream_id = int(prev_stream_id)
-        if not self._curr_state_delta_stream_cache.has_any_entity_changed(
-            prev_stream_id
-        ):
-            return []
-
-        def get_current_state_deltas_txn(txn):
-            # First we calculate the max stream id that will give us less than
-            # N results.
-            # We arbitarily limit to 100 stream_id entries to ensure we don't
-            # select toooo many.
-            sql = """
-                SELECT stream_id, count(*)
-                FROM current_state_delta_stream
-                WHERE stream_id > ?
-                GROUP BY stream_id
-                ORDER BY stream_id ASC
-                LIMIT 100
-            """
-            txn.execute(sql, (prev_stream_id,))
-
-            total = 0
-            max_stream_id = prev_stream_id
-            for max_stream_id, count in txn:
-                total += count
-                if total > 100:
-                    # We arbitarily limit to 100 entries to ensure we don't
-                    # select toooo many.
-                    break
-
-            # Now actually get the deltas
-            sql = """
-                SELECT stream_id, room_id, type, state_key, event_id, prev_event_id
-                FROM current_state_delta_stream
-                WHERE ? < stream_id AND stream_id <= ?
-                ORDER BY stream_id ASC
-            """
-            txn.execute(sql, (prev_stream_id, max_stream_id))
-            return self.cursor_to_dict(txn)
-
-        return self.runInteraction(
-            "get_current_state_deltas", get_current_state_deltas_txn
-        )
-
-    def get_max_stream_id_in_current_state_deltas(self):
-        return self._simple_select_one_onecol(
-            table="current_state_delta_stream",
-            keyvalues={},
-            retcol="COALESCE(MAX(stream_id), -1)",
-            desc="get_max_stream_id_in_current_state_deltas",
-        )
-
     @defer.inlineCallbacks
     def search_user_dir(self, user_id, search_term, limit):
         """Searches for users in directory