diff options
author | Matthew Hodgson <matthew@matrix.org> | 2018-07-18 02:07:36 +0100 |
---|---|---|
committer | Matthew Hodgson <matthew@matrix.org> | 2018-07-18 02:07:36 +0100 |
commit | a34061d3329789a4f2e16ec09dbcc9ed5074e346 (patch) | |
tree | 9089d6237de734347a8de8afb0ee32879d49aee0 /synapse | |
parent | Fix develop because I broke it :( (#3535) (diff) | |
download | synapse-a34061d3329789a4f2e16ec09dbcc9ed5074e346.tar.xz |
WIP of tracking per-room and per-user stats
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/api/constants.py | 1 | ||||
-rw-r--r-- | synapse/config/stats.py | 39 | ||||
-rw-r--r-- | synapse/handlers/stats.py | 249 | ||||
-rw-r--r-- | synapse/handlers/user_directory.py | 2 | ||||
-rw-r--r-- | synapse/storage/events.py | 38 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 18 | ||||
-rw-r--r-- | synapse/storage/schema/delta/51/stats.sql | 70 | ||||
-rw-r--r-- | synapse/storage/state_deltas.py | 107 | ||||
-rw-r--r-- | synapse/storage/stats.py | 51 | ||||
-rw-r--r-- | synapse/storage/user_directory.py | 75 |
10 files changed, 575 insertions, 75 deletions
diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 4df930c8d1..78cf429b04 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -68,6 +68,7 @@ class EventTypes(object): RoomHistoryVisibility = "m.room.history_visibility" CanonicalAlias = "m.room.canonical_alias" + Encryption = "m.room.encryption" RoomAvatar = "m.room.avatar" GuestAccess = "m.room.guest_access" diff --git a/synapse/config/stats.py b/synapse/config/stats.py new file mode 100644 index 0000000000..d0f4578a05 --- /dev/null +++ b/synapse/config/stats.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import Config + + +class StatsConfig(Config): + """Stats Configuration + Configuration for the behaviour of synapse's stats engine + """ + + def read_config(self, config): + self.stats_enable = False + self.stats_bucket_size = 86400 + stats_config = config.get("stats", None) + if stats: + self.stats_enable = stats_config.get("enable", self.stats_enable) + self.stats_bucket_size = stats_config.get("enable", self.stats_bucket_size) + + def default_config(self, config_dir_path, server_name, **kwargs): + return """ + # Stats configuration + # + # stats: + # enable: true + # bucket_size: 86400 # 1 day + """ diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py new file mode 100644 index 0000000000..9d4e1b6f6e --- /dev/null +++ b/synapse/handlers/stats.py @@ -0,0 +1,249 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from six import iteritems + +from twisted.internet import defer + +from synapse.api.constants import EventTypes, JoinRules, Membership +from synapse.storage.roommember import ProfileInfo +from synapse.types import get_localpart_from_id +from synapse.util.metrics import Measure + +logger = logging.getLogger(__name__) + + +class StatsHandler(object): + """Handles keeping the *_stats tables updated with a simple time-series of + information about the users, rooms and media on the server, such that admins + have some idea of who is consuming their resouces. + + Heavily derived from UserDirectoryHandler + """ + + INITIAL_ROOM_SLEEP_MS = 50 + INITIAL_ROOM_SLEEP_COUNT = 100 + INITIAL_ROOM_BATCH_SIZE = 100 + INITIAL_USER_SLEEP_MS = 10 + + def __init__(self, hs): + self.store = hs.get_datastore() + self.state = hs.get_state_handler() + self.server_name = hs.hostname + self.clock = hs.get_clock() + self.notifier = hs.get_notifier() + self.is_mine_id = hs.is_mine_id + self.stats_enable = hs.config.stats_enable + self.stats_bucket_size = hs.config.stats_bucket_size + + # The current position in the current_state_delta stream + self.pos = None + + # Guard to ensure we only process deltas one at a time + self._is_processing = False + + if self.stats_enable: + self.notifier.add_replication_callback(self.notify_new_event) + + # We kick this off so that we don't have to wait for a change before + # we start populating stats + self.clock.call_later(0, self.notify_new_event) + + @defer.inlineCallbacks + def notify_new_event(self): + """Called when there may be more deltas to process + """ + if not self.stats_enable: + return + + if self._is_processing: + return + + self._is_processing = True + try: + yield self._unsafe_process() + finally: + self._is_processing = False + + @defer.inlineCallbacks + def _unsafe_process(self): + # If self.pos is None then means we haven't fetched it from DB + if self.pos is None: + self.pos = yield self.store.get_stats_stream_pos() + + # If still None then we need to do the initial fill of stats + if self.pos is None: + yield self._do_initial_spam() + self.pos = yield self.store.get_stats_stream_pos() + + # Loop round handling deltas until we're up to date + while True: + with Measure(self.clock, "stats_delta"): + deltas = yield self.store.get_current_state_deltas(self.pos) + if not deltas: + return + + logger.info("Handling %d state deltas", len(deltas)) + yield self._handle_deltas(deltas) + + self.pos = deltas[-1]["stream_id"] + yield self.store.update_stats_stream_pos(self.pos) + + @defer.inlineCallbacks + def _do_initial_spam(self): + """Populates the stats tables from the current state of the DB, used + when synapse first starts with stats support + """ + new_pos = yield self.store.get_max_stream_id_in_current_state_deltas() + + # We process by going through each existing room at a time. + room_ids = yield self.store.get_all_rooms() + + logger.info("Doing initial update of room_stats. %d rooms", len(room_ids)) + num_processed_rooms = 0 + + for room_id in room_ids: + logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids)) + yield self._handle_initial_room(room_id) + num_processed_rooms += 1 + yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) + + logger.info("Processed all rooms.") + + user_ids = yield self.store.get_all_local_users() + logger.info("Doing initial update user_stats. %d users", len(user_ids)) + for user_id in user_ids: + logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids)) + yield self._handle_local_user(user_id) + num_processed_users += 1 + yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.) + + logger.info("Processed all users") + + yield self.store.update_stats_stream_pos(new_pos) + + @defer.inlineCallbacks + def _handle_initial_room(self, room_id): + """Called when we initially fill out stats one room at a time + """ + + current_state_ids = yield self.store.get_current_state_ids(room_id) + + join_rules = yield self.store.get_event( + current_state_ids.get((EventTypes.JoinRules, "")) + ) + history_visibility = yield self.store.get_event( + current_state_ids.get((EventTypes.RoomHistoryVisibility, "")) + ) + encryption = yield self.store.get_event( + current_state_ids.get((EventTypes.RoomEncryption, "")) + ) + name = yield self.store.get_event( + current_state_ids.get((EventTypes.Name, "")) + ) + topic = yield self.store.get_event( + current_state_ids.get((EventTypes.Topic, "")) + ) + avatar = yield self.store.get_event( + current_state_ids.get((EventTypes.RoomAvatar, "")) + ) + canonical_alias = yield self.store.get_event( + current_state_ids.get((EventTypes.CanonicalAlias, "")) + ) + + yield self.store.update_room_state( + room_id, + { + "join_rules": join_rules.content.get("join_rule") + if join_rules else None, + "history_visibility": history_visibility.content.get("history_visibility") + if history_visibility else None, + "encryption": encryption.content.get("algorithm") + if encryption else None, + "name": name.content.get("name") + if name else None, + "topic": name.content.get("topic") + if canonical_alias else None, + "avatar": name.content.get("url") + if avatar else None, + "canonical_alias": name.content.get("alias") + if canonical_alias else None, + } + ) + + now = self.clock.time_msec() + + # quantise time to the nearest bucket + now = int(now / (self.stats_bucket_size * 1000)) * self.stats_bucket_size * 1000 + + current_state_events = len(current_state_ids) + joined_members = yield self.store.get_user_count_in_room(room_id, Membership.JOIN) + invited_members = yield self.store.get_user_count_in_room(room_id, Membership.INVITE) + left_members = yield self.store.get_user_count_in_room(room_id, Membership.LEAVE) + banned_members = yield self.store.get_user_count_in_room(room_id, Membership.BAN) + state_events = yield self.store.get_state_event_counts(room_id) + (local_events, remote_events) = yield self.store.get_event_counts( + room_id, self.server_name + ) + + yield self.store.delete_room_stats(room_id, now) + + self.store.update_room_stats( + room_id, + { + "ts": now, + "bucket_size": self.stats_bucket_size, + "current_state_events": current_state_events, + "joined_members": joined_members, + "invited_members": invited_members, + "left_members": left_members, + "banned_members": banned_members, + "state_events": state_events, + "local_events": local_events, + "remote_events": remote_events, + } + ) + + @defer.inlineCallbacks + def _handle_deltas(self, deltas): + """Called with the state deltas to process + """ + for delta in deltas: + typ = delta["type"] + state_key = delta["state_key"] + room_id = delta["room_id"] + event_id = delta["event_id"] + prev_event_id = delta["prev_event_id"] + + logger.debug("Handling: %r %r, %s", typ, state_key, event_id) + + + + @defer.inlineCallbacks + def _handle_local_user(self, user_id): + """Adds a new local roomless user into the user_directory_search table. + Used to populate up the user index when we have an + user_directory_search_all_users specified. + """ + logger.debug("Adding new local user to dir, %r", user_id) + + profile = yield self.store.get_profileinfo(get_localpart_from_id(user_id)) + + row = yield self.store.get_user_in_directory(user_id) + if not row: + yield self.store.add_profiles_to_user_dir(None, {user_id: profile}) + diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 37dda64587..d4a51281cb 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -531,7 +531,7 @@ class UserDirectoryHandler(object): @defer.inlineCallbacks def _handle_remove_user(self, room_id, user_id): - """Called when we might need to remove user to directory + """Called when we might need to remove user from directory Args: room_id (str): room_id that user left or stopped being public that diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2aaab0d02c..42f3fad613 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1807,6 +1807,44 @@ class EventsStore(EventsWorkerStore): ) return self.runInteraction("get_all_new_events", get_all_new_events_txn) + def get_state_event_counts(self, room_id): + """Gets the total number of state events in the room + """ + + def f(txn): + sql = ( + "SELECT COUNT(*)" + " FROM state_events" + " WHERE room_id=?" + ) + txn.execute(sql, (local_server, room_id,)) + row = txn.fetchone() + return row[0] if row else 0 + + return self.runInteraction("get_state_event_counts", f) + + def get_event_counts(self, room_id, local_server): + """Gets the number of events in the room, split into local versus remote + """ + + def f(txn): + sql = ( + "SELECT sender LIKE '%%:%s' AS local, COUNT(*)" + " FROM events" + " WHERE room_id=?" + " GROUP BY local" + ) + txn.execute(sql, (local_server, room_id,)) + rows = txn.fetchall() + results = { + ("local" if row[0] else "remote") : row[1] + for row in rows + } + return (results.get("local", 0), results.get("remote", 0)) + + return self.runInteraction("get_event_counts", f) + + def purge_history( self, room_id, token, delete_local_events, ): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 02a802bed9..1493a881d9 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -83,6 +83,24 @@ class RoomMemberWorkerStore(EventsWorkerStore): return self.runInteraction("get_users_in_room", f) @cached() + def get_user_count_in_room(self, room_id, membership): + def f(txn): + sql = ( + "SELECT count(*) FROM room_memberships as m" + " INNER JOIN current_state_events as c" + " ON m.event_id = c.event_id " + " AND m.room_id = c.room_id " + " AND m.user_id = c.state_key" + " WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?" + ) + + txn.execute(sql, (room_id, membership,)) + row = txn.fetchone() + return row[0] + + return self.runInteraction("get_users_in_room", f) + + @cached() def get_invited_rooms_for_user(self, user_id): """ Get all the rooms the user is invited to Args: diff --git a/synapse/storage/schema/delta/51/stats.sql b/synapse/storage/schema/delta/51/stats.sql new file mode 100644 index 0000000000..82300f60b6 --- /dev/null +++ b/synapse/storage/schema/delta/51/stats.sql @@ -0,0 +1,70 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE stats_stream_pos ( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + stream_id BIGINT, + CHECK (Lock='X') +); + +INSERT INTO stats_stream_pos (stream_id) VALUES (null); + +CREATE TABLE user_stats ( + user_id TEXT NOT NULL, + ts BIGINT NOT NULL, + bucket_size INT NOT NULL, + sent_events INT NOT NULL, + stored_events INT NOT NULL, -- delta or absolute? + public_rooms INT NOT NULL, + private_rooms INT NOT NULL, + sent_file_count INT NOT NULL, + sent_file_size INT NOT NULL, +); + +CREATE TABLE room_stats ( + room_id TEXT NOT NULL, + ts BIGINT NOT NULL, + bucket_size INT NOT NULL, + current_state_events INT NOT NULL, + joined_members INT NOT NULL, + invited_members INT NOT NULL, + left_members INT NOT NULL, + banned_members INT NOT NULL, + state_events INT NOT NULL, + local_events INT NOT NULL, + remote_events INT NOT NULL, + sent_events INT NOT NULL, -- number sent per timeslice +); + +-- cache of current room state; useful for the publicRooms list +CREATE TABLE room_state ( + room_id TEXT NOT NULL, + join_rules TEXT NOT NULL, + history_visibility TEXT NOT NULL, + encrypted BOOLEAN, + name TEXT NOT NULL, + topic TEXT NOT NULL, + canonical_alias TEXT NOT NULL, + -- get aliases straight from the right table +); + +CREATE TABLE media_stats ( + ts BIGINT NOT NULL, + bucket_size INT NOT NULL, + local_media_count INT NOT NULL, + local_media_size INT NOT NULL, + remote_media_count INT NOT NULL, + remote_media_size INT NOT NULL, +); \ No newline at end of file diff --git a/synapse/storage/state_deltas.py b/synapse/storage/state_deltas.py new file mode 100644 index 0000000000..b733e68c45 --- /dev/null +++ b/synapse/storage/state_deltas.py @@ -0,0 +1,107 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import re + +from six import iteritems + +from twisted.internet import defer + +from synapse.api.constants import EventTypes, JoinRules +from synapse.storage.engines import PostgresEngine, Sqlite3Engine +from synapse.types import get_domain_from_id, get_localpart_from_id +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks + +from ._base import SQLBaseStore + +logger = logging.getLogger(__name__) + + +class StateDeltasStore(SQLBaseStore): + + @defer.inlineCallbacks + def get_all_rooms(self): + """Get all room_ids we've ever known about, in ascending order of "size" + """ + sql = """ + SELECT room_id FROM current_state_events + GROUP BY room_id + ORDER BY count(*) ASC + """ + rows = yield self._execute("get_all_rooms", None, sql) + defer.returnValue([room_id for room_id, in rows]) + + @defer.inlineCallbacks + def get_all_local_users(self): + """Get all local users + """ + sql = """ + SELECT name FROM users + """ + rows = yield self._execute("get_all_local_users", None, sql) + defer.returnValue([name for name, in rows]) + + def get_current_state_deltas(self, prev_stream_id): + prev_stream_id = int(prev_stream_id) + if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id): + return [] + + def get_current_state_deltas_txn(txn): + # First we calculate the max stream id that will give us less than + # N results. + # We arbitarily limit to 100 stream_id entries to ensure we don't + # select toooo many. + sql = """ + SELECT stream_id, count(*) + FROM current_state_delta_stream + WHERE stream_id > ? + GROUP BY stream_id + ORDER BY stream_id ASC + LIMIT 100 + """ + txn.execute(sql, (prev_stream_id,)) + + total = 0 + max_stream_id = prev_stream_id + for max_stream_id, count in txn: + total += count + if total > 100: + # We arbitarily limit to 100 entries to ensure we don't + # select toooo many. + break + + # Now actually get the deltas + sql = """ + SELECT stream_id, room_id, type, state_key, event_id, prev_event_id + FROM current_state_delta_stream + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC + """ + txn.execute(sql, (prev_stream_id, max_stream_id,)) + return self.cursor_to_dict(txn) + + return self.runInteraction( + "get_current_state_deltas", get_current_state_deltas_txn + ) + + def get_max_stream_id_in_current_state_deltas(self): + return self._simple_select_one_onecol( + table="current_state_delta_stream", + keyvalues={}, + retcol="COALESCE(MAX(stream_id), -1)", + desc="get_max_stream_id_in_current_state_deltas", + ) + diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py new file mode 100644 index 0000000000..8b40312eb1 --- /dev/null +++ b/synapse/storage/stats.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import re + +from six import iteritems + +from twisted.internet import defer + +from synapse.api.constants import EventTypes, JoinRules +from synapse.storage.engines import PostgresEngine, Sqlite3Engine +from synapse.types import get_domain_from_id, get_localpart_from_id +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks + +from ._base import SQLBaseStore + +logger = logging.getLogger(__name__) + + +class StatsStore(StateDeltasStore): + + def get_stats_stream_pos(self): + return self._simple_select_one_onecol( + table="stats_stream_pos", + keyvalues={}, + retcol="stream_id", + desc="stats_stream_pos", + ) + + def update_stats_stream_pos(self, stream_id): + return self._simple_update_one( + table="stats_stream_pos", + keyvalues={}, + updatevalues={"stream_id": stream_id}, + desc="update_stats_stream_pos", + ) + + def update_room_state(self, ) \ No newline at end of file diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index a8781b0e5d..b3b821fc0f 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -30,7 +30,7 @@ from ._base import SQLBaseStore logger = logging.getLogger(__name__) -class UserDirectoryStore(SQLBaseStore): +class UserDirectoryStore(StateDeltasStore): @cachedInlineCallbacks(cache_context=True) def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context): """Check if the room is either world_readable or publically joinable @@ -307,28 +307,6 @@ class UserDirectoryStore(SQLBaseStore): defer.returnValue(user_ids) - @defer.inlineCallbacks - def get_all_rooms(self): - """Get all room_ids we've ever known about, in ascending order of "size" - """ - sql = """ - SELECT room_id FROM current_state_events - GROUP BY room_id - ORDER BY count(*) ASC - """ - rows = yield self._execute("get_all_rooms", None, sql) - defer.returnValue([room_id for room_id, in rows]) - - @defer.inlineCallbacks - def get_all_local_users(self): - """Get all local users - """ - sql = """ - SELECT name FROM users - """ - rows = yield self._execute("get_all_local_users", None, sql) - defer.returnValue([name for name, in rows]) - def add_users_who_share_room(self, room_id, share_private, user_id_tuples): """Insert entries into the users_who_share_rooms table. The first user should be a local user. @@ -572,57 +550,6 @@ class UserDirectoryStore(SQLBaseStore): desc="update_user_directory_stream_pos", ) - def get_current_state_deltas(self, prev_stream_id): - prev_stream_id = int(prev_stream_id) - if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id): - return [] - - def get_current_state_deltas_txn(txn): - # First we calculate the max stream id that will give us less than - # N results. - # We arbitarily limit to 100 stream_id entries to ensure we don't - # select toooo many. - sql = """ - SELECT stream_id, count(*) - FROM current_state_delta_stream - WHERE stream_id > ? - GROUP BY stream_id - ORDER BY stream_id ASC - LIMIT 100 - """ - txn.execute(sql, (prev_stream_id,)) - - total = 0 - max_stream_id = prev_stream_id - for max_stream_id, count in txn: - total += count - if total > 100: - # We arbitarily limit to 100 entries to ensure we don't - # select toooo many. - break - - # Now actually get the deltas - sql = """ - SELECT stream_id, room_id, type, state_key, event_id, prev_event_id - FROM current_state_delta_stream - WHERE ? < stream_id AND stream_id <= ? - ORDER BY stream_id ASC - """ - txn.execute(sql, (prev_stream_id, max_stream_id,)) - return self.cursor_to_dict(txn) - - return self.runInteraction( - "get_current_state_deltas", get_current_state_deltas_txn - ) - - def get_max_stream_id_in_current_state_deltas(self): - return self._simple_select_one_onecol( - table="current_state_delta_stream", - keyvalues={}, - retcol="COALESCE(MAX(stream_id), -1)", - desc="get_max_stream_id_in_current_state_deltas", - ) - @defer.inlineCallbacks def search_user_dir(self, user_id, search_term, limit): """Searches for users in directory |