diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 43f2986f89..8008fb8a51 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -69,7 +69,6 @@ from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet,
ReplicationStoreRoomOnInviteRestServlet,
)
-from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
@@ -80,7 +79,6 @@ from synapse.types import (
get_domain_from_id,
)
from synapse.util.async_helpers import Linearizer, concurrently_execute
-from synapse.util.distributor import user_joined_room
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_server
@@ -141,9 +139,6 @@ class FederationHandler(BaseHandler):
self._replication = hs.get_replication_data_handler()
self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
- self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client(
- hs
- )
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
hs
)
@@ -182,7 +177,7 @@ class FederationHandler(BaseHandler):
room_id = pdu.room_id
event_id = pdu.event_id
- logger.info("handling received PDU: %s", pdu)
+ logger.info("[%s %s] handling received PDU: %s", room_id, event_id, pdu)
# We reprocess pdus when we have seen them only as outliers
existing = await self.store.get_event(
@@ -297,6 +292,14 @@ class FederationHandler(BaseHandler):
room_id,
event_id,
)
+ elif missing_prevs:
+ logger.info(
+ "[%s %s] Not recursively fetching %d missing prev_events: %s",
+ room_id,
+ event_id,
+ len(missing_prevs),
+ shortstr(missing_prevs),
+ )
if prevs - seen:
# We've still not been able to get all of the prev_events for this event.
@@ -341,12 +344,6 @@ class FederationHandler(BaseHandler):
affected=pdu.event_id,
)
- logger.info(
- "Event %s is missing prev_events: calculating state for a "
- "backwards extremity",
- event_id,
- )
-
# Calculate the state after each of the previous events, and
# resolve them to find the correct state at the current event.
event_map = {event_id: pdu}
@@ -364,7 +361,10 @@ class FederationHandler(BaseHandler):
# know about
for p in prevs - seen:
logger.info(
- "Requesting state at missing prev_event %s", event_id,
+ "[%s %s] Requesting state at missing prev_event %s",
+ room_id,
+ event_id,
+ p,
)
with nested_logging_context(p):
@@ -399,9 +399,7 @@ class FederationHandler(BaseHandler):
# First though we need to fetch all the events that are in
# state_map, so we can build up the state below.
evs = await self.store.get_events(
- list(state_map.values()),
- get_prev_content=False,
- redact_behaviour=EventRedactBehaviour.AS_IS,
+ list(state_map.values()), get_prev_content=False,
)
event_map.update(evs)
@@ -704,31 +702,10 @@ class FederationHandler(BaseHandler):
logger.debug("[%s %s] Processing event: %s", room_id, event_id, event)
try:
- context = await self._handle_new_event(origin, event, state=state)
+ await self._handle_new_event(origin, event, state=state)
except AuthError as e:
raise FederationError("ERROR", e.code, e.msg, affected=event.event_id)
- if event.type == EventTypes.Member:
- if event.membership == Membership.JOIN:
- # Only fire user_joined_room if the user has acutally
- # joined the room. Don't bother if the user is just
- # changing their profile info.
- newly_joined = True
-
- prev_state_ids = await context.get_prev_state_ids()
-
- prev_state_id = prev_state_ids.get((event.type, event.state_key))
- if prev_state_id:
- prev_state = await self.store.get_event(
- prev_state_id, allow_none=True
- )
- if prev_state and prev_state.membership == Membership.JOIN:
- newly_joined = False
-
- if newly_joined:
- user = UserID.from_string(event.state_key)
- await self.user_joined_room(user, room_id)
-
# For encrypted messages we check that we know about the sending device,
# if we don't then we mark the device cache for that user as stale.
if event.type == EventTypes.Encrypted:
@@ -1550,11 +1527,6 @@ class FederationHandler(BaseHandler):
event.signatures,
)
- if event.type == EventTypes.Member:
- if event.content["membership"] == Membership.JOIN:
- user = UserID.from_string(event.state_key)
- await self.user_joined_room(user, event.room_id)
-
prev_state_ids = await context.get_prev_state_ids()
state_ids = list(prev_state_ids.values())
@@ -1581,8 +1553,15 @@ class FederationHandler(BaseHandler):
if self.hs.config.block_non_admin_invites:
raise SynapseError(403, "This server does not accept room invites")
+ is_published = await self.store.is_room_published(event.room_id)
+
if not self.spam_checker.user_may_invite(
- event.sender, event.state_key, event.room_id
+ event.sender,
+ event.state_key,
+ None,
+ room_id=event.room_id,
+ new_room=False,
+ published_room=is_published,
):
raise SynapseError(
403, "This user is not permitted to send invites to this server/user"
@@ -2970,7 +2949,7 @@ class FederationHandler(BaseHandler):
event, event_stream_id, max_stream_id, extra_users=extra_users
)
- await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
+ await self.pusher_pool.on_new_notifications(max_stream_id)
async def _clean_room_for_join(self, room_id: str) -> None:
"""Called to clean up any data in DB for a given room, ready for the
@@ -2984,16 +2963,6 @@ class FederationHandler(BaseHandler):
else:
await self.store.clean_room_for_join(room_id)
- async def user_joined_room(self, user: UserID, room_id: str) -> None:
- """Called when a new user has joined the room
- """
- if self.config.worker_app:
- await self._notify_user_membership_change(
- room_id=room_id, user_id=user.to_string(), change="joined"
- )
- else:
- user_joined_room(self.distributor, user, room_id)
-
async def get_room_complexity(
self, remote_room_hosts: List[str], room_id: str
) -> Optional[dict]:
|