diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 6a5c33f212..36652289a4 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -92,6 +92,7 @@ from synapse.types import (
get_domain_from_id,
)
from synapse.util.async_helpers import Linearizer, concurrently_execute
+from synapse.util.iterutils import batch_iter
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_server
@@ -1741,28 +1742,17 @@ class FederationHandler(BaseHandler):
# Check if the user is already in the room or invited to the room.
user_id = event.state_key
prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
- newly_joined = True
- user_is_invited = False
+ prev_member_event = None
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
- newly_joined = prev_member_event.membership != Membership.JOIN
- user_is_invited = prev_member_event.membership == Membership.INVITE
-
- # If the member is not already in the room, and not invited, check if
- # they should be allowed access via membership in a space.
- if (
- newly_joined
- and not user_is_invited
- and not await self._event_auth_handler.can_join_without_invite(
- prev_state_ids,
- event.room_version,
- user_id,
- )
- ):
- raise AuthError(
- 403,
- "You do not belong to any of the required spaces to join this room.",
- )
+
+ # Check if the member should be allowed access via membership in a space.
+ await self._event_auth_handler.check_restricted_join_rules(
+ prev_state_ids,
+ event.room_version,
+ user_id,
+ prev_member_event,
+ )
# Persist the event.
await self._auth_and_persist_event(origin, event, context)
@@ -3258,13 +3248,15 @@ class FederationHandler(BaseHandler):
"""
instance = self.config.worker.events_shard_config.get_instance(room_id)
if instance != self._instance_name:
- result = await self._send_events(
- instance_name=instance,
- store=self.store,
- room_id=room_id,
- event_and_contexts=event_and_contexts,
- backfilled=backfilled,
- )
+ # Limit the number of events sent over federation.
+ for batch in batch_iter(event_and_contexts, 1000):
+ result = await self._send_events(
+ instance_name=instance,
+ store=self.store,
+ room_id=room_id,
+ event_and_contexts=batch,
+ backfilled=backfilled,
+ )
return result["max_stream_id"]
else:
assert self.storage.persistence
|