diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 32cfd010a5..cb88e49b51 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -24,6 +24,8 @@ from canonicaljson import json
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage._base import LoggingTransaction
from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer
@@ -53,9 +55,51 @@ ProfileInfo = namedtuple("ProfileInfo", ("avatar_url", "display_name"))
MemberSummary = namedtuple("MemberSummary", ("members", "count"))
_MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update"
+_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership"
class RoomMemberWorkerStore(EventsWorkerStore):
+ def __init__(self, db_conn, hs):
+ super(RoomMemberWorkerStore, self).__init__(db_conn, hs)
+
+ # Is the current_state_events.membership up to date? Or is the
+ # background update still running?
+ self._current_state_events_membership_up_to_date = False
+
+ txn = LoggingTransaction(
+ db_conn.cursor(),
+ name="_check_safe_current_state_events_membership_updated",
+ database_engine=self.database_engine,
+ )
+ self._check_safe_current_state_events_membership_updated_txn(txn)
+ txn.close()
+
+ def _check_safe_current_state_events_membership_updated_txn(self, txn):
+ """Checks if it is safe to assume the new current_state_events
+ membership column is up to date
+ """
+
+ pending_update = self._simple_select_one_txn(
+ txn,
+ table="background_updates",
+ keyvalues={"update_name": _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME},
+ retcols=["update_name"],
+ allow_none=True,
+ )
+
+ self._current_state_events_membership_up_to_date = not pending_update
+
+ # If the update is still running, reschedule to run.
+ if pending_update:
+ self._clock.call_later(
+ 15.0,
+ run_as_background_process,
+ "_check_safe_current_state_events_membership_updated",
+ self.runInteraction,
+ "_check_safe_current_state_events_membership_updated",
+ self._check_safe_current_state_events_membership_updated_txn,
+ )
+
@cachedInlineCallbacks(max_entries=100000, iterable=True, cache_context=True)
def get_hosts_in_room(self, room_id, cache_context):
"""Returns the set of all hosts currently in the room
@@ -64,19 +108,28 @@ class RoomMemberWorkerStore(EventsWorkerStore):
room_id, on_invalidate=cache_context.invalidate
)
hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids)
- defer.returnValue(hosts)
+ return hosts
@cached(max_entries=100000, iterable=True)
def get_users_in_room(self, room_id):
def f(txn):
- sql = (
- "SELECT m.user_id FROM room_memberships as m"
- " INNER JOIN current_state_events as c"
- " ON m.event_id = c.event_id "
- " AND m.room_id = c.room_id "
- " AND m.user_id = c.state_key"
- " WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?"
- )
+ # If we can assume current_state_events.membership is up to date
+ # then we can avoid a join, which is a Very Good Thing given how
+ # frequently this function gets called.
+ if self._current_state_events_membership_up_to_date:
+ sql = """
+ SELECT state_key FROM current_state_events
+ WHERE type = 'm.room.member' AND room_id = ? AND membership = ?
+ """
+ else:
+ sql = """
+ SELECT state_key FROM room_memberships as m
+ INNER JOIN current_state_events as c
+ ON m.event_id = c.event_id
+ AND m.room_id = c.room_id
+ AND m.user_id = c.state_key
+ WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
+ """
txn.execute(sql, (room_id, Membership.JOIN))
return [to_ascii(r[0]) for r in txn]
@@ -98,15 +151,26 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# first get counts.
# We do this all in one transaction to keep the cache small.
# FIXME: get rid of this when we have room_stats
- sql = """
- SELECT count(*), m.membership FROM room_memberships as m
- INNER JOIN current_state_events as c
- ON m.event_id = c.event_id
- AND m.room_id = c.room_id
- AND m.user_id = c.state_key
- WHERE c.type = 'm.room.member' AND c.room_id = ?
- GROUP BY m.membership
- """
+
+ # If we can assume current_state_events.membership is up to date
+ # then we can avoid a join, which is a Very Good Thing given how
+ # frequently this function gets called.
+ if self._current_state_events_membership_up_to_date:
+ sql = """
+ SELECT count(*), membership FROM current_state_events
+ WHERE type = 'm.room.member' AND room_id = ?
+ GROUP BY membership
+ """
+ else:
+ sql = """
+ SELECT count(*), m.membership FROM room_memberships as m
+ INNER JOIN current_state_events as c
+ ON m.event_id = c.event_id
+ AND m.room_id = c.room_id
+ AND m.user_id = c.state_key
+ WHERE c.type = 'm.room.member' AND c.room_id = ?
+ GROUP BY m.membership
+ """
txn.execute(sql, (room_id,))
res = {}
@@ -189,8 +253,8 @@ class RoomMemberWorkerStore(EventsWorkerStore):
invites = yield self.get_invited_rooms_for_user(user_id)
for invite in invites:
if invite.room_id == room_id:
- defer.returnValue(invite)
- defer.returnValue(None)
+ return invite
+ return None
def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
""" Get all the rooms for this user where the membership for this user
@@ -224,7 +288,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
results = []
if membership_list:
where_clause = "user_id = ? AND (%s) AND forgotten = 0" % (
- " OR ".join(["membership = ?" for _ in membership_list]),
+ " OR ".join(["m.membership = ?" for _ in membership_list]),
)
args = [user_id]
@@ -283,11 +347,9 @@ class RoomMemberWorkerStore(EventsWorkerStore):
rooms = yield self.get_rooms_for_user_where_membership_is(
user_id, membership_list=[Membership.JOIN]
)
- defer.returnValue(
- frozenset(
- GetRoomsForUserWithStreamOrdering(r.room_id, r.stream_ordering)
- for r in rooms
- )
+ return frozenset(
+ GetRoomsForUserWithStreamOrdering(r.room_id, r.stream_ordering)
+ for r in rooms
)
@defer.inlineCallbacks
@@ -297,7 +359,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
rooms = yield self.get_rooms_for_user_with_stream_ordering(
user_id, on_invalidate=on_invalidate
)
- defer.returnValue(frozenset(r.room_id for r in rooms))
+ return frozenset(r.room_id for r in rooms)
@cachedInlineCallbacks(max_entries=500000, cache_context=True, iterable=True)
def get_users_who_share_room_with_user(self, user_id, cache_context):
@@ -314,7 +376,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
)
user_who_share_room.update(user_ids)
- defer.returnValue(user_who_share_room)
+ return user_who_share_room
@defer.inlineCallbacks
def get_joined_users_from_context(self, event, context):
@@ -330,7 +392,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
result = yield self._get_joined_users_from_context(
event.room_id, state_group, current_state_ids, event=event, context=context
)
- defer.returnValue(result)
+ return result
def get_joined_users_from_state(self, room_id, state_entry):
state_group = state_entry.state_group
@@ -444,7 +506,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
avatar_url=to_ascii(event.content.get("avatar_url", None)),
)
- defer.returnValue(users_in_room)
+ return users_in_room
@cachedInlineCallbacks(max_entries=10000)
def is_host_joined(self, room_id, host):
@@ -453,8 +515,8 @@ class RoomMemberWorkerStore(EventsWorkerStore):
sql = """
SELECT state_key FROM current_state_events AS c
- INNER JOIN room_memberships USING (event_id)
- WHERE membership = 'join'
+ INNER JOIN room_memberships AS m USING (event_id)
+ WHERE m.membership = 'join'
AND type = 'm.room.member'
AND c.room_id = ?
AND state_key LIKE ?
@@ -469,14 +531,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
rows = yield self._execute("is_host_joined", None, sql, room_id, like_clause)
if not rows:
- defer.returnValue(False)
+ return False
user_id = rows[0][0]
if get_domain_from_id(user_id) != host:
# This can only happen if the host name has something funky in it
raise Exception("Invalid host name")
- defer.returnValue(True)
+ return True
@cachedInlineCallbacks()
def was_host_joined(self, room_id, host):
@@ -509,14 +571,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
rows = yield self._execute("was_host_joined", None, sql, room_id, like_clause)
if not rows:
- defer.returnValue(False)
+ return False
user_id = rows[0][0]
if get_domain_from_id(user_id) != host:
# This can only happen if the host name has something funky in it
raise Exception("Invalid host name")
- defer.returnValue(True)
+ return True
def get_joined_hosts(self, room_id, state_entry):
state_group = state_entry.state_group
@@ -543,7 +605,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
cache = self._get_joined_hosts_cache(room_id)
joined_hosts = yield cache.get_destinations(state_entry)
- defer.returnValue(joined_hosts)
+ return joined_hosts
@cached(max_entries=10000)
def _get_joined_hosts_cache(self, room_id):
@@ -573,7 +635,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return rows[0][0]
count = yield self.runInteraction("did_forget_membership", f)
- defer.returnValue(count == 0)
+ return count == 0
@defer.inlineCallbacks
def get_rooms_user_has_been_in(self, user_id):
@@ -602,6 +664,10 @@ class RoomMemberStore(RoomMemberWorkerStore):
self.register_background_update_handler(
_MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile
)
+ self.register_background_update_handler(
+ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME,
+ self._background_current_state_membership,
+ )
def _store_room_members_txn(self, txn, events, backfilled):
"""Store a room member in the database.
@@ -779,7 +845,65 @@ class RoomMemberStore(RoomMemberWorkerStore):
if not result:
yield self._end_background_update(_MEMBERSHIP_PROFILE_UPDATE_NAME)
- defer.returnValue(result)
+ return result
+
+ @defer.inlineCallbacks
+ def _background_current_state_membership(self, progress, batch_size):
+ """Update the new membership column on current_state_events.
+
+ This works by iterating over all rooms in alphebetical order.
+ """
+
+ def _background_current_state_membership_txn(txn, last_processed_room):
+ processed = 0
+ while processed < batch_size:
+ txn.execute(
+ """
+ SELECT MIN(room_id) FROM rooms WHERE room_id > ?
+ """,
+ (last_processed_room,),
+ )
+ row = txn.fetchone()
+ if not row or not row[0]:
+ return processed, True
+
+ next_room, = row
+
+ sql = """
+ UPDATE current_state_events AS c
+ SET membership = (
+ SELECT membership FROM room_memberships
+ WHERE event_id = c.event_id
+ )
+ WHERE room_id = ?
+ """
+ txn.execute(sql, (next_room,))
+ processed += txn.rowcount
+
+ last_processed_room = next_room
+
+ self._background_update_progress_txn(
+ txn,
+ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME,
+ {"last_processed_room": last_processed_room},
+ )
+
+ return processed, False
+
+ # If we haven't got a last processed room then just use the empty
+ # string, which will compare before all room IDs correctly.
+ last_processed_room = progress.get("last_processed_room", "")
+
+ row_count, finished = yield self.runInteraction(
+ "_background_current_state_membership_update",
+ _background_current_state_membership_txn,
+ last_processed_room,
+ )
+
+ if finished:
+ yield self._end_background_update(_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME)
+
+ return row_count
class _JoinedHostsCache(object):
@@ -807,7 +931,7 @@ class _JoinedHostsCache(object):
state_entry(synapse.state._StateCacheEntry)
"""
if state_entry.state_group == self.state_group:
- defer.returnValue(frozenset(self.hosts_to_joined_users))
+ return frozenset(self.hosts_to_joined_users)
with (yield self.linearizer.queue(())):
if state_entry.state_group == self.state_group:
@@ -844,7 +968,7 @@ class _JoinedHostsCache(object):
else:
self.state_group = object()
self._len = sum(len(v) for v in itervalues(self.hosts_to_joined_users))
- defer.returnValue(frozenset(self.hosts_to_joined_users))
+ return frozenset(self.hosts_to_joined_users)
def __len__(self):
return self._len
|