From e7ca813dd476c83497d4130ad8efa9424d86e921 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 10:38:14 +0000 Subject: Try to ensure we don't persist an event we have already persisted. In persist_event check if we already have the event, if so then update instead of replacing so that we don't cause a bump of the stream_ordering. --- synapse/handlers/federation.py | 42 ++++++++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 14 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8bf5a4cc11..c384789c2f 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -112,6 +112,14 @@ class FederationHandler(BaseHandler): logger.debug("Event: %s", event) + event_ids = set() + if state: + event_ids += {e.event_id for e in state} + if auth_chain: + event_ids += {e.event_id for e in auth_chain} + + seen_ids = (yield self.store.have_events(event_ids)).keys() + # FIXME (erikj): Awful hack to make the case where we are not currently # in the room work current_state = None @@ -124,20 +132,26 @@ class FederationHandler(BaseHandler): current_state = state if state and auth_chain is not None: - for e in state: - e.internal_metadata.outlier = True - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - yield self._handle_new_event(origin, e, auth_events=auth) - except: - logger.exception( - "Failed to handle state event %s", - e.event_id, - ) + for list_of_pdus in [auth_chain, state]: + for e in list_of_pdus: + if e.event_id in seen_ids: + continue + + e.internal_metadata.outlier = True + try: + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + yield self._handle_new_event( + origin, e, auth_events=auth + ) + except: + logger.exception( + "Failed to handle state event %s", + e.event_id, + ) try: yield self._handle_new_event( -- cgit 1.4.1 From 51969f9e5f4774e4d0ddcc2e8ebcf96803cbf295 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 10:40:14 +0000 Subject: Return rejected events if asked for it over federation. --- synapse/handlers/federation.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c384789c2f..0161fbe49c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -632,6 +632,7 @@ class FederationHandler(BaseHandler): event = yield self.store.get_event( event_id, allow_none=True, + allow_rejected=True, ) if event: -- cgit 1.4.1 From 4ff2273b300c0a40fcf054a613aac24c51af22f1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 11:23:26 +0000 Subject: Add FIXME note. --- synapse/handlers/federation.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0161fbe49c..322c1b407d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -716,6 +716,8 @@ class FederationHandler(BaseHandler): context.rejected = RejectedReason.AUTH_ERROR + # FIXME: Don't store as rejected with AUTH_ERROR if we haven't + # seen all the auth events. yield self.store.persist_event( event, context=context, -- cgit 1.4.1 From 06c34bfbaee553b851e0f02d8e17d5bff7360376 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 11:23:44 +0000 Subject: Give exception better message --- synapse/handlers/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 322c1b407d..9d7557f578 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1000,7 +1000,7 @@ class FederationHandler(BaseHandler): if reason is None: # FIXME: ERRR?! logger.warn("Could not find reason for %s", e.event_id) - raise RuntimeError("") + raise RuntimeError("Could not find reason for %s" % e.event_id) reason_map[e.event_id] = reason -- cgit 1.4.1 From fed29251d70b050ea5799708b6b13be9617d1f6d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 13:23:58 +0000 Subject: Spelling --- synapse/handlers/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 9d7557f578..b9b2e25d1f 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -767,7 +767,7 @@ class FederationHandler(BaseHandler): ) ) - logger.debug("on_query_auth reutrning: %s", ret) + logger.debug("on_query_auth returning: %s", ret) defer.returnValue(ret) -- cgit 1.4.1 From 77a076bd25fc355c49251bcf489c0511c0f0f0af Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 13:35:17 +0000 Subject: Set combinations is | and not + --- synapse/handlers/federation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b9b2e25d1f..653ab0dbfa 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -114,9 +114,9 @@ class FederationHandler(BaseHandler): event_ids = set() if state: - event_ids += {e.event_id for e in state} + event_ids |= {e.event_id for e in state} if auth_chain: - event_ids += {e.event_id for e in auth_chain} + event_ids |= {e.event_id for e in auth_chain} seen_ids = (yield self.store.have_events(event_ids)).keys() -- cgit 1.4.1 From 6efd4d1649a539ba4bf1c884ebb90a48c2d1c8df Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 13:57:54 +0000 Subject: Don't completely die if get auth_chain or querying auth_chain requests fail --- synapse/handlers/federation.py | 135 ++++++++++++++++++++++------------------- 1 file changed, 72 insertions(+), 63 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 653ab0dbfa..6727155c38 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -787,41 +787,45 @@ class FederationHandler(BaseHandler): if missing_auth: logger.debug("Missing auth: %s", missing_auth) # If we don't have all the auth events, we need to get them. - remote_auth_chain = yield self.replication_layer.get_event_auth( - origin, event.room_id, event.event_id - ) + try: + remote_auth_chain = yield self.replication_layer.get_event_auth( + origin, event.room_id, event.event_id + ) - seen_remotes = yield self.store.have_events( - [e.event_id for e in remote_auth_chain] - ) + seen_remotes = yield self.store.have_events( + [e.event_id for e in remote_auth_chain] + ) - for e in remote_auth_chain: - if e.event_id in seen_remotes.keys(): - continue + for e in remote_auth_chain: + if e.event_id in seen_remotes.keys(): + continue - if e.event_id == event.event_id: - continue + if e.event_id == event.event_id: + continue - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { - (e.type, e.state_key): e for e in remote_auth_chain - if e.event_id in auth_ids - } - e.internal_metadata.outlier = True + try: + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in remote_auth_chain + if e.event_id in auth_ids + } + e.internal_metadata.outlier = True - logger.debug( - "do_auth %s missing_auth: %s", - event.event_id, e.event_id - ) - yield self._handle_new_event( - origin, e, auth_events=auth - ) + logger.debug( + "do_auth %s missing_auth: %s", + event.event_id, e.event_id + ) + yield self._handle_new_event( + origin, e, auth_events=auth + ) - if e.event_id in event_auth_events: - auth_events[(e.type, e.state_key)] = e - except AuthError: - pass + if e.event_id in event_auth_events: + auth_events[(e.type, e.state_key)] = e + except AuthError: + pass + except: + # FIXME: + logger.exception("Failed to get auth chain") # FIXME: Assumes we have and stored all the state for all the # prev_events @@ -836,47 +840,52 @@ class FederationHandler(BaseHandler): auth_ids = self.auth.compute_auth_events(event, context) local_auth_chain = yield self.store.get_auth_chain(auth_ids) - # 2. Get remote difference. - result = yield self.replication_layer.query_auth( - origin, - event.room_id, - event.event_id, - local_auth_chain, - ) + try: + # 2. Get remote difference. + result = yield self.replication_layer.query_auth( + origin, + event.room_id, + event.event_id, + local_auth_chain, + ) - seen_remotes = yield self.store.have_events( - [e.event_id for e in result["auth_chain"]] - ) + seen_remotes = yield self.store.have_events( + [e.event_id for e in result["auth_chain"]] + ) - # 3. Process any remote auth chain events we haven't seen. - for ev in result["auth_chain"]: - if ev.event_id in seen_remotes.keys(): - continue + # 3. Process any remote auth chain events we haven't seen. + for ev in result["auth_chain"]: + if ev.event_id in seen_remotes.keys(): + continue - if ev.event_id == event.event_id: - continue + if ev.event_id == event.event_id: + continue - try: - auth_ids = [e_id for e_id, _ in ev.auth_events] - auth = { - (e.type, e.state_key): e for e in result["auth_chain"] - if e.event_id in auth_ids - } - ev.internal_metadata.outlier = True + try: + auth_ids = [e_id for e_id, _ in ev.auth_events] + auth = { + (e.type, e.state_key): e for e in result["auth_chain"] + if e.event_id in auth_ids + } + ev.internal_metadata.outlier = True - logger.debug( - "do_auth %s different_auth: %s", - event.event_id, e.event_id - ) + logger.debug( + "do_auth %s different_auth: %s", + event.event_id, e.event_id + ) - yield self._handle_new_event( - origin, ev, auth_events=auth - ) + yield self._handle_new_event( + origin, ev, auth_events=auth + ) + + if ev.event_id in event_auth_events: + auth_events[(ev.type, ev.state_key)] = ev + except AuthError: + pass - if ev.event_id in event_auth_events: - auth_events[(ev.type, ev.state_key)] = ev - except AuthError: - pass + except: + # FIXME: + logger.exception("Failed to query auth chain") # 4. Look at rejects and their proofs. # TODO. -- cgit 1.4.1 From 7dd1c5c542d58464085b11ababe746c6a60515ec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 16:12:04 +0000 Subject: Neaten the handling of state and auth_chain up a bit --- synapse/handlers/federation.py | 57 ++++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 27 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 6727155c38..86953bf8c8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -30,6 +30,7 @@ from synapse.types import UserID from twisted.internet import defer +import itertools import logging @@ -112,14 +113,6 @@ class FederationHandler(BaseHandler): logger.debug("Event: %s", event) - event_ids = set() - if state: - event_ids |= {e.event_id for e in state} - if auth_chain: - event_ids |= {e.event_id for e in auth_chain} - - seen_ids = (yield self.store.have_events(event_ids)).keys() - # FIXME (erikj): Awful hack to make the case where we are not currently # in the room work current_state = None @@ -131,27 +124,37 @@ class FederationHandler(BaseHandler): logger.debug("Got event for room we're not in.") current_state = state + event_ids = set() + if state: + event_ids |= {e.event_id for e in state} + if auth_chain: + event_ids |= {e.event_id for e in auth_chain} + + seen_ids = (yield self.store.have_events(event_ids)).keys() + if state and auth_chain is not None: - for list_of_pdus in [auth_chain, state]: - for e in list_of_pdus: - if e.event_id in seen_ids: - continue + # If we have any state or auth_chain given to us by the replication + # layer, then we should handle them (if we haven't before.) + for e in itertools.chain(auth_chain, state): + if e.event_id in seen_ids: + continue - e.internal_metadata.outlier = True - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - yield self._handle_new_event( - origin, e, auth_events=auth - ) - except: - logger.exception( - "Failed to handle state event %s", - e.event_id, - ) + e.internal_metadata.outlier = True + try: + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + yield self._handle_new_event( + origin, e, auth_events=auth + ) + seen_ids.add(e.event_id) + except: + logger.exception( + "Failed to handle state event %s", + e.event_id, + ) try: yield self._handle_new_event( -- cgit 1.4.1 From c0462dbf1533f285f632dcb0a74c0ef0c3e2475b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Feb 2015 10:16:51 +0000 Subject: Rearrange persist_event so that do all the queries that need to be done before returning early if we have already persisted that event. --- synapse/events/__init__.py | 2 +- synapse/handlers/federation.py | 2 + synapse/storage/__init__.py | 145 +++++++++++++++++++++-------------------- 3 files changed, 77 insertions(+), 72 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index bf07951027..8f0c6e959f 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -77,7 +77,7 @@ class EventBase(object): return self.content["membership"] def is_state(self): - return hasattr(self, "state_key") + return hasattr(self, "state_key") and self.state_key is not None def get_dict(self): d = dict(self._event_dict) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 86953bf8c8..0876589e31 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -515,6 +515,8 @@ class FederationHandler(BaseHandler): "Failed to get destination from event %s", s.event_id ) + destinations.remove(origin) + logger.debug( "on_send_join_request: Sending event: %s, signatures: %s", event.event_id, diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 93ab26fcd1..30ce378900 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -163,19 +163,70 @@ class DataStore(RoomMemberStore, RoomStore, def _persist_event_txn(self, txn, event, context, backfilled, stream_ordering=None, is_new_state=True, current_state=None): - if event.type == EventTypes.Member: - self._store_room_member_txn(txn, event) - elif event.type == EventTypes.Feedback: - self._store_feedback_txn(txn, event) - elif event.type == EventTypes.Name: - self._store_room_name_txn(txn, event) - elif event.type == EventTypes.Topic: - self._store_room_topic_txn(txn, event) - elif event.type == EventTypes.Redaction: - self._store_redaction(txn, event) + + # We purposefully do this first since if we include a `current_state` + # key, we *want* to update the `current_state_events` table + if current_state: + txn.execute( + "DELETE FROM current_state_events WHERE room_id = ?", + (event.room_id,) + ) + + for s in current_state: + self._simple_insert_txn( + txn, + "current_state_events", + { + "event_id": s.event_id, + "room_id": s.room_id, + "type": s.type, + "state_key": s.state_key, + }, + or_replace=True, + ) + + if event.is_state() and is_new_state: + if not backfilled and not context.rejected: + self._simple_insert_txn( + txn, + table="state_forward_extremities", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + }, + or_replace=True, + ) + + for prev_state_id, _ in event.prev_state: + self._simple_delete_txn( + txn, + table="state_forward_extremities", + keyvalues={ + "event_id": prev_state_id, + } + ) outlier = event.internal_metadata.is_outlier() + if not outlier: + self._store_state_groups_txn(txn, event, context) + + self._update_min_depth_for_room_txn( + txn, + event.room_id, + event.depth + ) + + self._handle_prev_events( + txn, + outlier=outlier, + event_id=event.event_id, + prev_events=event.prev_events, + room_id=event.room_id, + ) + have_persisted = self._simple_select_one_onecol_txn( txn, table="event_json", @@ -209,6 +260,17 @@ class DataStore(RoomMemberStore, RoomStore, ) return + if event.type == EventTypes.Member: + self._store_room_member_txn(txn, event) + elif event.type == EventTypes.Feedback: + self._store_feedback_txn(txn, event) + elif event.type == EventTypes.Name: + self._store_room_name_txn(txn, event) + elif event.type == EventTypes.Topic: + self._store_room_topic_txn(txn, event) + elif event.type == EventTypes.Redaction: + self._store_redaction(txn, event) + event_dict = { k: v for k, v in event.get_dict().items() @@ -273,41 +335,10 @@ class DataStore(RoomMemberStore, RoomStore, ) raise _RollbackButIsFineException("_persist_event") - self._handle_prev_events( - txn, - outlier=outlier, - event_id=event.event_id, - prev_events=event.prev_events, - room_id=event.room_id, - ) - - if not outlier: - self._store_state_groups_txn(txn, event, context) - if context.rejected: self._store_rejections_txn(txn, event.event_id, context.rejected) - if current_state: - txn.execute( - "DELETE FROM current_state_events WHERE room_id = ?", - (event.room_id,) - ) - - for s in current_state: - self._simple_insert_txn( - txn, - "current_state_events", - { - "event_id": s.event_id, - "room_id": s.room_id, - "type": s.type, - "state_key": s.state_key, - }, - or_replace=True, - ) - - is_state = hasattr(event, "state_key") and event.state_key is not None - if is_state: + if event.is_state(): vals = { "event_id": event.event_id, "room_id": event.room_id, @@ -315,6 +346,7 @@ class DataStore(RoomMemberStore, RoomStore, "state_key": event.state_key, } + # TODO: How does this work with backfilling? if hasattr(event, "replaces_state"): vals["prev_state"] = event.replaces_state @@ -351,28 +383,6 @@ class DataStore(RoomMemberStore, RoomStore, or_ignore=True, ) - if not backfilled and not context.rejected: - self._simple_insert_txn( - txn, - table="state_forward_extremities", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - }, - or_replace=True, - ) - - for prev_state_id, _ in event.prev_state: - self._simple_delete_txn( - txn, - table="state_forward_extremities", - keyvalues={ - "event_id": prev_state_id, - } - ) - for hash_alg, hash_base64 in event.hashes.items(): hash_bytes = decode_base64(hash_base64) self._store_event_content_hash_txn( @@ -403,13 +413,6 @@ class DataStore(RoomMemberStore, RoomStore, txn, event.event_id, ref_alg, ref_hash_bytes ) - if not outlier: - self._update_min_depth_for_room_txn( - txn, - event.room_id, - event.depth - ) - def _store_redaction(self, txn, event): txn.execute( "INSERT OR IGNORE INTO redactions " -- cgit 1.4.1 From 650e32d45580ddd13826364291bda6760c014df9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Feb 2015 14:06:42 +0000 Subject: Change context.auth_events to what the auth_events would be bases on context.current_state, rather than based on the auth_events from the event. --- synapse/api/auth.py | 12 ++++++------ synapse/handlers/federation.py | 4 +++- synapse/state.py | 8 ++++++-- 3 files changed, 15 insertions(+), 9 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 3471afd7e7..7105ee21dc 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -358,7 +358,7 @@ class Auth(object): def add_auth_events(self, builder, context): yield run_on_reactor() - auth_ids = self.compute_auth_events(builder, context) + auth_ids = self.compute_auth_events(builder, context.current_state) auth_events_entries = yield self.store.add_event_hashes( auth_ids @@ -372,26 +372,26 @@ class Auth(object): if v.event_id in auth_ids } - def compute_auth_events(self, event, context): + def compute_auth_events(self, event, current_state): if event.type == EventTypes.Create: return [] auth_ids = [] key = (EventTypes.PowerLevels, "", ) - power_level_event = context.current_state.get(key) + power_level_event = current_state.get(key) if power_level_event: auth_ids.append(power_level_event.event_id) key = (EventTypes.JoinRules, "", ) - join_rule_event = context.current_state.get(key) + join_rule_event = current_state.get(key) key = (EventTypes.Member, event.user_id, ) - member_event = context.current_state.get(key) + member_event = current_state.get(key) key = (EventTypes.Create, "", ) - create_event = context.current_state.get(key) + create_event = current_state.get(key) if create_event: auth_ids.append(create_event.event_id) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0876589e31..2e2c23ef63 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -842,7 +842,9 @@ class FederationHandler(BaseHandler): logger.debug("Different auth: %s", different_auth) # 1. Get what we think is the auth chain. - auth_ids = self.auth.compute_auth_events(event, context) + auth_ids = self.auth.compute_auth_events( + event, context.current_state + ) local_auth_chain = yield self.store.get_auth_chain(auth_ids) try: diff --git a/synapse/state.py b/synapse/state.py index 6a6fb8aea0..695a5e7ac4 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -103,7 +103,9 @@ class StateHandler(object): context.state_group = None if hasattr(event, "auth_events") and event.auth_events: - auth_ids = zip(*event.auth_events)[0] + auth_ids = self.hs.get_auth().compute_auth_events( + event, context.current_state + ) context.auth_events = { k: v for k, v in context.current_state.items() @@ -149,7 +151,9 @@ class StateHandler(object): event.unsigned["replaces_state"] = replaces.event_id if hasattr(event, "auth_events") and event.auth_events: - auth_ids = zip(*event.auth_events)[0] + auth_ids = self.hs.get_auth().compute_auth_events( + event, context.current_state + ) context.auth_events = { k: v for k, v in context.current_state.items() -- cgit 1.4.1 From 95e2d2d36d6dfb7205c40fa8e59ef350f8096395 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Feb 2015 15:02:23 +0000 Subject: When returning lists of servers from alias lookups, put the current server first in the list --- synapse/handlers/directory.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 58e9a91562..7b60921040 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -113,7 +113,16 @@ class DirectoryHandler(BaseHandler): ) extra_servers = yield self.store.get_joined_hosts_for_room(room_id) - servers = list(set(extra_servers) | set(servers)) + servers = set(extra_servers) | set(servers) + + # If this server is in the list of servers, return it first. + if self.server_name in servers: + servers = ( + [self.server_name] + + [s for s in servers if s != self.server_name] + ) + else: + servers = list(servers) defer.returnValue({ "room_id": room_id, -- cgit 1.4.1 From ae46f10fc5dba0f81518e3144ab8d9ed7a7d03bc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Feb 2015 16:28:12 +0000 Subject: Apply sanity to the transport client interface. Convert 'make_join' and 'send_join' to accept iterables of destinations --- synapse/api/errors.py | 8 +++- synapse/federation/federation_client.py | 82 ++++++++++++++++++++------------- synapse/federation/transaction_queue.py | 23 +++++++-- synapse/federation/transport/client.py | 42 +++++++---------- synapse/handlers/federation.py | 4 +- synapse/http/matrixfederationclient.py | 42 ++++++++++++++--- 6 files changed, 130 insertions(+), 71 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/api/errors.py b/synapse/api/errors.py index ad478aa6b7..5041828f18 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -39,7 +39,7 @@ class Codes(object): TOO_LARGE = "M_TOO_LARGE" -class CodeMessageException(Exception): +class CodeMessageException(RuntimeError): """An exception with integer code and message string attributes.""" def __init__(self, code, msg): @@ -227,3 +227,9 @@ class FederationError(RuntimeError): "affected": self.affected, "source": self.source if self.source else self.affected, } + + +class HttpResponseException(CodeMessageException): + def __init__(self, code, msg, response): + self.response = response + super(HttpResponseException, self).__init__(code, msg) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index d6b8c43916..eb36ec040b 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -19,6 +19,7 @@ from twisted.internet import defer from .federation_base import FederationBase from .units import Edu +from synapse.api.errors import CodeMessageException from synapse.util.logutils import log_function from synapse.events import FrozenEvent @@ -180,7 +181,8 @@ class FederationClient(FederationBase): pdu = yield self._check_sigs_and_hash(pdu) break - + except CodeMessageException: + raise except Exception as e: logger.info( "Failed to get PDU %s from %s because %s", @@ -264,45 +266,63 @@ class FederationClient(FederationBase): defer.returnValue(self.event_from_pdu_json(pdu_dict)) break - except Exception as e: - logger.warn("Failed to make_join via %s", destination) + except CodeMessageException: + raise + except RuntimeError as e: + logger.warn( + "Failed to make_join via %s: %s", + destination, e.message + ) + + raise RuntimeError("Failed to send to any server.") @defer.inlineCallbacks - def send_join(self, destination, pdu): - time_now = self._clock.time_msec() - _, content = yield self.transport_layer.send_join( - destination=destination, - room_id=pdu.room_id, - event_id=pdu.event_id, - content=pdu.get_pdu_json(time_now), - ) + def send_join(self, destinations, pdu): + for destination in destinations: + try: + time_now = self._clock.time_msec() + _, content = yield self.transport_layer.send_join( + destination=destination, + room_id=pdu.room_id, + event_id=pdu.event_id, + content=pdu.get_pdu_json(time_now), + ) - logger.debug("Got content: %s", content) + logger.debug("Got content: %s", content) - state = [ - self.event_from_pdu_json(p, outlier=True) - for p in content.get("state", []) - ] + state = [ + self.event_from_pdu_json(p, outlier=True) + for p in content.get("state", []) + ] - auth_chain = [ - self.event_from_pdu_json(p, outlier=True) - for p in content.get("auth_chain", []) - ] + auth_chain = [ + self.event_from_pdu_json(p, outlier=True) + for p in content.get("auth_chain", []) + ] - signed_state = yield self._check_sigs_and_hash_and_fetch( - destination, state, outlier=True - ) + signed_state = yield self._check_sigs_and_hash_and_fetch( + destination, state, outlier=True + ) - signed_auth = yield self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True - ) + signed_auth = yield self._check_sigs_and_hash_and_fetch( + destination, auth_chain, outlier=True + ) - auth_chain.sort(key=lambda e: e.depth) + auth_chain.sort(key=lambda e: e.depth) + + defer.returnValue({ + "state": signed_state, + "auth_chain": signed_auth, + }) + except CodeMessageException: + raise + except RuntimeError as e: + logger.warn( + "Failed to send_join via %s: %s", + destination, e.message + ) - defer.returnValue({ - "state": signed_state, - "auth_chain": signed_auth, - }) + raise RuntimeError("Failed to send to any server.") @defer.inlineCallbacks def send_invite(self, destination, room_id, event_id, pdu): diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 9d4f2c09a2..f38aeba7cc 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -19,6 +19,7 @@ from twisted.internet import defer from .persistence import TransactionActions from .units import Transaction +from synapse.api.errors import HttpResponseException from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext @@ -238,9 +239,14 @@ class TransactionQueue(object): del p["age_ts"] return data - code, response = yield self.transport_layer.send_transaction( - transaction, json_data_cb - ) + try: + response = yield self.transport_layer.send_transaction( + transaction, json_data_cb + ) + code = 200 + except HttpResponseException as e: + code = e.code + response = e.response logger.info("TX [%s] got %d response", destination, code) @@ -274,8 +280,7 @@ class TransactionQueue(object): pass logger.debug("TX [%s] Yielded to callbacks", destination) - - except Exception as e: + except RuntimeError as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. logger.warn( @@ -283,6 +288,14 @@ class TransactionQueue(object): destination, e, ) + except Exception as e: + # We capture this here as there as nothing actually listens + # for this finishing functions deferred. + logger.exception( + "TX [%s] Problem in _attempt_transaction: %s", + destination, + e, + ) self.set_retrying(destination, retry_interval) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 4cb1dea2de..8b137e7128 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -19,7 +19,6 @@ from synapse.api.urls import FEDERATION_PREFIX as PREFIX from synapse.util.logutils import log_function import logging -import json logger = logging.getLogger(__name__) @@ -129,7 +128,7 @@ class TransportLayerClient(object): # generated by the json_data_callback. json_data = transaction.get_dict() - code, response = yield self.client.put_json( + response = yield self.client.put_json( transaction.destination, path=PREFIX + "/send/%s/" % transaction.transaction_id, data=json_data, @@ -137,95 +136,86 @@ class TransportLayerClient(object): ) logger.debug( - "send_data dest=%s, txid=%s, got response: %d", - transaction.destination, transaction.transaction_id, code + "send_data dest=%s, txid=%s, got response: 200", + transaction.destination, transaction.transaction_id, ) - defer.returnValue((code, response)) + defer.returnValue(response) @defer.inlineCallbacks @log_function def make_query(self, destination, query_type, args, retry_on_dns_fail): path = PREFIX + "/query/%s" % query_type - response = yield self.client.get_json( + content = yield self.client.get_json( destination=destination, path=path, args=args, retry_on_dns_fail=retry_on_dns_fail, ) - defer.returnValue(response) + defer.returnValue(content) @defer.inlineCallbacks @log_function def make_join(self, destination, room_id, user_id, retry_on_dns_fail=True): path = PREFIX + "/make_join/%s/%s" % (room_id, user_id) - response = yield self.client.get_json( + content = yield self.client.get_json( destination=destination, path=path, retry_on_dns_fail=retry_on_dns_fail, ) - defer.returnValue(response) + defer.returnValue(content) @defer.inlineCallbacks @log_function def send_join(self, destination, room_id, event_id, content): path = PREFIX + "/send_join/%s/%s" % (room_id, event_id) - code, content = yield self.client.put_json( + response = yield self.client.put_json( destination=destination, path=path, data=content, ) - if not 200 <= code < 300: - raise RuntimeError("Got %d from send_join", code) - - defer.returnValue(json.loads(content)) + defer.returnValue(response) @defer.inlineCallbacks @log_function def send_invite(self, destination, room_id, event_id, content): path = PREFIX + "/invite/%s/%s" % (room_id, event_id) - code, content = yield self.client.put_json( + response = yield self.client.put_json( destination=destination, path=path, data=content, ) - if not 200 <= code < 300: - raise RuntimeError("Got %d from send_invite", code) - - defer.returnValue(json.loads(content)) + defer.returnValue(response) @defer.inlineCallbacks @log_function def get_event_auth(self, destination, room_id, event_id): path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id) - response = yield self.client.get_json( + content = yield self.client.get_json( destination=destination, path=path, ) - defer.returnValue(response) + defer.returnValue(content) @defer.inlineCallbacks @log_function def send_query_auth(self, destination, room_id, event_id, content): path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id) - code, content = yield self.client.post_json( + content = yield self.client.post_json( destination=destination, path=path, data=content, ) - if not 200 <= code < 300: - raise RuntimeError("Got %d from send_invite", code) - - defer.returnValue(json.loads(content)) + defer.returnValue(content) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0876589e31..a968a87360 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -288,7 +288,7 @@ class FederationHandler(BaseHandler): logger.debug("Joining %s to %s", joinee, room_id) pdu = yield self.replication_layer.make_join( - target_host, + [target_host], room_id, joinee ) @@ -331,7 +331,7 @@ class FederationHandler(BaseHandler): new_event = builder.build() ret = yield self.replication_layer.send_join( - target_host, + [target_host], new_event ) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index c7bf1b47b8..8559d06b7f 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -27,7 +27,9 @@ from synapse.util.logcontext import PreserveLoggingContext from syutil.jsonutil import encode_canonical_json -from synapse.api.errors import CodeMessageException, SynapseError, Codes +from synapse.api.errors import ( + SynapseError, Codes, HttpResponseException, +) from syutil.crypto.jsonsign import sign_json @@ -163,13 +165,12 @@ class MatrixFederationHttpClient(object): ) if 200 <= response.code < 300: - # We need to update the transactions table to say it was sent? pass else: # :'( # Update transactions table? - raise CodeMessageException( - response.code, response.phrase + raise HttpResponseException( + response.code, response.phrase, response ) defer.returnValue(response) @@ -238,11 +239,20 @@ class MatrixFederationHttpClient(object): headers_dict={"Content-Type": ["application/json"]}, ) + if 200 <= response.code < 300: + # We need to update the transactions table to say it was sent? + c_type = response.headers.getRawHeaders("Content-Type") + + if "application/json" not in c_type: + raise RuntimeError( + "Content-Type not application/json" + ) + logger.debug("Getting resp body") body = yield readBody(response) logger.debug("Got resp body") - defer.returnValue((response.code, body)) + defer.returnValue(json.loads(body)) @defer.inlineCallbacks def post_json(self, destination, path, data={}): @@ -275,11 +285,20 @@ class MatrixFederationHttpClient(object): headers_dict={"Content-Type": ["application/json"]}, ) + if 200 <= response.code < 300: + # We need to update the transactions table to say it was sent? + c_type = response.headers.getRawHeaders("Content-Type") + + if "application/json" not in c_type: + raise RuntimeError( + "Content-Type not application/json" + ) + logger.debug("Getting resp body") body = yield readBody(response) logger.debug("Got resp body") - defer.returnValue((response.code, body)) + defer.returnValue(json.loads(body)) @defer.inlineCallbacks def get_json(self, destination, path, args={}, retry_on_dns_fail=True): @@ -321,7 +340,18 @@ class MatrixFederationHttpClient(object): retry_on_dns_fail=retry_on_dns_fail ) + if 200 <= response.code < 300: + # We need to update the transactions table to say it was sent? + c_type = response.headers.getRawHeaders("Content-Type") + + if "application/json" not in c_type: + raise RuntimeError( + "Content-Type not application/json" + ) + + logger.debug("Getting resp body") body = yield readBody(response) + logger.debug("Got resp body") defer.returnValue(json.loads(body)) -- cgit 1.4.1 From 26a041541baed887dc069c3667a86ddef81802bc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 Feb 2015 13:17:05 +0000 Subject: SYN-202: Log as WARN the 404 'Presence information not visible' errors instead of as ERROR since they were spamming the logs --- synapse/handlers/message.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 6fbd2af4ab..3f51f38f18 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import RoomError +from synapse.api.errors import RoomError, SynapseError from synapse.streams.config import PaginationConfig from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator @@ -372,10 +372,17 @@ class MessageHandler(BaseHandler): as_event=True, ) presence.append(member_presence) - except Exception: - logger.exception( - "Failed to get member presence of %r", m.user_id - ) + except SynapseError as e: + if e.code == 404: + # FIXME: We are doing this as a warn since this gets hit a + # lot and spams the logs. Why is this happening? + logger.warn( + "Failed to get member presence of %r", m.user_id + ) + else: + logger.exception( + "Failed to get member presence of %r", m.user_id + ) time_now = self.clock.time_msec() -- cgit 1.4.1 From e1515c3e91f2117adc3976b5e606728560ce9e96 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 Feb 2015 13:43:28 +0000 Subject: Pass through list of room hosts from room alias query to federation so that it can retry against different room hosts --- synapse/federation/federation_client.py | 5 ++++- synapse/handlers/federation.py | 20 +++++++++++++------- synapse/handlers/room.py | 12 +++++------- 3 files changed, 22 insertions(+), 15 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index eb36ec040b..9923b3fc0d 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -264,7 +264,9 @@ class FederationClient(FederationBase): logger.debug("Got response to make_join: %s", pdu_dict) - defer.returnValue(self.event_from_pdu_json(pdu_dict)) + defer.returnValue( + (destination, self.event_from_pdu_json(pdu_dict)) + ) break except CodeMessageException: raise @@ -313,6 +315,7 @@ class FederationClient(FederationBase): defer.returnValue({ "state": signed_state, "auth_chain": signed_auth, + "origin": destination, }) except CodeMessageException: raise diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 04a4689483..aba266c2bc 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -273,7 +273,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def do_invite_join(self, target_host, room_id, joinee, content, snapshot): + def do_invite_join(self, target_hosts, room_id, joinee, content, snapshot): """ Attempts to join the `joinee` to the room `room_id` via the server `target_host`. @@ -287,8 +287,8 @@ class FederationHandler(BaseHandler): """ logger.debug("Joining %s to %s", joinee, room_id) - pdu = yield self.replication_layer.make_join( - [target_host], + origin, pdu = yield self.replication_layer.make_join( + target_hosts, room_id, joinee ) @@ -330,11 +330,17 @@ class FederationHandler(BaseHandler): new_event = builder.build() + # Try the host we successfully got a response to /make_join/ + # request first. + target_hosts.remove(origin) + target_hosts.insert(0, origin) + ret = yield self.replication_layer.send_join( - [target_host], + target_hosts, new_event ) + origin = ret["origin"] state = ret["state"] auth_chain = ret["auth_chain"] auth_chain.sort(key=lambda e: e.depth) @@ -371,7 +377,7 @@ class FederationHandler(BaseHandler): if e.event_id in auth_ids } yield self._handle_new_event( - target_host, e, auth_events=auth + origin, e, auth_events=auth ) except: logger.exception( @@ -391,7 +397,7 @@ class FederationHandler(BaseHandler): if e.event_id in auth_ids } yield self._handle_new_event( - target_host, e, auth_events=auth + origin, e, auth_events=auth ) except: logger.exception( @@ -406,7 +412,7 @@ class FederationHandler(BaseHandler): } yield self._handle_new_event( - target_host, + origin, new_event, state=state, current_state=state, diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 23821d321f..0369b907a5 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -389,8 +389,6 @@ class RoomMemberHandler(BaseHandler): if not hosts: raise SynapseError(404, "No known servers") - host = hosts[0] - # If event doesn't include a display name, add one. yield self.distributor.fire( "collect_presencelike_data", joinee, content @@ -407,12 +405,12 @@ class RoomMemberHandler(BaseHandler): }) event, context = yield self._create_new_client_event(builder) - yield self._do_join(event, context, room_host=host, do_auth=True) + yield self._do_join(event, context, room_hosts=hosts, do_auth=True) defer.returnValue({"room_id": room_id}) @defer.inlineCallbacks - def _do_join(self, event, context, room_host=None, do_auth=True): + def _do_join(self, event, context, room_hosts=None, do_auth=True): joinee = UserID.from_string(event.state_key) # room_id = RoomID.from_string(event.room_id, self.hs) room_id = event.room_id @@ -441,7 +439,7 @@ class RoomMemberHandler(BaseHandler): if is_host_in_room: should_do_dance = False - elif room_host: # TODO: Shouldn't this be remote_room_host? + elif room_hosts: # TODO: Shouldn't this be remote_room_host? should_do_dance = True else: # TODO(markjh): get prev_state from snapshot @@ -453,7 +451,7 @@ class RoomMemberHandler(BaseHandler): inviter = UserID.from_string(prev_state.user_id) should_do_dance = not self.hs.is_mine(inviter) - room_host = inviter.domain + room_hosts = [inviter.domain] else: # return the same error as join_room_alias does raise SynapseError(404, "No known servers") @@ -461,7 +459,7 @@ class RoomMemberHandler(BaseHandler): if should_do_dance: handler = self.hs.get_handlers().federation_handler yield handler.do_invite_join( - room_host, + room_hosts, room_id, event.user_id, event.get_dict()["content"], # FIXME To get a non-frozen dict -- cgit 1.4.1