diff --git a/synapse/storage/schema/delta/53/user_share.sql b/synapse/storage/schema/delta/53/user_share.sql
new file mode 100644
index 0000000000..14424ded0c
--- /dev/null
+++ b/synapse/storage/schema/delta/53/user_share.sql
@@ -0,0 +1,47 @@
+/* Copyright 2017 Vector Creations Ltd, 2019 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Old disused version of the tables below.
+DROP TABLE IF EXISTS users_who_share_rooms;
+
+-- This is no longer used because it's duplicated by the users_who_share_public_rooms
+DROP TABLE IF EXISTS users_in_public_rooms;
+
+-- Tables keeping track of what users share rooms. This is a map of local users
+-- to local or remote users, per room. Remote users cannot be in the user_id
+-- column, only the other_user_id column. There are two tables, one for public
+-- rooms and those for private rooms.
+CREATE TABLE IF NOT EXISTS users_who_share_public_rooms (
+ user_id TEXT NOT NULL,
+ other_user_id TEXT NOT NULL,
+ room_id TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS users_who_share_private_rooms (
+ user_id TEXT NOT NULL,
+ other_user_id TEXT NOT NULL,
+ room_id TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX users_who_share_public_rooms_u_idx ON users_who_share_public_rooms(user_id, other_user_id, room_id);
+CREATE INDEX users_who_share_public_rooms_r_idx ON users_who_share_public_rooms(room_id);
+CREATE INDEX users_who_share_public_rooms_o_idx ON users_who_share_public_rooms(other_user_id);
+
+CREATE UNIQUE INDEX users_who_share_private_rooms_u_idx ON users_who_share_private_rooms(user_id, other_user_id, room_id);
+CREATE INDEX users_who_share_private_rooms_r_idx ON users_who_share_private_rooms(room_id);
+CREATE INDEX users_who_share_private_rooms_o_idx ON users_who_share_private_rooms(other_user_id);
+
+-- Make sure that we populate the tables initially by resetting the stream ID
+UPDATE user_directory_stream_pos SET stream_id = NULL;
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index d6cfdba519..580fafeb3a 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -191,6 +191,25 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks
def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0,
order='DESC'):
+ """Get new room events in stream ordering since `from_key`.
+
+ Args:
+ room_id (str)
+ from_key (str): Token from which no events are returned before
+ to_key (str): Token from which no events are returned after. (This
+ is typically the current stream token)
+ limit (int): Maximum number of events to return
+ order (str): Either "DESC" or "ASC". Determines which events are
+ returned when the result is limited. If "DESC" then the most
+ recent `limit` events are returned, otherwise returns the
+ oldest `limit` events.
+
+ Returns:
+ Deferred[dict[str,tuple[list[FrozenEvent], str]]]
+ A map from room id to a tuple containing:
+ - list of recent events in the room
+ - stream ordering key for the start of the chunk of events returned.
+ """
from_id = RoomStreamToken.parse_stream_token(from_key).stream
room_ids = yield self._events_stream_cache.get_entities_changed(
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index fea866c043..2317d22ed6 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -63,31 +63,14 @@ class UserDirectoryStore(SQLBaseStore):
defer.returnValue(False)
- @defer.inlineCallbacks
- def add_users_to_public_room(self, room_id, user_ids):
- """Add user to the list of users in public rooms
-
- Args:
- room_id (str): A room_id that all users are in that is world_readable
- or publically joinable
- user_ids (list(str)): Users to add
- """
- yield self._simple_insert_many(
- table="users_in_public_rooms",
- values=[{"user_id": user_id, "room_id": room_id} for user_id in user_ids],
- desc="add_users_to_public_room",
- )
- for user_id in user_ids:
- self.get_user_in_public_room.invalidate((user_id,))
-
- def add_profiles_to_user_dir(self, room_id, users_with_profile):
+ def add_profiles_to_user_dir(self, users_with_profile):
"""Add profiles to the user directory
Args:
- room_id (str): A room_id that all users are joined to
users_with_profile (dict): Users to add to directory in the form of
mapping of user_id -> ProfileInfo
"""
+
if isinstance(self.database_engine, PostgresEngine):
# We weight the loclpart most highly, then display name and finally
# server name
@@ -113,7 +96,7 @@ class UserDirectoryStore(SQLBaseStore):
INSERT INTO user_directory_search(user_id, value)
VALUES (?,?)
"""
- args = (
+ args = tuple(
(
user_id,
"%s %s" % (user_id, p.display_name) if p.display_name else user_id,
@@ -132,7 +115,7 @@ class UserDirectoryStore(SQLBaseStore):
values=[
{
"user_id": user_id,
- "room_id": room_id,
+ "room_id": None,
"display_name": profile.display_name,
"avatar_url": profile.avatar_url,
}
@@ -250,16 +233,6 @@ class UserDirectoryStore(SQLBaseStore):
"update_profile_in_user_dir", _update_profile_in_user_dir_txn
)
- @defer.inlineCallbacks
- def update_user_in_public_user_list(self, user_id, room_id):
- yield self._simple_update_one(
- table="users_in_public_rooms",
- keyvalues={"user_id": user_id},
- updatevalues={"room_id": room_id},
- desc="update_user_in_public_user_list",
- )
- self.get_user_in_public_room.invalidate((user_id,))
-
def remove_from_user_dir(self, user_id):
def _remove_from_user_dir_txn(txn):
self._simple_delete_txn(
@@ -269,62 +242,50 @@ class UserDirectoryStore(SQLBaseStore):
txn, table="user_directory_search", keyvalues={"user_id": user_id}
)
self._simple_delete_txn(
- txn, table="users_in_public_rooms", keyvalues={"user_id": user_id}
+ txn,
+ table="users_who_share_public_rooms",
+ keyvalues={"user_id": user_id},
+ )
+ self._simple_delete_txn(
+ txn,
+ table="users_who_share_public_rooms",
+ keyvalues={"other_user_id": user_id},
+ )
+ self._simple_delete_txn(
+ txn,
+ table="users_who_share_private_rooms",
+ keyvalues={"user_id": user_id},
+ )
+ self._simple_delete_txn(
+ txn,
+ table="users_who_share_private_rooms",
+ keyvalues={"other_user_id": user_id},
)
txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
- txn.call_after(self.get_user_in_public_room.invalidate, (user_id,))
return self.runInteraction("remove_from_user_dir", _remove_from_user_dir_txn)
@defer.inlineCallbacks
- def remove_from_user_in_public_room(self, user_id):
- yield self._simple_delete(
- table="users_in_public_rooms",
- keyvalues={"user_id": user_id},
- desc="remove_from_user_in_public_room",
- )
- self.get_user_in_public_room.invalidate((user_id,))
-
- def get_users_in_public_due_to_room(self, room_id):
- """Get all user_ids that are in the room directory because they're
- in the given room_id
- """
- return self._simple_select_onecol(
- table="users_in_public_rooms",
- keyvalues={"room_id": room_id},
- retcol="user_id",
- desc="get_users_in_public_due_to_room",
- )
-
- @defer.inlineCallbacks
def get_users_in_dir_due_to_room(self, room_id):
"""Get all user_ids that are in the room directory because they're
in the given room_id
"""
- user_ids_dir = yield self._simple_select_onecol(
- table="user_directory",
- keyvalues={"room_id": room_id},
- retcol="user_id",
- desc="get_users_in_dir_due_to_room",
- )
-
- user_ids_pub = yield self._simple_select_onecol(
- table="users_in_public_rooms",
+ user_ids_share_pub = yield self._simple_select_onecol(
+ table="users_who_share_public_rooms",
keyvalues={"room_id": room_id},
- retcol="user_id",
+ retcol="other_user_id",
desc="get_users_in_dir_due_to_room",
)
- user_ids_share = yield self._simple_select_onecol(
- table="users_who_share_rooms",
+ user_ids_share_priv = yield self._simple_select_onecol(
+ table="users_who_share_private_rooms",
keyvalues={"room_id": room_id},
- retcol="user_id",
+ retcol="other_user_id",
desc="get_users_in_dir_due_to_room",
)
- user_ids = set(user_ids_dir)
- user_ids.update(user_ids_pub)
- user_ids.update(user_ids_share)
+ user_ids = set(user_ids_share_pub)
+ user_ids.update(user_ids_share_priv)
defer.returnValue(user_ids)
@@ -351,7 +312,7 @@ class UserDirectoryStore(SQLBaseStore):
defer.returnValue([name for name, in rows])
def add_users_who_share_room(self, room_id, share_private, user_id_tuples):
- """Insert entries into the users_who_share_rooms table. The first
+ """Insert entries into the users_who_share_*_rooms table. The first
user should be a local user.
Args:
@@ -361,109 +322,71 @@ class UserDirectoryStore(SQLBaseStore):
"""
def _add_users_who_share_room_txn(txn):
- self._simple_insert_many_txn(
+
+ if share_private:
+ tbl = "users_who_share_private_rooms"
+ else:
+ tbl = "users_who_share_public_rooms"
+
+ self._simple_upsert_many_txn(
txn,
- table="users_who_share_rooms",
- values=[
- {
- "user_id": user_id,
- "other_user_id": other_user_id,
- "room_id": room_id,
- "share_private": share_private,
- }
+ table=tbl,
+ key_names=["user_id", "other_user_id", "room_id"],
+ key_values=[
+ (user_id, other_user_id, room_id)
for user_id, other_user_id in user_id_tuples
],
+ value_names=(),
+ value_values=None,
)
for user_id, other_user_id in user_id_tuples:
txn.call_after(
self.get_users_who_share_room_from_dir.invalidate, (user_id,)
)
- txn.call_after(
- self.get_if_users_share_a_room.invalidate, (user_id, other_user_id)
- )
return self.runInteraction(
"add_users_who_share_room", _add_users_who_share_room_txn
)
- def update_users_who_share_room(self, room_id, share_private, user_id_sets):
- """Updates entries in the users_who_share_rooms table. The first
- user should be a local user.
-
- Args:
- room_id (str)
- share_private (bool): Is the room private
- user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
+ def remove_user_who_share_room(self, user_id, room_id):
"""
-
- def _update_users_who_share_room_txn(txn):
- sql = """
- UPDATE users_who_share_rooms
- SET room_id = ?, share_private = ?
- WHERE user_id = ? AND other_user_id = ?
- """
- txn.executemany(
- sql, ((room_id, share_private, uid, oid) for uid, oid in user_id_sets)
- )
- for user_id, other_user_id in user_id_sets:
- txn.call_after(
- self.get_users_who_share_room_from_dir.invalidate, (user_id,)
- )
- txn.call_after(
- self.get_if_users_share_a_room.invalidate, (user_id, other_user_id)
- )
-
- return self.runInteraction(
- "update_users_who_share_room", _update_users_who_share_room_txn
- )
-
- def remove_user_who_share_room(self, user_id, other_user_id):
- """Deletes entries in the users_who_share_rooms table. The first
+ Deletes entries in the users_who_share_*_rooms table. The first
user should be a local user.
Args:
+ user_id (str)
room_id (str)
- share_private (bool): Is the room private
- user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
"""
def _remove_user_who_share_room_txn(txn):
self._simple_delete_txn(
txn,
- table="users_who_share_rooms",
- keyvalues={"user_id": user_id, "other_user_id": other_user_id},
+ table="users_who_share_private_rooms",
+ keyvalues={"user_id": user_id, "room_id": room_id},
)
- txn.call_after(
- self.get_users_who_share_room_from_dir.invalidate, (user_id,)
+ self._simple_delete_txn(
+ txn,
+ table="users_who_share_private_rooms",
+ keyvalues={"other_user_id": user_id, "room_id": room_id},
+ )
+ self._simple_delete_txn(
+ txn,
+ table="users_who_share_public_rooms",
+ keyvalues={"user_id": user_id, "room_id": room_id},
+ )
+ self._simple_delete_txn(
+ txn,
+ table="users_who_share_public_rooms",
+ keyvalues={"other_user_id": user_id, "room_id": room_id},
)
txn.call_after(
- self.get_if_users_share_a_room.invalidate, (user_id, other_user_id)
+ self.get_users_who_share_room_from_dir.invalidate, (user_id,)
)
return self.runInteraction(
"remove_user_who_share_room", _remove_user_who_share_room_txn
)
- @cached(max_entries=500000)
- def get_if_users_share_a_room(self, user_id, other_user_id):
- """Gets if users share a room.
-
- Args:
- user_id (str): Must be a local user_id
- other_user_id (str)
-
- Returns:
- bool|None: None if they don't share a room, otherwise whether they
- share a private room or not.
- """
- return self._simple_select_one_onecol(
- table="users_who_share_rooms",
- keyvalues={"user_id": user_id, "other_user_id": other_user_id},
- retcol="share_private",
- allow_none=True,
- desc="get_if_users_share_a_room",
- )
-
@cachedInlineCallbacks(max_entries=500000, iterable=True)
def get_users_who_share_room_from_dir(self, user_id):
"""Returns the set of users who share a room with `user_id`
@@ -472,32 +395,29 @@ class UserDirectoryStore(SQLBaseStore):
user_id(str): Must be a local user
Returns:
- dict: user_id -> share_private mapping
+ list: user_id
"""
- rows = yield self._simple_select_list(
- table="users_who_share_rooms",
+ rows = yield self._simple_select_onecol(
+ table="users_who_share_private_rooms",
+ keyvalues={"user_id": user_id},
+ retcol="other_user_id",
+ desc="get_users_who_share_room_with_user",
+ )
+
+ pub_rows = yield self._simple_select_onecol(
+ table="users_who_share_public_rooms",
keyvalues={"user_id": user_id},
- retcols=("other_user_id", "share_private"),
+ retcol="other_user_id",
desc="get_users_who_share_room_with_user",
)
- defer.returnValue({row["other_user_id"]: row["share_private"] for row in rows})
+ users = set(pub_rows)
+ users.update(rows)
- def get_users_in_share_dir_with_room_id(self, user_id, room_id):
- """Get all user tuples that are in the users_who_share_rooms due to the
- given room_id.
+ # Remove the user themselves from this list.
+ users.discard(user_id)
- Returns:
- [(user_id, other_user_id)]: where one of the two will match the given
- user_id.
- """
- sql = """
- SELECT user_id, other_user_id FROM users_who_share_rooms
- WHERE room_id = ? AND (user_id = ? OR other_user_id = ?)
- """
- return self._execute(
- "get_users_in_share_dir_with_room_id", None, sql, room_id, user_id, user_id
- )
+ defer.returnValue(list(users))
@defer.inlineCallbacks
def get_rooms_in_common_for_users(self, user_id, other_user_id):
@@ -532,12 +452,10 @@ class UserDirectoryStore(SQLBaseStore):
def _delete_all_from_user_dir_txn(txn):
txn.execute("DELETE FROM user_directory")
txn.execute("DELETE FROM user_directory_search")
- txn.execute("DELETE FROM users_in_public_rooms")
- txn.execute("DELETE FROM users_who_share_rooms")
+ txn.execute("DELETE FROM users_who_share_public_rooms")
+ txn.execute("DELETE FROM users_who_share_private_rooms")
txn.call_after(self.get_user_in_directory.invalidate_all)
- txn.call_after(self.get_user_in_public_room.invalidate_all)
txn.call_after(self.get_users_who_share_room_from_dir.invalidate_all)
- txn.call_after(self.get_if_users_share_a_room.invalidate_all)
return self.runInteraction(
"delete_all_from_user_dir", _delete_all_from_user_dir_txn
@@ -548,21 +466,11 @@ class UserDirectoryStore(SQLBaseStore):
return self._simple_select_one(
table="user_directory",
keyvalues={"user_id": user_id},
- retcols=("room_id", "display_name", "avatar_url"),
+ retcols=("display_name", "avatar_url"),
allow_none=True,
desc="get_user_in_directory",
)
- @cached()
- def get_user_in_public_room(self, user_id):
- return self._simple_select_one(
- table="users_in_public_rooms",
- keyvalues={"user_id": user_id},
- retcols=("room_id",),
- allow_none=True,
- desc="get_user_in_public_room",
- )
-
def get_user_directory_stream_pos(self):
return self._simple_select_one_onecol(
table="user_directory_stream_pos",
@@ -660,14 +568,15 @@ class UserDirectoryStore(SQLBaseStore):
where_clause = "1=1"
else:
join_clause = """
- LEFT JOIN users_in_public_rooms AS p USING (user_id)
LEFT JOIN (
- SELECT other_user_id AS user_id FROM users_who_share_rooms
- WHERE user_id = ? AND share_private
- ) AS s USING (user_id)
+ SELECT other_user_id AS user_id FROM users_who_share_public_rooms
+ UNION
+ SELECT other_user_id AS user_id FROM users_who_share_private_rooms
+ WHERE user_id = ?
+ ) AS p USING (user_id)
"""
join_args = (user_id,)
- where_clause = "(s.user_id IS NOT NULL OR p.user_id IS NOT NULL)"
+ where_clause = "p.user_id IS NOT NULL"
if isinstance(self.database_engine, PostgresEngine):
full_query, exact_query, prefix_query = _parse_query_postgres(search_term)
@@ -686,7 +595,7 @@ class UserDirectoryStore(SQLBaseStore):
%s
AND vector @@ to_tsquery('english', ?)
ORDER BY
- (CASE WHEN s.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END)
+ (CASE WHEN d.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END)
* (CASE WHEN display_name IS NOT NULL THEN 1.2 ELSE 1.0 END)
* (CASE WHEN avatar_url IS NOT NULL THEN 1.2 ELSE 1.0 END)
* (
|