diff options
author | David Baker <dbkr@matrix.org> | 2014-12-02 13:50:05 +0000 |
---|---|---|
committer | David Baker <dbkr@matrix.org> | 2014-12-02 13:50:05 +0000 |
commit | 7642d95d5e90bab130b33dc27f10e347072e0294 (patch) | |
tree | 08c618bc5daeceb90bb8a47e3800b88c12d83427 /synapse/handlers | |
parent | More work on pushers. Attempt to do HTTP pokes. Not sure if the actual HTTP p... (diff) | |
parent | Add non-working jitsi meet bridge (diff) | |
download | synapse-7642d95d5e90bab130b33dc27f10e347072e0294.tar.xz |
Merge branch 'develop' into pushers
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/_base.py | 2 | ||||
-rw-r--r-- | synapse/handlers/directory.py | 36 | ||||
-rw-r--r-- | synapse/handlers/events.py | 12 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 319 | ||||
-rw-r--r-- | synapse/handlers/message.py | 4 |
5 files changed, 271 insertions, 102 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index d53cd3df3e..15adc9dc2c 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -78,7 +78,7 @@ class BaseHandler(object): if not suppress_auth: logger.debug("Authing...") - self.auth.check(event, raises=True) + self.auth.check(event, auth_events=event.old_state_events) logger.debug("Authed") else: logger.debug("Suppressed auth.") diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index af4e7d49c8..3b37e49e6f 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -17,7 +17,7 @@ from twisted.internet import defer from ._base import BaseHandler -from synapse.api.errors import SynapseError +from synapse.api.errors import SynapseError, Codes, CodeMessageException from synapse.api.events.room import RoomAliasesEvent import logging @@ -84,22 +84,32 @@ class DirectoryHandler(BaseHandler): room_id = result.room_id servers = result.servers else: - result = yield self.federation.make_query( - destination=room_alias.domain, - query_type="directory", - args={ - "room_alias": room_alias.to_string(), - }, - retry_on_dns_fail=False, - ) + try: + result = yield self.federation.make_query( + destination=room_alias.domain, + query_type="directory", + args={ + "room_alias": room_alias.to_string(), + }, + retry_on_dns_fail=False, + ) + except CodeMessageException as e: + logging.warn("Error retrieving alias") + if e.code == 404: + result = None + else: + raise if result and "room_id" in result and "servers" in result: room_id = result["room_id"] servers = result["servers"] if not room_id: - defer.returnValue({}) - return + raise SynapseError( + 404, + "Room alias %r not found" % (room_alias.to_string(),), + Codes.NOT_FOUND + ) extra_servers = yield self.store.get_joined_hosts_for_room(room_id) servers = list(set(extra_servers) | set(servers)) @@ -129,7 +139,9 @@ class DirectoryHandler(BaseHandler): }) else: raise SynapseError( - 404, "Room alias \"%s\" not found" % (room_alias,) + 404, + "Room alias %r not found" % (room_alias.to_string(),), + Codes.NOT_FOUND ) @defer.inlineCallbacks diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index d59221a4fb..02202692d4 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -53,8 +53,12 @@ class EventStreamHandler(BaseHandler): if auth_user not in self._streams_per_user: self._streams_per_user[auth_user] = 0 if auth_user in self._stop_timer_per_user: - self.clock.cancel_call_later( - self._stop_timer_per_user.pop(auth_user)) + try: + self.clock.cancel_call_later( + self._stop_timer_per_user.pop(auth_user) + ) + except: + logger.exception("Failed to cancel event timer") else: yield self.distributor.fire( "started_user_eventstream", auth_user @@ -95,10 +99,12 @@ class EventStreamHandler(BaseHandler): logger.debug( "_later stopped_user_eventstream %s", auth_user ) + + self._stop_timer_per_user.pop(auth_user, None) + yield self.distributor.fire( "stopped_user_eventstream", auth_user ) - del self._stop_timer_per_user[auth_user] logger.debug("Scheduling _later: for %s", auth_user) self._stop_timer_per_user[auth_user] = ( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2e8b8a1f9a..925eb5376e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -18,13 +18,16 @@ from ._base import BaseHandler from synapse.api.events.utils import prune_event -from synapse.api.errors import AuthError, FederationError, SynapseError -from synapse.api.events.room import RoomMemberEvent +from synapse.api.errors import ( + AuthError, FederationError, SynapseError, StoreError, +) +from synapse.api.events.room import RoomMemberEvent, RoomCreateEvent from synapse.api.constants import Membership from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor from synapse.crypto.event_signing import ( - compute_event_signature, check_event_content_hash + compute_event_signature, check_event_content_hash, + add_hashes_and_signatures, ) from syutil.jsonutil import encode_canonical_json @@ -98,7 +101,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def on_receive_pdu(self, pdu, backfilled, state=None): + def on_receive_pdu(self, origin, pdu, backfilled, state=None): """ Called by the ReplicationLayer when we have a new pdu. We need to do auth checks and put it through the StateHandler. """ @@ -109,7 +112,7 @@ class FederationHandler(BaseHandler): # If we are currently in the process of joining this room, then we # queue up events for later processing. if event.room_id in self.room_queues: - self.room_queues[event.room_id].append(pdu) + self.room_queues[event.room_id].append((pdu, origin)) return logger.debug("Processing event: %s", event.event_id) @@ -141,15 +144,62 @@ class FederationHandler(BaseHandler): ) event = redacted_event - is_new_state = yield self.state_handler.annotate_event_with_state( - event, - old_state=state + logger.debug("Event: %s", event) + + # FIXME (erikj): Awful hack to make the case where we are not currently + # in the room work + current_state = None + is_in_room = yield self.auth.check_host_in_room( + event.room_id, + self.server_name ) + if not is_in_room and not event.outlier: + logger.debug("Got event for room we're not in.") + + replication_layer = self.replication_layer + auth_chain = yield replication_layer.get_event_auth( + origin, + context=event.room_id, + event_id=event.event_id, + ) - logger.debug("Event: %s", event) + for e in auth_chain: + e.outlier = True + try: + yield self._handle_new_event(e, fetch_missing=False) + except: + logger.exception( + "Failed to parse auth event %s", + e.event_id, + ) + + if not state: + state = yield replication_layer.get_state_for_context( + origin, + context=event.room_id, + event_id=event.event_id, + ) + + current_state = state + + if state: + for e in state: + e.outlier = True + try: + yield self._handle_new_event(e) + except: + logger.exception( + "Failed to parse state event %s", + e.event_id, + ) try: - self.auth.check(event, raises=True) + yield self._handle_new_event( + event, + state=state, + backfilled=backfilled, + current_state=current_state, + ) except AuthError as e: raise FederationError( "ERROR", @@ -158,43 +208,17 @@ class FederationHandler(BaseHandler): affected=event.event_id, ) - is_new_state = is_new_state and not backfilled - - # TODO: Implement something in federation that allows us to - # respond to PDU. - - yield self.store.persist_event( - event, - backfilled, - is_new_state=is_new_state - ) - room = yield self.store.get_room(event.room_id) if not room: - # Huh, let's try and get the current state try: - yield self.replication_layer.get_state_for_context( - event.origin, event.room_id, event.event_id, - ) - - hosts = yield self.store.get_joined_hosts_for_room( - event.room_id - ) - if self.hs.hostname in hosts: - try: - yield self.store.store_room( - room_id=event.room_id, - room_creator_user_id="", - is_public=False, - ) - except: - pass - except: - logger.exception( - "Failed to get current state for room %s", - event.room_id + yield self.store.store_room( + room_id=event.room_id, + room_creator_user_id="", + is_public=False, ) + except StoreError: + logger.exception("Failed to store room.") if not backfilled: extra_users = [] @@ -255,11 +279,23 @@ class FederationHandler(BaseHandler): pdu=event ) + + defer.returnValue(pdu) @defer.inlineCallbacks def on_event_auth(self, event_id): auth = yield self.store.get_auth_chain(event_id) + + for event in auth: + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ) + defer.returnValue([e for e in auth]) @log_function @@ -276,6 +312,8 @@ class FederationHandler(BaseHandler): We suspend processing of any received events from this room until we have finished processing the join. """ + logger.debug("Joining %s to %s", joinee, room_id) + pdu = yield self.replication_layer.make_join( target_host, room_id, @@ -298,19 +336,29 @@ class FederationHandler(BaseHandler): try: event.event_id = self.event_factory.create_event_id() + event.origin = self.hs.hostname event.content = content - state = yield self.replication_layer.send_join( + if not hasattr(event, "signatures"): + event.signatures = {} + + add_hashes_and_signatures( + event, + self.hs.hostname, + self.hs.config.signing_key[0], + ) + + ret = yield self.replication_layer.send_join( target_host, event ) - logger.debug("do_invite_join state: %s", state) + state = ret["state"] + auth_chain = ret["auth_chain"] + auth_chain.sort(key=lambda e: e.depth) - yield self.state_handler.annotate_event_with_state( - event, - old_state=state - ) + logger.debug("do_invite_join auth_chain: %s", auth_chain) + logger.debug("do_invite_join state: %s", state) logger.debug("do_invite_join event: %s", event) @@ -324,34 +372,50 @@ class FederationHandler(BaseHandler): # FIXME pass + for e in auth_chain: + e.outlier = True + try: + yield self._handle_new_event(e, fetch_missing=False) + except: + logger.exception( + "Failed to parse auth event %s", + e.event_id, + ) + for e in state: # FIXME: Auth these. e.outlier = True + try: + yield self._handle_new_event( + e, + fetch_missing=True + ) + except: + logger.exception( + "Failed to parse state event %s", + e.event_id, + ) - yield self.state_handler.annotate_event_with_state( - e, - ) - - yield self.store.persist_event( - e, - backfilled=False, - is_new_state=True - ) - - yield self.store.persist_event( + yield self._handle_new_event( event, - backfilled=False, - is_new_state=True + state=state, + current_state=state, ) + + yield self.notifier.on_new_room_event( + event, extra_users=[joinee] + ) + + logger.debug("Finished joining %s to %s", joinee, room_id) finally: room_queue = self.room_queues[room_id] del self.room_queues[room_id] - for p in room_queue: + for p, origin in room_queue: try: - yield self.on_receive_pdu(p, backfilled=False) + self.on_receive_pdu(origin, p, backfilled=False) except: - pass + logger.exception("Couldn't handle pdu") defer.returnValue(True) @@ -375,7 +439,7 @@ class FederationHandler(BaseHandler): yield self.state_handler.annotate_event_with_state(event) yield self.auth.add_auth_events(event) - self.auth.check(event, raises=True) + self.auth.check(event, auth_events=event.old_state_events) pdu = event @@ -391,17 +455,7 @@ class FederationHandler(BaseHandler): event.outlier = False - state_handler = self.state_handler - is_new_state = yield state_handler.annotate_event_with_state(event) - self.auth.check(event, raises=True) - - # FIXME (erikj): All this is duplicated above :( - - yield self.store.persist_event( - event, - backfilled=False, - is_new_state=is_new_state - ) + yield self._handle_new_event(event) extra_users = [] if event.type == RoomMemberEvent.TYPE: @@ -414,7 +468,7 @@ class FederationHandler(BaseHandler): ) if event.type == RoomMemberEvent.TYPE: - if event.membership == Membership.JOIN: + if event.content["membership"] == Membership.JOIN: user = self.hs.parse_userid(event.state_key) yield self.distributor.fire( "user_joined_room", user=user, room_id=event.room_id @@ -508,7 +562,17 @@ class FederationHandler(BaseHandler): else: del results[(event.type, event.state_key)] - defer.returnValue(results.values()) + res = results.values() + for event in res: + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ) + + defer.returnValue(res) else: defer.returnValue([]) @@ -529,7 +593,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def get_persisted_pdu(self, origin, event_id): + def get_persisted_pdu(self, origin, event_id, do_auth=True): """ Get a PDU from the database with given origin and id. Returns: @@ -541,12 +605,24 @@ class FederationHandler(BaseHandler): ) if event: - in_room = yield self.auth.check_host_in_room( - event.room_id, - origin + # FIXME: This is a temporary work around where we occasionally + # return events slightly differently than when they were + # originally signed + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) ) - if not in_room: - raise AuthError(403, "Host not in room.") + + if do_auth: + in_room = yield self.auth.check_host_in_room( + event.room_id, + origin + ) + if not in_room: + raise AuthError(403, "Host not in room.") defer.returnValue(event) else: @@ -564,3 +640,78 @@ class FederationHandler(BaseHandler): ) while waiters: waiters.pop().callback(None) + + @defer.inlineCallbacks + def _handle_new_event(self, event, state=None, backfilled=False, + current_state=None, fetch_missing=True): + is_new_state = yield self.state_handler.annotate_event_with_state( + event, + old_state=state + ) + + if event.old_state_events: + known_ids = set( + [s.event_id for s in event.old_state_events.values()] + ) + for e_id, _ in event.auth_events: + if e_id not in known_ids: + e = yield self.store.get_event( + e_id, + allow_none=True, + ) + + if not e: + # TODO: Do some conflict res to make sure that we're + # not the ones who are wrong. + logger.info( + "Rejecting %s as %s not in %s", + event.event_id, e_id, known_ids, + ) + raise AuthError(403, "Auth events are stale") + + auth_events = event.old_state_events + else: + # We need to get the auth events from somewhere. + + # TODO: Don't just hit the DBs? + + auth_events = {} + for e_id, _ in event.auth_events: + e = yield self.store.get_event( + e_id, + allow_none=True, + ) + + if not e: + e = yield self.replication_layer.get_pdu( + event.origin, e_id, outlier=True + ) + + if e and fetch_missing: + try: + yield self.on_receive_pdu(event.origin, e, False) + except: + logger.exception( + "Failed to parse auth event %s", + e_id, + ) + + if not e: + logger.warn("Can't find auth event %s.", e_id) + + auth_events[(e.type, e.state_key)] = e + + if event.type == RoomMemberEvent.TYPE and not event.auth_events: + if len(event.prev_events) == 1: + c = yield self.store.get_event(event.prev_events[0][0]) + if c.type == RoomCreateEvent.TYPE: + auth_events[(c.type, c.state_key)] = c + + self.auth.check(event, auth_events=auth_events) + + yield self.store.persist_event( + event, + backfilled=backfilled, + is_new_state=(is_new_state and not backfilled), + current_state=current_state, + ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 06a4e173f6..42dc4d46f3 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -243,7 +243,7 @@ class MessageHandler(BaseHandler): public_room_ids = [r["room_id"] for r in public_rooms] limit = pagin_config.limit - if not limit: + if limit is None: limit = 10 for event in room_list: @@ -306,7 +306,7 @@ class MessageHandler(BaseHandler): auth_user = self.hs.parse_userid(user_id) # TODO: These concurrently - state_tuples = yield self.store.get_current_state(room_id) + state_tuples = yield self.state_handler.get_current_state(room_id) state = [self.hs.serialize_event(x) for x in state_tuples] member_event = (yield self.store.get_room_member( |