diff options
author | Erik Johnston <erik@matrix.org> | 2016-12-08 13:02:58 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-12-08 13:08:41 +0000 |
commit | c45d8e9ba20bd14228e95a1f21021dfb38240daf (patch) | |
tree | 290a2896587bbbbc10dac36b5c2eeda4abbb7b82 /synapse/storage/roommember.py | |
parent | Add joined_rooms servlet (diff) | |
download | synapse-c45d8e9ba20bd14228e95a1f21021dfb38240daf.tar.xz |
Add profile data to the room_membership table for joins
Diffstat (limited to '')
-rw-r--r-- | synapse/storage/roommember.py | 86 |
1 files changed, 86 insertions, 0 deletions
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 866d64e679..7bf9040c0a 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -24,6 +24,7 @@ from synapse.api.constants import Membership, EventTypes from synapse.types import get_domain_from_id import logging +import ujson as json logger = logging.getLogger(__name__) @@ -34,7 +35,15 @@ RoomsForUser = namedtuple( ) +_MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update" + + class RoomMemberStore(SQLBaseStore): + def __init__(self, hs): + super(RoomMemberStore, self).__init__(hs) + self.register_background_update_handler( + _MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile + ) def _store_room_members_txn(self, txn, events, backfilled): """Store a room member in the database. @@ -49,6 +58,8 @@ class RoomMemberStore(SQLBaseStore): "sender": event.user_id, "room_id": event.room_id, "membership": event.membership, + "display_name": event.content.get("displayname", None), + "avatar_url": event.content.get("avatar_url", None), } for event in events ] @@ -448,3 +459,78 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(True) defer.returnValue(False) + + @defer.inlineCallbacks + def _background_add_membership_profile(self, progress, batch_size): + target_min_stream_id = progress.get( + "target_min_stream_id_inclusive", self._min_stream_order_on_start + ) + max_stream_id = progress.get( + "max_stream_id_exclusive", self._stream_order_on_start + 1 + ) + + INSERT_CLUMP_SIZE = 1000 + + def add_membership_profile_txn(txn): + sql = (""" + SELECT stream_ordering, event_id, room_id, content + FROM events + INNER JOIN room_memberships USING (room_id, event_id) + WHERE ? <= stream_ordering AND stream_ordering < ? + AND type = 'm.room.member' + ORDER BY stream_ordering DESC + LIMIT ? + """) + + txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) + + rows = self.cursor_to_dict(txn) + if not rows: + return 0 + + min_stream_id = rows[-1]["stream_ordering"] + + to_update = [] + for row in rows: + event_id = row["event_id"] + room_id = row["room_id"] + try: + content = json.loads(row["content"]) + except: + continue + + display_name = content.get("displayname", None) + avatar_url = content.get("avatar_url", None) + + if display_name or avatar_url: + to_update.append(( + display_name, avatar_url, event_id, room_id + )) + + to_update_sql = (""" + UPDATE room_memberships SET display_name = ?, avatar_url = ? + WHERE event_id = ? AND room_id = ? + """) + for index in range(0, len(to_update), INSERT_CLUMP_SIZE): + clump = to_update[index:index + INSERT_CLUMP_SIZE] + txn.executemany(to_update_sql, clump) + + progress = { + "target_min_stream_id_inclusive": target_min_stream_id, + "max_stream_id_exclusive": min_stream_id, + } + + self._background_update_progress_txn( + txn, _MEMBERSHIP_PROFILE_UPDATE_NAME, progress + ) + + return len(to_update) + + result = yield self.runInteraction( + _MEMBERSHIP_PROFILE_UPDATE_NAME, add_membership_profile_txn + ) + + if not result: + yield self._end_background_update(_MEMBERSHIP_PROFILE_UPDATE_NAME) + + defer.returnValue(result) |