summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2018-07-18 02:07:36 +0100
committerMatthew Hodgson <matthew@matrix.org>2018-07-18 02:07:36 +0100
commita34061d3329789a4f2e16ec09dbcc9ed5074e346 (patch)
tree9089d6237de734347a8de8afb0ee32879d49aee0 /synapse
parentFix develop because I broke it :( (#3535) (diff)
downloadsynapse-a34061d3329789a4f2e16ec09dbcc9ed5074e346.tar.xz
WIP of tracking per-room and per-user stats
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/constants.py1
-rw-r--r--synapse/config/stats.py39
-rw-r--r--synapse/handlers/stats.py249
-rw-r--r--synapse/handlers/user_directory.py2
-rw-r--r--synapse/storage/events.py38
-rw-r--r--synapse/storage/roommember.py18
-rw-r--r--synapse/storage/schema/delta/51/stats.sql70
-rw-r--r--synapse/storage/state_deltas.py107
-rw-r--r--synapse/storage/stats.py51
-rw-r--r--synapse/storage/user_directory.py75
10 files changed, 575 insertions, 75 deletions
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 4df930c8d1..78cf429b04 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -68,6 +68,7 @@ class EventTypes(object):
 
     RoomHistoryVisibility = "m.room.history_visibility"
     CanonicalAlias = "m.room.canonical_alias"
+    Encryption = "m.room.encryption"
     RoomAvatar = "m.room.avatar"
     GuestAccess = "m.room.guest_access"
 
diff --git a/synapse/config/stats.py b/synapse/config/stats.py
new file mode 100644
index 0000000000..d0f4578a05
--- /dev/null
+++ b/synapse/config/stats.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import Config
+
+
+class StatsConfig(Config):
+    """Stats Configuration
+    Configuration for the behaviour of synapse's stats engine
+    """
+
+    def read_config(self, config):
+        self.stats_enable = False
+        self.stats_bucket_size = 86400
+        stats_config = config.get("stats", None)
+        if stats:
+            self.stats_enable = stats_config.get("enable", self.stats_enable)
+            self.stats_bucket_size = stats_config.get("enable", self.stats_bucket_size)
+
+    def default_config(self, config_dir_path, server_name, **kwargs):
+        return """
+        # Stats configuration
+        #
+        # stats:
+        #    enable: true
+        #    bucket_size: 86400 # 1 day
+        """
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
new file mode 100644
index 0000000000..9d4e1b6f6e
--- /dev/null
+++ b/synapse/handlers/stats.py
@@ -0,0 +1,249 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from six import iteritems
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.storage.roommember import ProfileInfo
+from synapse.types import get_localpart_from_id
+from synapse.util.metrics import Measure
+
+logger = logging.getLogger(__name__)
+
+
+class StatsHandler(object):
+    """Handles keeping the *_stats tables updated with a simple time-series of
+    information about the users, rooms and media on the server, such that admins
+    have some idea of who is consuming their resouces.
+
+    Heavily derived from UserDirectoryHandler
+    """
+
+    INITIAL_ROOM_SLEEP_MS = 50
+    INITIAL_ROOM_SLEEP_COUNT = 100
+    INITIAL_ROOM_BATCH_SIZE = 100
+    INITIAL_USER_SLEEP_MS = 10
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.state = hs.get_state_handler()
+        self.server_name = hs.hostname
+        self.clock = hs.get_clock()
+        self.notifier = hs.get_notifier()
+        self.is_mine_id = hs.is_mine_id
+        self.stats_enable = hs.config.stats_enable
+        self.stats_bucket_size = hs.config.stats_bucket_size
+
+        # The current position in the current_state_delta stream
+        self.pos = None
+
+        # Guard to ensure we only process deltas one at a time
+        self._is_processing = False
+
+        if self.stats_enable:
+            self.notifier.add_replication_callback(self.notify_new_event)
+
+            # We kick this off so that we don't have to wait for a change before
+            # we start populating stats
+            self.clock.call_later(0, self.notify_new_event)
+
+    @defer.inlineCallbacks
+    def notify_new_event(self):
+        """Called when there may be more deltas to process
+        """
+        if not self.stats_enable:
+            return
+
+        if self._is_processing:
+            return
+
+        self._is_processing = True
+        try:
+            yield self._unsafe_process()
+        finally:
+            self._is_processing = False
+
+    @defer.inlineCallbacks
+    def _unsafe_process(self):
+        # If self.pos is None then means we haven't fetched it from DB
+        if self.pos is None:
+            self.pos = yield self.store.get_stats_stream_pos()
+
+        # If still None then we need to do the initial fill of stats
+        if self.pos is None:
+            yield self._do_initial_spam()
+            self.pos = yield self.store.get_stats_stream_pos()
+
+        # Loop round handling deltas until we're up to date
+        while True:
+            with Measure(self.clock, "stats_delta"):
+                deltas = yield self.store.get_current_state_deltas(self.pos)
+                if not deltas:
+                    return
+
+                logger.info("Handling %d state deltas", len(deltas))
+                yield self._handle_deltas(deltas)
+
+                self.pos = deltas[-1]["stream_id"]
+                yield self.store.update_stats_stream_pos(self.pos)
+
+    @defer.inlineCallbacks
+    def _do_initial_spam(self):
+        """Populates the stats tables from the current state of the DB, used
+        when synapse first starts with stats support
+        """
+        new_pos = yield self.store.get_max_stream_id_in_current_state_deltas()
+
+        # We process by going through each existing room at a time.
+        room_ids = yield self.store.get_all_rooms()
+
+        logger.info("Doing initial update of room_stats. %d rooms", len(room_ids))
+        num_processed_rooms = 0
+
+        for room_id in room_ids:
+            logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
+            yield self._handle_initial_room(room_id)
+            num_processed_rooms += 1
+            yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+
+        logger.info("Processed all rooms.")
+
+        user_ids = yield self.store.get_all_local_users()
+        logger.info("Doing initial update user_stats. %d users", len(user_ids))
+        for user_id in user_ids:
+            logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids))
+            yield self._handle_local_user(user_id)
+            num_processed_users += 1
+            yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
+
+        logger.info("Processed all users")
+
+        yield self.store.update_stats_stream_pos(new_pos)
+
+    @defer.inlineCallbacks
+    def _handle_initial_room(self, room_id):
+        """Called when we initially fill out stats one room at a time
+        """
+
+        current_state_ids = yield self.store.get_current_state_ids(room_id)
+
+        join_rules = yield self.store.get_event(
+            current_state_ids.get((EventTypes.JoinRules, ""))
+        )
+        history_visibility = yield self.store.get_event(
+            current_state_ids.get((EventTypes.RoomHistoryVisibility, ""))
+        )
+        encryption = yield self.store.get_event(
+            current_state_ids.get((EventTypes.RoomEncryption, ""))
+        )
+        name = yield self.store.get_event(
+            current_state_ids.get((EventTypes.Name, ""))
+        )
+        topic = yield self.store.get_event(
+            current_state_ids.get((EventTypes.Topic, ""))
+        )
+        avatar = yield self.store.get_event(
+            current_state_ids.get((EventTypes.RoomAvatar, ""))
+        )
+        canonical_alias = yield self.store.get_event(
+            current_state_ids.get((EventTypes.CanonicalAlias, ""))
+        )
+
+        yield self.store.update_room_state(
+            room_id,
+            {
+                "join_rules": join_rules.content.get("join_rule")
+                              if join_rules else None,
+                "history_visibility": history_visibility.content.get("history_visibility")
+                                      if history_visibility else None,
+                "encryption": encryption.content.get("algorithm")
+                              if encryption else None,
+                "name": name.content.get("name")
+                              if name else None,
+                "topic": name.content.get("topic")
+                              if canonical_alias else None,
+                "avatar": name.content.get("url")
+                              if avatar else None,
+                "canonical_alias": name.content.get("alias")
+                              if canonical_alias else None,
+            }
+        )
+
+        now = self.clock.time_msec()
+
+        # quantise time to the nearest bucket
+        now = int(now / (self.stats_bucket_size * 1000)) * self.stats_bucket_size * 1000
+
+        current_state_events = len(current_state_ids)
+        joined_members  = yield self.store.get_user_count_in_room(room_id, Membership.JOIN)
+        invited_members = yield self.store.get_user_count_in_room(room_id, Membership.INVITE)
+        left_members    = yield self.store.get_user_count_in_room(room_id, Membership.LEAVE)
+        banned_members  = yield self.store.get_user_count_in_room(room_id, Membership.BAN)
+        state_events    = yield self.store.get_state_event_counts(room_id)
+        (local_events, remote_events) = yield self.store.get_event_counts(
+            room_id, self.server_name
+        )
+
+        yield self.store.delete_room_stats(room_id, now)
+
+        self.store.update_room_stats(
+            room_id,
+            {
+                "ts": now,
+                "bucket_size": self.stats_bucket_size,
+                "current_state_events": current_state_events,
+                "joined_members": joined_members,
+                "invited_members": invited_members,
+                "left_members": left_members,
+                "banned_members": banned_members,
+                "state_events": state_events,
+                "local_events": local_events,
+                "remote_events": remote_events,
+            }
+        )
+
+    @defer.inlineCallbacks
+    def _handle_deltas(self, deltas):
+        """Called with the state deltas to process
+        """
+        for delta in deltas:
+            typ = delta["type"]
+            state_key = delta["state_key"]
+            room_id = delta["room_id"]
+            event_id = delta["event_id"]
+            prev_event_id = delta["prev_event_id"]
+
+            logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+
+
+
+    @defer.inlineCallbacks
+    def _handle_local_user(self, user_id):
+        """Adds a new local roomless user into the user_directory_search table.
+        Used to populate up the user index when we have an
+        user_directory_search_all_users specified.
+        """
+        logger.debug("Adding new local user to dir, %r", user_id)
+
+        profile = yield self.store.get_profileinfo(get_localpart_from_id(user_id))
+
+        row = yield self.store.get_user_in_directory(user_id)
+        if not row:
+            yield self.store.add_profiles_to_user_dir(None, {user_id: profile})
+
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 37dda64587..d4a51281cb 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -531,7 +531,7 @@ class UserDirectoryHandler(object):
 
     @defer.inlineCallbacks
     def _handle_remove_user(self, room_id, user_id):
-        """Called when we might need to remove user to directory
+        """Called when we might need to remove user from directory
 
         Args:
             room_id (str): room_id that user left or stopped being public that
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2aaab0d02c..42f3fad613 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1807,6 +1807,44 @@ class EventsStore(EventsWorkerStore):
             )
         return self.runInteraction("get_all_new_events", get_all_new_events_txn)
 
+    def get_state_event_counts(self, room_id):
+        """Gets the total number of state events in the room
+        """
+
+        def f(txn):
+            sql = (
+                "SELECT COUNT(*)"
+                " FROM state_events"
+                " WHERE room_id=?"
+            )
+            txn.execute(sql, (local_server, room_id,))
+            row = txn.fetchone()
+            return row[0] if row else 0
+
+        return self.runInteraction("get_state_event_counts", f)
+
+    def get_event_counts(self, room_id, local_server):
+        """Gets the number of events in the room, split into local versus remote
+        """
+
+        def f(txn):
+            sql = (
+                "SELECT sender LIKE '%%:%s' AS local, COUNT(*)"
+                " FROM events"
+                " WHERE room_id=?"
+                " GROUP BY local"
+            )
+            txn.execute(sql, (local_server, room_id,))
+            rows = txn.fetchall()
+            results = {
+                ("local" if row[0] else "remote") : row[1]
+                for row in rows
+            }
+            return (results.get("local", 0), results.get("remote", 0))
+
+        return self.runInteraction("get_event_counts", f)
+
+
     def purge_history(
         self, room_id, token, delete_local_events,
     ):
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 02a802bed9..1493a881d9 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -83,6 +83,24 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         return self.runInteraction("get_users_in_room", f)
 
     @cached()
+    def get_user_count_in_room(self, room_id, membership):
+        def f(txn):
+            sql = (
+                "SELECT count(*) FROM room_memberships as m"
+                " INNER JOIN current_state_events as c"
+                " ON m.event_id = c.event_id "
+                " AND m.room_id = c.room_id "
+                " AND m.user_id = c.state_key"
+                " WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?"
+            )
+
+            txn.execute(sql, (room_id, membership,))
+            row = txn.fetchone()
+            return row[0]
+
+        return self.runInteraction("get_users_in_room", f)
+
+    @cached()
     def get_invited_rooms_for_user(self, user_id):
         """ Get all the rooms the user is invited to
         Args:
diff --git a/synapse/storage/schema/delta/51/stats.sql b/synapse/storage/schema/delta/51/stats.sql
new file mode 100644
index 0000000000..82300f60b6
--- /dev/null
+++ b/synapse/storage/schema/delta/51/stats.sql
@@ -0,0 +1,70 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE stats_stream_pos (
+    Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE,  -- Makes sure this table only has one row.
+    stream_id BIGINT,
+    CHECK (Lock='X')
+);
+
+INSERT INTO stats_stream_pos (stream_id) VALUES (null);
+
+CREATE TABLE user_stats (
+    user_id TEXT NOT NULL,
+    ts BIGINT NOT NULL,
+    bucket_size INT NOT NULL,
+    sent_events INT NOT NULL,
+    stored_events INT NOT NULL, -- delta or absolute?
+    public_rooms INT NOT NULL,
+    private_rooms INT NOT NULL,
+    sent_file_count INT NOT NULL,
+    sent_file_size INT NOT NULL,
+);
+
+CREATE TABLE room_stats (
+    room_id TEXT NOT NULL,
+    ts BIGINT NOT NULL,
+    bucket_size INT NOT NULL,
+    current_state_events INT NOT NULL,
+    joined_members INT NOT NULL,
+    invited_members INT NOT NULL,
+    left_members INT NOT NULL,
+    banned_members INT NOT NULL,
+    state_events INT NOT NULL,
+    local_events INT NOT NULL,
+    remote_events INT NOT NULL,
+    sent_events INT NOT NULL, -- number sent per timeslice
+);
+
+-- cache of current room state; useful for the publicRooms list
+CREATE TABLE room_state (
+    room_id TEXT NOT NULL,
+    join_rules TEXT NOT NULL,
+    history_visibility TEXT NOT NULL,
+    encrypted BOOLEAN,
+    name TEXT NOT NULL,
+    topic TEXT NOT NULL,
+    canonical_alias TEXT NOT NULL,
+    -- get aliases straight from the right table
+);
+
+CREATE TABLE media_stats (
+    ts BIGINT NOT NULL,
+    bucket_size INT NOT NULL,
+    local_media_count INT NOT NULL,
+    local_media_size INT NOT NULL,
+    remote_media_count INT NOT NULL,
+    remote_media_size INT NOT NULL,
+);
\ No newline at end of file
diff --git a/synapse/storage/state_deltas.py b/synapse/storage/state_deltas.py
new file mode 100644
index 0000000000..b733e68c45
--- /dev/null
+++ b/synapse/storage/state_deltas.py
@@ -0,0 +1,107 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import re
+
+from six import iteritems
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, JoinRules
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+from synapse.types import get_domain_from_id, get_localpart_from_id
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+
+from ._base import SQLBaseStore
+
+logger = logging.getLogger(__name__)
+
+
+class StateDeltasStore(SQLBaseStore):
+
+    @defer.inlineCallbacks
+    def get_all_rooms(self):
+        """Get all room_ids we've ever known about, in ascending order of "size"
+        """
+        sql = """
+            SELECT room_id FROM current_state_events
+            GROUP BY room_id
+            ORDER BY count(*) ASC
+        """
+        rows = yield self._execute("get_all_rooms", None, sql)
+        defer.returnValue([room_id for room_id, in rows])
+
+    @defer.inlineCallbacks
+    def get_all_local_users(self):
+        """Get all local users
+        """
+        sql = """
+            SELECT name FROM users
+        """
+        rows = yield self._execute("get_all_local_users", None, sql)
+        defer.returnValue([name for name, in rows])
+
+    def get_current_state_deltas(self, prev_stream_id):
+        prev_stream_id = int(prev_stream_id)
+        if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id):
+            return []
+
+        def get_current_state_deltas_txn(txn):
+            # First we calculate the max stream id that will give us less than
+            # N results.
+            # We arbitarily limit to 100 stream_id entries to ensure we don't
+            # select toooo many.
+            sql = """
+                SELECT stream_id, count(*)
+                FROM current_state_delta_stream
+                WHERE stream_id > ?
+                GROUP BY stream_id
+                ORDER BY stream_id ASC
+                LIMIT 100
+            """
+            txn.execute(sql, (prev_stream_id,))
+
+            total = 0
+            max_stream_id = prev_stream_id
+            for max_stream_id, count in txn:
+                total += count
+                if total > 100:
+                    # We arbitarily limit to 100 entries to ensure we don't
+                    # select toooo many.
+                    break
+
+            # Now actually get the deltas
+            sql = """
+                SELECT stream_id, room_id, type, state_key, event_id, prev_event_id
+                FROM current_state_delta_stream
+                WHERE ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC
+            """
+            txn.execute(sql, (prev_stream_id, max_stream_id,))
+            return self.cursor_to_dict(txn)
+
+        return self.runInteraction(
+            "get_current_state_deltas", get_current_state_deltas_txn
+        )
+
+    def get_max_stream_id_in_current_state_deltas(self):
+        return self._simple_select_one_onecol(
+            table="current_state_delta_stream",
+            keyvalues={},
+            retcol="COALESCE(MAX(stream_id), -1)",
+            desc="get_max_stream_id_in_current_state_deltas",
+        )
+
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
new file mode 100644
index 0000000000..8b40312eb1
--- /dev/null
+++ b/synapse/storage/stats.py
@@ -0,0 +1,51 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import re
+
+from six import iteritems
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, JoinRules
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+from synapse.types import get_domain_from_id, get_localpart_from_id
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+
+from ._base import SQLBaseStore
+
+logger = logging.getLogger(__name__)
+
+
+class StatsStore(StateDeltasStore):
+
+    def get_stats_stream_pos(self):
+        return self._simple_select_one_onecol(
+            table="stats_stream_pos",
+            keyvalues={},
+            retcol="stream_id",
+            desc="stats_stream_pos",
+        )
+
+    def update_stats_stream_pos(self, stream_id):
+        return self._simple_update_one(
+            table="stats_stream_pos",
+            keyvalues={},
+            updatevalues={"stream_id": stream_id},
+            desc="update_stats_stream_pos",
+        )
+
+    def update_room_state(self, )
\ No newline at end of file
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index a8781b0e5d..b3b821fc0f 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -30,7 +30,7 @@ from ._base import SQLBaseStore
 logger = logging.getLogger(__name__)
 
 
-class UserDirectoryStore(SQLBaseStore):
+class UserDirectoryStore(StateDeltasStore):
     @cachedInlineCallbacks(cache_context=True)
     def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context):
         """Check if the room is either world_readable or publically joinable
@@ -307,28 +307,6 @@ class UserDirectoryStore(SQLBaseStore):
 
         defer.returnValue(user_ids)
 
-    @defer.inlineCallbacks
-    def get_all_rooms(self):
-        """Get all room_ids we've ever known about, in ascending order of "size"
-        """
-        sql = """
-            SELECT room_id FROM current_state_events
-            GROUP BY room_id
-            ORDER BY count(*) ASC
-        """
-        rows = yield self._execute("get_all_rooms", None, sql)
-        defer.returnValue([room_id for room_id, in rows])
-
-    @defer.inlineCallbacks
-    def get_all_local_users(self):
-        """Get all local users
-        """
-        sql = """
-            SELECT name FROM users
-        """
-        rows = yield self._execute("get_all_local_users", None, sql)
-        defer.returnValue([name for name, in rows])
-
     def add_users_who_share_room(self, room_id, share_private, user_id_tuples):
         """Insert entries into the users_who_share_rooms table. The first
         user should be a local user.
@@ -572,57 +550,6 @@ class UserDirectoryStore(SQLBaseStore):
             desc="update_user_directory_stream_pos",
         )
 
-    def get_current_state_deltas(self, prev_stream_id):
-        prev_stream_id = int(prev_stream_id)
-        if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id):
-            return []
-
-        def get_current_state_deltas_txn(txn):
-            # First we calculate the max stream id that will give us less than
-            # N results.
-            # We arbitarily limit to 100 stream_id entries to ensure we don't
-            # select toooo many.
-            sql = """
-                SELECT stream_id, count(*)
-                FROM current_state_delta_stream
-                WHERE stream_id > ?
-                GROUP BY stream_id
-                ORDER BY stream_id ASC
-                LIMIT 100
-            """
-            txn.execute(sql, (prev_stream_id,))
-
-            total = 0
-            max_stream_id = prev_stream_id
-            for max_stream_id, count in txn:
-                total += count
-                if total > 100:
-                    # We arbitarily limit to 100 entries to ensure we don't
-                    # select toooo many.
-                    break
-
-            # Now actually get the deltas
-            sql = """
-                SELECT stream_id, room_id, type, state_key, event_id, prev_event_id
-                FROM current_state_delta_stream
-                WHERE ? < stream_id AND stream_id <= ?
-                ORDER BY stream_id ASC
-            """
-            txn.execute(sql, (prev_stream_id, max_stream_id,))
-            return self.cursor_to_dict(txn)
-
-        return self.runInteraction(
-            "get_current_state_deltas", get_current_state_deltas_txn
-        )
-
-    def get_max_stream_id_in_current_state_deltas(self):
-        return self._simple_select_one_onecol(
-            table="current_state_delta_stream",
-            keyvalues={},
-            retcol="COALESCE(MAX(stream_id), -1)",
-            desc="get_max_stream_id_in_current_state_deltas",
-        )
-
     @defer.inlineCallbacks
     def search_user_dir(self, user_id, search_term, limit):
         """Searches for users in directory