summary refs log tree commit diff
path: root/synapse/handlers/room_member.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-05-22 14:21:54 +0100
committerGitHub <noreply@github.com>2020-05-22 14:21:54 +0100
commit1531b214fc57714c14046a8f66c7b5fe5ec5dcdd (patch)
treefd150f21a14dcc6f5cf3f373984478dd12d43c95 /synapse/handlers/room_member.py
parentConvert sending mail to async/await. (#7557) (diff)
downloadsynapse-1531b214fc57714c14046a8f66c7b5fe5ec5dcdd.tar.xz
Add ability to wait for replication streams (#7542)
The idea here is that if an instance persists an event via the replication HTTP API it can return before we receive that event over replication, which can lead to races where code assumes that persisting an event immediately updates various caches (e.g. current state of the room).

Most of Synapse doesn't hit such races, so we don't do the waiting automagically, instead we do so where necessary to avoid unnecessary delays. We may decide to change our minds here if it turns out there are a lot of subtle races going on.

People probably want to look at this commit by commit.
Diffstat (limited to 'synapse/handlers/room_member.py')
-rw-r--r--synapse/handlers/room_member.py65
1 files changed, 41 insertions, 24 deletions
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index e51e1c32fe..691b6705b2 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -17,7 +17,7 @@
 
 import abc
 import logging
-from typing import Dict, Iterable, List, Optional, Tuple, Union
+from typing import Dict, Iterable, List, Optional, Tuple
 
 from six.moves import http_client
 
@@ -84,7 +84,7 @@ class RoomMemberHandler(object):
         room_id: str,
         user: UserID,
         content: dict,
-    ) -> Optional[dict]:
+    ) -> Tuple[str, int]:
         """Try and join a room that this server is not in
 
         Args:
@@ -104,7 +104,7 @@ class RoomMemberHandler(object):
         room_id: str,
         target: UserID,
         content: dict,
-    ) -> dict:
+    ) -> Tuple[Optional[str], int]:
         """Attempt to reject an invite for a room this server is not in. If we
         fail to do so we locally mark the invite as rejected.
 
@@ -154,7 +154,7 @@ class RoomMemberHandler(object):
         ratelimit: bool = True,
         content: Optional[dict] = None,
         require_consent: bool = True,
-    ) -> EventBase:
+    ) -> Tuple[str, int]:
         user_id = target.to_string()
 
         if content is None:
@@ -187,9 +187,10 @@ class RoomMemberHandler(object):
         )
         if duplicate is not None:
             # Discard the new event since this membership change is a no-op.
-            return duplicate
+            _, stream_id = await self.store.get_event_ordering(duplicate.event_id)
+            return duplicate.event_id, stream_id
 
-        await self.event_creation_handler.handle_new_client_event(
+        stream_id = await self.event_creation_handler.handle_new_client_event(
             requester, event, context, extra_users=[target], ratelimit=ratelimit
         )
 
@@ -213,7 +214,7 @@ class RoomMemberHandler(object):
                 if prev_member_event.membership == Membership.JOIN:
                     await self._user_left_room(target, room_id)
 
-        return event
+        return event.event_id, stream_id
 
     async def copy_room_tags_and_direct_to_room(
         self, old_room_id, new_room_id, user_id
@@ -263,7 +264,7 @@ class RoomMemberHandler(object):
         ratelimit: bool = True,
         content: Optional[dict] = None,
         require_consent: bool = True,
-    ) -> Union[EventBase, Optional[dict]]:
+    ) -> Tuple[Optional[str], int]:
         key = (room_id,)
 
         with (await self.member_linearizer.queue(key)):
@@ -294,7 +295,7 @@ class RoomMemberHandler(object):
         ratelimit: bool = True,
         content: Optional[dict] = None,
         require_consent: bool = True,
-    ) -> Union[EventBase, Optional[dict]]:
+    ) -> Tuple[Optional[str], int]:
         content_specified = bool(content)
         if content is None:
             content = {}
@@ -398,7 +399,13 @@ class RoomMemberHandler(object):
                 same_membership = old_membership == effective_membership_state
                 same_sender = requester.user.to_string() == old_state.sender
                 if same_sender and same_membership and same_content:
-                    return old_state
+                    _, stream_id = await self.store.get_event_ordering(
+                        old_state.event_id
+                    )
+                    return (
+                        old_state.event_id,
+                        stream_id,
+                    )
 
             if old_membership in ["ban", "leave"] and action == "kick":
                 raise AuthError(403, "The target user is not in the room")
@@ -705,7 +712,7 @@ class RoomMemberHandler(object):
         requester: Requester,
         txn_id: Optional[str],
         id_access_token: Optional[str] = None,
-    ) -> None:
+    ) -> int:
         if self.config.block_non_admin_invites:
             is_requester_admin = await self.auth.is_server_admin(requester.user)
             if not is_requester_admin:
@@ -737,11 +744,11 @@ class RoomMemberHandler(object):
         )
 
         if invitee:
-            await self.update_membership(
+            _, stream_id = await self.update_membership(
                 requester, UserID.from_string(invitee), room_id, "invite", txn_id=txn_id
             )
         else:
-            await self._make_and_store_3pid_invite(
+            stream_id = await self._make_and_store_3pid_invite(
                 requester,
                 id_server,
                 medium,
@@ -752,6 +759,8 @@ class RoomMemberHandler(object):
                 id_access_token=id_access_token,
             )
 
+        return stream_id
+
     async def _make_and_store_3pid_invite(
         self,
         requester: Requester,
@@ -762,7 +771,7 @@ class RoomMemberHandler(object):
         user: UserID,
         txn_id: Optional[str],
         id_access_token: Optional[str] = None,
-    ) -> None:
+    ) -> int:
         room_state = await self.state_handler.get_current_state(room_id)
 
         inviter_display_name = ""
@@ -817,7 +826,10 @@ class RoomMemberHandler(object):
             id_access_token=id_access_token,
         )
 
-        await self.event_creation_handler.create_and_send_nonmember_event(
+        (
+            event,
+            stream_id,
+        ) = await self.event_creation_handler.create_and_send_nonmember_event(
             requester,
             {
                 "type": EventTypes.ThirdPartyInvite,
@@ -835,6 +847,7 @@ class RoomMemberHandler(object):
             ratelimit=False,
             txn_id=txn_id,
         )
+        return stream_id
 
     async def _is_host_in_room(
         self, current_state_ids: Dict[Tuple[str, str], str]
@@ -916,7 +929,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
         room_id: str,
         user: UserID,
         content: dict,
-    ) -> None:
+    ) -> Tuple[str, int]:
         """Implements RoomMemberHandler._remote_join
         """
         # filter ourselves out of remote_room_hosts: do_invite_join ignores it
@@ -945,7 +958,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
         # join dance for now, since we're kinda implicitly checking
         # that we are allowed to join when we decide whether or not we
         # need to do the invite/join dance.
-        await self.federation_handler.do_invite_join(
+        event_id, stream_id = await self.federation_handler.do_invite_join(
             remote_room_hosts, room_id, user.to_string(), content
         )
         await self._user_joined_room(user, room_id)
@@ -955,14 +968,14 @@ class RoomMemberMasterHandler(RoomMemberHandler):
         if self.hs.config.limit_remote_rooms.enabled:
             if too_complex is False:
                 # We checked, and we're under the limit.
-                return
+                return event_id, stream_id
 
             # Check again, but with the local state events
             too_complex = await self._is_local_room_too_complex(room_id)
 
             if too_complex is False:
                 # We're under the limit.
-                return
+                return event_id, stream_id
 
             # The room is too large. Leave.
             requester = types.create_requester(user, None, False, None)
@@ -975,6 +988,8 @@ class RoomMemberMasterHandler(RoomMemberHandler):
                 errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
             )
 
+        return event_id, stream_id
+
     async def _remote_reject_invite(
         self,
         requester: Requester,
@@ -982,15 +997,15 @@ class RoomMemberMasterHandler(RoomMemberHandler):
         room_id: str,
         target: UserID,
         content: dict,
-    ) -> dict:
+    ) -> Tuple[Optional[str], int]:
         """Implements RoomMemberHandler._remote_reject_invite
         """
         fed_handler = self.federation_handler
         try:
-            ret = await fed_handler.do_remotely_reject_invite(
+            event, stream_id = await fed_handler.do_remotely_reject_invite(
                 remote_room_hosts, room_id, target.to_string(), content=content,
             )
-            return ret
+            return event.event_id, stream_id
         except Exception as e:
             # if we were unable to reject the exception, just mark
             # it as rejected on our end and plough ahead.
@@ -1000,8 +1015,10 @@ class RoomMemberMasterHandler(RoomMemberHandler):
             #
             logger.warning("Failed to reject invite: %s", e)
 
-            await self.store.locally_reject_invite(target.to_string(), room_id)
-            return {}
+            stream_id = await self.store.locally_reject_invite(
+                target.to_string(), room_id
+            )
+            return None, stream_id
 
     async def _user_joined_room(self, target: UserID, room_id: str) -> None:
         """Implements RoomMemberHandler._user_joined_room