summary refs log tree commit diff
path: root/synapse/storage/roommember.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-04-17 19:44:40 +0100
committerErik Johnston <erik@matrix.org>2019-04-17 19:44:40 +0100
commitca90336a6935b36b5761244005b0f68b496d5d79 (patch)
tree6bbce5eafc0db3b24ccc3b59b051da850382ae09 /synapse/storage/roommember.py
parentAdd management endpoints for account validity (diff)
parentMerge pull request #5047 from matrix-org/babolivier/account_expiration (diff)
downloadsynapse-ca90336a6935b36b5761244005b0f68b496d5d79.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into babolivier/account_expiration
Diffstat (limited to 'synapse/storage/roommember.py')
-rw-r--r--synapse/storage/roommember.py173
1 files changed, 87 insertions, 86 deletions
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 592c1bcd33..57df17bcc2 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -35,28 +35,22 @@ logger = logging.getLogger(__name__)
 
 
 RoomsForUser = namedtuple(
-    "RoomsForUser",
-    ("room_id", "sender", "membership", "event_id", "stream_ordering")
+    "RoomsForUser", ("room_id", "sender", "membership", "event_id", "stream_ordering")
 )
 
 GetRoomsForUserWithStreamOrdering = namedtuple(
-    "_GetRoomsForUserWithStreamOrdering",
-    ("room_id", "stream_ordering",)
+    "_GetRoomsForUserWithStreamOrdering", ("room_id", "stream_ordering")
 )
 
 
 # We store this using a namedtuple so that we save about 3x space over using a
 # dict.
-ProfileInfo = namedtuple(
-    "ProfileInfo", ("avatar_url", "display_name")
-)
+ProfileInfo = namedtuple("ProfileInfo", ("avatar_url", "display_name"))
 
 # "members" points to a truncated list of (user_id, event_id) tuples for users of
 # a given membership type, suitable for use in calculating heroes for a room.
 # "count" points to the total numberr of users of a given membership type.
-MemberSummary = namedtuple(
-    "MemberSummary", ("members", "count")
-)
+MemberSummary = namedtuple("MemberSummary", ("members", "count"))
 
 _MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update"
 
@@ -67,7 +61,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         """Returns the set of all hosts currently in the room
         """
         user_ids = yield self.get_users_in_room(
-            room_id, on_invalidate=cache_context.invalidate,
+            room_id, on_invalidate=cache_context.invalidate
         )
         hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids)
         defer.returnValue(hosts)
@@ -84,8 +78,9 @@ class RoomMemberWorkerStore(EventsWorkerStore):
                 " WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?"
             )
 
-            txn.execute(sql, (room_id, Membership.JOIN,))
+            txn.execute(sql, (room_id, Membership.JOIN))
             return [to_ascii(r[0]) for r in txn]
+
         return self.runInteraction("get_users_in_room", f)
 
     @cached(max_entries=100000)
@@ -156,9 +151,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             A deferred list of RoomsForUser.
         """
 
-        return self.get_rooms_for_user_where_membership_is(
-            user_id, [Membership.INVITE]
-        )
+        return self.get_rooms_for_user_where_membership_is(user_id, [Membership.INVITE])
 
     @defer.inlineCallbacks
     def get_invite_for_user_in_room(self, user_id, room_id):
@@ -196,11 +189,13 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         return self.runInteraction(
             "get_rooms_for_user_where_membership_is",
             self._get_rooms_for_user_where_membership_is_txn,
-            user_id, membership_list
+            user_id,
+            membership_list,
         )
 
-    def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id,
-                                                    membership_list):
+    def _get_rooms_for_user_where_membership_is_txn(
+        self, txn, user_id, membership_list
+    ):
 
         do_invite = Membership.INVITE in membership_list
         membership_list = [m for m in membership_list if m != Membership.INVITE]
@@ -227,9 +222,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             ) % (where_clause,)
 
             txn.execute(sql, args)
-            results = [
-                RoomsForUser(**r) for r in self.cursor_to_dict(txn)
-            ]
+            results = [RoomsForUser(**r) for r in self.cursor_to_dict(txn)]
 
         if do_invite:
             sql = (
@@ -241,13 +234,16 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             )
 
             txn.execute(sql, (user_id,))
-            results.extend(RoomsForUser(
-                room_id=r["room_id"],
-                sender=r["inviter"],
-                event_id=r["event_id"],
-                stream_ordering=r["stream_ordering"],
-                membership=Membership.INVITE,
-            ) for r in self.cursor_to_dict(txn))
+            results.extend(
+                RoomsForUser(
+                    room_id=r["room_id"],
+                    sender=r["inviter"],
+                    event_id=r["event_id"],
+                    stream_ordering=r["stream_ordering"],
+                    membership=Membership.INVITE,
+                )
+                for r in self.cursor_to_dict(txn)
+            )
 
         return results
 
@@ -264,19 +260,21 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             of the most recent join for that user and room.
         """
         rooms = yield self.get_rooms_for_user_where_membership_is(
-            user_id, membership_list=[Membership.JOIN],
+            user_id, membership_list=[Membership.JOIN]
+        )
+        defer.returnValue(
+            frozenset(
+                GetRoomsForUserWithStreamOrdering(r.room_id, r.stream_ordering)
+                for r in rooms
+            )
         )
-        defer.returnValue(frozenset(
-            GetRoomsForUserWithStreamOrdering(r.room_id, r.stream_ordering)
-            for r in rooms
-        ))
 
     @defer.inlineCallbacks
     def get_rooms_for_user(self, user_id, on_invalidate=None):
         """Returns a set of room_ids the user is currently joined to
         """
         rooms = yield self.get_rooms_for_user_with_stream_ordering(
-            user_id, on_invalidate=on_invalidate,
+            user_id, on_invalidate=on_invalidate
         )
         defer.returnValue(frozenset(r.room_id for r in rooms))
 
@@ -285,13 +283,13 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         """Returns the set of users who share a room with `user_id`
         """
         room_ids = yield self.get_rooms_for_user(
-            user_id, on_invalidate=cache_context.invalidate,
+            user_id, on_invalidate=cache_context.invalidate
         )
 
         user_who_share_room = set()
         for room_id in room_ids:
             user_ids = yield self.get_users_in_room(
-                room_id, on_invalidate=cache_context.invalidate,
+                room_id, on_invalidate=cache_context.invalidate
             )
             user_who_share_room.update(user_ids)
 
@@ -309,9 +307,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
         current_state_ids = yield context.get_current_state_ids(self)
         result = yield self._get_joined_users_from_context(
-            event.room_id, state_group, current_state_ids,
-            event=event,
-            context=context,
+            event.room_id, state_group, current_state_ids, event=event, context=context
         )
         defer.returnValue(result)
 
@@ -325,13 +321,21 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             state_group = object()
 
         return self._get_joined_users_from_context(
-            room_id, state_group, state_entry.state, context=state_entry,
+            room_id, state_group, state_entry.state, context=state_entry
         )
 
-    @cachedInlineCallbacks(num_args=2, cache_context=True, iterable=True,
-                           max_entries=100000)
-    def _get_joined_users_from_context(self, room_id, state_group, current_state_ids,
-                                       cache_context, event=None, context=None):
+    @cachedInlineCallbacks(
+        num_args=2, cache_context=True, iterable=True, max_entries=100000
+    )
+    def _get_joined_users_from_context(
+        self,
+        room_id,
+        state_group,
+        current_state_ids,
+        cache_context,
+        event=None,
+        context=None,
+    ):
         # We don't use `state_group`, it's there so that we can cache based
         # on it. However, it's important that it's never None, since two current_states
         # with a state_group of None are likely to be different.
@@ -371,9 +375,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         # the hit ratio counts. After all, we don't populate the cache if we
         # miss it here
         event_map = self._get_events_from_cache(
-            member_event_ids,
-            allow_rejected=False,
-            update_metrics=False,
+            member_event_ids, allow_rejected=False, update_metrics=False
         )
 
         missing_member_event_ids = []
@@ -397,21 +399,21 @@ class RoomMemberWorkerStore(EventsWorkerStore):
                 table="room_memberships",
                 column="event_id",
                 iterable=missing_member_event_ids,
-                retcols=('user_id', 'display_name', 'avatar_url',),
-                keyvalues={
-                    "membership": Membership.JOIN,
-                },
+                retcols=('user_id', 'display_name', 'avatar_url'),
+                keyvalues={"membership": Membership.JOIN},
                 batch_size=500,
                 desc="_get_joined_users_from_context",
             )
 
-            users_in_room.update({
-                to_ascii(row["user_id"]): ProfileInfo(
-                    avatar_url=to_ascii(row["avatar_url"]),
-                    display_name=to_ascii(row["display_name"]),
-                )
-                for row in rows
-            })
+            users_in_room.update(
+                {
+                    to_ascii(row["user_id"]): ProfileInfo(
+                        avatar_url=to_ascii(row["avatar_url"]),
+                        display_name=to_ascii(row["display_name"]),
+                    )
+                    for row in rows
+                }
+            )
 
         if event is not None and event.type == EventTypes.Member:
             if event.membership == Membership.JOIN:
@@ -505,7 +507,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             state_group = object()
 
         return self._get_joined_hosts(
-            room_id, state_group, state_entry.state, state_entry=state_entry,
+            room_id, state_group, state_entry.state, state_entry=state_entry
         )
 
     @cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True)
@@ -531,6 +533,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         """Returns whether user_id has elected to discard history for room_id.
 
         Returns False if they have since re-joined."""
+
         def f(txn):
             sql = (
                 "SELECT"
@@ -547,6 +550,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             txn.execute(sql, (user_id, room_id))
             rows = txn.fetchall()
             return rows[0][0]
+
         count = yield self.runInteraction("did_forget_membership", f)
         defer.returnValue(count == 0)
 
@@ -575,13 +579,14 @@ class RoomMemberStore(RoomMemberWorkerStore):
                     "avatar_url": event.content.get("avatar_url", None),
                 }
                 for event in events
-            ]
+            ],
         )
 
         for event in events:
             txn.call_after(
                 self._membership_stream_cache.entity_has_changed,
-                event.state_key, event.internal_metadata.stream_ordering
+                event.state_key,
+                event.internal_metadata.stream_ordering,
             )
             txn.call_after(
                 self.get_invited_rooms_for_user.invalidate, (event.state_key,)
@@ -607,7 +612,7 @@ class RoomMemberStore(RoomMemberWorkerStore):
                             "inviter": event.sender,
                             "room_id": event.room_id,
                             "stream_id": event.internal_metadata.stream_ordering,
-                        }
+                        },
                     )
                 else:
                     sql = (
@@ -616,12 +621,15 @@ class RoomMemberStore(RoomMemberWorkerStore):
                         " AND replaced_by is NULL"
                     )
 
-                    txn.execute(sql, (
-                        event.internal_metadata.stream_ordering,
-                        event.event_id,
-                        event.room_id,
-                        event.state_key,
-                    ))
+                    txn.execute(
+                        sql,
+                        (
+                            event.internal_metadata.stream_ordering,
+                            event.event_id,
+                            event.room_id,
+                            event.state_key,
+                        ),
+                    )
 
     @defer.inlineCallbacks
     def locally_reject_invite(self, user_id, room_id):
@@ -632,18 +640,14 @@ class RoomMemberStore(RoomMemberWorkerStore):
         )
 
         def f(txn, stream_ordering):
-            txn.execute(sql, (
-                stream_ordering,
-                True,
-                room_id,
-                user_id,
-            ))
+            txn.execute(sql, (stream_ordering, True, room_id, user_id))
 
         with self._stream_id_gen.get_next() as stream_ordering:
             yield self.runInteraction("locally_reject_invite", f, stream_ordering)
 
     def forget(self, user_id, room_id):
         """Indicate that user_id wishes to discard history for room_id."""
+
         def f(txn):
             sql = (
                 "UPDATE"
@@ -657,9 +661,8 @@ class RoomMemberStore(RoomMemberWorkerStore):
             )
             txn.execute(sql, (user_id, room_id))
 
-            self._invalidate_cache_and_stream(
-                txn, self.did_forget, (user_id, room_id,),
-            )
+            self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
+
         return self.runInteraction("forget_membership", f)
 
     @defer.inlineCallbacks
@@ -674,7 +677,7 @@ class RoomMemberStore(RoomMemberWorkerStore):
         INSERT_CLUMP_SIZE = 1000
 
         def add_membership_profile_txn(txn):
-            sql = ("""
+            sql = """
                 SELECT stream_ordering, event_id, events.room_id, event_json.json
                 FROM events
                 INNER JOIN event_json USING (event_id)
@@ -683,7 +686,7 @@ class RoomMemberStore(RoomMemberWorkerStore):
                 AND type = 'm.room.member'
                 ORDER BY stream_ordering DESC
                 LIMIT ?
-            """)
+            """
 
             txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
 
@@ -707,16 +710,14 @@ class RoomMemberStore(RoomMemberWorkerStore):
                 avatar_url = content.get("avatar_url", None)
 
                 if display_name or avatar_url:
-                    to_update.append((
-                        display_name, avatar_url, event_id, room_id
-                    ))
+                    to_update.append((display_name, avatar_url, event_id, room_id))
 
-            to_update_sql = ("""
+            to_update_sql = """
                 UPDATE room_memberships SET display_name = ?, avatar_url = ?
                 WHERE event_id = ? AND room_id = ?
-            """)
+            """
             for index in range(0, len(to_update), INSERT_CLUMP_SIZE):
-                clump = to_update[index:index + INSERT_CLUMP_SIZE]
+                clump = to_update[index : index + INSERT_CLUMP_SIZE]
                 txn.executemany(to_update_sql, clump)
 
             progress = {
@@ -789,7 +790,7 @@ class _JoinedHostsCache(object):
                             self.hosts_to_joined_users.pop(host, None)
             else:
                 joined_users = yield self.store.get_joined_users_from_state(
-                    self.room_id, state_entry,
+                    self.room_id, state_entry
                 )
 
                 self.hosts_to_joined_users = {}