From 2d21d43c34751cffb5f324bd58ceff060f65f679 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Jul 2016 10:28:51 +0100 Subject: Add purge_history API --- synapse/handlers/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 6c0bc7eafa..351b218247 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1413,7 +1413,7 @@ class FederationHandler(BaseHandler): local_view = dict(auth_events) remote_view = dict(auth_events) remote_view.update({ - (d.type, d.state_key): d for d in different_events + (d.type, d.state_key): d for d in different_events if d }) new_state, prev_state = self.state_handler.resolve_events( -- cgit 1.4.1 From ebdafd8114d1aed631a3497ad142f79efa9face7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Jul 2016 16:49:37 +0100 Subject: Check sender signed event --- synapse/api/auth.py | 10 ++++++++-- synapse/handlers/federation.py | 4 ++-- synapse/state.py | 4 ++-- 3 files changed, 12 insertions(+), 6 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index e05defd7d8..e2f40ee65a 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -63,7 +63,7 @@ class Auth(object): "user_id = ", ]) - def check(self, event, auth_events): + def check(self, event, auth_events, do_sig_check=True): """ Checks if this event is correctly authed. Args: @@ -79,6 +79,13 @@ class Auth(object): if not hasattr(event, "room_id"): raise AuthError(500, "Event has no room_id: %s" % event) + + sender_domain = get_domain_from_id(event.sender) + + # Check the sender's domain has signed the event + if do_sig_check and not event.signatures.get(sender_domain): + raise AuthError(403, "Event not signed by sending server") + if auth_events is None: # Oh, we don't know what the state of the room was, so we # are trusting that this is allowed (at least for now) @@ -87,7 +94,6 @@ class Auth(object): if event.type == EventTypes.Create: room_id_domain = get_domain_from_id(event.room_id) - sender_domain = get_domain_from_id(event.sender) if room_id_domain != sender_domain: raise AuthError( 403, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 351b218247..4e8ffa8f7b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -688,7 +688,7 @@ class FederationHandler(BaseHandler): logger.warn("Failed to create join %r because %s", event, e) raise e - self.auth.check(event, auth_events=context.current_state) + self.auth.check(event, auth_events=context.current_state, do_sig_check=False) defer.returnValue(event) @@ -918,7 +918,7 @@ class FederationHandler(BaseHandler): ) try: - self.auth.check(event, auth_events=context.current_state) + self.auth.check(event, auth_events=context.current_state, do_sig_check=False) except AuthError as e: logger.warn("Failed to create new leave %r because %s", event, e) raise e diff --git a/synapse/state.py b/synapse/state.py index d0f76dc4f5..d7d08570c9 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -379,7 +379,7 @@ class StateHandler(object): try: # FIXME: hs.get_auth() is bad style, but we need to do it to # get around circular deps. - self.hs.get_auth().check(event, auth_events) + self.hs.get_auth().check(event, auth_events, do_sig_check=False) prev_event = event except AuthError: return prev_event @@ -391,7 +391,7 @@ class StateHandler(object): try: # FIXME: hs.get_auth() is bad style, but we need to do it to # get around circular deps. - self.hs.get_auth().check(event, auth_events) + self.hs.get_auth().check(event, auth_events, do_sig_check=False) return event except AuthError: pass -- cgit 1.4.1 From 9e1b43bcbf46c38510cd8348b7df3eb5f6374e81 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Jul 2016 09:29:54 +0100 Subject: Comment --- synapse/handlers/federation.py | 4 ++++ synapse/state.py | 2 ++ 2 files changed, 6 insertions(+) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 4e8ffa8f7b..7622962d46 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -688,6 +688,8 @@ class FederationHandler(BaseHandler): logger.warn("Failed to create join %r because %s", event, e) raise e + # The remote hasn't signed it yet, obviously. We'll do the full checks + # when we get the event back in `on_send_join_request` self.auth.check(event, auth_events=context.current_state, do_sig_check=False) defer.returnValue(event) @@ -918,6 +920,8 @@ class FederationHandler(BaseHandler): ) try: + # The remote hasn't signed it yet, obviously. We'll do the full checks + # when we get the event back in `on_send_leave_request` self.auth.check(event, auth_events=context.current_state, do_sig_check=False) except AuthError as e: logger.warn("Failed to create new leave %r because %s", event, e) diff --git a/synapse/state.py b/synapse/state.py index d7d08570c9..ef1bc470be 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -379,6 +379,7 @@ class StateHandler(object): try: # FIXME: hs.get_auth() is bad style, but we need to do it to # get around circular deps. + # The signatures have already been checked at this point self.hs.get_auth().check(event, auth_events, do_sig_check=False) prev_event = event except AuthError: @@ -391,6 +392,7 @@ class StateHandler(object): try: # FIXME: hs.get_auth() is bad style, but we need to do it to # get around circular deps. + # The signatures have already been checked at this point self.hs.get_auth().check(event, auth_events, do_sig_check=False) return event except AuthError: -- cgit 1.4.1 From 57dca356923f220026d31fbb58fcf37ae9b27c8e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 Jul 2016 13:25:06 +0100 Subject: Don't notify pusher pool for backfilled events --- synapse/handlers/federation.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7622962d46..3f138daf17 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1118,11 +1118,12 @@ class FederationHandler(BaseHandler): backfilled=backfilled, ) - # this intentionally does not yield: we don't care about the result - # and don't need to wait for it. - preserve_fn(self.hs.get_pusherpool().on_new_notifications)( - event_stream_id, max_stream_id - ) + if not backfilled: + # this intentionally does not yield: we don't care about the result + # and don't need to wait for it. + preserve_fn(self.hs.get_pusherpool().on_new_notifications)( + event_stream_id, max_stream_id + ) defer.returnValue((context, event_stream_id, max_stream_id)) -- cgit 1.4.1 From 248e6770ca0faadf574cfd62f72d8e200cb5b57a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Jul 2016 10:30:12 +0100 Subject: Cache federation state responses --- synapse/federation/federation_server.py | 66 ++++++++++++++++++++++----------- synapse/handlers/federation.py | 7 +--- synapse/handlers/room.py | 4 +- synapse/handlers/sync.py | 2 +- synapse/util/caches/response_cache.py | 13 ++++++- 5 files changed, 60 insertions(+), 32 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 85f5e752fe..d15c7e1b40 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -21,10 +21,11 @@ from .units import Transaction, Edu from synapse.util.async import Linearizer from synapse.util.logutils import log_function +from synapse.util.caches.response_cache import ResponseCache from synapse.events import FrozenEvent import synapse.metrics -from synapse.api.errors import FederationError, SynapseError +from synapse.api.errors import AuthError, FederationError, SynapseError from synapse.crypto.event_signing import compute_event_signature @@ -48,9 +49,15 @@ class FederationServer(FederationBase): def __init__(self, hs): super(FederationServer, self).__init__(hs) + self.auth = hs.get_auth() + self._room_pdu_linearizer = Linearizer() self._server_linearizer = Linearizer() + # We cache responses to state queries, as they take a while and often + # come in waves. + self._state_resp_cache = ResponseCache(hs, timeout_ms=30000) + def set_handler(self, handler): """Sets the handler that the replication layer will use to communicate receipt of new PDUs from other home servers. The required methods are @@ -188,28 +195,45 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function def on_context_state_request(self, origin, room_id, event_id): - with (yield self._server_linearizer.queue((origin, room_id))): - if event_id: - pdus = yield self.handler.get_state_for_pdu( - origin, room_id, event_id, - ) - auth_chain = yield self.store.get_auth_chain( - [pdu.event_id for pdu in pdus] + if not event_id: + raise NotImplementedError("Specify an event") + + in_room = yield self.auth.check_host_in_room(room_id, origin) + if not in_room: + raise AuthError(403, "Host not in room.") + + result = self._state_resp_cache.get((room_id, event_id)) + if not result: + with (yield self._server_linearizer.queue((origin, room_id))): + resp = yield self.response_cache.set( + (room_id, event_id), + self._on_context_state_request_compute(room_id, event_id) ) + else: + resp = yield result - for event in auth_chain: - # We sign these again because there was a bug where we - # incorrectly signed things the first time round - if self.hs.is_mine_id(event.event_id): - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - else: - raise NotImplementedError("Specify an event") + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def _on_context_state_request_compute(self, room_id, event_id): + pdus = yield self.handler.get_state_for_pdu( + room_id, event_id, + ) + auth_chain = yield self.store.get_auth_chain( + [pdu.event_id for pdu in pdus] + ) + + for event in auth_chain: + # We sign these again because there was a bug where we + # incorrectly signed things the first time round + if self.hs.is_mine_id(event.event_id): + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ) defer.returnValue((200, { "pdus": [pdu.get_pdu_json() for pdu in pdus], diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3f138daf17..fcad41d7b6 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -991,14 +991,9 @@ class FederationHandler(BaseHandler): defer.returnValue(None) @defer.inlineCallbacks - def get_state_for_pdu(self, origin, room_id, event_id, do_auth=True): + def get_state_for_pdu(self, room_id, event_id): yield run_on_reactor() - if do_auth: - in_room = yield self.auth.check_host_in_room(room_id, origin) - if not in_room: - raise AuthError(403, "Host not in room.") - state_groups = yield self.store.get_state_groups( room_id, [event_id] ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index ae44c7a556..bf6b1c1535 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -345,8 +345,8 @@ class RoomCreationHandler(BaseHandler): class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) - self.response_cache = ResponseCache() - self.remote_list_request_cache = ResponseCache() + self.response_cache = ResponseCache(hs) + self.remote_list_request_cache = ResponseCache(hs) self.remote_list_cache = {} self.fetch_looping_call = hs.get_clock().looping_call( self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index be26a491ff..0ee4ebe504 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -138,7 +138,7 @@ class SyncHandler(object): self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() - self.response_cache = ResponseCache() + self.response_cache = ResponseCache(hs) def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, full_state=False): diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 36686b479e..00af539880 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -24,9 +24,12 @@ class ResponseCache(object): used rather than trying to compute a new response. """ - def __init__(self): + def __init__(self, hs, timeout_ms=0): self.pending_result_cache = {} # Requests that haven't finished yet. + self.clock = hs.get_clock() + self.timeout_sec = timeout_ms / 1000. + def get(self, key): result = self.pending_result_cache.get(key) if result is not None: @@ -39,7 +42,13 @@ class ResponseCache(object): self.pending_result_cache[key] = result def remove(r): - self.pending_result_cache.pop(key, None) + if self.timeout_sec: + self.clock.call_later( + self.timeout_sec, + self.pending_result_cache.pop, key, None, + ) + else: + self.pending_result_cache.pop(key, None) return r result.addBoth(remove) -- cgit 1.4.1 From 1e2740caabe348e4131fe6bd2d777fc7483909a4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jul 2016 16:08:33 +0100 Subject: Handle the case of missing auth events when joining a room --- synapse/handlers/federation.py | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3f138daf17..cab7efb5db 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -124,7 +124,7 @@ class FederationHandler(BaseHandler): try: event_stream_id, max_stream_id = yield self._persist_auth_tree( - auth_chain, state, event + origin, auth_chain, state, event ) except AuthError as e: raise FederationError( @@ -637,7 +637,7 @@ class FederationHandler(BaseHandler): pass event_stream_id, max_stream_id = yield self._persist_auth_tree( - auth_chain, state, event + origin, auth_chain, state, event ) with PreserveLoggingContext(): @@ -1155,7 +1155,7 @@ class FederationHandler(BaseHandler): ) @defer.inlineCallbacks - def _persist_auth_tree(self, auth_events, state, event): + def _persist_auth_tree(self, origin, auth_events, state, event): """Checks the auth chain is valid (and passes auth checks) for the state and event. Then persists the auth chain and state atomically. Persists the event seperately. @@ -1172,7 +1172,7 @@ class FederationHandler(BaseHandler): event_map = { e.event_id: e - for e in auth_events + for e in itertools.chain(auth_events, state, [event]) } create_event = None @@ -1181,10 +1181,29 @@ class FederationHandler(BaseHandler): create_event = e break + missing_auth_events = set() + for e in itertools.chain(auth_events, state, [event]): + for e_id, _ in e.auth_events: + if e_id not in event_map: + missing_auth_events.add(e_id) + + for e_id in missing_auth_events: + m_ev = yield self.replication_layer.get_pdu( + [origin], + e_id, + outlier=True, + timeout=10000, + ) + if m_ev and m_ev.event_id == e_id: + event_map[e_id] = m_ev + else: + logger.info("Failed to find auth event %r", e_id) + for e in itertools.chain(auth_events, state, [event]): auth_for_e = { (event_map[e_id].type, event_map[e_id].state_key): event_map[e_id] for e_id, _ in e.auth_events + if e_id in event_map } if create_event: auth_for_e[(EventTypes.Create, "")] = create_event -- cgit 1.4.1 From 3d13c3a2952263c38111fcf95d625e316416b52b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 Jul 2016 10:45:05 +0100 Subject: Update docstring --- synapse/handlers/federation.py | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index cab7efb5db..9583629388 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1160,6 +1160,12 @@ class FederationHandler(BaseHandler): state and event. Then persists the auth chain and state atomically. Persists the event seperately. + Args: + origin (str): Where the events came from + auth_events (list) + state (list) + event (Event) + Returns: 2-tuple of (event_stream_id, max_stream_id) from the persist_event call for `event` -- cgit 1.4.1 From c51a52f3002abf4597952e07759c6ab3016e3497 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 Jul 2016 11:17:04 +0100 Subject: Mention that func will fetch auth events --- synapse/handlers/federation.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 9583629388..1323235b62 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1160,6 +1160,8 @@ class FederationHandler(BaseHandler): state and event. Then persists the auth chain and state atomically. Persists the event seperately. + Will attempt to fetch missing auth events. + Args: origin (str): Where the events came from auth_events (list) -- cgit 1.4.1 From 93acf49e9b6d601c65ccb73b46ffe24c9b0f26a2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Aug 2016 12:59:04 +0100 Subject: Fix backfill auth events --- synapse/handlers/federation.py | 71 +++++++++++++++++++++++++++++------------- 1 file changed, 50 insertions(+), 21 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 187bfc4315..618cb53629 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -335,31 +335,58 @@ class FederationHandler(BaseHandler): state_events.update({s.event_id: s for s in state}) events_to_state[e_id] = state - seen_events = yield self.store.have_events( - set(auth_events.keys()) | set(state_events.keys()) - ) - - all_events = events + state_events.values() + auth_events.values() required_auth = set( - a_id for event in all_events for a_id, _ in event.auth_events + a_id + for event in events + state_events.values() + auth_events.values() + for a_id, _ in event.auth_events ) - + auth_events.update({ + e_id: event_map[e_id] for e_id in required_auth if e_id in event_map + }) missing_auth = required_auth - set(auth_events) - if missing_auth: + failed_to_fetch = set() + + # Try and fetch any missing auth events from both DB and remote servers. + # We repeatedly do this until we stop finding new auth events. + while missing_auth - failed_to_fetch: logger.info("Missing auth for backfill: %r", missing_auth) - results = yield defer.gatherResults( - [ - self.replication_layer.get_pdu( - [dest], - event_id, - outlier=True, - timeout=10000, - ) - for event_id in missing_auth - ], - consumeErrors=True - ).addErrback(unwrapFirstError) - auth_events.update({a.event_id: a for a in results}) + ret_events = yield self.store.get_events(missing_auth - failed_to_fetch) + auth_events.update(ret_events) + + required_auth.update( + a_id for event in ret_events.values() for a_id, _ in event.auth_events + ) + missing_auth = required_auth - set(auth_events) + + if missing_auth - failed_to_fetch: + logger.info( + "Fetching missing auth for backfill: %r", + missing_auth - failed_to_fetch + ) + + results = yield defer.gatherResults( + [ + self.replication_layer.get_pdu( + [dest], + event_id, + outlier=True, + timeout=10000, + ) + for event_id in missing_auth - failed_to_fetch + ], + consumeErrors=True + ).addErrback(unwrapFirstError) + auth_events.update({a.event_id: a for a in results}) + required_auth.update( + a_id for event in results for a_id, _ in event.auth_events + ) + missing_auth = required_auth - set(auth_events) + + failed_to_fetch = missing_auth - set(auth_events) + + seen_events = yield self.store.have_events( + set(auth_events.keys()) | set(state_events.keys()) + ) ev_infos = [] for a in auth_events.values(): @@ -372,6 +399,7 @@ class FederationHandler(BaseHandler): (auth_events[a_id].type, auth_events[a_id].state_key): auth_events[a_id] for a_id, _ in a.auth_events + if a_id in auth_events } }) @@ -383,6 +411,7 @@ class FederationHandler(BaseHandler): (auth_events[a_id].type, auth_events[a_id].state_key): auth_events[a_id] for a_id, _ in event_map[e_id].auth_events + if a_id in auth_events } }) -- cgit 1.4.1