diff --git a/synapse/storage/state_deltas.py b/synapse/storage/state_deltas.py
new file mode 100644
index 0000000000..57bc45cdb9
--- /dev/null
+++ b/synapse/storage/state_deltas.py
@@ -0,0 +1,74 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.storage._base import SQLBaseStore
+
+logger = logging.getLogger(__name__)
+
+
+class StateDeltasStore(SQLBaseStore):
+
+ def get_current_state_deltas(self, prev_stream_id):
+ prev_stream_id = int(prev_stream_id)
+ if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id):
+ return []
+
+ def get_current_state_deltas_txn(txn):
+ # First we calculate the max stream id that will give us less than
+ # N results.
+ # We arbitarily limit to 100 stream_id entries to ensure we don't
+ # select toooo many.
+ sql = """
+ SELECT stream_id, count(*)
+ FROM current_state_delta_stream
+ WHERE stream_id > ?
+ GROUP BY stream_id
+ ORDER BY stream_id ASC
+ LIMIT 100
+ """
+ txn.execute(sql, (prev_stream_id,))
+
+ total = 0
+ max_stream_id = prev_stream_id
+ for max_stream_id, count in txn:
+ total += count
+ if total > 100:
+ # We arbitarily limit to 100 entries to ensure we don't
+ # select toooo many.
+ break
+
+ # Now actually get the deltas
+ sql = """
+ SELECT stream_id, room_id, type, state_key, event_id, prev_event_id
+ FROM current_state_delta_stream
+ WHERE ? < stream_id AND stream_id <= ?
+ ORDER BY stream_id ASC
+ """
+ txn.execute(sql, (prev_stream_id, max_stream_id,))
+ return self.cursor_to_dict(txn)
+
+ return self.runInteraction(
+ "get_current_state_deltas", get_current_state_deltas_txn
+ )
+
+ def get_max_stream_id_in_current_state_deltas(self):
+ return self._simple_select_one_onecol(
+ table="current_state_delta_stream",
+ keyvalues={},
+ retcol="COALESCE(MAX(stream_id), -1)",
+ desc="get_max_stream_id_in_current_state_deltas",
+ )
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index d360e857d1..65bdb1b4a5 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -22,6 +22,7 @@ from synapse.api.constants import EventTypes, JoinRules
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.storage.state import StateFilter
+from synapse.storage.state_deltas import StateDeltasStore
from synapse.types import get_domain_from_id, get_localpart_from_id
from synapse.util.caches.descriptors import cached
@@ -31,7 +32,7 @@ logger = logging.getLogger(__name__)
TEMP_TABLE = "_temp_populate_user_directory"
-class UserDirectoryStore(BackgroundUpdateStore):
+class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
# How many records do we calculate before sending it to
# add_users_who_share_private_rooms?
@@ -488,16 +489,6 @@ class UserDirectoryStore(BackgroundUpdateStore):
defer.returnValue(user_ids)
- @defer.inlineCallbacks
- def get_all_local_users(self):
- """Get all local users
- """
- sql = """
- SELECT name FROM users
- """
- rows = yield self._execute("get_all_local_users", None, sql)
- defer.returnValue([name for name, in rows])
-
def add_users_who_share_private_room(self, room_id, user_id_tuples):
"""Insert entries into the users_who_share_private_rooms table. The first
user should be a local user.
@@ -675,59 +666,6 @@ class UserDirectoryStore(BackgroundUpdateStore):
desc="update_user_directory_stream_pos",
)
- def get_current_state_deltas(self, prev_stream_id):
- prev_stream_id = int(prev_stream_id)
- if not self._curr_state_delta_stream_cache.has_any_entity_changed(
- prev_stream_id
- ):
- return []
-
- def get_current_state_deltas_txn(txn):
- # First we calculate the max stream id that will give us less than
- # N results.
- # We arbitarily limit to 100 stream_id entries to ensure we don't
- # select toooo many.
- sql = """
- SELECT stream_id, count(*)
- FROM current_state_delta_stream
- WHERE stream_id > ?
- GROUP BY stream_id
- ORDER BY stream_id ASC
- LIMIT 100
- """
- txn.execute(sql, (prev_stream_id,))
-
- total = 0
- max_stream_id = prev_stream_id
- for max_stream_id, count in txn:
- total += count
- if total > 100:
- # We arbitarily limit to 100 entries to ensure we don't
- # select toooo many.
- break
-
- # Now actually get the deltas
- sql = """
- SELECT stream_id, room_id, type, state_key, event_id, prev_event_id
- FROM current_state_delta_stream
- WHERE ? < stream_id AND stream_id <= ?
- ORDER BY stream_id ASC
- """
- txn.execute(sql, (prev_stream_id, max_stream_id))
- return self.cursor_to_dict(txn)
-
- return self.runInteraction(
- "get_current_state_deltas", get_current_state_deltas_txn
- )
-
- def get_max_stream_id_in_current_state_deltas(self):
- return self._simple_select_one_onecol(
- table="current_state_delta_stream",
- keyvalues={},
- retcol="COALESCE(MAX(stream_id), -1)",
- desc="get_max_stream_id_in_current_state_deltas",
- )
-
@defer.inlineCallbacks
def search_user_dir(self, user_id, search_term, limit):
"""Searches for users in directory
|