diff options
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r-- | synapse/handlers/federation.py | 94 |
1 files changed, 33 insertions, 61 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 014dab2940..ea9264e751 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -69,7 +69,6 @@ from synapse.replication.http.federation import ( ReplicationFederationSendEventsRestServlet, ReplicationStoreRoomOnInviteRestServlet, ) -from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet from synapse.state import StateResolutionStore, resolve_events_with_store from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.types import ( @@ -80,7 +79,6 @@ from synapse.types import ( get_domain_from_id, ) from synapse.util.async_helpers import Linearizer, concurrently_execute -from synapse.util.distributor import user_joined_room from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import shortstr from synapse.visibility import filter_events_for_server @@ -88,7 +86,7 @@ from synapse.visibility import filter_events_for_server logger = logging.getLogger(__name__) -@attr.s +@attr.s(slots=True) class _NewEventInfo: """Holds information about a received event, ready for passing to _handle_new_events @@ -117,7 +115,7 @@ class FederationHandler(BaseHandler): """ def __init__(self, hs): - super(FederationHandler, self).__init__(hs) + super().__init__(hs) self.hs = hs @@ -130,7 +128,6 @@ class FederationHandler(BaseHandler): self.keyring = hs.get_keyring() self.action_generator = hs.get_action_generator() self.is_mine_id = hs.is_mine_id - self.pusher_pool = hs.get_pusherpool() self.spam_checker = hs.get_spam_checker() self.event_creation_handler = hs.get_event_creation_handler() self._message_handler = hs.get_message_handler() @@ -141,9 +138,6 @@ class FederationHandler(BaseHandler): self._replication = hs.get_replication_data_handler() self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs) - self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client( - hs - ) self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client( hs ) @@ -704,31 +698,10 @@ class FederationHandler(BaseHandler): logger.debug("[%s %s] Processing event: %s", room_id, event_id, event) try: - context = await self._handle_new_event(origin, event, state=state) + await self._handle_new_event(origin, event, state=state) except AuthError as e: raise FederationError("ERROR", e.code, e.msg, affected=event.event_id) - if event.type == EventTypes.Member: - if event.membership == Membership.JOIN: - # Only fire user_joined_room if the user has acutally - # joined the room. Don't bother if the user is just - # changing their profile info. - newly_joined = True - - prev_state_ids = await context.get_prev_state_ids() - - prev_state_id = prev_state_ids.get((event.type, event.state_key)) - if prev_state_id: - prev_state = await self.store.get_event( - prev_state_id, allow_none=True - ) - if prev_state and prev_state.membership == Membership.JOIN: - newly_joined = False - - if newly_joined: - user = UserID.from_string(event.state_key) - await self.user_joined_room(user, room_id) - # For encrypted messages we check that we know about the sending device, # if we don't then we mark the device cache for that user as stale. if event.type == EventTypes.Encrypted: @@ -923,7 +896,8 @@ class FederationHandler(BaseHandler): ) ) - await self._handle_new_events(dest, ev_infos, backfilled=True) + if ev_infos: + await self._handle_new_events(dest, room_id, ev_infos, backfilled=True) # Step 2: Persist the rest of the events in the chunk one by one events.sort(key=lambda e: e.depth) @@ -1265,7 +1239,7 @@ class FederationHandler(BaseHandler): event_infos.append(_NewEventInfo(event, None, auth)) await self._handle_new_events( - destination, event_infos, + destination, room_id, event_infos, ) def _sanity_check_event(self, ev): @@ -1412,15 +1386,15 @@ class FederationHandler(BaseHandler): ) max_stream_id = await self._persist_auth_tree( - origin, auth_chain, state, event, room_version_obj + origin, room_id, auth_chain, state, event, room_version_obj ) # We wait here until this instance has seen the events come down # replication (if we're using replication) as the below uses caches. - # - # TODO: Currently the events stream is written to from master await self._replication.wait_for_stream_position( - self.config.worker.writers.events, "events", max_stream_id + self.config.worker.events_shard_config.get_instance(room_id), + "events", + max_stream_id, ) # Check whether this room is the result of an upgrade of a room we already know @@ -1599,11 +1573,6 @@ class FederationHandler(BaseHandler): event.signatures, ) - if event.type == EventTypes.Member: - if event.content["membership"] == Membership.JOIN: - user = UserID.from_string(event.state_key) - await self.user_joined_room(user, event.room_id) - prev_state_ids = await context.get_prev_state_ids() state_ids = list(prev_state_ids.values()) @@ -1674,7 +1643,7 @@ class FederationHandler(BaseHandler): ) context = await self.state_handler.compute_event_context(event) - await self.persist_events_and_notify([(event, context)]) + await self.persist_events_and_notify(event.room_id, [(event, context)]) return event @@ -1701,7 +1670,9 @@ class FederationHandler(BaseHandler): await self.federation_client.send_leave(host_list, event) context = await self.state_handler.compute_event_context(event) - stream_id = await self.persist_events_and_notify([(event, context)]) + stream_id = await self.persist_events_and_notify( + event.room_id, [(event, context)] + ) return event, stream_id @@ -1949,7 +1920,7 @@ class FederationHandler(BaseHandler): ) await self.persist_events_and_notify( - [(event, context)], backfilled=backfilled + event.room_id, [(event, context)], backfilled=backfilled ) except Exception: run_in_background( @@ -1962,6 +1933,7 @@ class FederationHandler(BaseHandler): async def _handle_new_events( self, origin: str, + room_id: str, event_infos: Iterable[_NewEventInfo], backfilled: bool = False, ) -> None: @@ -1993,6 +1965,7 @@ class FederationHandler(BaseHandler): ) await self.persist_events_and_notify( + room_id, [ (ev_info.event, context) for ev_info, context in zip(event_infos, contexts) @@ -2003,6 +1976,7 @@ class FederationHandler(BaseHandler): async def _persist_auth_tree( self, origin: str, + room_id: str, auth_events: List[EventBase], state: List[EventBase], event: EventBase, @@ -2017,6 +1991,7 @@ class FederationHandler(BaseHandler): Args: origin: Where the events came from + room_id, auth_events state event @@ -2091,17 +2066,20 @@ class FederationHandler(BaseHandler): events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR await self.persist_events_and_notify( + room_id, [ (e, events_to_context[e.event_id]) for e in itertools.chain(auth_events, state) - ] + ], ) new_event_context = await self.state_handler.compute_event_context( event, old_state=state ) - return await self.persist_events_and_notify([(event, new_event_context)]) + return await self.persist_events_and_notify( + room_id, [(event, new_event_context)] + ) async def _prep_event( self, @@ -2952,6 +2930,7 @@ class FederationHandler(BaseHandler): async def persist_events_and_notify( self, + room_id: str, event_and_contexts: Sequence[Tuple[EventBase, EventContext]], backfilled: bool = False, ) -> int: @@ -2959,14 +2938,19 @@ class FederationHandler(BaseHandler): necessary. Args: - event_and_contexts: + room_id: The room ID of events being persisted. + event_and_contexts: Sequence of events with their associated + context that should be persisted. All events must belong to + the same room. backfilled: Whether these events are a result of backfilling or not """ - if self.config.worker.writers.events != self._instance_name: + instance = self.config.worker.events_shard_config.get_instance(room_id) + if instance != self._instance_name: result = await self._send_events( - instance_name=self.config.worker.writers.events, + instance_name=instance, store=self.store, + room_id=room_id, event_and_contexts=event_and_contexts, backfilled=backfilled, ) @@ -3019,8 +3003,6 @@ class FederationHandler(BaseHandler): event, event_stream_id, max_stream_id, extra_users=extra_users ) - await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id) - async def _clean_room_for_join(self, room_id: str) -> None: """Called to clean up any data in DB for a given room, ready for the server to join the room. @@ -3033,16 +3015,6 @@ class FederationHandler(BaseHandler): else: await self.store.clean_room_for_join(room_id) - async def user_joined_room(self, user: UserID, room_id: str) -> None: - """Called when a new user has joined the room - """ - if self.config.worker_app: - await self._notify_user_membership_change( - room_id=room_id, user_id=user.to_string(), change="joined" - ) - else: - user_joined_room(self.distributor, user, room_id) - async def get_room_complexity( self, remote_room_hosts: List[str], room_id: str ) -> Optional[dict]: |