diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index b05e32f457..fdce54c5c3 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -39,10 +39,6 @@ class EventStreamHandler(BaseHandler):
def __init__(self, hs: "HomeServer"):
super(EventStreamHandler, self).__init__(hs)
- self.distributor = hs.get_distributor()
- self.distributor.declare("started_user_eventstream")
- self.distributor.declare("stopped_user_eventstream")
-
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 74d7ac8a67..be9b0701a0 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -69,7 +69,6 @@ from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet,
ReplicationStoreRoomOnInviteRestServlet,
)
-from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
@@ -80,7 +79,6 @@ from synapse.types import (
get_domain_from_id,
)
from synapse.util.async_helpers import Linearizer, concurrently_execute
-from synapse.util.distributor import user_joined_room
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_server
@@ -141,9 +139,6 @@ class FederationHandler(BaseHandler):
self._replication = hs.get_replication_data_handler()
self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
- self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client(
- hs
- )
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
hs
)
@@ -704,31 +699,10 @@ class FederationHandler(BaseHandler):
logger.debug("[%s %s] Processing event: %s", room_id, event_id, event)
try:
- context = await self._handle_new_event(origin, event, state=state)
+ await self._handle_new_event(origin, event, state=state)
except AuthError as e:
raise FederationError("ERROR", e.code, e.msg, affected=event.event_id)
- if event.type == EventTypes.Member:
- if event.membership == Membership.JOIN:
- # Only fire user_joined_room if the user has acutally
- # joined the room. Don't bother if the user is just
- # changing their profile info.
- newly_joined = True
-
- prev_state_ids = await context.get_prev_state_ids()
-
- prev_state_id = prev_state_ids.get((event.type, event.state_key))
- if prev_state_id:
- prev_state = await self.store.get_event(
- prev_state_id, allow_none=True
- )
- if prev_state and prev_state.membership == Membership.JOIN:
- newly_joined = False
-
- if newly_joined:
- user = UserID.from_string(event.state_key)
- await self.user_joined_room(user, room_id)
-
# For encrypted messages we check that we know about the sending device,
# if we don't then we mark the device cache for that user as stale.
if event.type == EventTypes.Encrypted:
@@ -1550,11 +1524,6 @@ class FederationHandler(BaseHandler):
event.signatures,
)
- if event.type == EventTypes.Member:
- if event.content["membership"] == Membership.JOIN:
- user = UserID.from_string(event.state_key)
- await self.user_joined_room(user, event.room_id)
-
prev_state_ids = await context.get_prev_state_ids()
state_ids = list(prev_state_ids.values())
@@ -2984,16 +2953,6 @@ class FederationHandler(BaseHandler):
else:
await self.store.clean_room_for_join(room_id)
- async def user_joined_room(self, user: UserID, room_id: str) -> None:
- """Called when a new user has joined the room
- """
- if self.config.worker_app:
- await self._notify_user_membership_change(
- room_id=room_id, user_id=user.to_string(), change="joined"
- )
- else:
- user_joined_room(self.distributor, user, room_id)
-
async def get_room_complexity(
self, remote_room_hosts: List[str], room_id: str
) -> Optional[dict]:
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 32b7e323fa..100f335b80 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -40,7 +40,7 @@ from synapse.events.validator import EventValidator
from synapse.storage.roommember import RoomsForUser
from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID
from synapse.util.async_helpers import Linearizer
-from synapse.util.distributor import user_joined_room, user_left_room
+from synapse.util.distributor import user_left_room
from ._base import BaseHandler
@@ -149,17 +149,6 @@ class RoomMemberHandler:
raise NotImplementedError()
@abc.abstractmethod
- async def _user_joined_room(self, target: UserID, room_id: str) -> None:
- """Notifies distributor on master process that the user has joined the
- room.
-
- Args:
- target
- room_id
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Notifies distributor on master process that the user has left the
room.
@@ -221,7 +210,6 @@ class RoomMemberHandler:
prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
- newly_joined = False
if event.membership == Membership.JOIN:
newly_joined = True
if prev_member_event_id:
@@ -246,12 +234,7 @@ class RoomMemberHandler:
requester, event, context, extra_users=[target], ratelimit=ratelimit,
)
- if event.membership == Membership.JOIN and newly_joined:
- # Only fire user_joined_room if the user has actually joined the
- # room. Don't bother if the user is just changing their profile
- # info.
- await self._user_joined_room(target, room_id)
- elif event.membership == Membership.LEAVE:
+ if event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
if prev_member_event.membership == Membership.JOIN:
@@ -726,17 +709,7 @@ class RoomMemberHandler:
(EventTypes.Member, event.state_key), None
)
- if event.membership == Membership.JOIN:
- # Only fire user_joined_room if the user has actually joined the
- # room. Don't bother if the user is just changing their profile
- # info.
- newly_joined = True
- if prev_member_event_id:
- prev_member_event = await self.store.get_event(prev_member_event_id)
- newly_joined = prev_member_event.membership != Membership.JOIN
- if newly_joined:
- await self._user_joined_room(target_user, room_id)
- elif event.membership == Membership.LEAVE:
+ if event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
if prev_member_event.membership == Membership.JOIN:
@@ -1002,10 +975,9 @@ class RoomMemberHandler:
class RoomMemberMasterHandler(RoomMemberHandler):
def __init__(self, hs):
- super(RoomMemberMasterHandler, self).__init__(hs)
+ super().__init__(hs)
self.distributor = hs.get_distributor()
- self.distributor.declare("user_joined_room")
self.distributor.declare("user_left_room")
async def _is_remote_room_too_complex(
@@ -1085,7 +1057,6 @@ class RoomMemberMasterHandler(RoomMemberHandler):
event_id, stream_id = await self.federation_handler.do_invite_join(
remote_room_hosts, room_id, user.to_string(), content
)
- await self._user_joined_room(user, room_id)
# Check the room we just joined wasn't too large, if we didn't fetch the
# complexity of it before.
@@ -1228,11 +1199,6 @@ class RoomMemberMasterHandler(RoomMemberHandler):
)
return event.event_id, stream_id
- async def _user_joined_room(self, target: UserID, room_id: str) -> None:
- """Implements RoomMemberHandler._user_joined_room
- """
- user_joined_room(self.distributor, target, room_id)
-
async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_left_room
"""
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 897338fd54..e7f34737c6 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -57,8 +57,6 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
content=content,
)
- await self._user_joined_room(user, room_id)
-
return ret["event_id"], ret["stream_id"]
async def remote_reject_invite(
@@ -81,13 +79,6 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
)
return ret["event_id"], ret["stream_id"]
- async def _user_joined_room(self, target: UserID, room_id: str) -> None:
- """Implements RoomMemberHandler._user_joined_room
- """
- await self._notify_change_client(
- user_id=target.to_string(), room_id=room_id, change="joined"
- )
-
async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_left_room
"""
|