diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index 58f35d7f56..e9fe63037b 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -128,6 +128,7 @@ class EventsStore(
hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000)
self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
+ self.is_mine_id = hs.is_mine_id
@defer.inlineCallbacks
def _read_forward_extremities(self):
@@ -547,6 +548,34 @@ class EventsStore(
],
)
+ # Note: Do we really want to delete rows here (that we do not
+ # subsequently reinsert below)? While technically correct it means
+ # we have no record of the fact the user *was* a member of the
+ # room but got, say, state reset out of it.
+ if to_delete or to_insert:
+ txn.executemany(
+ "DELETE FROM local_current_membership"
+ " WHERE room_id = ? AND user_id = ?",
+ (
+ (room_id, state_key)
+ for etype, state_key in itertools.chain(to_delete, to_insert)
+ if etype == EventTypes.Member and self.is_mine_id(state_key)
+ ),
+ )
+
+ if to_insert:
+ txn.executemany(
+ """INSERT INTO local_current_membership
+ (room_id, user_id, event_id, membership)
+ VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
+ """,
+ [
+ (room_id, key[1], ev_id, ev_id)
+ for key, ev_id in to_insert.items()
+ if key[0] == EventTypes.Member and self.is_mine_id(key[1])
+ ],
+ )
+
txn.call_after(
self._curr_state_delta_stream_cache.entity_has_changed,
room_id,
@@ -1724,6 +1753,7 @@ class EventsStore(
"local_invites",
"room_account_data",
"room_tags",
+ "local_current_membership",
):
logger.info("[purge] removing %s from %s", room_id, table)
txn.execute("DELETE FROM %s WHERE room_id=?" % (table,), (room_id,))
diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py
index 70ff5751b6..9acef7c950 100644
--- a/synapse/storage/data_stores/main/roommember.py
+++ b/synapse/storage/data_stores/main/roommember.py
@@ -297,19 +297,22 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return {row[0]: row[1] for row in txn}
@cached()
- def get_invited_rooms_for_user(self, user_id):
- """ Get all the rooms the user is invited to
+ def get_invited_rooms_for_local_user(self, user_id):
+ """ Get all the rooms the *local* user is invited to
+
Args:
user_id (str): The user ID.
Returns:
A deferred list of RoomsForUser.
"""
- return self.get_rooms_for_user_where_membership_is(user_id, [Membership.INVITE])
+ return self.get_rooms_for_local_user_where_membership_is(
+ user_id, [Membership.INVITE]
+ )
@defer.inlineCallbacks
- def get_invite_for_user_in_room(self, user_id, room_id):
- """Gets the invite for the given user and room
+ def get_invite_for_local_user_in_room(self, user_id, room_id):
+ """Gets the invite for the given *local* user and room
Args:
user_id (str)
@@ -319,15 +322,15 @@ class RoomMemberWorkerStore(EventsWorkerStore):
Deferred: Resolves to either a RoomsForUser or None if no invite was
found.
"""
- invites = yield self.get_invited_rooms_for_user(user_id)
+ invites = yield self.get_invited_rooms_for_local_user(user_id)
for invite in invites:
if invite.room_id == room_id:
return invite
return None
@defer.inlineCallbacks
- def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
- """ Get all the rooms for this user where the membership for this user
+ def get_rooms_for_local_user_where_membership_is(self, user_id, membership_list):
+ """ Get all the rooms for this *local* user where the membership for this user
matches one in the membership list.
Filters out forgotten rooms.
@@ -344,8 +347,8 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return defer.succeed(None)
rooms = yield self.db.runInteraction(
- "get_rooms_for_user_where_membership_is",
- self._get_rooms_for_user_where_membership_is_txn,
+ "get_rooms_for_local_user_where_membership_is",
+ self._get_rooms_for_local_user_where_membership_is_txn,
user_id,
membership_list,
)
@@ -354,76 +357,42 @@ class RoomMemberWorkerStore(EventsWorkerStore):
forgotten_rooms = yield self.get_forgotten_rooms_for_user(user_id)
return [room for room in rooms if room.room_id not in forgotten_rooms]
- def _get_rooms_for_user_where_membership_is_txn(
+ def _get_rooms_for_local_user_where_membership_is_txn(
self, txn, user_id, membership_list
):
+ # Paranoia check.
+ if not self.hs.is_mine_id(user_id):
+ raise Exception(
+ "Cannot call 'get_rooms_for_local_user_where_membership_is' on non-local user %r"
+ % (user_id,),
+ )
- do_invite = Membership.INVITE in membership_list
- membership_list = [m for m in membership_list if m != Membership.INVITE]
-
- results = []
- if membership_list:
- if self._current_state_events_membership_up_to_date:
- clause, args = make_in_list_sql_clause(
- self.database_engine, "c.membership", membership_list
- )
- sql = """
- SELECT room_id, e.sender, c.membership, event_id, e.stream_ordering
- FROM current_state_events AS c
- INNER JOIN events AS e USING (room_id, event_id)
- WHERE
- c.type = 'm.room.member'
- AND state_key = ?
- AND %s
- """ % (
- clause,
- )
- else:
- clause, args = make_in_list_sql_clause(
- self.database_engine, "m.membership", membership_list
- )
- sql = """
- SELECT room_id, e.sender, m.membership, event_id, e.stream_ordering
- FROM current_state_events AS c
- INNER JOIN room_memberships AS m USING (room_id, event_id)
- INNER JOIN events AS e USING (room_id, event_id)
- WHERE
- c.type = 'm.room.member'
- AND state_key = ?
- AND %s
- """ % (
- clause,
- )
-
- txn.execute(sql, (user_id, *args))
- results = [RoomsForUser(**r) for r in self.db.cursor_to_dict(txn)]
+ clause, args = make_in_list_sql_clause(
+ self.database_engine, "c.membership", membership_list
+ )
- if do_invite:
- sql = (
- "SELECT i.room_id, inviter, i.event_id, e.stream_ordering"
- " FROM local_invites as i"
- " INNER JOIN events as e USING (event_id)"
- " WHERE invitee = ? AND locally_rejected is NULL"
- " AND replaced_by is NULL"
- )
+ sql = """
+ SELECT room_id, e.sender, c.membership, event_id, e.stream_ordering
+ FROM local_current_membership AS c
+ INNER JOIN events AS e USING (room_id, event_id)
+ WHERE
+ user_id = ?
+ AND %s
+ """ % (
+ clause,
+ )
- txn.execute(sql, (user_id,))
- results.extend(
- RoomsForUser(
- room_id=r["room_id"],
- sender=r["inviter"],
- event_id=r["event_id"],
- stream_ordering=r["stream_ordering"],
- membership=Membership.INVITE,
- )
- for r in self.db.cursor_to_dict(txn)
- )
+ txn.execute(sql, (user_id, *args))
+ results = [RoomsForUser(**r) for r in self.db.cursor_to_dict(txn)]
return results
- @cachedInlineCallbacks(max_entries=500000, iterable=True)
+ @cached(max_entries=500000, iterable=True)
def get_rooms_for_user_with_stream_ordering(self, user_id):
- """Returns a set of room_ids the user is currently joined to
+ """Returns a set of room_ids the user is currently joined to.
+
+ If a remote user only returns rooms this server is currently
+ participating in.
Args:
user_id (str)
@@ -433,17 +402,49 @@ class RoomMemberWorkerStore(EventsWorkerStore):
the rooms the user is in currently, along with the stream ordering
of the most recent join for that user and room.
"""
- rooms = yield self.get_rooms_for_user_where_membership_is(
- user_id, membership_list=[Membership.JOIN]
- )
- return frozenset(
- GetRoomsForUserWithStreamOrdering(r.room_id, r.stream_ordering)
- for r in rooms
+ return self.db.runInteraction(
+ "get_rooms_for_user_with_stream_ordering",
+ self._get_rooms_for_user_with_stream_ordering_txn,
+ user_id,
)
+ def _get_rooms_for_user_with_stream_ordering_txn(self, txn, user_id):
+ # We use `current_state_events` here and not `local_current_membership`
+ # as a) this gets called with remote users and b) this only gets called
+ # for rooms the server is participating in.
+ if self._current_state_events_membership_up_to_date:
+ sql = """
+ SELECT room_id, e.stream_ordering
+ FROM current_state_events AS c
+ INNER JOIN events AS e USING (room_id, event_id)
+ WHERE
+ c.type = 'm.room.member'
+ AND state_key = ?
+ AND c.membership = ?
+ """
+ else:
+ sql = """
+ SELECT room_id, e.stream_ordering
+ FROM current_state_events AS c
+ INNER JOIN room_memberships AS m USING (room_id, event_id)
+ INNER JOIN events AS e USING (room_id, event_id)
+ WHERE
+ c.type = 'm.room.member'
+ AND state_key = ?
+ AND m.membership = ?
+ """
+
+ txn.execute(sql, (user_id, Membership.JOIN))
+ results = frozenset(GetRoomsForUserWithStreamOrdering(*row) for row in txn)
+
+ return results
+
@defer.inlineCallbacks
def get_rooms_for_user(self, user_id, on_invalidate=None):
- """Returns a set of room_ids the user is currently joined to
+ """Returns a set of room_ids the user is currently joined to.
+
+ If a remote user only returns rooms this server is currently
+ participating in.
"""
rooms = yield self.get_rooms_for_user_with_stream_ordering(
user_id, on_invalidate=on_invalidate
@@ -1022,7 +1023,7 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore):
event.internal_metadata.stream_ordering,
)
txn.call_after(
- self.get_invited_rooms_for_user.invalidate, (event.state_key,)
+ self.get_invited_rooms_for_local_user.invalidate, (event.state_key,)
)
# We update the local_invites table only if the event is "current",
@@ -1064,6 +1065,27 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore):
),
)
+ # We also update the `local_current_membership` table with
+ # latest invite info. This will usually get updated by the
+ # `current_state_events` handling, unless its an outlier.
+ if event.internal_metadata.is_outlier():
+ # This should only happen for out of band memberships, so
+ # we add a paranoia check.
+ assert event.internal_metadata.is_out_of_band_membership()
+
+ self.db.simple_upsert_txn(
+ txn,
+ table="local_current_membership",
+ keyvalues={
+ "room_id": event.room_id,
+ "user_id": event.state_key,
+ },
+ values={
+ "event_id": event.event_id,
+ "membership": event.membership,
+ },
+ )
+
@defer.inlineCallbacks
def locally_reject_invite(self, user_id, room_id):
sql = (
@@ -1075,6 +1097,15 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore):
def f(txn, stream_ordering):
txn.execute(sql, (stream_ordering, True, room_id, user_id))
+ # We also clear this entry from `local_current_membership`.
+ # Ideally we'd point to a leave event, but we don't have one, so
+ # nevermind.
+ self.db.simple_delete_txn(
+ txn,
+ table="local_current_membership",
+ keyvalues={"room_id": room_id, "user_id": user_id},
+ )
+
with self._stream_id_gen.get_next() as stream_ordering:
yield self.db.runInteraction("locally_reject_invite", f, stream_ordering)
diff --git a/synapse/storage/data_stores/main/schema/delta/57/local_current_membership.py b/synapse/storage/data_stores/main/schema/delta/57/local_current_membership.py
new file mode 100644
index 0000000000..601c236c4a
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/57/local_current_membership.py
@@ -0,0 +1,97 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# We create a new table called `local_current_membership` that stores the latest
+# membership state of local users in rooms, which helps track leaves/bans/etc
+# even if the server has left the room (and so has deleted the room from
+# `current_state_events`). This will also include outstanding invites for local
+# users for rooms the server isn't in.
+#
+# If the server isn't and hasn't been in the room then it will only include
+# outsstanding invites, and not e.g. pre-emptive bans of local users.
+#
+# If the server later rejoins a room `local_current_membership` can simply be
+# replaced with the new current state of the room (which results in the
+# equivalent behaviour as if the server had remained in the room).
+
+
+def run_upgrade(cur, database_engine, config, *args, **kwargs):
+ # We need to do the insert in `run_upgrade` section as we don't have access
+ # to `config` in `run_create`.
+
+ # This upgrade may take a bit of time for large servers (e.g. one minute for
+ # matrix.org) but means we avoid a lots of book keeping required to do it as
+ # a background update.
+
+ # We check if the `current_state_events.membership` is up to date by
+ # checking if the relevant background update has finished. If it has
+ # finished we can avoid doing a join against `room_memberships`, which
+ # speesd things up.
+ cur.execute(
+ """SELECT 1 FROM background_updates
+ WHERE update_name = 'current_state_events_membership'
+ """
+ )
+ current_state_membership_up_to_date = not bool(cur.fetchone())
+
+ # Cheekily drop and recreate indices, as that is faster.
+ cur.execute("DROP INDEX local_current_membership_idx")
+ cur.execute("DROP INDEX local_current_membership_room_idx")
+
+ if current_state_membership_up_to_date:
+ sql = """
+ INSERT INTO local_current_membership (room_id, user_id, event_id, membership)
+ SELECT c.room_id, state_key AS user_id, event_id, c.membership
+ FROM current_state_events AS c
+ WHERE type = 'm.room.member' AND c.membership IS NOT NULL AND state_key like '%' || ?
+ """
+ else:
+ # We can't rely on the membership column, so we need to join against
+ # `room_memberships`.
+ sql = """
+ INSERT INTO local_current_membership (room_id, user_id, event_id, membership)
+ SELECT c.room_id, state_key AS user_id, event_id, r.membership
+ FROM current_state_events AS c
+ INNER JOIN room_memberships AS r USING (event_id)
+ WHERE type = 'm.room.member' and state_key like '%' || ?
+ """
+ cur.execute(sql, (config.server_name,))
+
+ cur.execute(
+ "CREATE UNIQUE INDEX local_current_membership_idx ON local_current_membership(user_id, room_id)"
+ )
+ cur.execute(
+ "CREATE INDEX local_current_membership_room_idx ON local_current_membership(room_id)"
+ )
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+ cur.execute(
+ """
+ CREATE TABLE local_current_membership (
+ room_id TEXT NOT NULL,
+ user_id TEXT NOT NULL,
+ event_id TEXT NOT NULL,
+ membership TEXT NOT NULL
+ )"""
+ )
+
+ cur.execute(
+ "CREATE UNIQUE INDEX local_current_membership_idx ON local_current_membership(user_id, room_id)"
+ )
+ cur.execute(
+ "CREATE INDEX local_current_membership_room_idx ON local_current_membership(room_id)"
+ )
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index e70026b80a..e86984cd50 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -29,7 +29,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 56
+SCHEMA_VERSION = 57
dir_path = os.path.abspath(os.path.dirname(__file__))
|