summary refs log tree commit diff
path: root/synapse/storage/roommember.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-12-08 13:02:58 +0000
committerErik Johnston <erik@matrix.org>2016-12-08 13:08:41 +0000
commitc45d8e9ba20bd14228e95a1f21021dfb38240daf (patch)
tree290a2896587bbbbc10dac36b5c2eeda4abbb7b82 /synapse/storage/roommember.py
parentAdd joined_rooms servlet (diff)
downloadsynapse-c45d8e9ba20bd14228e95a1f21021dfb38240daf.tar.xz
Add profile data to the room_membership table for joins
Diffstat (limited to '')
-rw-r--r--synapse/storage/roommember.py86
1 files changed, 86 insertions, 0 deletions
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 866d64e679..7bf9040c0a 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -24,6 +24,7 @@ from synapse.api.constants import Membership, EventTypes
 from synapse.types import get_domain_from_id
 
 import logging
+import ujson as json
 
 logger = logging.getLogger(__name__)
 
@@ -34,7 +35,15 @@ RoomsForUser = namedtuple(
 )
 
 
+_MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update"
+
+
 class RoomMemberStore(SQLBaseStore):
+    def __init__(self, hs):
+        super(RoomMemberStore, self).__init__(hs)
+        self.register_background_update_handler(
+            _MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile
+        )
 
     def _store_room_members_txn(self, txn, events, backfilled):
         """Store a room member in the database.
@@ -49,6 +58,8 @@ class RoomMemberStore(SQLBaseStore):
                     "sender": event.user_id,
                     "room_id": event.room_id,
                     "membership": event.membership,
+                    "display_name": event.content.get("displayname", None),
+                    "avatar_url": event.content.get("avatar_url", None),
                 }
                 for event in events
             ]
@@ -448,3 +459,78 @@ class RoomMemberStore(SQLBaseStore):
                     defer.returnValue(True)
 
         defer.returnValue(False)
+
+    @defer.inlineCallbacks
+    def _background_add_membership_profile(self, progress, batch_size):
+        target_min_stream_id = progress.get(
+            "target_min_stream_id_inclusive", self._min_stream_order_on_start
+        )
+        max_stream_id = progress.get(
+            "max_stream_id_exclusive", self._stream_order_on_start + 1
+        )
+
+        INSERT_CLUMP_SIZE = 1000
+
+        def add_membership_profile_txn(txn):
+            sql = ("""
+                SELECT stream_ordering, event_id, room_id, content
+                FROM events
+                INNER JOIN room_memberships USING (room_id, event_id)
+                WHERE ? <= stream_ordering AND stream_ordering < ?
+                AND type = 'm.room.member'
+                ORDER BY stream_ordering DESC
+                LIMIT ?
+            """)
+
+            txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
+
+            rows = self.cursor_to_dict(txn)
+            if not rows:
+                return 0
+
+            min_stream_id = rows[-1]["stream_ordering"]
+
+            to_update = []
+            for row in rows:
+                event_id = row["event_id"]
+                room_id = row["room_id"]
+                try:
+                    content = json.loads(row["content"])
+                except:
+                    continue
+
+                display_name = content.get("displayname", None)
+                avatar_url = content.get("avatar_url", None)
+
+                if display_name or avatar_url:
+                    to_update.append((
+                        display_name, avatar_url, event_id, room_id
+                    ))
+
+            to_update_sql = ("""
+                UPDATE room_memberships SET display_name = ?, avatar_url = ?
+                WHERE event_id = ? AND room_id = ?
+            """)
+            for index in range(0, len(to_update), INSERT_CLUMP_SIZE):
+                clump = to_update[index:index + INSERT_CLUMP_SIZE]
+                txn.executemany(to_update_sql, clump)
+
+            progress = {
+                "target_min_stream_id_inclusive": target_min_stream_id,
+                "max_stream_id_exclusive": min_stream_id,
+            }
+
+            self._background_update_progress_txn(
+                txn, _MEMBERSHIP_PROFILE_UPDATE_NAME, progress
+            )
+
+            return len(to_update)
+
+        result = yield self.runInteraction(
+            _MEMBERSHIP_PROFILE_UPDATE_NAME, add_membership_profile_txn
+        )
+
+        if not result:
+            yield self._end_background_update(_MEMBERSHIP_PROFILE_UPDATE_NAME)
+
+        defer.returnValue(result)