diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 619252b761..cfd6efd1fc 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -17,7 +17,7 @@
import abc
import logging
-from typing import Dict, Iterable, List, Optional, Tuple, Union
+from typing import Dict, Iterable, List, Optional, Tuple
from six.moves import http_client
@@ -26,6 +26,9 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
+from synapse.replication.http.membership import (
+ ReplicationLocallyRejectInviteRestServlet,
+)
from synapse.types import Collection, Requester, RoomAlias, RoomID, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
@@ -44,11 +47,6 @@ class RoomMemberHandler(object):
__metaclass__ = abc.ABCMeta
def __init__(self, hs):
- """
-
- Args:
- hs (synapse.server.HomeServer):
- """
self.hs = hs
self.store = hs.get_datastore()
self.auth = hs.get_auth()
@@ -72,6 +70,17 @@ class RoomMemberHandler(object):
self._enable_lookup = hs.config.enable_3pid_lookup
self.allow_per_room_profiles = self.config.allow_per_room_profiles
+ self._event_stream_writer_instance = hs.config.worker.writers.events
+ self._is_on_event_persistence_instance = (
+ self._event_stream_writer_instance == hs.get_instance_name()
+ )
+ if self._is_on_event_persistence_instance:
+ self.persist_event_storage = hs.get_storage().persistence
+ else:
+ self._locally_reject_client = ReplicationLocallyRejectInviteRestServlet.make_client(
+ hs
+ )
+
# This is only used to get at ratelimit function, and
# maybe_kick_guest_users. It's fine there are multiple of these as
# it doesn't store state.
@@ -85,7 +94,7 @@ class RoomMemberHandler(object):
room_id: str,
user: UserID,
content: dict,
- ) -> Optional[dict]:
+ ) -> Tuple[str, int]:
"""Try and join a room that this server is not in
Args:
@@ -105,7 +114,7 @@ class RoomMemberHandler(object):
room_id: str,
target: UserID,
content: dict,
- ) -> dict:
+ ) -> Tuple[Optional[str], int]:
"""Attempt to reject an invite for a room this server is not in. If we
fail to do so we locally mark the invite as rejected.
@@ -122,6 +131,22 @@ class RoomMemberHandler(object):
"""
raise NotImplementedError()
+ async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
+ """Mark the invite has having been rejected even though we failed to
+ create a leave event for it.
+ """
+ if self._is_on_event_persistence_instance:
+ return await self.persist_event_storage.locally_reject_invite(
+ user_id, room_id
+ )
+ else:
+ result = await self._locally_reject_client(
+ instance_name=self._event_stream_writer_instance,
+ user_id=user_id,
+ room_id=room_id,
+ )
+ return result["stream_id"]
+
@abc.abstractmethod
async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Notifies distributor on master process that the user has joined the
@@ -155,7 +180,7 @@ class RoomMemberHandler(object):
ratelimit: bool = True,
content: Optional[dict] = None,
require_consent: bool = True,
- ) -> EventBase:
+ ) -> Tuple[str, int]:
user_id = target.to_string()
if content is None:
@@ -188,9 +213,10 @@ class RoomMemberHandler(object):
)
if duplicate is not None:
# Discard the new event since this membership change is a no-op.
- return duplicate
+ _, stream_id = await self.store.get_event_ordering(duplicate.event_id)
+ return duplicate.event_id, stream_id
- await self.event_creation_handler.handle_new_client_event(
+ stream_id = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[target], ratelimit=ratelimit
)
@@ -214,7 +240,7 @@ class RoomMemberHandler(object):
if prev_member_event.membership == Membership.JOIN:
await self._user_left_room(target, room_id)
- return event
+ return event.event_id, stream_id
async def copy_room_tags_and_direct_to_room(
self, old_room_id, new_room_id, user_id
@@ -264,7 +290,7 @@ class RoomMemberHandler(object):
ratelimit: bool = True,
content: Optional[dict] = None,
require_consent: bool = True,
- ) -> Union[EventBase, Optional[dict]]:
+ ) -> Tuple[Optional[str], int]:
key = (room_id,)
as_id = object()
@@ -314,7 +340,7 @@ class RoomMemberHandler(object):
ratelimit: bool = True,
content: Optional[dict] = None,
require_consent: bool = True,
- ) -> Union[EventBase, Optional[dict]]:
+ ) -> Tuple[Optional[str], int]:
content_specified = bool(content)
if content is None:
content = {}
@@ -418,7 +444,13 @@ class RoomMemberHandler(object):
same_membership = old_membership == effective_membership_state
same_sender = requester.user.to_string() == old_state.sender
if same_sender and same_membership and same_content:
- return old_state
+ _, stream_id = await self.store.get_event_ordering(
+ old_state.event_id
+ )
+ return (
+ old_state.event_id,
+ stream_id,
+ )
if old_membership in ["ban", "leave"] and action == "kick":
raise AuthError(403, "The target user is not in the room")
@@ -725,7 +757,7 @@ class RoomMemberHandler(object):
requester: Requester,
txn_id: Optional[str],
id_access_token: Optional[str] = None,
- ) -> None:
+ ) -> int:
if self.config.block_non_admin_invites:
is_requester_admin = await self.auth.is_server_admin(requester.user)
if not is_requester_admin:
@@ -757,11 +789,11 @@ class RoomMemberHandler(object):
)
if invitee:
- await self.update_membership(
+ _, stream_id = await self.update_membership(
requester, UserID.from_string(invitee), room_id, "invite", txn_id=txn_id
)
else:
- await self._make_and_store_3pid_invite(
+ stream_id = await self._make_and_store_3pid_invite(
requester,
id_server,
medium,
@@ -772,6 +804,8 @@ class RoomMemberHandler(object):
id_access_token=id_access_token,
)
+ return stream_id
+
async def _make_and_store_3pid_invite(
self,
requester: Requester,
@@ -782,7 +816,7 @@ class RoomMemberHandler(object):
user: UserID,
txn_id: Optional[str],
id_access_token: Optional[str] = None,
- ) -> None:
+ ) -> int:
room_state = await self.state_handler.get_current_state(room_id)
inviter_display_name = ""
@@ -837,7 +871,10 @@ class RoomMemberHandler(object):
id_access_token=id_access_token,
)
- await self.event_creation_handler.create_and_send_nonmember_event(
+ (
+ event,
+ stream_id,
+ ) = await self.event_creation_handler.create_and_send_nonmember_event(
requester,
{
"type": EventTypes.ThirdPartyInvite,
@@ -855,6 +892,7 @@ class RoomMemberHandler(object):
ratelimit=False,
txn_id=txn_id,
)
+ return stream_id
async def _is_host_in_room(
self, current_state_ids: Dict[Tuple[str, str], str]
@@ -936,7 +974,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
room_id: str,
user: UserID,
content: dict,
- ) -> None:
+ ) -> Tuple[str, int]:
"""Implements RoomMemberHandler._remote_join
"""
# filter ourselves out of remote_room_hosts: do_invite_join ignores it
@@ -965,7 +1003,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
# join dance for now, since we're kinda implicitly checking
# that we are allowed to join when we decide whether or not we
# need to do the invite/join dance.
- await self.federation_handler.do_invite_join(
+ event_id, stream_id = await self.federation_handler.do_invite_join(
remote_room_hosts, room_id, user.to_string(), content
)
await self._user_joined_room(user, room_id)
@@ -975,14 +1013,14 @@ class RoomMemberMasterHandler(RoomMemberHandler):
if self.hs.config.limit_remote_rooms.enabled:
if too_complex is False:
# We checked, and we're under the limit.
- return
+ return event_id, stream_id
# Check again, but with the local state events
too_complex = await self._is_local_room_too_complex(room_id)
if too_complex is False:
# We're under the limit.
- return
+ return event_id, stream_id
# The room is too large. Leave.
requester = types.create_requester(user, None, False, None)
@@ -995,6 +1033,8 @@ class RoomMemberMasterHandler(RoomMemberHandler):
errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
)
+ return event_id, stream_id
+
async def _remote_reject_invite(
self,
requester: Requester,
@@ -1002,15 +1042,15 @@ class RoomMemberMasterHandler(RoomMemberHandler):
room_id: str,
target: UserID,
content: dict,
- ) -> dict:
+ ) -> Tuple[Optional[str], int]:
"""Implements RoomMemberHandler._remote_reject_invite
"""
fed_handler = self.federation_handler
try:
- ret = await fed_handler.do_remotely_reject_invite(
+ event, stream_id = await fed_handler.do_remotely_reject_invite(
remote_room_hosts, room_id, target.to_string(), content=content,
)
- return ret
+ return event.event_id, stream_id
except Exception as e:
# if we were unable to reject the exception, just mark
# it as rejected on our end and plough ahead.
@@ -1020,8 +1060,8 @@ class RoomMemberMasterHandler(RoomMemberHandler):
#
logger.warning("Failed to reject invite: %s", e)
- await self.store.locally_reject_invite(target.to_string(), room_id)
- return {}
+ stream_id = await self.locally_reject_invite(target.to_string(), room_id)
+ return None, stream_id
async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_joined_room
|