diff options
author | Kegan Dougal <kegan@matrix.org> | 2015-02-05 14:28:03 +0000 |
---|---|---|
committer | Kegan Dougal <kegan@matrix.org> | 2015-02-05 14:28:03 +0000 |
commit | 951690e54d5673431abefafc30c94af6e11341e3 (patch) | |
tree | 779471113f16dfbf7e67504731dd412101eee180 /synapse/handlers | |
parent | Fix user query checks. HS>AS pushing now works. (diff) | |
parent | Merge branch 'federation_client_retries' of github.com:matrix-org/synapse int... (diff) | |
download | synapse-951690e54d5673431abefafc30c94af6e11341e3.tar.xz |
Merge branch 'develop' into application-services
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/directory.py | 11 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 193 | ||||
-rw-r--r-- | synapse/handlers/message.py | 17 | ||||
-rw-r--r-- | synapse/handlers/room.py | 12 |
4 files changed, 143 insertions, 90 deletions
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 000bf5793c..24ea3573d3 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -120,7 +120,16 @@ class DirectoryHandler(BaseHandler): ) extra_servers = yield self.store.get_joined_hosts_for_room(room_id) - servers = list(set(extra_servers) | set(servers)) + servers = set(extra_servers) | set(servers) + + # If this server is in the list of servers, return it first. + if self.server_name in servers: + servers = ( + [self.server_name] + + [s for s in servers if s != self.server_name] + ) + else: + servers = list(servers) defer.returnValue({ "room_id": room_id, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8bf5a4cc11..aba266c2bc 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -30,6 +30,7 @@ from synapse.types import UserID from twisted.internet import defer +import itertools import logging @@ -123,8 +124,21 @@ class FederationHandler(BaseHandler): logger.debug("Got event for room we're not in.") current_state = state + event_ids = set() + if state: + event_ids |= {e.event_id for e in state} + if auth_chain: + event_ids |= {e.event_id for e in auth_chain} + + seen_ids = (yield self.store.have_events(event_ids)).keys() + if state and auth_chain is not None: - for e in state: + # If we have any state or auth_chain given to us by the replication + # layer, then we should handle them (if we haven't before.) + for e in itertools.chain(auth_chain, state): + if e.event_id in seen_ids: + continue + e.internal_metadata.outlier = True try: auth_ids = [e_id for e_id, _ in e.auth_events] @@ -132,7 +146,10 @@ class FederationHandler(BaseHandler): (e.type, e.state_key): e for e in auth_chain if e.event_id in auth_ids } - yield self._handle_new_event(origin, e, auth_events=auth) + yield self._handle_new_event( + origin, e, auth_events=auth + ) + seen_ids.add(e.event_id) except: logger.exception( "Failed to handle state event %s", @@ -256,7 +273,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def do_invite_join(self, target_host, room_id, joinee, content, snapshot): + def do_invite_join(self, target_hosts, room_id, joinee, content, snapshot): """ Attempts to join the `joinee` to the room `room_id` via the server `target_host`. @@ -270,8 +287,8 @@ class FederationHandler(BaseHandler): """ logger.debug("Joining %s to %s", joinee, room_id) - pdu = yield self.replication_layer.make_join( - target_host, + origin, pdu = yield self.replication_layer.make_join( + target_hosts, room_id, joinee ) @@ -313,11 +330,17 @@ class FederationHandler(BaseHandler): new_event = builder.build() + # Try the host we successfully got a response to /make_join/ + # request first. + target_hosts.remove(origin) + target_hosts.insert(0, origin) + ret = yield self.replication_layer.send_join( - target_host, + target_hosts, new_event ) + origin = ret["origin"] state = ret["state"] auth_chain = ret["auth_chain"] auth_chain.sort(key=lambda e: e.depth) @@ -354,7 +377,7 @@ class FederationHandler(BaseHandler): if e.event_id in auth_ids } yield self._handle_new_event( - target_host, e, auth_events=auth + origin, e, auth_events=auth ) except: logger.exception( @@ -374,7 +397,7 @@ class FederationHandler(BaseHandler): if e.event_id in auth_ids } yield self._handle_new_event( - target_host, e, auth_events=auth + origin, e, auth_events=auth ) except: logger.exception( @@ -389,7 +412,7 @@ class FederationHandler(BaseHandler): } yield self._handle_new_event( - target_host, + origin, new_event, state=state, current_state=state, @@ -498,6 +521,8 @@ class FederationHandler(BaseHandler): "Failed to get destination from event %s", s.event_id ) + destinations.remove(origin) + logger.debug( "on_send_join_request: Sending event: %s, signatures: %s", event.event_id, @@ -618,6 +643,7 @@ class FederationHandler(BaseHandler): event = yield self.store.get_event( event_id, allow_none=True, + allow_rejected=True, ) if event: @@ -701,6 +727,8 @@ class FederationHandler(BaseHandler): context.rejected = RejectedReason.AUTH_ERROR + # FIXME: Don't store as rejected with AUTH_ERROR if we haven't + # seen all the auth events. yield self.store.persist_event( event, context=context, @@ -750,7 +778,7 @@ class FederationHandler(BaseHandler): ) ) - logger.debug("on_query_auth reutrning: %s", ret) + logger.debug("on_query_auth returning: %s", ret) defer.returnValue(ret) @@ -770,41 +798,45 @@ class FederationHandler(BaseHandler): if missing_auth: logger.debug("Missing auth: %s", missing_auth) # If we don't have all the auth events, we need to get them. - remote_auth_chain = yield self.replication_layer.get_event_auth( - origin, event.room_id, event.event_id - ) + try: + remote_auth_chain = yield self.replication_layer.get_event_auth( + origin, event.room_id, event.event_id + ) - seen_remotes = yield self.store.have_events( - [e.event_id for e in remote_auth_chain] - ) + seen_remotes = yield self.store.have_events( + [e.event_id for e in remote_auth_chain] + ) - for e in remote_auth_chain: - if e.event_id in seen_remotes.keys(): - continue + for e in remote_auth_chain: + if e.event_id in seen_remotes.keys(): + continue - if e.event_id == event.event_id: - continue + if e.event_id == event.event_id: + continue - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { - (e.type, e.state_key): e for e in remote_auth_chain - if e.event_id in auth_ids - } - e.internal_metadata.outlier = True + try: + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in remote_auth_chain + if e.event_id in auth_ids + } + e.internal_metadata.outlier = True - logger.debug( - "do_auth %s missing_auth: %s", - event.event_id, e.event_id - ) - yield self._handle_new_event( - origin, e, auth_events=auth - ) + logger.debug( + "do_auth %s missing_auth: %s", + event.event_id, e.event_id + ) + yield self._handle_new_event( + origin, e, auth_events=auth + ) - if e.event_id in event_auth_events: - auth_events[(e.type, e.state_key)] = e - except AuthError: - pass + if e.event_id in event_auth_events: + auth_events[(e.type, e.state_key)] = e + except AuthError: + pass + except: + # FIXME: + logger.exception("Failed to get auth chain") # FIXME: Assumes we have and stored all the state for all the # prev_events @@ -816,50 +848,57 @@ class FederationHandler(BaseHandler): logger.debug("Different auth: %s", different_auth) # 1. Get what we think is the auth chain. - auth_ids = self.auth.compute_auth_events(event, context) - local_auth_chain = yield self.store.get_auth_chain(auth_ids) - - # 2. Get remote difference. - result = yield self.replication_layer.query_auth( - origin, - event.room_id, - event.event_id, - local_auth_chain, - ) - - seen_remotes = yield self.store.have_events( - [e.event_id for e in result["auth_chain"]] + auth_ids = self.auth.compute_auth_events( + event, context.current_state ) + local_auth_chain = yield self.store.get_auth_chain(auth_ids) - # 3. Process any remote auth chain events we haven't seen. - for ev in result["auth_chain"]: - if ev.event_id in seen_remotes.keys(): - continue + try: + # 2. Get remote difference. + result = yield self.replication_layer.query_auth( + origin, + event.room_id, + event.event_id, + local_auth_chain, + ) - if ev.event_id == event.event_id: - continue + seen_remotes = yield self.store.have_events( + [e.event_id for e in result["auth_chain"]] + ) - try: - auth_ids = [e_id for e_id, _ in ev.auth_events] - auth = { - (e.type, e.state_key): e for e in result["auth_chain"] - if e.event_id in auth_ids - } - ev.internal_metadata.outlier = True + # 3. Process any remote auth chain events we haven't seen. + for ev in result["auth_chain"]: + if ev.event_id in seen_remotes.keys(): + continue + + if ev.event_id == event.event_id: + continue + + try: + auth_ids = [e_id for e_id, _ in ev.auth_events] + auth = { + (e.type, e.state_key): e for e in result["auth_chain"] + if e.event_id in auth_ids + } + ev.internal_metadata.outlier = True + + logger.debug( + "do_auth %s different_auth: %s", + event.event_id, e.event_id + ) - logger.debug( - "do_auth %s different_auth: %s", - event.event_id, e.event_id - ) + yield self._handle_new_event( + origin, ev, auth_events=auth + ) - yield self._handle_new_event( - origin, ev, auth_events=auth - ) + if ev.event_id in event_auth_events: + auth_events[(ev.type, ev.state_key)] = ev + except AuthError: + pass - if ev.event_id in event_auth_events: - auth_events[(ev.type, ev.state_key)] = ev - except AuthError: - pass + except: + # FIXME: + logger.exception("Failed to query auth chain") # 4. Look at rejects and their proofs. # TODO. @@ -983,7 +1022,7 @@ class FederationHandler(BaseHandler): if reason is None: # FIXME: ERRR?! logger.warn("Could not find reason for %s", e.event_id) - raise RuntimeError("") + raise RuntimeError("Could not find reason for %s" % e.event_id) reason_map[e.event_id] = reason diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 6fbd2af4ab..3f51f38f18 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import RoomError +from synapse.api.errors import RoomError, SynapseError from synapse.streams.config import PaginationConfig from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator @@ -372,10 +372,17 @@ class MessageHandler(BaseHandler): as_event=True, ) presence.append(member_presence) - except Exception: - logger.exception( - "Failed to get member presence of %r", m.user_id - ) + except SynapseError as e: + if e.code == 404: + # FIXME: We are doing this as a warn since this gets hit a + # lot and spams the logs. Why is this happening? + logger.warn( + "Failed to get member presence of %r", m.user_id + ) + else: + logger.exception( + "Failed to get member presence of %r", m.user_id + ) time_now = self.clock.time_msec() diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 23821d321f..0369b907a5 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -389,8 +389,6 @@ class RoomMemberHandler(BaseHandler): if not hosts: raise SynapseError(404, "No known servers") - host = hosts[0] - # If event doesn't include a display name, add one. yield self.distributor.fire( "collect_presencelike_data", joinee, content @@ -407,12 +405,12 @@ class RoomMemberHandler(BaseHandler): }) event, context = yield self._create_new_client_event(builder) - yield self._do_join(event, context, room_host=host, do_auth=True) + yield self._do_join(event, context, room_hosts=hosts, do_auth=True) defer.returnValue({"room_id": room_id}) @defer.inlineCallbacks - def _do_join(self, event, context, room_host=None, do_auth=True): + def _do_join(self, event, context, room_hosts=None, do_auth=True): joinee = UserID.from_string(event.state_key) # room_id = RoomID.from_string(event.room_id, self.hs) room_id = event.room_id @@ -441,7 +439,7 @@ class RoomMemberHandler(BaseHandler): if is_host_in_room: should_do_dance = False - elif room_host: # TODO: Shouldn't this be remote_room_host? + elif room_hosts: # TODO: Shouldn't this be remote_room_host? should_do_dance = True else: # TODO(markjh): get prev_state from snapshot @@ -453,7 +451,7 @@ class RoomMemberHandler(BaseHandler): inviter = UserID.from_string(prev_state.user_id) should_do_dance = not self.hs.is_mine(inviter) - room_host = inviter.domain + room_hosts = [inviter.domain] else: # return the same error as join_room_alias does raise SynapseError(404, "No known servers") @@ -461,7 +459,7 @@ class RoomMemberHandler(BaseHandler): if should_do_dance: handler = self.hs.get_handlers().federation_handler yield handler.do_invite_join( - room_host, + room_hosts, room_id, event.user_id, event.get_dict()["content"], # FIXME To get a non-frozen dict |