diff --git a/changelog.d/13745.misc b/changelog.d/13745.misc
new file mode 100644
index 0000000000..e97a789c0e
--- /dev/null
+++ b/changelog.d/13745.misc
@@ -0,0 +1 @@
+Remove old queries to join room memberships to current state events. Contributed by Nick @ Beeper (@fizzadar).
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 6e1ff5626b..fdb4684e12 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -32,10 +32,7 @@ import attr
from synapse.api.constants import EventTypes, Membership
from synapse.metrics import LaterGauge
-from synapse.metrics.background_process_metrics import (
- run_as_background_process,
- wrap_as_background_process,
-)
+from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
@@ -91,16 +88,6 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# at a time. Keyed by room_id.
self._joined_host_linearizer = Linearizer("_JoinedHostsCache")
- # Is the current_state_events.membership up to date? Or is the
- # background update still running?
- self._current_state_events_membership_up_to_date = False
-
- txn = db_conn.cursor(
- txn_name="_check_safe_current_state_events_membership_updated"
- )
- self._check_safe_current_state_events_membership_updated_txn(txn)
- txn.close()
-
if (
self.hs.config.worker.run_background_tasks
and self.hs.config.metrics.metrics_flags.known_servers
@@ -157,34 +144,6 @@ class RoomMemberWorkerStore(EventsWorkerStore):
self._known_servers_count = max([count, 1])
return self._known_servers_count
- def _check_safe_current_state_events_membership_updated_txn(
- self, txn: LoggingTransaction
- ) -> None:
- """Checks if it is safe to assume the new current_state_events
- membership column is up to date
- """
-
- pending_update = self.db_pool.simple_select_one_txn(
- txn,
- table="background_updates",
- keyvalues={"update_name": _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME},
- retcols=["update_name"],
- allow_none=True,
- )
-
- self._current_state_events_membership_up_to_date = not pending_update
-
- # If the update is still running, reschedule to run.
- if pending_update:
- self._clock.call_later(
- 15.0,
- run_as_background_process,
- "_check_safe_current_state_events_membership_updated",
- self.db_pool.runInteraction,
- "_check_safe_current_state_events_membership_updated",
- self._check_safe_current_state_events_membership_updated_txn,
- )
-
@cached(max_entries=100000, iterable=True)
async def get_users_in_room(self, room_id: str) -> List[str]:
"""
@@ -212,31 +171,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
`get_current_hosts_in_room()` and so we can re-use the cache but it's
not horrible to have here either.
"""
- # If we can assume current_state_events.membership is up to date
- # then we can avoid a join, which is a Very Good Thing given how
- # frequently this function gets called.
- if self._current_state_events_membership_up_to_date:
- sql = """
- SELECT c.state_key FROM current_state_events as c
- /* Get the depth of the event from the events table */
- INNER JOIN events AS e USING (event_id)
- WHERE c.type = 'm.room.member' AND c.room_id = ? AND membership = ?
- /* Sorted by lowest depth first */
- ORDER BY e.depth ASC;
- """
- else:
- sql = """
- SELECT c.state_key FROM room_memberships as m
- /* Get the depth of the event from the events table */
- INNER JOIN events AS e USING (event_id)
- INNER JOIN current_state_events as c
- ON m.event_id = c.event_id
- AND m.room_id = c.room_id
- AND m.user_id = c.state_key
- WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
- /* Sorted by lowest depth first */
- ORDER BY e.depth ASC;
- """
+ sql = """
+ SELECT c.state_key FROM current_state_events as c
+ /* Get the depth of the event from the events table */
+ INNER JOIN events AS e USING (event_id)
+ WHERE c.type = 'm.room.member' AND c.room_id = ? AND membership = ?
+ /* Sorted by lowest depth first */
+ ORDER BY e.depth ASC;
+ """
txn.execute(sql, (room_id, Membership.JOIN))
return [r[0] for r in txn]
@@ -353,28 +295,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# We do this all in one transaction to keep the cache small.
# FIXME: get rid of this when we have room_stats
- # If we can assume current_state_events.membership is up to date
- # then we can avoid a join, which is a Very Good Thing given how
- # frequently this function gets called.
- if self._current_state_events_membership_up_to_date:
- # Note, rejected events will have a null membership field, so
- # we we manually filter them out.
- sql = """
- SELECT count(*), membership FROM current_state_events
- WHERE type = 'm.room.member' AND room_id = ?
- AND membership IS NOT NULL
- GROUP BY membership
- """
- else:
- sql = """
- SELECT count(*), m.membership FROM room_memberships as m
- INNER JOIN current_state_events as c
- ON m.event_id = c.event_id
- AND m.room_id = c.room_id
- AND m.user_id = c.state_key
- WHERE c.type = 'm.room.member' AND c.room_id = ?
- GROUP BY m.membership
- """
+ # Note, rejected events will have a null membership field, so
+ # we we manually filter them out.
+ sql = """
+ SELECT count(*), membership FROM current_state_events
+ WHERE type = 'm.room.member' AND room_id = ?
+ AND membership IS NOT NULL
+ GROUP BY membership
+ """
txn.execute(sql, (room_id,))
res: Dict[str, MemberSummary] = {}
@@ -383,30 +311,18 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# we order by membership and then fairly arbitrarily by event_id so
# heroes are consistent
- if self._current_state_events_membership_up_to_date:
- # Note, rejected events will have a null membership field, so
- # we we manually filter them out.
- sql = """
- SELECT state_key, membership, event_id
- FROM current_state_events
- WHERE type = 'm.room.member' AND room_id = ?
- AND membership IS NOT NULL
- ORDER BY
- CASE membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
- event_id ASC
- LIMIT ?
- """
- else:
- sql = """
- SELECT c.state_key, m.membership, c.event_id
- FROM room_memberships as m
- INNER JOIN current_state_events as c USING (room_id, event_id)
- WHERE c.type = 'm.room.member' AND c.room_id = ?
- ORDER BY
- CASE m.membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
- c.event_id ASC
- LIMIT ?
- """
+ # Note, rejected events will have a null membership field, so
+ # we we manually filter them out.
+ sql = """
+ SELECT state_key, membership, event_id
+ FROM current_state_events
+ WHERE type = 'm.room.member' AND room_id = ?
+ AND membership IS NOT NULL
+ ORDER BY
+ CASE membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
+ event_id ASC
+ LIMIT ?
+ """
# 6 is 5 (number of heroes) plus 1, in case one of them is the calling user.
txn.execute(sql, (room_id, Membership.JOIN, Membership.INVITE, 6))
@@ -649,27 +565,15 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# We use `current_state_events` here and not `local_current_membership`
# as a) this gets called with remote users and b) this only gets called
# for rooms the server is participating in.
- if self._current_state_events_membership_up_to_date:
- sql = """
- SELECT room_id, e.instance_name, e.stream_ordering
- FROM current_state_events AS c
- INNER JOIN events AS e USING (room_id, event_id)
- WHERE
- c.type = 'm.room.member'
- AND c.state_key = ?
- AND c.membership = ?
- """
- else:
- sql = """
- SELECT room_id, e.instance_name, e.stream_ordering
- FROM current_state_events AS c
- INNER JOIN room_memberships AS m USING (room_id, event_id)
- INNER JOIN events AS e USING (room_id, event_id)
- WHERE
- c.type = 'm.room.member'
- AND c.state_key = ?
- AND m.membership = ?
- """
+ sql = """
+ SELECT room_id, e.instance_name, e.stream_ordering
+ FROM current_state_events AS c
+ INNER JOIN events AS e USING (room_id, event_id)
+ WHERE
+ c.type = 'm.room.member'
+ AND c.state_key = ?
+ AND c.membership = ?
+ """
txn.execute(sql, (user_id, Membership.JOIN))
return frozenset(
@@ -707,27 +611,15 @@ class RoomMemberWorkerStore(EventsWorkerStore):
user_ids,
)
- if self._current_state_events_membership_up_to_date:
- sql = f"""
- SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
- FROM current_state_events AS c
- INNER JOIN events AS e USING (room_id, event_id)
- WHERE
- c.type = 'm.room.member'
- AND c.membership = ?
- AND {clause}
- """
- else:
- sql = f"""
- SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
- FROM current_state_events AS c
- INNER JOIN room_memberships AS m USING (room_id, event_id)
- INNER JOIN events AS e USING (room_id, event_id)
- WHERE
- c.type = 'm.room.member'
- AND m.membership = ?
- AND {clause}
- """
+ sql = f"""
+ SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
+ FROM current_state_events AS c
+ INNER JOIN events AS e USING (room_id, event_id)
+ WHERE
+ c.type = 'm.room.member'
+ AND c.membership = ?
+ AND {clause}
+ """
txn.execute(sql, [Membership.JOIN] + args)
diff --git a/synapse/storage/schema/main/delta/72/07force_update_current_state_events_membership.py b/synapse/storage/schema/main/delta/72/07force_update_current_state_events_membership.py
new file mode 100644
index 0000000000..b5853d125c
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/07force_update_current_state_events_membership.py
@@ -0,0 +1,52 @@
+# Copyright 2022 Beeper
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+"""
+Forces through the `current_state_events_membership` background job so checks
+for its completion can be removed.
+
+Note the background job must still remain defined in the database class.
+"""
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+ cur.execute("SELECT update_name FROM background_updates")
+ rows = cur.fetchall()
+ for row in rows:
+ if row[0] == "current_state_events_membership":
+ break
+ # No pending background job so nothing to do here
+ else:
+ return
+
+ # Populate membership field for all current_state_events, this may take
+ # a while but was originally handled via a background update in 2019.
+ cur.execute(
+ """
+ UPDATE current_state_events
+ SET membership = (
+ SELECT membership FROM room_memberships
+ WHERE event_id = current_state_events.event_id
+ )
+ """
+ )
+
+ # Finally, delete the background job because we've handled it above
+ cur.execute(
+ """
+ DELETE FROM background_updates
+ WHERE update_name = 'current_state_events_membership'
+ """
+ )
|