diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index b1dab57447..766d9849f5 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -1980,6 +1980,10 @@ class FederationEventHandler:
event, event_pos, max_stream_token, extra_users=extra_users
)
+ if event.type == EventTypes.Member and event.membership == Membership.JOIN:
+ # TODO retrieve the previous state, and exclude join -> join transitions
+ self._notifier.notify_user_joined_room(event.event_id, event.room_id)
+
def _sanity_check_event(self, ev: EventBase) -> None:
"""
Do some early sanity checks of a received event
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 85abe71ea8..bd7baef051 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -463,6 +463,7 @@ class EventCreationHandler:
)
self._events_shard_config = self.config.worker.events_shard_config
self._instance_name = hs.get_instance_name()
+ self._notifier = hs.get_notifier()
self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state
@@ -1550,6 +1551,16 @@ class EventCreationHandler:
requester, is_admin_redaction=is_admin_redaction
)
+ if event.type == EventTypes.Member and event.membership == Membership.JOIN:
+ (
+ current_membership,
+ _,
+ ) = await self.store.get_local_current_membership_for_user_in_room(
+ event.state_key, event.room_id
+ )
+ if current_membership != Membership.JOIN:
+ self._notifier.notify_user_joined_room(event.event_id, event.room_id)
+
await self._maybe_kick_guest_users(event, context)
if event.type == EventTypes.CanonicalAlias:
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index a5b9ac904e..30b4cb23df 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -94,12 +94,29 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
burst_count=hs.config.ratelimiting.rc_joins_local.burst_count,
)
+ # Tracks joins from local users to rooms this server isn't a member of.
+ # I.e. joins this server makes by requesting /make_join /send_join from
+ # another server.
self._join_rate_limiter_remote = Ratelimiter(
store=self.store,
clock=self.clock,
rate_hz=hs.config.ratelimiting.rc_joins_remote.per_second,
burst_count=hs.config.ratelimiting.rc_joins_remote.burst_count,
)
+ # TODO: find a better place to keep this Ratelimiter.
+ # It needs to be
+ # - written to by event persistence code
+ # - written to by something which can snoop on replication streams
+ # - read by the RoomMemberHandler to rate limit joins from local users
+ # - read by the FederationServer to rate limit make_joins and send_joins from
+ # other homeservers
+ # I wonder if a homeserver-wide collection of rate limiters might be cleaner?
+ self._join_rate_per_room_limiter = Ratelimiter(
+ store=self.store,
+ clock=self.clock,
+ rate_hz=hs.config.ratelimiting.rc_joins_per_room.per_second,
+ burst_count=hs.config.ratelimiting.rc_joins_per_room.burst_count,
+ )
# Ratelimiter for invites, keyed by room (across all issuers, all
# recipients).
@@ -136,6 +153,18 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
)
self.request_ratelimiter = hs.get_request_ratelimiter()
+ hs.get_notifier().add_new_join_in_room_callback(self._on_user_joined_room)
+
+ def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
+ """Notify the rate limiter that a room join has occurred.
+
+ Use this to inform the RoomMemberHandler about joins that have either
+ - taken place on another homeserver, or
+ - on another worker in this homeserver.
+ Joins actioned by this worker should use the usual `ratelimit` method, which
+ checks the limit and increments the counter in one go.
+ """
+ self._join_rate_per_room_limiter.record_action(requester=None, key=room_id)
@abc.abstractmethod
async def _remote_join(
@@ -396,6 +425,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# up blocking profile updates.
if newly_joined and ratelimit:
await self._join_rate_limiter_local.ratelimit(requester)
+ await self._join_rate_per_room_limiter.ratelimit(
+ requester, key=room_id, update=False
+ )
result_event = await self.event_creation_handler.handle_new_client_event(
requester,
@@ -867,6 +899,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
await self._join_rate_limiter_remote.ratelimit(
requester,
)
+ await self._join_rate_per_room_limiter.ratelimit(
+ requester,
+ key=room_id,
+ update=False,
+ )
inviter = await self._get_inviter(target.to_string(), room_id)
if inviter and not self.hs.is_mine(inviter):
|