summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py12
-rw-r--r--synapse/rest/client/v1/room.py36
-rw-r--r--synapse/storage/__init__.py1
-rw-r--r--synapse/storage/roommember.py102
-rw-r--r--synapse/storage/schema/delta/39/membership_profile.sql20
-rw-r--r--synapse/storage/stream.py3
6 files changed, 165 insertions, 9 deletions
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index be55598c43..78b095c903 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -87,12 +87,12 @@ class BulkPushRuleEvaluator:
         condition_cache = {}
 
         for uid, rules in self.rules_by_user.items():
-            display_name = None
-            member_ev_id = context.current_state_ids.get((EventTypes.Member, uid))
-            if member_ev_id:
-                member_ev = yield self.store.get_event(member_ev_id, allow_none=True)
-                if member_ev:
-                    display_name = member_ev.content.get("displayname", None)
+            display_name = room_members.get(uid, {}).get("display_name", None)
+            if not display_name:
+                # Handle the case where we are pushing a membership event to
+                # that user, as they might not be already joined.
+                if event.type == EventTypes.Member and event.state_key == uid:
+                    display_name = event.content.get("displayname", None)
 
             filtered = filtered_by_user[uid]
             if len(filtered) == 0:
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 3fb1f2deb3..a0bba1fa3b 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -369,6 +369,24 @@ class RoomMemberListRestServlet(ClientV1RestServlet):
         }))
 
 
+class JoinedRoomMemberListRestServlet(ClientV1RestServlet):
+    PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/joined_members$")
+
+    def __init__(self, hs):
+        super(JoinedRoomMemberListRestServlet, self).__init__(hs)
+        self.state = hs.get_state_handler()
+
+    @defer.inlineCallbacks
+    def on_GET(self, request, room_id):
+        yield self.auth.get_user_by_req(request)
+
+        users_with_profile = yield self.state.get_current_user_in_room(room_id)
+
+        defer.returnValue((200, {
+            "joined": users_with_profile
+        }))
+
+
 # TODO: Needs better unit testing
 class RoomMessageListRestServlet(ClientV1RestServlet):
     PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/messages$")
@@ -692,6 +710,22 @@ class SearchRestServlet(ClientV1RestServlet):
         defer.returnValue((200, results))
 
 
+class JoinedRoomsRestServlet(ClientV1RestServlet):
+    PATTERNS = client_path_patterns("/joined_rooms$")
+
+    def __init__(self, hs):
+        super(JoinedRoomsRestServlet, self).__init__(hs)
+        self.store = hs.get_datastore()
+
+    @defer.inlineCallbacks
+    def on_GET(self, request):
+        requester = yield self.auth.get_user_by_req(request, allow_guest=True)
+
+        rooms = yield self.store.get_rooms_for_user(requester.user.to_string())
+        room_ids = set(r.room_id for r in rooms)  # Ensure they're unique.
+        defer.returnValue((200, {"joined_rooms": list(room_ids)}))
+
+
 def register_txn_path(servlet, regex_string, http_server, with_get=False):
     """Registers a transaction-based path.
 
@@ -727,6 +761,7 @@ def register_servlets(hs, http_server):
     RoomStateEventRestServlet(hs).register(http_server)
     RoomCreateRestServlet(hs).register(http_server)
     RoomMemberListRestServlet(hs).register(http_server)
+    JoinedRoomMemberListRestServlet(hs).register(http_server)
     RoomMessageListRestServlet(hs).register(http_server)
     JoinRoomAliasServlet(hs).register(http_server)
     RoomForgetRestServlet(hs).register(http_server)
@@ -738,4 +773,5 @@ def register_servlets(hs, http_server):
     RoomRedactEventRestServlet(hs).register(http_server)
     RoomTypingRestServlet(hs).register(http_server)
     SearchRestServlet(hs).register(http_server)
+    JoinedRoomsRestServlet(hs).register(http_server)
     RoomEventContext(hs).register(http_server)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index db146ed348..fe936b3e62 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -222,6 +222,7 @@ class DataStore(RoomMemberStore, RoomStore,
         )
 
         self._stream_order_on_start = self.get_room_max_stream_ordering()
+        self._min_stream_order_on_start = self.get_room_min_stream_ordering()
 
         super(DataStore, self).__init__(hs)
 
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 866d64e679..b2a45a38c1 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -24,6 +24,7 @@ from synapse.api.constants import Membership, EventTypes
 from synapse.types import get_domain_from_id
 
 import logging
+import ujson as json
 
 logger = logging.getLogger(__name__)
 
@@ -34,7 +35,15 @@ RoomsForUser = namedtuple(
 )
 
 
+_MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update"
+
+
 class RoomMemberStore(SQLBaseStore):
+    def __init__(self, hs):
+        super(RoomMemberStore, self).__init__(hs)
+        self.register_background_update_handler(
+            _MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile
+        )
 
     def _store_room_members_txn(self, txn, events, backfilled):
         """Store a room member in the database.
@@ -49,6 +58,8 @@ class RoomMemberStore(SQLBaseStore):
                     "sender": event.user_id,
                     "room_id": event.room_id,
                     "membership": event.membership,
+                    "display_name": event.content.get("displayname", None),
+                    "avatar_url": event.content.get("avatar_url", None),
                 }
                 for event in events
             ]
@@ -398,7 +409,7 @@ class RoomMemberStore(SQLBaseStore):
             table="room_memberships",
             column="event_id",
             iterable=member_event_ids,
-            retcols=['user_id'],
+            retcols=['user_id', 'display_name', 'avatar_url'],
             keyvalues={
                 "membership": Membership.JOIN,
             },
@@ -406,11 +417,21 @@ class RoomMemberStore(SQLBaseStore):
             desc="_get_joined_users_from_context",
         )
 
-        users_in_room = set(row["user_id"] for row in rows)
+        users_in_room = {
+            row["user_id"]: {
+                "display_name": row["display_name"],
+                "avatar_url": row["avatar_url"],
+            }
+            for row in rows
+        }
+
         if event is not None and event.type == EventTypes.Member:
             if event.membership == Membership.JOIN:
                 if event.event_id in member_event_ids:
-                    users_in_room.add(event.state_key)
+                    users_in_room[event.state_key] = {
+                        "display_name": event.content.get("displayname", None),
+                        "avatar_url": event.content.get("avatar_url", None),
+                    }
 
         defer.returnValue(users_in_room)
 
@@ -448,3 +469,78 @@ class RoomMemberStore(SQLBaseStore):
                     defer.returnValue(True)
 
         defer.returnValue(False)
+
+    @defer.inlineCallbacks
+    def _background_add_membership_profile(self, progress, batch_size):
+        target_min_stream_id = progress.get(
+            "target_min_stream_id_inclusive", self._min_stream_order_on_start
+        )
+        max_stream_id = progress.get(
+            "max_stream_id_exclusive", self._stream_order_on_start + 1
+        )
+
+        INSERT_CLUMP_SIZE = 1000
+
+        def add_membership_profile_txn(txn):
+            sql = ("""
+                SELECT stream_ordering, event_id, room_id, content
+                FROM events
+                INNER JOIN room_memberships USING (room_id, event_id)
+                WHERE ? <= stream_ordering AND stream_ordering < ?
+                AND type = 'm.room.member'
+                ORDER BY stream_ordering DESC
+                LIMIT ?
+            """)
+
+            txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
+
+            rows = self.cursor_to_dict(txn)
+            if not rows:
+                return 0
+
+            min_stream_id = rows[-1]["stream_ordering"]
+
+            to_update = []
+            for row in rows:
+                event_id = row["event_id"]
+                room_id = row["room_id"]
+                try:
+                    content = json.loads(row["content"])
+                except:
+                    continue
+
+                display_name = content.get("displayname", None)
+                avatar_url = content.get("avatar_url", None)
+
+                if display_name or avatar_url:
+                    to_update.append((
+                        display_name, avatar_url, event_id, room_id
+                    ))
+
+            to_update_sql = ("""
+                UPDATE room_memberships SET display_name = ?, avatar_url = ?
+                WHERE event_id = ? AND room_id = ?
+            """)
+            for index in range(0, len(to_update), INSERT_CLUMP_SIZE):
+                clump = to_update[index:index + INSERT_CLUMP_SIZE]
+                txn.executemany(to_update_sql, clump)
+
+            progress = {
+                "target_min_stream_id_inclusive": target_min_stream_id,
+                "max_stream_id_exclusive": min_stream_id,
+            }
+
+            self._background_update_progress_txn(
+                txn, _MEMBERSHIP_PROFILE_UPDATE_NAME, progress
+            )
+
+            return len(to_update)
+
+        result = yield self.runInteraction(
+            _MEMBERSHIP_PROFILE_UPDATE_NAME, add_membership_profile_txn
+        )
+
+        if not result:
+            yield self._end_background_update(_MEMBERSHIP_PROFILE_UPDATE_NAME)
+
+        defer.returnValue(result)
diff --git a/synapse/storage/schema/delta/39/membership_profile.sql b/synapse/storage/schema/delta/39/membership_profile.sql
new file mode 100644
index 0000000000..1bf911c8ab
--- /dev/null
+++ b/synapse/storage/schema/delta/39/membership_profile.sql
@@ -0,0 +1,20 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE room_memberships ADD COLUMN display_name TEXT;
+ALTER TABLE room_memberships ADD COLUMN avatar_url TEXT;
+
+INSERT into background_updates (update_name, progress_json)
+    VALUES ('room_membership_profile_update', '{}');
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 7fa63b58a7..2dc24951c4 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -541,6 +541,9 @@ class StreamStore(SQLBaseStore):
     def get_room_max_stream_ordering(self):
         return self._stream_id_gen.get_current_token()
 
+    def get_room_min_stream_ordering(self):
+        return self._backfill_id_gen.get_current_token()
+
     def get_stream_token_for_event(self, event_id):
         """The stream token for an event
         Args: