diff options
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r-- | synapse/handlers/federation.py | 137 |
1 files changed, 100 insertions, 37 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index adafd06b24..d95e0b23b1 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -26,7 +26,7 @@ from synapse.api.errors import ( from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.events.validator import EventValidator from synapse.util import unwrapFirstError -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import PreserveLoggingContext, preserve_fn from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor from synapse.util.frozenutils import unfreeze @@ -40,6 +40,7 @@ from synapse.events.utils import prune_event from synapse.util.retryutils import NotRetryingDestination from synapse.push.action_generator import ActionGenerator +from synapse.util.distributor import user_joined_room from twisted.internet import defer @@ -49,10 +50,6 @@ import logging logger = logging.getLogger(__name__) -def user_joined_room(distributor, user, room_id): - return distributor.fire("user_joined_room", user, room_id) - - class FederationHandler(BaseHandler): """Handles events that originated from federation. Responsible for: @@ -283,7 +280,14 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def backfill(self, dest, room_id, limit, extremities=[]): """ Trigger a backfill request to `dest` for the given `room_id` + + This will attempt to get more events from the remote. This may return + be successfull and still return no events if the other side has no new + events to offer. """ + if dest == self.server_name: + raise SynapseError(400, "Can't backfill from self.") + if not extremities: extremities = yield self.store.get_oldest_events_in_room(room_id) @@ -294,6 +298,16 @@ class FederationHandler(BaseHandler): extremities=extremities, ) + # Don't bother processing events we already have. + seen_events = yield self.store.have_events_in_timeline( + set(e.event_id for e in events) + ) + + events = [e for e in events if e.event_id not in seen_events] + + if not events: + defer.returnValue([]) + event_map = {e.event_id: e for e in events} event_ids = set(e.event_id for e in events) @@ -353,6 +367,7 @@ class FederationHandler(BaseHandler): for a in auth_events.values(): if a.event_id in seen_events: continue + a.internal_metadata.outlier = True ev_infos.append({ "event": a, "auth_events": { @@ -373,20 +388,23 @@ class FederationHandler(BaseHandler): } }) + yield self._handle_new_events( + dest, ev_infos, + backfilled=True, + ) + events.sort(key=lambda e: e.depth) for event in events: if event in events_to_state: continue - ev_infos.append({ - "event": event, - }) - - yield self._handle_new_events( - dest, ev_infos, - backfilled=True, - ) + # We store these one at a time since each event depends on the + # previous to work out the state. + # TODO: We can probably do something more clever here. + yield self._handle_new_event( + dest, event + ) defer.returnValue(events) @@ -450,7 +468,7 @@ class FederationHandler(BaseHandler): likely_domains = [ domain for domain, depth in curr_domains - if domain is not self.server_name + if domain != self.server_name ] @defer.inlineCallbacks @@ -458,11 +476,15 @@ class FederationHandler(BaseHandler): # TODO: Should we try multiple of these at a time? for dom in domains: try: - events = yield self.backfill( + yield self.backfill( dom, room_id, limit=100, extremities=[e for e in extremities.keys()] ) + # If this succeeded then we probably already have the + # appropriate stuff. + # TODO: We can probably do something more intelligent here. + defer.returnValue(True) except SynapseError as e: logger.info( "Failed to backfill from %s because %s", @@ -488,8 +510,6 @@ class FederationHandler(BaseHandler): ) continue - if events: - defer.returnValue(True) defer.returnValue(False) success = yield try_backfill(likely_domains) @@ -661,9 +681,13 @@ class FederationHandler(BaseHandler): "state_key": user_id, }) - event, context = yield self._create_new_client_event( - builder=builder, - ) + try: + event, context = yield self._create_new_client_event( + builder=builder, + ) + except AuthError as e: + logger.warn("Failed to create join %r because %s", event, e) + raise e self.auth.check(event, auth_events=context.current_state) @@ -784,13 +808,19 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def do_remotely_reject_invite(self, target_hosts, room_id, user_id): - origin, event = yield self._make_and_verify_event( - target_hosts, - room_id, - user_id, - "leave" - ) - signed_event = self._sign_event(event) + try: + origin, event = yield self._make_and_verify_event( + target_hosts, + room_id, + user_id, + "leave" + ) + signed_event = self._sign_event(event) + except SynapseError: + raise + except CodeMessageException as e: + logger.warn("Failed to reject invite: %s", e) + raise SynapseError(500, "Failed to reject invite") # Try the host we successfully got a response to /make_join/ # request first. @@ -800,10 +830,16 @@ class FederationHandler(BaseHandler): except ValueError: pass - yield self.replication_layer.send_leave( - target_hosts, - signed_event - ) + try: + yield self.replication_layer.send_leave( + target_hosts, + signed_event + ) + except SynapseError: + raise + except CodeMessageException as e: + logger.warn("Failed to reject invite: %s", e) + raise SynapseError(500, "Failed to reject invite") context = yield self.state_handler.compute_event_context(event) @@ -883,7 +919,11 @@ class FederationHandler(BaseHandler): builder=builder, ) - self.auth.check(event, auth_events=context.current_state) + try: + self.auth.check(event, auth_events=context.current_state) + except AuthError as e: + logger.warn("Failed to create new leave %r because %s", event, e) + raise e defer.returnValue(event) @@ -1064,7 +1104,8 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def _handle_new_event(self, origin, event, state=None, auth_events=None): + def _handle_new_event(self, origin, event, state=None, auth_events=None, + backfilled=False): context = yield self._prep_event( origin, event, state=state, @@ -1080,12 +1121,24 @@ class FederationHandler(BaseHandler): event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, + backfilled=backfilled, + ) + + # this intentionally does not yield: we don't care about the result + # and don't need to wait for it. + preserve_fn(self.hs.get_pusherpool().on_new_notifications)( + event_stream_id, max_stream_id ) defer.returnValue((context, event_stream_id, max_stream_id)) @defer.inlineCallbacks def _handle_new_events(self, origin, event_infos, backfilled=False): + """Creates the appropriate contexts and persists events. The events + should not depend on one another, e.g. this should be used to persist + a bunch of outliers, but not a chunk of individual events that depend + on each other for state calculations. + """ contexts = yield defer.gatherResults( [ self._prep_event( @@ -1467,8 +1520,9 @@ class FederationHandler(BaseHandler): try: self.auth.check(event, auth_events=auth_events) - except AuthError: - raise + except AuthError as e: + logger.warn("Failed auth resolution for %r because %s", event, e) + raise e @defer.inlineCallbacks def construct_auth_difference(self, local_auth, remote_auth): @@ -1644,7 +1698,12 @@ class FederationHandler(BaseHandler): event_dict, event, context ) - self.auth.check(event, context.current_state) + try: + self.auth.check(event, context.current_state) + except AuthError as e: + logger.warn("Denying new third party invite %r because %s", event, e) + raise e + yield self._check_signature(event, auth_events=context.current_state) member_handler = self.hs.get_handlers().room_member_handler yield member_handler.send_membership_event(None, event, context) @@ -1669,7 +1728,11 @@ class FederationHandler(BaseHandler): event_dict, event, context ) - self.auth.check(event, auth_events=context.current_state) + try: + self.auth.check(event, auth_events=context.current_state) + except AuthError as e: + logger.warn("Denying third party invite %r because %s", event, e) + raise e yield self._check_signature(event, auth_events=context.current_state) returned_invite = yield self.send_invite(origin, event) |