From 2e223a8c22ef7f65aa42fd149f178a842b60e3c7 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 31 Oct 2018 04:24:59 +1100 Subject: Remove the unused /pull federation API (#4118) --- synapse/federation/federation_server.py | 5 ----- synapse/federation/transport/server.py | 9 --------- 2 files changed, 14 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 0f9302a6a8..fa2cc550e2 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -323,11 +323,6 @@ class FederationServer(FederationBase): else: defer.returnValue((404, "")) - @defer.inlineCallbacks - @log_function - def on_pull_request(self, origin, versions): - raise NotImplementedError("Pull transactions not implemented") - @defer.inlineCallbacks def on_query_request(self, query_type, args): received_queries_counter.labels(query_type).inc() diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 7288d49074..3553f418f1 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -362,14 +362,6 @@ class FederationSendServlet(BaseFederationServlet): defer.returnValue((code, response)) -class FederationPullServlet(BaseFederationServlet): - PATH = "/pull/" - - # This is for when someone asks us for everything since version X - def on_GET(self, origin, content, query): - return self.handler.on_pull_request(query["origin"][0], query["v"]) - - class FederationEventServlet(BaseFederationServlet): PATH = "/event/(?P[^/]*)/" @@ -1261,7 +1253,6 @@ class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet): FEDERATION_SERVLET_CLASSES = ( FederationSendServlet, - FederationPullServlet, FederationEventServlet, FederationStateServlet, FederationStateIdsServlet, -- cgit 1.4.1 From b86d05a279051b59d711c5a1982474994f302803 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 2 Nov 2018 13:44:12 +0000 Subject: Clean up event accesses and tests This is in preparation to refactor FrozenEvent to support different event formats for different room versions --- synapse/federation/units.py | 3 --- synapse/push/httppusher.py | 4 ++-- synapse/push/push_rule_evaluator.py | 4 ++-- tests/replication/slave/storage/test_events.py | 4 ++-- tests/test_federation.py | 2 +- 5 files changed, 7 insertions(+), 10 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/units.py b/synapse/federation/units.py index c5ab14314e..025a79c022 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -117,9 +117,6 @@ class Transaction(JsonEncodedObject): "Require 'transaction_id' to construct a Transaction" ) - for p in pdus: - p.transaction_id = kwargs["transaction_id"] - kwargs["pdus"] = [p.get_pdu_json() for p in pdus] return Transaction(**kwargs) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 6bd703632d..87fa7f006a 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -311,10 +311,10 @@ class HttpPusher(object): ] } } - if event.type == 'm.room.member': + if event.type == 'm.room.member' and event.is_state(): d['notification']['membership'] = event.content['membership'] d['notification']['user_is_target'] = event.state_key == self.user_id - if self.hs.config.push_include_content and 'content' in event: + if self.hs.config.push_include_content and event.content: d['notification']['content'] = event.content # We no longer send aliases separately, instead, we send the human diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 2bd321d530..cf6c8b875e 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -124,7 +124,7 @@ class PushRuleEvaluatorForEvent(object): # XXX: optimisation: cache our pattern regexps if condition['key'] == 'content.body': - body = self._event["content"].get("body", None) + body = self._event.content.get("body", None) if not body: return False @@ -140,7 +140,7 @@ class PushRuleEvaluatorForEvent(object): if not display_name: return False - body = self._event["content"].get("body", None) + body = self._event.content.get("body", None) if not body: return False diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 41be5d5a1a..1688a741d1 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -28,8 +28,8 @@ ROOM_ID = "!room:blue" def dict_equals(self, other): - me = encode_canonical_json(self._event_dict) - them = encode_canonical_json(other._event_dict) + me = encode_canonical_json(self.get_pdu_json()) + them = encode_canonical_json(other.get_pdu_json()) return me == them diff --git a/tests/test_federation.py b/tests/test_federation.py index 952a0a7b51..e1a34ccffd 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -112,7 +112,7 @@ class MessageAcceptTests(unittest.TestCase): "origin_server_ts": 1, "type": "m.room.message", "origin": "test.serv", - "content": "hewwo?", + "content": {"body": "hewwo?"}, "auth_events": [], "prev_events": [("two:test.serv", {}), (most_recent, {})], } -- cgit 1.4.1 From bc80b3f454aa9b9ca8bc710ff502b83892ac0a91 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Nov 2018 13:35:15 +0000 Subject: Add helpers for getting prev and auth events (#4139) * Add helpers for getting prev and auth events This is in preparation for allowing the event format to change between room versions. --- changelog.d/4139.misc | 1 + synapse/event_auth.py | 4 +-- synapse/events/__init__.py | 18 +++++++++++++ synapse/federation/transaction_queue.py | 4 +-- synapse/handlers/federation.py | 48 ++++++++++++++++----------------- synapse/state/__init__.py | 2 +- synapse/state/v2.py | 16 +++++------ synapse/storage/event_federation.py | 4 +-- synapse/storage/events.py | 8 +++--- tests/state/test_v2.py | 2 +- 10 files changed, 62 insertions(+), 45 deletions(-) create mode 100644 changelog.d/4139.misc (limited to 'synapse/federation') diff --git a/changelog.d/4139.misc b/changelog.d/4139.misc new file mode 100644 index 0000000000..d63d9e7003 --- /dev/null +++ b/changelog.d/4139.misc @@ -0,0 +1 @@ +Add helpers functions for getting prev and auth events of an event diff --git a/synapse/event_auth.py b/synapse/event_auth.py index d4d4474847..c81d8e6729 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -200,11 +200,11 @@ def _is_membership_change_allowed(event, auth_events): membership = event.content["membership"] # Check if this is the room creator joining: - if len(event.prev_events) == 1 and Membership.JOIN == membership: + if len(event.prev_event_ids()) == 1 and Membership.JOIN == membership: # Get room creation event: key = (EventTypes.Create, "", ) create = auth_events.get(key) - if create and event.prev_events[0][0] == create.event_id: + if create and event.prev_event_ids()[0] == create.event_id: if create.content["creator"] == event.state_key: return diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 12f1eb0a3e..84c75495d5 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -159,6 +159,24 @@ class EventBase(object): def keys(self): return six.iterkeys(self._event_dict) + def prev_event_ids(self): + """Returns the list of prev event IDs. The order matches the order + specified in the event, though there is no meaning to it. + + Returns: + list[str]: The list of event IDs of this event's prev_events + """ + return [e for e, _ in self.prev_events] + + def auth_event_ids(self): + """Returns the list of auth event IDs. The order matches the order + specified in the event, though there is no meaning to it. + + Returns: + list[str]: The list of event IDs of this event's auth_events + """ + return [e for e, _ in self.auth_events] + class FrozenEvent(EventBase): def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None): diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 3fdd63be95..099ace28c1 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -183,9 +183,7 @@ class TransactionQueue(object): # banned then it won't receive the event because it won't # be in the room after the ban. destinations = yield self.state.get_current_hosts_in_room( - event.room_id, latest_event_ids=[ - prev_id for prev_id, _ in event.prev_events - ], + event.room_id, latest_event_ids=event.prev_event_ids(), ) except Exception: logger.exception( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index cd5b9bbb19..9ca5fd8724 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -239,7 +239,7 @@ class FederationHandler(BaseHandler): room_id, event_id, min_depth, ) - prevs = {e_id for e_id, _ in pdu.prev_events} + prevs = set(pdu.prev_event_ids()) seen = yield self.store.have_seen_events(prevs) if min_depth and pdu.depth < min_depth: @@ -607,7 +607,7 @@ class FederationHandler(BaseHandler): if e.event_id in seen_ids: continue e.internal_metadata.outlier = True - auth_ids = [e_id for e_id, _ in e.auth_events] + auth_ids = e.auth_event_ids() auth = { (e.type, e.state_key): e for e in auth_chain if e.event_id in auth_ids or e.type == EventTypes.Create @@ -726,7 +726,7 @@ class FederationHandler(BaseHandler): edges = [ ev.event_id for ev in events - if set(e_id for e_id, _ in ev.prev_events) - event_ids + if set(ev.prev_event_ids()) - event_ids ] logger.info( @@ -753,7 +753,7 @@ class FederationHandler(BaseHandler): required_auth = set( a_id for event in events + list(state_events.values()) + list(auth_events.values()) - for a_id, _ in event.auth_events + for a_id in event.auth_event_ids() ) auth_events.update({ e_id: event_map[e_id] for e_id in required_auth if e_id in event_map @@ -769,7 +769,7 @@ class FederationHandler(BaseHandler): auth_events.update(ret_events) required_auth.update( - a_id for event in ret_events.values() for a_id, _ in event.auth_events + a_id for event in ret_events.values() for a_id in event.auth_event_ids() ) missing_auth = required_auth - set(auth_events) @@ -796,7 +796,7 @@ class FederationHandler(BaseHandler): required_auth.update( a_id for event in results if event - for a_id, _ in event.auth_events + for a_id in event.auth_event_ids() ) missing_auth = required_auth - set(auth_events) @@ -816,7 +816,7 @@ class FederationHandler(BaseHandler): "auth_events": { (auth_events[a_id].type, auth_events[a_id].state_key): auth_events[a_id] - for a_id, _ in a.auth_events + for a_id in a.auth_event_ids() if a_id in auth_events } }) @@ -828,7 +828,7 @@ class FederationHandler(BaseHandler): "auth_events": { (auth_events[a_id].type, auth_events[a_id].state_key): auth_events[a_id] - for a_id, _ in event_map[e_id].auth_events + for a_id in event_map[e_id].auth_event_ids() if a_id in auth_events } }) @@ -1041,17 +1041,17 @@ class FederationHandler(BaseHandler): Raises: SynapseError if the event does not pass muster """ - if len(ev.prev_events) > 20: + if len(ev.prev_event_ids()) > 20: logger.warn("Rejecting event %s which has %i prev_events", - ev.event_id, len(ev.prev_events)) + ev.event_id, len(ev.prev_event_ids())) raise SynapseError( http_client.BAD_REQUEST, "Too many prev_events", ) - if len(ev.auth_events) > 10: + if len(ev.auth_event_ids()) > 10: logger.warn("Rejecting event %s which has %i auth_events", - ev.event_id, len(ev.auth_events)) + ev.event_id, len(ev.auth_event_ids())) raise SynapseError( http_client.BAD_REQUEST, "Too many auth_events", @@ -1076,7 +1076,7 @@ class FederationHandler(BaseHandler): def on_event_auth(self, event_id): event = yield self.store.get_event(event_id) auth = yield self.store.get_auth_chain( - [auth_id for auth_id, _ in event.auth_events], + [auth_id for auth_id in event.auth_event_ids()], include_given=True ) defer.returnValue([e for e in auth]) @@ -1698,7 +1698,7 @@ class FederationHandler(BaseHandler): missing_auth_events = set() for e in itertools.chain(auth_events, state, [event]): - for e_id, _ in e.auth_events: + for e_id in e.auth_event_ids(): if e_id not in event_map: missing_auth_events.add(e_id) @@ -1717,7 +1717,7 @@ class FederationHandler(BaseHandler): for e in itertools.chain(auth_events, state, [event]): auth_for_e = { (event_map[e_id].type, event_map[e_id].state_key): event_map[e_id] - for e_id, _ in e.auth_events + for e_id in e.auth_event_ids() if e_id in event_map } if create_event: @@ -1785,10 +1785,10 @@ class FederationHandler(BaseHandler): # This is a hack to fix some old rooms where the initial join event # didn't reference the create event in its auth events. - if event.type == EventTypes.Member and not event.auth_events: - if len(event.prev_events) == 1 and event.depth < 5: + if event.type == EventTypes.Member and not event.auth_event_ids(): + if len(event.prev_event_ids()) == 1 and event.depth < 5: c = yield self.store.get_event( - event.prev_events[0][0], + event.prev_event_ids()[0], allow_none=True, ) if c and c.type == EventTypes.Create: @@ -1835,7 +1835,7 @@ class FederationHandler(BaseHandler): # Now get the current auth_chain for the event. local_auth_chain = yield self.store.get_auth_chain( - [auth_id for auth_id, _ in event.auth_events], + [auth_id for auth_id in event.auth_event_ids()], include_given=True ) @@ -1891,7 +1891,7 @@ class FederationHandler(BaseHandler): """ # Check if we have all the auth events. current_state = set(e.event_id for e in auth_events.values()) - event_auth_events = set(e_id for e_id, _ in event.auth_events) + event_auth_events = set(event.auth_event_ids()) if event.is_state(): event_key = (event.type, event.state_key) @@ -1935,7 +1935,7 @@ class FederationHandler(BaseHandler): continue try: - auth_ids = [e_id for e_id, _ in e.auth_events] + auth_ids = e.auth_event_ids() auth = { (e.type, e.state_key): e for e in remote_auth_chain if e.event_id in auth_ids or e.type == EventTypes.Create @@ -1956,7 +1956,7 @@ class FederationHandler(BaseHandler): pass have_events = yield self.store.get_seen_events_with_rejections( - [e_id for e_id, _ in event.auth_events] + event.auth_event_ids() ) seen_events = set(have_events.keys()) except Exception: @@ -2058,7 +2058,7 @@ class FederationHandler(BaseHandler): continue try: - auth_ids = [e_id for e_id, _ in ev.auth_events] + auth_ids = ev.auth_event_ids() auth = { (e.type, e.state_key): e for e in result["auth_chain"] @@ -2250,7 +2250,7 @@ class FederationHandler(BaseHandler): missing_remote_ids = [e.event_id for e in missing_remotes] base_remote_rejected = list(missing_remotes) for e in missing_remotes: - for e_id, _ in e.auth_events: + for e_id in e.auth_event_ids(): if e_id in missing_remote_ids: try: base_remote_rejected.remove(e) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 943d5d6bb5..70048b0c09 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -261,7 +261,7 @@ class StateHandler(object): logger.debug("calling resolve_state_groups from compute_event_context") entry = yield self.resolve_state_groups_for_events( - event.room_id, [e for e, _ in event.prev_events], + event.room_id, event.prev_event_ids(), ) prev_state_ids = entry.state diff --git a/synapse/state/v2.py b/synapse/state/v2.py index dbc9688c56..3573bb0028 100644 --- a/synapse/state/v2.py +++ b/synapse/state/v2.py @@ -159,7 +159,7 @@ def _get_power_level_for_sender(event_id, event_map, state_res_store): event = yield _get_event(event_id, event_map, state_res_store) pl = None - for aid, _ in event.auth_events: + for aid in event.auth_event_ids(): aev = yield _get_event(aid, event_map, state_res_store) if (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""): pl = aev @@ -167,7 +167,7 @@ def _get_power_level_for_sender(event_id, event_map, state_res_store): if pl is None: # Couldn't find power level. Check if they're the creator of the room - for aid, _ in event.auth_events: + for aid in event.auth_event_ids(): aev = yield _get_event(aid, event_map, state_res_store) if (aev.type, aev.state_key) == (EventTypes.Create, ""): if aev.content.get("creator") == event.sender: @@ -299,7 +299,7 @@ def _add_event_and_auth_chain_to_graph(graph, event_id, event_map, graph.setdefault(eid, set()) event = yield _get_event(eid, event_map, state_res_store) - for aid, _ in event.auth_events: + for aid in event.auth_event_ids(): if aid in auth_diff: if aid not in graph: state.append(aid) @@ -369,7 +369,7 @@ def _iterative_auth_checks(event_ids, base_state, event_map, state_res_store): event = event_map[event_id] auth_events = {} - for aid, _ in event.auth_events: + for aid in event.auth_event_ids(): ev = yield _get_event(aid, event_map, state_res_store) if ev.rejected_reason is None: @@ -417,9 +417,9 @@ def _mainline_sort(event_ids, resolved_power_event_id, event_map, while pl: mainline.append(pl) pl_ev = yield _get_event(pl, event_map, state_res_store) - auth_events = pl_ev.auth_events + auth_events = pl_ev.auth_event_ids() pl = None - for aid, _ in auth_events: + for aid in auth_events: ev = yield _get_event(aid, event_map, state_res_store) if (ev.type, ev.state_key) == (EventTypes.PowerLevels, ""): pl = aid @@ -464,10 +464,10 @@ def _get_mainline_depth_for_event(event, mainline_map, event_map, state_res_stor if depth is not None: defer.returnValue(depth) - auth_events = event.auth_events + auth_events = event.auth_event_ids() event = None - for aid, _ in auth_events: + for aid in auth_events: aev = yield _get_event(aid, event_map, state_res_store) if (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""): event = aev diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 3faca2a042..d3b9dea1d6 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -477,7 +477,7 @@ class EventFederationStore(EventFederationWorkerStore): "is_state": False, } for ev in events - for e_id, _ in ev.prev_events + for e_id in ev.prev_event_ids() ], ) @@ -510,7 +510,7 @@ class EventFederationStore(EventFederationWorkerStore): txn.executemany(query, [ (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False) - for ev in events for e_id, _ in ev.prev_events + for ev in events for e_id in ev.prev_event_ids() if not ev.internal_metadata.is_outlier() ]) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 919e855f3b..2047110b1d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -416,7 +416,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore ) if len_1: all_single_prev_not_state = all( - len(event.prev_events) == 1 + len(event.prev_event_ids()) == 1 and not event.is_state() for event, ctx in ev_ctx_rm ) @@ -440,7 +440,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # guess this by looking at the prev_events and checking # if they match the current forward extremities. for ev, _ in ev_ctx_rm: - prev_event_ids = set(e for e, _ in ev.prev_events) + prev_event_ids = set(ev.prev_event_ids()) if latest_event_ids == prev_event_ids: state_delta_reuse_delta_counter.inc() break @@ -551,7 +551,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore result.difference_update( e_id for event in new_events - for e_id, _ in event.prev_events + for e_id in event.prev_event_ids() ) # Finally, remove any events which are prev_events of any existing events. @@ -869,7 +869,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore "auth_id": auth_id, } for event, _ in events_and_contexts - for auth_id, _ in event.auth_events + for auth_id in event.auth_event_ids() if event.is_state() ], ) diff --git a/tests/state/test_v2.py b/tests/state/test_v2.py index d67f59b2c7..2e073a3afc 100644 --- a/tests/state/test_v2.py +++ b/tests/state/test_v2.py @@ -753,7 +753,7 @@ class TestStateResolutionStore(object): result.add(event_id) event = self.event_map[event_id] - for aid, _ in event.auth_events: + for aid in event.auth_event_ids(): stack.append(aid) return list(result) -- cgit 1.4.1 From 9417986f7799bdcd609db16c2c964af09d9fe9b5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Nov 2018 12:11:20 +0000 Subject: Drop PDUs of unknown rooms When we receive events over federation we will need to know the room version to be able to correctly handle them, e.g. once we start changing event formats. Currently, we attempt to handle events in unknown rooms. --- synapse/federation/federation_server.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index fa2cc550e2..38949fb49a 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -162,8 +162,30 @@ class FederationServer(FederationBase): p["age_ts"] = request_time - int(p["age"]) del p["age"] + # We try and pull out an event ID so that if latter checks fail we + # can log something sensible. We don't mandate an event ID here in + # case future event formats get rid of the key. + possible_event_id = p.get("event_id", "") + + # Now we get the room ID so that we can check that we know the + # version of the room. + room_id = p.get("room_id") + if not room_id: + logger.info( + "Ignoring PDU as does not have a room_id. Event ID: %s", + possible_event_id, + ) + continue + + try: + # In future we will actually use the room version to parse the + # PDU into an event. + yield self.store.get_room_version(room_id) + except NotFoundError: + logger.info("Ignoring PUD for unknown room_id: %s", room_id) + continue + event = event_from_pdu_json(p) - room_id = event.room_id pdus_by_room.setdefault(room_id, []).append(event) pdu_results = {} -- cgit 1.4.1 From 9bce065a53799eb6403480736630ca5d7fdfe19c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 9 Nov 2018 11:28:22 +0000 Subject: Update synapse/federation/federation_server.py Co-Authored-By: erikjohnston --- synapse/federation/federation_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 38949fb49a..3bdc6024f8 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -162,7 +162,7 @@ class FederationServer(FederationBase): p["age_ts"] = request_time - int(p["age"]) del p["age"] - # We try and pull out an event ID so that if latter checks fail we + # We try and pull out an event ID so that if later checks fail we # can log something sensible. We don't mandate an event ID here in # case future event formats get rid of the key. possible_event_id = p.get("event_id", "") -- cgit 1.4.1 From 3cecf5340d4e2ad8944a79b0fd4213e242791cbe Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 9 Nov 2018 11:28:25 +0000 Subject: Update synapse/federation/federation_server.py Co-Authored-By: erikjohnston --- synapse/federation/federation_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 3bdc6024f8..98722ae543 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -182,7 +182,7 @@ class FederationServer(FederationBase): # PDU into an event. yield self.store.get_room_version(room_id) except NotFoundError: - logger.info("Ignoring PUD for unknown room_id: %s", room_id) + logger.info("Ignoring PDU for unknown room_id: %s", room_id) continue event = event_from_pdu_json(p) -- cgit 1.4.1 From b970cb0e9618c636e6df3b85a3a85c47bc4ca64a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 Jan 2019 11:04:28 +0000 Subject: Refactor request sending to have better excpetions (#4358) * Correctly retry and back off if we get a HTTPerror response * Refactor request sending to have better excpetions MatrixFederationHttpClient blindly reraised exceptions to the caller without differentiating "expected" failures (e.g. connection timeouts etc) versus more severe problems (e.g. programming errors). This commit adds a RequestSendFailed exception that is raised when "expected" failures happen, allowing the TransactionQueue to log them as warnings while allowing us to log other exceptions as actual exceptions. --- changelog.d/4358.misc | 1 + synapse/api/errors.py | 18 +++++ synapse/federation/transaction_queue.py | 19 ++++-- synapse/http/matrixfederationclient.py | 105 ++++++++++++++++++++---------- synapse/rest/media/v1/media_repository.py | 7 +- tests/http/test_fedclient.py | 13 +++- 6 files changed, 119 insertions(+), 44 deletions(-) create mode 100644 changelog.d/4358.misc (limited to 'synapse/federation') diff --git a/changelog.d/4358.misc b/changelog.d/4358.misc new file mode 100644 index 0000000000..020dacb547 --- /dev/null +++ b/changelog.d/4358.misc @@ -0,0 +1 @@ +Add better logging for unexpected errors while sending transactions diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 48b903374d..0b464834ce 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -348,6 +348,24 @@ class IncompatibleRoomVersionError(SynapseError): ) +class RequestSendFailed(RuntimeError): + """Sending a HTTP request over federation failed due to not being able to + talk to the remote server for some reason. + + This exception is used to differentiate "expected" errors that arise due to + networking (e.g. DNS failures, connection timeouts etc), versus unexpected + errors (like programming errors). + """ + def __init__(self, inner_exception, can_retry): + super(RequestSendFailed, self).__init__( + "Failed to send request: %s: %s" % ( + type(inner_exception).__name__, inner_exception, + ) + ) + self.inner_exception = inner_exception + self.can_retry = can_retry + + def cs_error(msg, code=Codes.UNKNOWN, **kwargs): """ Utility method for constructing an error response for client-server interactions. diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 099ace28c1..4640513497 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -22,7 +22,11 @@ from prometheus_client import Counter from twisted.internet import defer import synapse.metrics -from synapse.api.errors import FederationDeniedError, HttpResponseException +from synapse.api.errors import ( + FederationDeniedError, + HttpResponseException, + RequestSendFailed, +) from synapse.handlers.presence import format_user_presence_state, get_interested_remotes from synapse.metrics import ( LaterGauge, @@ -518,11 +522,16 @@ class TransactionQueue(object): ) except FederationDeniedError as e: logger.info(e) - except Exception as e: - logger.warn( - "TX [%s] Failed to send transaction: %s", + except RequestSendFailed as e: + logger.warning("(TX [%s] Failed to send transaction: %s", destination, e) + + for p, _ in pending_pdus: + logger.info("Failed to send event %s to %s", p.event_id, + destination) + except Exception: + logger.exception( + "TX [%s] Failed to send transaction", destination, - e, ) for p, _ in pending_pdus: logger.info("Failed to send event %s to %s", p.event_id, diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 24b6110c20..7a2b4f0957 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -19,7 +19,7 @@ import random import sys from io import BytesIO -from six import PY3, string_types +from six import PY3, raise_from, string_types from six.moves import urllib import attr @@ -41,6 +41,7 @@ from synapse.api.errors import ( Codes, FederationDeniedError, HttpResponseException, + RequestSendFailed, SynapseError, ) from synapse.http.endpoint import matrix_federation_endpoint @@ -231,7 +232,7 @@ class MatrixFederationHttpClient(object): Deferred: resolves with the http response object on success. Fails with ``HttpResponseException``: if we get an HTTP response - code >= 300. + code >= 300 (except 429). Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. @@ -239,8 +240,8 @@ class MatrixFederationHttpClient(object): Fails with ``FederationDeniedError`` if this destination is not on our federation whitelist - (May also fail with plenty of other Exceptions for things like DNS - failures, connection failures, SSL failures.) + Fails with ``RequestSendFailed`` if there were problems connecting to + the remote, due to e.g. DNS failures, connection timeouts etc. """ if timeout: _sec_timeout = timeout / 1000 @@ -335,23 +336,74 @@ class MatrixFederationHttpClient(object): reactor=self.hs.get_reactor(), ) - with Measure(self.clock, "outbound_request"): - response = yield make_deferred_yieldable( - request_deferred, + try: + with Measure(self.clock, "outbound_request"): + response = yield make_deferred_yieldable( + request_deferred, + ) + except DNSLookupError as e: + raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e) + except Exception as e: + raise_from(RequestSendFailed(e, can_retry=True), e) + + logger.info( + "{%s} [%s] Got response headers: %d %s", + request.txn_id, + request.destination, + response.code, + response.phrase.decode('ascii', errors='replace'), + ) + + if 200 <= response.code < 300: + pass + else: + # :'( + # Update transactions table? + d = treq.content(response) + d = timeout_deferred( + d, + timeout=_sec_timeout, + reactor=self.hs.get_reactor(), ) + try: + body = yield make_deferred_yieldable(d) + except Exception as e: + # Eh, we're already going to raise an exception so lets + # ignore if this fails. + logger.warn( + "{%s} [%s] Failed to get error response: %s %s: %s", + request.txn_id, + request.destination, + request.method, + url_str, + _flatten_response_never_received(e), + ) + body = None + + e = HttpResponseException( + response.code, response.phrase, body + ) + + # Retry if the error is a 429 (Too Many Requests), + # otherwise just raise a standard HttpResponseException + if response.code == 429: + raise_from(RequestSendFailed(e, can_retry=True), e) + else: + raise e + break - except Exception as e: + except RequestSendFailed as e: logger.warn( "{%s} [%s] Request failed: %s %s: %s", request.txn_id, request.destination, request.method, url_str, - _flatten_response_never_received(e), + _flatten_response_never_received(e.inner_exception), ) - if not retry_on_dns_fail and isinstance(e, DNSLookupError): + if not e.can_retry: raise if retries_left and not timeout: @@ -376,29 +428,16 @@ class MatrixFederationHttpClient(object): else: raise - logger.info( - "{%s} [%s] Got response headers: %d %s", - request.txn_id, - request.destination, - response.code, - response.phrase.decode('ascii', errors='replace'), - ) - - if 200 <= response.code < 300: - pass - else: - # :'( - # Update transactions table? - d = treq.content(response) - d = timeout_deferred( - d, - timeout=_sec_timeout, - reactor=self.hs.get_reactor(), - ) - body = yield make_deferred_yieldable(d) - raise HttpResponseException( - response.code, response.phrase, body - ) + except Exception as e: + logger.warn( + "{%s} [%s] Request failed: %s %s: %s", + request.txn_id, + request.destination, + request.method, + url_str, + _flatten_response_never_received(e), + ) + raise defer.returnValue(response) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index e117836e9a..bdffa97805 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -30,6 +30,7 @@ from synapse.api.errors import ( FederationDeniedError, HttpResponseException, NotFoundError, + RequestSendFailed, SynapseError, ) from synapse.metrics.background_process_metrics import run_as_background_process @@ -372,10 +373,10 @@ class MediaRepository(object): "allow_remote": "false", } ) - except twisted.internet.error.DNSLookupError as e: - logger.warn("HTTP error fetching remote media %s/%s: %r", + except RequestSendFailed as e: + logger.warn("Request failed fetching remote media %s/%s: %r", server_name, media_id, e) - raise NotFoundError() + raise SynapseError(502, "Failed to fetch remote media") except HttpResponseException as e: logger.warn("HTTP error fetching remote media %s/%s: %s", diff --git a/tests/http/test_fedclient.py b/tests/http/test_fedclient.py index f3cb1423f0..b2e38276d8 100644 --- a/tests/http/test_fedclient.py +++ b/tests/http/test_fedclient.py @@ -20,6 +20,7 @@ from twisted.internet.error import ConnectingCancelledError, DNSLookupError from twisted.web.client import ResponseNeverReceived from twisted.web.http import HTTPChannel +from synapse.api.errors import RequestSendFailed from synapse.http.matrixfederationclient import ( MatrixFederationHttpClient, MatrixFederationRequest, @@ -49,7 +50,8 @@ class FederationClientTests(HomeserverTestCase): self.pump() f = self.failureResultOf(d) - self.assertIsInstance(f.value, DNSLookupError) + self.assertIsInstance(f.value, RequestSendFailed) + self.assertIsInstance(f.value.inner_exception, DNSLookupError) def test_client_never_connect(self): """ @@ -76,7 +78,11 @@ class FederationClientTests(HomeserverTestCase): self.reactor.advance(10.5) f = self.failureResultOf(d) - self.assertIsInstance(f.value, (ConnectingCancelledError, TimeoutError)) + self.assertIsInstance(f.value, RequestSendFailed) + self.assertIsInstance( + f.value.inner_exception, + (ConnectingCancelledError, TimeoutError), + ) def test_client_connect_no_response(self): """ @@ -107,7 +113,8 @@ class FederationClientTests(HomeserverTestCase): self.reactor.advance(10.5) f = self.failureResultOf(d) - self.assertIsInstance(f.value, ResponseNeverReceived) + self.assertIsInstance(f.value, RequestSendFailed) + self.assertIsInstance(f.value.inner_exception, ResponseNeverReceived) def test_client_gets_headers(self): """ -- cgit 1.4.1 From 1371d5b798dc308bef8b0adbf3229919981fafa4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 Jan 2019 12:24:48 +0000 Subject: Don't log stack traces for HTTP error responses --- synapse/federation/transaction_queue.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 4640513497..fe787abaeb 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -522,8 +522,13 @@ class TransactionQueue(object): ) except FederationDeniedError as e: logger.info(e) + except HttpResponseException as e: + logger.warning( + "TX [%s] Received %d response to transaction: %s", + destination, e.code, e, + ) except RequestSendFailed as e: - logger.warning("(TX [%s] Failed to send transaction: %s", destination, e) + logger.warning("TX [%s] Failed to send transaction: %s", destination, e) for p, _ in pending_pdus: logger.info("Failed to send event %s to %s", p.event_id, -- cgit 1.4.1 From bb63e7ca4f989d980de9925adcf14f0393ac0406 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 15 Jan 2019 11:14:34 +0000 Subject: Add groundwork for new versions of federation APIs --- synapse/api/urls.py | 3 +- synapse/federation/transport/client.py | 132 +++++++++++++++++---------------- synapse/federation/transport/server.py | 6 +- 3 files changed, 73 insertions(+), 68 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/api/urls.py b/synapse/api/urls.py index f78695b657..ba019001ff 100644 --- a/synapse/api/urls.py +++ b/synapse/api/urls.py @@ -24,7 +24,8 @@ from synapse.config import ConfigError CLIENT_PREFIX = "/_matrix/client/api/v1" CLIENT_V2_ALPHA_PREFIX = "/_matrix/client/v2_alpha" -FEDERATION_PREFIX = "/_matrix/federation/v1" +FEDERATION_PREFIX = "/_matrix/federation" +FEDERATION_V1_PREFIX = FEDERATION_PREFIX + "/v1" STATIC_PREFIX = "/_matrix/static" WEB_CLIENT_PREFIX = "/_matrix/client" CONTENT_REPO_PREFIX = "/_matrix/content" diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index edba5a9808..260178c47b 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -21,7 +21,7 @@ from six.moves import urllib from twisted.internet import defer from synapse.api.constants import Membership -from synapse.api.urls import FEDERATION_PREFIX as PREFIX +from synapse.api.urls import FEDERATION_V1_PREFIX from synapse.util.logutils import log_function logger = logging.getLogger(__name__) @@ -51,7 +51,7 @@ class TransportLayerClient(object): logger.debug("get_room_state dest=%s, room=%s", destination, room_id) - path = _create_path(PREFIX, "/state/%s/", room_id) + path = _create_v1_path("/state/%s/", room_id) return self.client.get_json( destination, path=path, args={"event_id": event_id}, ) @@ -73,7 +73,7 @@ class TransportLayerClient(object): logger.debug("get_room_state_ids dest=%s, room=%s", destination, room_id) - path = _create_path(PREFIX, "/state_ids/%s/", room_id) + path = _create_v1_path("/state_ids/%s/", room_id) return self.client.get_json( destination, path=path, args={"event_id": event_id}, ) @@ -95,7 +95,7 @@ class TransportLayerClient(object): logger.debug("get_pdu dest=%s, event_id=%s", destination, event_id) - path = _create_path(PREFIX, "/event/%s/", event_id) + path = _create_v1_path("/event/%s/", event_id) return self.client.get_json(destination, path=path, timeout=timeout) @log_function @@ -121,7 +121,7 @@ class TransportLayerClient(object): # TODO: raise? return - path = _create_path(PREFIX, "/backfill/%s/", room_id) + path = _create_v1_path("/backfill/%s/", room_id) args = { "v": event_tuples, @@ -167,7 +167,7 @@ class TransportLayerClient(object): # generated by the json_data_callback. json_data = transaction.get_dict() - path = _create_path(PREFIX, "/send/%s/", transaction.transaction_id) + path = _create_v1_path("/send/%s/", transaction.transaction_id) response = yield self.client.put_json( transaction.destination, @@ -184,7 +184,7 @@ class TransportLayerClient(object): @log_function def make_query(self, destination, query_type, args, retry_on_dns_fail, ignore_backoff=False): - path = _create_path(PREFIX, "/query/%s", query_type) + path = _create_v1_path("/query/%s", query_type) content = yield self.client.get_json( destination=destination, @@ -231,7 +231,7 @@ class TransportLayerClient(object): "make_membership_event called with membership='%s', must be one of %s" % (membership, ",".join(valid_memberships)) ) - path = _create_path(PREFIX, "/make_%s/%s/%s", membership, room_id, user_id) + path = _create_v1_path("/make_%s/%s/%s", membership, room_id, user_id) ignore_backoff = False retry_on_dns_fail = False @@ -258,7 +258,7 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function def send_join(self, destination, room_id, event_id, content): - path = _create_path(PREFIX, "/send_join/%s/%s", room_id, event_id) + path = _create_v1_path("/send_join/%s/%s", room_id, event_id) response = yield self.client.put_json( destination=destination, @@ -271,7 +271,7 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function def send_leave(self, destination, room_id, event_id, content): - path = _create_path(PREFIX, "/send_leave/%s/%s", room_id, event_id) + path = _create_v1_path("/send_leave/%s/%s", room_id, event_id) response = yield self.client.put_json( destination=destination, @@ -290,7 +290,7 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function def send_invite(self, destination, room_id, event_id, content): - path = _create_path(PREFIX, "/invite/%s/%s", room_id, event_id) + path = _create_v1_path("/invite/%s/%s", room_id, event_id) response = yield self.client.put_json( destination=destination, @@ -306,7 +306,7 @@ class TransportLayerClient(object): def get_public_rooms(self, remote_server, limit, since_token, search_filter=None, include_all_networks=False, third_party_instance_id=None): - path = PREFIX + "/publicRooms" + path = _create_v1_path("/publicRooms") args = { "include_all_networks": "true" if include_all_networks else "false", @@ -332,7 +332,7 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function def exchange_third_party_invite(self, destination, room_id, event_dict): - path = _create_path(PREFIX, "/exchange_third_party_invite/%s", room_id,) + path = _create_v1_path("/exchange_third_party_invite/%s", room_id,) response = yield self.client.put_json( destination=destination, @@ -345,7 +345,7 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function def get_event_auth(self, destination, room_id, event_id): - path = _create_path(PREFIX, "/event_auth/%s/%s", room_id, event_id) + path = _create_v1_path("/event_auth/%s/%s", room_id, event_id) content = yield self.client.get_json( destination=destination, @@ -357,7 +357,7 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function def send_query_auth(self, destination, room_id, event_id, content): - path = _create_path(PREFIX, "/query_auth/%s/%s", room_id, event_id) + path = _create_v1_path("/query_auth/%s/%s", room_id, event_id) content = yield self.client.post_json( destination=destination, @@ -392,7 +392,7 @@ class TransportLayerClient(object): Returns: A dict containg the device keys. """ - path = PREFIX + "/user/keys/query" + path = _create_v1_path("/user/keys/query") content = yield self.client.post_json( destination=destination, @@ -419,7 +419,7 @@ class TransportLayerClient(object): Returns: A dict containg the device keys. """ - path = _create_path(PREFIX, "/user/devices/%s", user_id) + path = _create_v1_path("/user/devices/%s", user_id) content = yield self.client.get_json( destination=destination, @@ -455,7 +455,7 @@ class TransportLayerClient(object): A dict containg the one-time keys. """ - path = PREFIX + "/user/keys/claim" + path = _create_v1_path("/user/keys/claim") content = yield self.client.post_json( destination=destination, @@ -469,7 +469,7 @@ class TransportLayerClient(object): @log_function def get_missing_events(self, destination, room_id, earliest_events, latest_events, limit, min_depth, timeout): - path = _create_path(PREFIX, "/get_missing_events/%s", room_id,) + path = _create_v1_path("/get_missing_events/%s", room_id,) content = yield self.client.post_json( destination=destination, @@ -489,7 +489,7 @@ class TransportLayerClient(object): def get_group_profile(self, destination, group_id, requester_user_id): """Get a group profile """ - path = _create_path(PREFIX, "/groups/%s/profile", group_id,) + path = _create_v1_path("/groups/%s/profile", group_id,) return self.client.get_json( destination=destination, @@ -508,7 +508,7 @@ class TransportLayerClient(object): requester_user_id (str) content (dict): The new profile of the group """ - path = _create_path(PREFIX, "/groups/%s/profile", group_id,) + path = _create_v1_path("/groups/%s/profile", group_id,) return self.client.post_json( destination=destination, @@ -522,7 +522,7 @@ class TransportLayerClient(object): def get_group_summary(self, destination, group_id, requester_user_id): """Get a group summary """ - path = _create_path(PREFIX, "/groups/%s/summary", group_id,) + path = _create_v1_path("/groups/%s/summary", group_id,) return self.client.get_json( destination=destination, @@ -535,7 +535,7 @@ class TransportLayerClient(object): def get_rooms_in_group(self, destination, group_id, requester_user_id): """Get all rooms in a group """ - path = _create_path(PREFIX, "/groups/%s/rooms", group_id,) + path = _create_v1_path("/groups/%s/rooms", group_id,) return self.client.get_json( destination=destination, @@ -548,7 +548,7 @@ class TransportLayerClient(object): content): """Add a room to a group """ - path = _create_path(PREFIX, "/groups/%s/room/%s", group_id, room_id,) + path = _create_v1_path("/groups/%s/room/%s", group_id, room_id,) return self.client.post_json( destination=destination, @@ -562,8 +562,8 @@ class TransportLayerClient(object): config_key, content): """Update room in group """ - path = _create_path( - PREFIX, "/groups/%s/room/%s/config/%s", + path = _create_v1_path( + "/groups/%s/room/%s/config/%s", group_id, room_id, config_key, ) @@ -578,7 +578,7 @@ class TransportLayerClient(object): def remove_room_from_group(self, destination, group_id, requester_user_id, room_id): """Remove a room from a group """ - path = _create_path(PREFIX, "/groups/%s/room/%s", group_id, room_id,) + path = _create_v1_path("/groups/%s/room/%s", group_id, room_id,) return self.client.delete_json( destination=destination, @@ -591,7 +591,7 @@ class TransportLayerClient(object): def get_users_in_group(self, destination, group_id, requester_user_id): """Get users in a group """ - path = _create_path(PREFIX, "/groups/%s/users", group_id,) + path = _create_v1_path("/groups/%s/users", group_id,) return self.client.get_json( destination=destination, @@ -604,7 +604,7 @@ class TransportLayerClient(object): def get_invited_users_in_group(self, destination, group_id, requester_user_id): """Get users that have been invited to a group """ - path = _create_path(PREFIX, "/groups/%s/invited_users", group_id,) + path = _create_v1_path("/groups/%s/invited_users", group_id,) return self.client.get_json( destination=destination, @@ -617,8 +617,8 @@ class TransportLayerClient(object): def accept_group_invite(self, destination, group_id, user_id, content): """Accept a group invite """ - path = _create_path( - PREFIX, "/groups/%s/users/%s/accept_invite", + path = _create_v1_path( + "/groups/%s/users/%s/accept_invite", group_id, user_id, ) @@ -633,7 +633,7 @@ class TransportLayerClient(object): def join_group(self, destination, group_id, user_id, content): """Attempts to join a group """ - path = _create_path(PREFIX, "/groups/%s/users/%s/join", group_id, user_id) + path = _create_v1_path("/groups/%s/users/%s/join", group_id, user_id) return self.client.post_json( destination=destination, @@ -646,7 +646,7 @@ class TransportLayerClient(object): def invite_to_group(self, destination, group_id, user_id, requester_user_id, content): """Invite a user to a group """ - path = _create_path(PREFIX, "/groups/%s/users/%s/invite", group_id, user_id) + path = _create_v1_path("/groups/%s/users/%s/invite", group_id, user_id) return self.client.post_json( destination=destination, @@ -662,7 +662,7 @@ class TransportLayerClient(object): invited. """ - path = _create_path(PREFIX, "/groups/local/%s/users/%s/invite", group_id, user_id) + path = _create_v1_path("/groups/local/%s/users/%s/invite", group_id, user_id) return self.client.post_json( destination=destination, @@ -676,7 +676,7 @@ class TransportLayerClient(object): user_id, content): """Remove a user fron a group """ - path = _create_path(PREFIX, "/groups/%s/users/%s/remove", group_id, user_id) + path = _create_v1_path("/groups/%s/users/%s/remove", group_id, user_id) return self.client.post_json( destination=destination, @@ -693,7 +693,7 @@ class TransportLayerClient(object): kicked from the group. """ - path = _create_path(PREFIX, "/groups/local/%s/users/%s/remove", group_id, user_id) + path = _create_v1_path("/groups/local/%s/users/%s/remove", group_id, user_id) return self.client.post_json( destination=destination, @@ -708,7 +708,7 @@ class TransportLayerClient(object): the attestations """ - path = _create_path(PREFIX, "/groups/%s/renew_attestation/%s", group_id, user_id) + path = _create_v1_path("/groups/%s/renew_attestation/%s", group_id, user_id) return self.client.post_json( destination=destination, @@ -723,12 +723,12 @@ class TransportLayerClient(object): """Update a room entry in a group summary """ if category_id: - path = _create_path( - PREFIX, "/groups/%s/summary/categories/%s/rooms/%s", + path = _create_v1_path( + "/groups/%s/summary/categories/%s/rooms/%s", group_id, category_id, room_id, ) else: - path = _create_path(PREFIX, "/groups/%s/summary/rooms/%s", group_id, room_id,) + path = _create_v1_path("/groups/%s/summary/rooms/%s", group_id, room_id,) return self.client.post_json( destination=destination, @@ -744,12 +744,12 @@ class TransportLayerClient(object): """Delete a room entry in a group summary """ if category_id: - path = _create_path( - PREFIX + "/groups/%s/summary/categories/%s/rooms/%s", + path = _create_v1_path( + "/groups/%s/summary/categories/%s/rooms/%s", group_id, category_id, room_id, ) else: - path = _create_path(PREFIX, "/groups/%s/summary/rooms/%s", group_id, room_id,) + path = _create_v1_path("/groups/%s/summary/rooms/%s", group_id, room_id,) return self.client.delete_json( destination=destination, @@ -762,7 +762,7 @@ class TransportLayerClient(object): def get_group_categories(self, destination, group_id, requester_user_id): """Get all categories in a group """ - path = _create_path(PREFIX, "/groups/%s/categories", group_id,) + path = _create_v1_path("/groups/%s/categories", group_id,) return self.client.get_json( destination=destination, @@ -775,7 +775,7 @@ class TransportLayerClient(object): def get_group_category(self, destination, group_id, requester_user_id, category_id): """Get category info in a group """ - path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,) + path = _create_v1_path("/groups/%s/categories/%s", group_id, category_id,) return self.client.get_json( destination=destination, @@ -789,7 +789,7 @@ class TransportLayerClient(object): content): """Update a category in a group """ - path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,) + path = _create_v1_path("/groups/%s/categories/%s", group_id, category_id,) return self.client.post_json( destination=destination, @@ -804,7 +804,7 @@ class TransportLayerClient(object): category_id): """Delete a category in a group """ - path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,) + path = _create_v1_path("/groups/%s/categories/%s", group_id, category_id,) return self.client.delete_json( destination=destination, @@ -817,7 +817,7 @@ class TransportLayerClient(object): def get_group_roles(self, destination, group_id, requester_user_id): """Get all roles in a group """ - path = _create_path(PREFIX, "/groups/%s/roles", group_id,) + path = _create_v1_path("/groups/%s/roles", group_id,) return self.client.get_json( destination=destination, @@ -830,7 +830,7 @@ class TransportLayerClient(object): def get_group_role(self, destination, group_id, requester_user_id, role_id): """Get a roles info """ - path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,) + path = _create_v1_path("/groups/%s/roles/%s", group_id, role_id,) return self.client.get_json( destination=destination, @@ -844,7 +844,7 @@ class TransportLayerClient(object): content): """Update a role in a group """ - path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,) + path = _create_v1_path("/groups/%s/roles/%s", group_id, role_id,) return self.client.post_json( destination=destination, @@ -858,7 +858,7 @@ class TransportLayerClient(object): def delete_group_role(self, destination, group_id, requester_user_id, role_id): """Delete a role in a group """ - path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,) + path = _create_v1_path("/groups/%s/roles/%s", group_id, role_id,) return self.client.delete_json( destination=destination, @@ -873,12 +873,12 @@ class TransportLayerClient(object): """Update a users entry in a group """ if role_id: - path = _create_path( - PREFIX, "/groups/%s/summary/roles/%s/users/%s", + path = _create_v1_path( + "/groups/%s/summary/roles/%s/users/%s", group_id, role_id, user_id, ) else: - path = _create_path(PREFIX, "/groups/%s/summary/users/%s", group_id, user_id,) + path = _create_v1_path("/groups/%s/summary/users/%s", group_id, user_id,) return self.client.post_json( destination=destination, @@ -893,7 +893,7 @@ class TransportLayerClient(object): content): """Sets the join policy for a group """ - path = _create_path(PREFIX, "/groups/%s/settings/m.join_policy", group_id,) + path = _create_v1_path("/groups/%s/settings/m.join_policy", group_id,) return self.client.put_json( destination=destination, @@ -909,12 +909,12 @@ class TransportLayerClient(object): """Delete a users entry in a group """ if role_id: - path = _create_path( - PREFIX, "/groups/%s/summary/roles/%s/users/%s", + path = _create_v1_path( + "/groups/%s/summary/roles/%s/users/%s", group_id, role_id, user_id, ) else: - path = _create_path(PREFIX, "/groups/%s/summary/users/%s", group_id, user_id,) + path = _create_v1_path("/groups/%s/summary/users/%s", group_id, user_id,) return self.client.delete_json( destination=destination, @@ -927,7 +927,7 @@ class TransportLayerClient(object): """Get the groups a list of users are publicising """ - path = PREFIX + "/get_groups_publicised" + path = _create_v1_path("/get_groups_publicised") content = {"user_ids": user_ids} @@ -939,20 +939,22 @@ class TransportLayerClient(object): ) -def _create_path(prefix, path, *args): - """Creates a path from the prefix, path template and args. Ensures that - all args are url encoded. +def _create_v1_path(path, *args): + """Creates a path against V1 federation API from the path template and + args. Ensures that all args are url encoded. Example: - _create_path(PREFIX, "/event/%s/", event_id) + _create_v1_path("/event/%s/", event_id) Args: - prefix (str) path (str): String template for the path args: ([str]): Args to insert into path. Each arg will be url encoded Returns: str """ - return prefix + path % tuple(urllib.parse.quote(arg, "") for arg in args) + return ( + FEDERATION_V1_PREFIX + + path % tuple(urllib.parse.quote(arg, "") for arg in args) + ) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 3553f418f1..e592269cf4 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -22,7 +22,7 @@ from twisted.internet import defer import synapse from synapse.api.errors import Codes, FederationDeniedError, SynapseError -from synapse.api.urls import FEDERATION_PREFIX as PREFIX +from synapse.api.urls import FEDERATION_V1_PREFIX from synapse.http.endpoint import parse_and_validate_server_name from synapse.http.server import JsonResource from synapse.http.servlet import ( @@ -227,6 +227,8 @@ class BaseFederationServlet(object): """ REQUIRE_AUTH = True + PREFIX = FEDERATION_V1_PREFIX # Allows specifying the API version + def __init__(self, handler, authenticator, ratelimiter, server_name): self.handler = handler self.authenticator = authenticator @@ -286,7 +288,7 @@ class BaseFederationServlet(object): return new_func def register(self, server): - pattern = re.compile("^" + PREFIX + self.PATH + "$") + pattern = re.compile("^" + self.PREFIX + self.PATH + "$") for method in ("GET", "PUT", "POST"): code = getattr(self, "on_%s" % (method), None) -- cgit 1.4.1 From 4a4d2e17bc28562cc6d0de55bc8c8c05335414cd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 15 Jan 2019 13:22:44 +0000 Subject: Add /v2/invite federation API --- synapse/api/urls.py | 1 + synapse/federation/federation_server.py | 4 ++-- synapse/federation/transport/server.py | 42 +++++++++++++++++++++++++++++---- 3 files changed, 41 insertions(+), 6 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/api/urls.py b/synapse/api/urls.py index ba019001ff..8102176653 100644 --- a/synapse/api/urls.py +++ b/synapse/api/urls.py @@ -26,6 +26,7 @@ CLIENT_PREFIX = "/_matrix/client/api/v1" CLIENT_V2_ALPHA_PREFIX = "/_matrix/client/v2_alpha" FEDERATION_PREFIX = "/_matrix/federation" FEDERATION_V1_PREFIX = FEDERATION_PREFIX + "/v1" +FEDERATION_V2_PREFIX = FEDERATION_PREFIX + "/v2" STATIC_PREFIX = "/_matrix/static" WEB_CLIENT_PREFIX = "/_matrix/client" CONTENT_REPO_PREFIX = "/_matrix/content" diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 98722ae543..37d29e7027 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -369,13 +369,13 @@ class FederationServer(FederationBase): }) @defer.inlineCallbacks - def on_invite_request(self, origin, content): + def on_invite_request(self, origin, content, room_version): pdu = event_from_pdu_json(content) origin_host, _ = parse_server_name(origin) yield self.check_server_matches_acl(origin_host, pdu.room_id) ret_pdu = yield self.handler.on_invite_request(origin, pdu) time_now = self._clock.time_msec() - defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)})) + defer.returnValue({"event": ret_pdu.get_pdu_json(time_now)}) @defer.inlineCallbacks def on_send_join_request(self, origin, content): diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index e592269cf4..4557a9e66e 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -21,8 +21,9 @@ import re from twisted.internet import defer import synapse +from synapse.api.constants import RoomVersions from synapse.api.errors import Codes, FederationDeniedError, SynapseError -from synapse.api.urls import FEDERATION_V1_PREFIX +from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX from synapse.http.endpoint import parse_and_validate_server_name from synapse.http.server import JsonResource from synapse.http.servlet import ( @@ -490,14 +491,46 @@ class FederationSendJoinServlet(BaseFederationServlet): defer.returnValue((200, content)) -class FederationInviteServlet(BaseFederationServlet): +class FederationV1InviteServlet(BaseFederationServlet): PATH = "/invite/(?P[^/]*)/(?P[^/]*)" + @defer.inlineCallbacks + def on_PUT(self, origin, content, query, context, event_id): + # We don't get a room version, so we have to assume its EITHER v1 or + # v2. This is "fine" as the only difference between V1 and V2 is the + # state resolution algorithm, and we don't use that for processing + # invites + content = yield self.handler.on_invite_request( + origin, content, room_version=RoomVersions.V1, + ) + + # V1 federation API is defined to return a content of `[200, {...}]` + # due to a historical bug. + defer.returnValue((200, (200, content))) + + +class FederationV2InviteServlet(BaseFederationServlet): + PATH = "/invite/(?P[^/]*)/(?P[^/]*)" + + PREFIX = FEDERATION_V2_PREFIX + @defer.inlineCallbacks def on_PUT(self, origin, content, query, context, event_id): # TODO(paul): assert that context/event_id parsed from path actually # match those given in content - content = yield self.handler.on_invite_request(origin, content) + + room_version = content["room_version"] + event = content["event"] + invite_room_state = content["invite_room_state"] + + # Synapse expects invite_room_state to be in unsigned, as it is in v1 + # API + + event.setdefault("unsigned", {})["invite_room_state"] = invite_room_state + + content = yield self.handler.on_invite_request( + origin, event, room_version=room_version, + ) defer.returnValue((200, content)) @@ -1265,7 +1298,8 @@ FEDERATION_SERVLET_CLASSES = ( FederationEventServlet, FederationSendJoinServlet, FederationSendLeaveServlet, - FederationInviteServlet, + FederationV1InviteServlet, + FederationV2InviteServlet, FederationQueryAuthServlet, FederationGetMissingEventsServlet, FederationEventAuthServlet, -- cgit 1.4.1 From 82e13662c03a41c085e784a594b423711e0caffa Mon Sep 17 00:00:00 2001 From: Jason Robinson Date: Mon, 21 Jan 2019 01:54:43 +0200 Subject: Split federation OpenID userinfo endpoint out of the federation resource This allows the OpenID userinfo endpoint to be active even if the federation resource is not active. The OpenID userinfo endpoint is called by integration managers to verify user actions using the client API OpenID access token. Without this verification, the integration manager cannot know that the access token is valid. The OpenID userinfo endpoint will be loaded in the case that either "federation" or "openid" resource is defined. The new "openid" resource is defaulted to active in default configuration. Signed-off-by: Jason Robinson --- synapse/app/federation_reader.py | 7 ++ synapse/app/homeserver.py | 9 +++ synapse/config/server.py | 9 +-- synapse/federation/transport/server.py | 114 +++++++++++++++++++++------------ 4 files changed, 93 insertions(+), 46 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index ea594f0f1a..99e8a4cf6a 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -87,6 +87,13 @@ class FederationReaderServer(HomeServer): resources.update({ FEDERATION_PREFIX: TransportLayerServer(self), }) + if name == "openid" and "federation" not in res["names"]: + # Only load the openid resource separately if federation resource + # is not specified since federation resource includes openid + # resource. + resources.update({ + FEDERATION_PREFIX: TransportLayerServer(self, servlet_groups=["openid"]), + }) root_resource = create_resource_tree(resources, NoResource()) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 5652c6201c..0a924d7a80 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -95,6 +95,10 @@ class SynapseHomeServer(HomeServer): resources = {} for res in listener_config["resources"]: for name in res["names"]: + if name == "openid" and "federation" in res["names"]: + # Skip loading openid resource if federation is defined + # since federation resource will include openid + continue resources.update(self._configure_named_resource( name, res.get("compress", False), )) @@ -192,6 +196,11 @@ class SynapseHomeServer(HomeServer): FEDERATION_PREFIX: TransportLayerServer(self), }) + if name == "openid": + resources.update({ + FEDERATION_PREFIX: TransportLayerServer(self, servlet_groups=["openid"]), + }) + if name in ["static", "client"]: resources.update({ STATIC_PREFIX: File( diff --git a/synapse/config/server.py b/synapse/config/server.py index fb57791098..556f1efee5 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -151,7 +151,7 @@ class ServerConfig(Config): "compress": gzip_responses, }, { - "names": ["federation"], + "names": ["federation", "openid"], "compress": False, } ] @@ -170,7 +170,7 @@ class ServerConfig(Config): "compress": gzip_responses, }, { - "names": ["federation"], + "names": ["federation", "openid"], "compress": False, } ] @@ -328,7 +328,7 @@ class ServerConfig(Config): # that can do automatic compression. compress: true - - names: [federation] # Federation APIs + - names: [federation, openid] # Federation APIs compress: false # optional list of additional endpoints which can be loaded via @@ -350,7 +350,7 @@ class ServerConfig(Config): resources: - names: [client] compress: true - - names: [federation] + - names: [federation, openid] compress: false # Turn on the twisted ssh manhole service on localhost on the given @@ -477,6 +477,7 @@ KNOWN_RESOURCES = ( 'keys', 'media', 'metrics', + 'openid', 'replication', 'static', 'webclient', diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 4557a9e66e..49b80beecb 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -43,9 +43,10 @@ logger = logging.getLogger(__name__) class TransportLayerServer(JsonResource): """Handles incoming federation HTTP requests""" - def __init__(self, hs): + def __init__(self, hs, servlet_groups=None): self.hs = hs self.clock = hs.get_clock() + self.servlet_groups = servlet_groups super(TransportLayerServer, self).__init__(hs, canonical_json=False) @@ -67,6 +68,7 @@ class TransportLayerServer(JsonResource): resource=self, ratelimiter=self.ratelimiter, authenticator=self.authenticator, + servlet_groups=self.servlet_groups, ) @@ -1308,10 +1310,12 @@ FEDERATION_SERVLET_CLASSES = ( FederationClientKeysClaimServlet, FederationThirdPartyInviteExchangeServlet, On3pidBindServlet, - OpenIdUserInfo, FederationVersionServlet, ) +OPENID_SERVLET_CLASSES = ( + OpenIdUserInfo, +) ROOM_LIST_CLASSES = ( PublicRoomList, @@ -1350,44 +1354,70 @@ GROUP_ATTESTATION_SERVLET_CLASSES = ( FederationGroupsRenewAttestaionServlet, ) +DEFAULT_SERVLET_GROUPS = ( + "federation", + "room_list", + "group_server", + "group_local", + "group_attestation", + "openid", +) + -def register_servlets(hs, resource, authenticator, ratelimiter): - for servletclass in FEDERATION_SERVLET_CLASSES: - servletclass( - handler=hs.get_federation_server(), - authenticator=authenticator, - ratelimiter=ratelimiter, - server_name=hs.hostname, - ).register(resource) - - for servletclass in ROOM_LIST_CLASSES: - servletclass( - handler=hs.get_room_list_handler(), - authenticator=authenticator, - ratelimiter=ratelimiter, - server_name=hs.hostname, - ).register(resource) - - for servletclass in GROUP_SERVER_SERVLET_CLASSES: - servletclass( - handler=hs.get_groups_server_handler(), - authenticator=authenticator, - ratelimiter=ratelimiter, - server_name=hs.hostname, - ).register(resource) - - for servletclass in GROUP_LOCAL_SERVLET_CLASSES: - servletclass( - handler=hs.get_groups_local_handler(), - authenticator=authenticator, - ratelimiter=ratelimiter, - server_name=hs.hostname, - ).register(resource) - - for servletclass in GROUP_ATTESTATION_SERVLET_CLASSES: - servletclass( - handler=hs.get_groups_attestation_renewer(), - authenticator=authenticator, - ratelimiter=ratelimiter, - server_name=hs.hostname, - ).register(resource) +def register_servlets(hs, resource, authenticator, ratelimiter, servlet_groups=None): + if not servlet_groups: + servlet_groups = DEFAULT_SERVLET_GROUPS + + if "federation" in servlet_groups: + for servletclass in FEDERATION_SERVLET_CLASSES: + servletclass( + handler=hs.get_federation_server(), + authenticator=authenticator, + ratelimiter=ratelimiter, + server_name=hs.hostname, + ).register(resource) + + if "openid" in servlet_groups: + for servletclass in OPENID_SERVLET_CLASSES: + servletclass( + handler=hs.get_federation_server(), + authenticator=authenticator, + ratelimiter=ratelimiter, + server_name=hs.hostname, + ).register(resource) + + if "room_list" in servlet_groups: + for servletclass in ROOM_LIST_CLASSES: + servletclass( + handler=hs.get_room_list_handler(), + authenticator=authenticator, + ratelimiter=ratelimiter, + server_name=hs.hostname, + ).register(resource) + + if "group_server" in servlet_groups: + for servletclass in GROUP_SERVER_SERVLET_CLASSES: + servletclass( + handler=hs.get_groups_server_handler(), + authenticator=authenticator, + ratelimiter=ratelimiter, + server_name=hs.hostname, + ).register(resource) + + if "group_local" in servlet_groups: + for servletclass in GROUP_LOCAL_SERVLET_CLASSES: + servletclass( + handler=hs.get_groups_local_handler(), + authenticator=authenticator, + ratelimiter=ratelimiter, + server_name=hs.hostname, + ).register(resource) + + if "group_attestation" in servlet_groups: + for servletclass in GROUP_ATTESTATION_SERVLET_CLASSES: + servletclass( + handler=hs.get_groups_attestation_renewer(), + authenticator=authenticator, + ratelimiter=ratelimiter, + server_name=hs.hostname, + ).register(resource) -- cgit 1.4.1 From d39b7b6d38b0d9876ad305491ea05af92ea35b04 Mon Sep 17 00:00:00 2001 From: Jason Robinson Date: Tue, 22 Jan 2019 11:00:17 +0200 Subject: Document `servlet_groups` parameters Signed-off-by: Jason Robinson --- synapse/federation/transport/server.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) (limited to 'synapse/federation') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 49b80beecb..9c1c9c39e1 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -44,6 +44,16 @@ class TransportLayerServer(JsonResource): """Handles incoming federation HTTP requests""" def __init__(self, hs, servlet_groups=None): + """Initialize the TransportLayerServer + + Will by default register all servlets. For custom behaviour, pass in + a list of servlet_groups to register. + + Args: + hs (synapse.server.HomeServer): homeserver + servlet_groups (list[str], optional): List of servlet groups to register. + Defaults to ``DEFAULT_SERVLET_GROUPS``. + """ self.hs = hs self.clock = hs.get_clock() self.servlet_groups = servlet_groups @@ -1365,6 +1375,19 @@ DEFAULT_SERVLET_GROUPS = ( def register_servlets(hs, resource, authenticator, ratelimiter, servlet_groups=None): + """Initialize and register servlet classes. + + Will by default register all servlets. For custom behaviour, pass in + a list of servlet_groups to register. + + Args: + hs (synapse.server.HomeServer): homeserver + resource (TransportLayerServer): resource class to register to + authenticator (Authenticator): authenticator to use + ratelimiter (util.ratelimitutils.FederationRateLimiter): ratelimiter to use + servlet_groups (list[str], optional): List of servlet groups to register. + Defaults to ``DEFAULT_SERVLET_GROUPS``. + """ if not servlet_groups: servlet_groups = DEFAULT_SERVLET_GROUPS -- cgit 1.4.1 From 67cd4dad81ed2932009472da2d13648ca11eab73 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Jan 2019 16:50:06 +0000 Subject: Implement MSC 1813 - Add room version to make APIs We also implement `make_membership_event` converting the returned room version to an event format version. --- synapse/events/__init__.py | 17 ++++++++++++++++- synapse/federation/federation_client.py | 21 ++++++++++++++++----- synapse/federation/federation_server.py | 8 +++++++- synapse/handlers/federation.py | 2 +- 4 files changed, 40 insertions(+), 8 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 9dd6940385..e6f94e68af 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -18,7 +18,7 @@ from distutils.util import strtobool import six -from synapse.api.constants import EventFormatVersions +from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventFormatVersions from synapse.util.caches import intern_dict from synapse.util.frozenutils import freeze @@ -235,3 +235,18 @@ class FrozenEvent(EventBase): self.get("type", None), self.get("state_key", None), ) + + +def room_version_to_event_format(room_version): + """Converts a room version string to the event format + + Args: + room_version (str) + + Returns: + int + """ + if room_version not in KNOWN_ROOM_VERSIONS: + raise + + return EventFormatVersions.V1 diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index d05ed91d64..0757ad12f4 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -25,14 +25,19 @@ from prometheus_client import Counter from twisted.internet import defer -from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership +from synapse.api.constants import ( + KNOWN_ROOM_VERSIONS, + EventTypes, + Membership, + RoomVersions, +) from synapse.api.errors import ( CodeMessageException, FederationDeniedError, HttpResponseException, SynapseError, ) -from synapse.events import builder +from synapse.events import builder, room_version_to_event_format from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.util import logcontext, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache @@ -536,8 +541,9 @@ class FederationClient(FederationBase): params (dict[str, str|Iterable[str]]): Query parameters to include in the request. Return: - Deferred: resolves to a tuple of (origin (str), event (object)) - where origin is the remote homeserver which generated the event. + Deferred[tuple[str, dict, int]]: resolves to a tuple of + `(origin, event, event_format)` where origin is the remote + homeserver which generated the event. Fails with a ``SynapseError`` if the chosen remote server returns a 300/400 code. @@ -557,6 +563,11 @@ class FederationClient(FederationBase): destination, room_id, user_id, membership, params, ) + # Note: If not supplied, the room version may be either v1 or v2, + # however either way the event format version will be v1. + room_version = ret.get("room_version", RoomVersions.V1) + event_format = room_version_to_event_format(room_version) + pdu_dict = ret.get("event", None) if not isinstance(pdu_dict, dict): raise InvalidResponseError("Bad 'event' field in response") @@ -574,7 +585,7 @@ class FederationClient(FederationBase): ev = builder.EventBuilder(pdu_dict) defer.returnValue( - (destination, ev) + (destination, ev, event_format) ) return self._try_destination_list( diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 37d29e7027..17eccaaea0 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -400,8 +400,14 @@ class FederationServer(FederationBase): origin_host, _ = parse_server_name(origin) yield self.check_server_matches_acl(origin_host, room_id) pdu = yield self.handler.on_make_leave_request(room_id, user_id) + + room_version = yield self.store.get_room_version(room_id) + time_now = self._clock.time_msec() - defer.returnValue({"event": pdu.get_pdu_json(time_now)}) + defer.returnValue({ + "event": pdu.get_pdu_json(time_now), + "room_version": room_version, + }) @defer.inlineCallbacks def on_send_leave_request(self, origin, content): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a3bb864bb2..d1ba1450e9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1336,7 +1336,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _make_and_verify_event(self, target_hosts, room_id, user_id, membership, content={}, params=None): - origin, pdu = yield self.federation_client.make_membership_event( + origin, pdu, _ = yield self.federation_client.make_membership_event( target_hosts, room_id, user_id, -- cgit 1.4.1 From 6a41d2a187ecef484a3aa67518ec9b4b0638c614 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Jan 2019 17:19:58 +0000 Subject: Add room_version param to get_pdu When we add new event format we'll need to know the event format or room version when parsing events. --- synapse/federation/federation_base.py | 11 +++++--- synapse/federation/federation_client.py | 46 ++++++++++++++++++++++++++++----- synapse/federation/federation_server.py | 4 ++- synapse/handlers/federation.py | 12 +++++++-- 4 files changed, 60 insertions(+), 13 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index b7ad729c63..d749bfdd3a 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -43,8 +43,8 @@ class FederationBase(object): self._clock = hs.get_clock() @defer.inlineCallbacks - def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False, - include_none=False): + def _check_sigs_and_hash_and_fetch(self, origin, pdus, room_version, + outlier=False, include_none=False): """Takes a list of PDUs and checks the signatures and hashs of each one. If a PDU fails its signature check then we check if we have it in the database and if not then request if from the originating server of @@ -56,8 +56,12 @@ class FederationBase(object): a new list. Args: + origin (str) pdu (list) - outlier (bool) + room_version (str) + outlier (bool): Whether the events are outliers or not + include_none (str): Whether to include None in the returned list + for events that have failed their checks Returns: Deferred : A list of PDUs that have valid signatures and hashes. @@ -84,6 +88,7 @@ class FederationBase(object): res = yield self.get_pdu( destinations=[pdu.origin], event_id=pdu.event_id, + room_version=room_version, outlier=outlier, timeout=10000, ) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index d05ed91d64..4e171f9b56 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -25,7 +25,12 @@ from prometheus_client import Counter from twisted.internet import defer -from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership +from synapse.api.constants import ( + KNOWN_ROOM_VERSIONS, + EventTypes, + Membership, + RoomVersions, +) from synapse.api.errors import ( CodeMessageException, FederationDeniedError, @@ -202,7 +207,8 @@ class FederationClient(FederationBase): @defer.inlineCallbacks @log_function - def get_pdu(self, destinations, event_id, outlier=False, timeout=None): + def get_pdu(self, destinations, event_id, room_version, outlier=False, + timeout=None): """Requests the PDU with given origin and ID from the remote home servers. @@ -212,6 +218,7 @@ class FederationClient(FederationBase): Args: destinations (list): Which home servers to query event_id (str): event to fetch + room_version (str): version of the room outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if it's from an arbitary point in the context as opposed to part of the current block of PDUs. Defaults to `False` @@ -352,10 +359,13 @@ class FederationClient(FederationBase): ev.event_id for ev in itertools.chain(pdus, auth_chain) ]) + room_version = yield self.store.get_room_version(room_id) + signed_pdus = yield self._check_sigs_and_hash_and_fetch( destination, [p for p in pdus if p.event_id not in seen_events], - outlier=True + outlier=True, + room_version=room_version, ) signed_pdus.extend( seen_events[p.event_id] for p in pdus if p.event_id in seen_events @@ -364,7 +374,8 @@ class FederationClient(FederationBase): signed_auth = yield self._check_sigs_and_hash_and_fetch( destination, [p for p in auth_chain if p.event_id not in seen_events], - outlier=True + outlier=True, + room_version=room_version, ) signed_auth.extend( seen_events[p.event_id] for p in auth_chain if p.event_id in seen_events @@ -411,6 +422,8 @@ class FederationClient(FederationBase): random.shuffle(srvs) return srvs + room_version = yield self.store.get_room_version(room_id) + batch_size = 20 missing_events = list(missing_events) for i in range(0, len(missing_events), batch_size): @@ -421,6 +434,7 @@ class FederationClient(FederationBase): self.get_pdu, destinations=random_server_list(), event_id=e_id, + room_version=room_version, ) for e_id in batch ] @@ -450,8 +464,11 @@ class FederationClient(FederationBase): for p in res["auth_chain"] ] + room_version = yield self.store.get_room_version(room_id) + signed_auth = yield self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True + destination, auth_chain, + outlier=True, room_version=room_version, ) signed_auth.sort(key=lambda e: e.depth) @@ -650,9 +667,20 @@ class FederationClient(FederationBase): for p in itertools.chain(state, auth_chain) } + room_version = None + for e in state: + if (e.type, e.state_key) == (EventTypes.Create, ""): + room_version = e.content.get("room_version", RoomVersions.V1) + break + + if room_version is None: + # We use this error has that is what + raise SynapseError(400, "No create event in state") + valid_pdus = yield self._check_sigs_and_hash_and_fetch( destination, list(pdus.values()), outlier=True, + room_version=room_version, ) valid_pdus_map = { @@ -790,8 +818,10 @@ class FederationClient(FederationBase): for e in content["auth_chain"] ] + room_version = yield self.store.get_room_version(room_id) + signed_auth = yield self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True + destination, auth_chain, outlier=True, room_version=room_version, ) signed_auth.sort(key=lambda e: e.depth) @@ -838,8 +868,10 @@ class FederationClient(FederationBase): for e in content.get("events", []) ] + room_version = yield self.store.get_room_version(room_id) + signed_events = yield self._check_sigs_and_hash_and_fetch( - destination, events, outlier=False + destination, events, outlier=False, room_version=room_version, ) except HttpResponseException as e: if not e.code == 400: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 37d29e7027..cb729c69ea 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -457,8 +457,10 @@ class FederationServer(FederationBase): for e in content["auth_chain"] ] + room_version = yield self.store.get_room_version(room_id) + signed_auth = yield self._check_sigs_and_hash_and_fetch( - origin, auth_chain, outlier=True + origin, auth_chain, outlier=True, room_version=room_version, ) ret = yield self.handler.on_query_auth( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a3bb864bb2..a9dc4a4e4e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -34,6 +34,7 @@ from synapse.api.constants import ( EventTypes, Membership, RejectedReason, + RoomVersions, ) from synapse.api.errors import ( AuthError, @@ -342,6 +343,8 @@ class FederationHandler(BaseHandler): room_id, event_id, p, ) + room_version = yield self.store.get_room_version(room_id) + with logcontext.nested_logging_context(p): # note that if any of the missing prevs share missing state or # auth events, the requests to fetch those events are deduped @@ -355,7 +358,7 @@ class FederationHandler(BaseHandler): # we want the state *after* p; get_state_for_room returns the # state *before* p. remote_event = yield self.federation_client.get_pdu( - [origin], p, outlier=True, + [origin], p, room_version, outlier=True, ) if remote_event is None: @@ -379,7 +382,6 @@ class FederationHandler(BaseHandler): for x in remote_state: event_map[x.event_id] = x - room_version = yield self.store.get_room_version(room_id) state_map = yield resolve_events_with_store( room_version, state_maps, event_map, state_res_store=StateResolutionStore(self.store), @@ -655,6 +657,8 @@ class FederationHandler(BaseHandler): if dest == self.server_name: raise SynapseError(400, "Can't backfill from self.") + room_version = yield self.store.get_room_version(room_id) + events = yield self.federation_client.backfill( dest, room_id, @@ -748,6 +752,7 @@ class FederationHandler(BaseHandler): self.federation_client.get_pdu, [dest], event_id, + room_version=room_version, outlier=True, timeout=10000, ) @@ -1659,6 +1664,8 @@ class FederationHandler(BaseHandler): create_event = e break + room_version = create_event.content.get("room_version", RoomVersions.V1) + missing_auth_events = set() for e in itertools.chain(auth_events, state, [event]): for e_id in e.auth_event_ids(): @@ -1669,6 +1676,7 @@ class FederationHandler(BaseHandler): m_ev = yield self.federation_client.get_pdu( [origin], e_id, + room_version=room_version, outlier=True, timeout=10000, ) -- cgit 1.4.1 From 886e5acc762b879b606773b511ff92345aef14c6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 16 Jan 2019 15:13:07 +0000 Subject: Store rejected remote invite events as outliers Currently they're stored as non-outliers even though the server isn't in the room, which can be problematic in places where the code assumes it has the state for all non outlier events. In particular, there is an edge case where persisting the leave event triggers a state resolution, which requires looking up the room version from state. Since the server doesn't have the state, this causes an exception to be thrown. --- synapse/federation/federation_client.py | 10 ++++++-- synapse/handlers/federation.py | 44 +++++++++------------------------ synapse/storage/roommember.py | 5 +--- 3 files changed, 21 insertions(+), 38 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index d05ed91d64..8fa726759e 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -32,7 +32,6 @@ from synapse.api.errors import ( HttpResponseException, SynapseError, ) -from synapse.events import builder from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.util import logcontext, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache @@ -66,6 +65,8 @@ class FederationClient(FederationBase): self.state = hs.get_state_handler() self.transport_layer = hs.get_federation_transport_client() + self.event_builder_factory = hs.get_event_builder_factory() + self._get_pdu_cache = ExpiringCache( cache_name="get_pdu_cache", clock=self._clock, @@ -571,7 +572,12 @@ class FederationClient(FederationBase): if "prev_state" not in pdu_dict: pdu_dict["prev_state"] = [] - ev = builder.EventBuilder(pdu_dict) + # Strip off the fields that we want to clobber. + pdu_dict.pop("origin", None) + pdu_dict.pop("origin_server_ts", None) + pdu_dict.pop("unsigned", None) + + ev = self.event_builder_factory.new(pdu_dict) defer.returnValue( (destination, ev) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a3bb864bb2..70be87cd3d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -43,10 +43,7 @@ from synapse.api.errors import ( StoreError, SynapseError, ) -from synapse.crypto.event_signing import ( - add_hashes_and_signatures, - compute_event_signature, -) +from synapse.crypto.event_signing import compute_event_signature from synapse.events.validator import EventValidator from synapse.replication.http.federation import ( ReplicationCleanRoomRestServlet, @@ -58,7 +55,6 @@ from synapse.types import UserID, get_domain_from_id from synapse.util import logcontext, unwrapFirstError from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_joined_room -from synapse.util.frozenutils import unfreeze from synapse.util.logutils import log_function from synapse.util.retryutils import NotRetryingDestination from synapse.visibility import filter_events_for_server @@ -1083,7 +1079,9 @@ class FederationHandler(BaseHandler): handled_events = set() try: - event = self._sign_event(event) + self._sign_event(event) + event.internal_metadata.outlier = False + # Try the host we successfully got a response to /make_join/ # request first. try: @@ -1289,13 +1287,7 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = True event.internal_metadata.invite_from_remote = True - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) + self._sign_event(event) context = yield self.state_handler.compute_event_context(event) yield self.persist_events_and_notify([(event, context)]) @@ -1313,7 +1305,7 @@ class FederationHandler(BaseHandler): # Mark as outlier as we don't have any state for this event; we're not # even in the room. event.internal_metadata.outlier = True - event = self._sign_event(event) + self._sign_event(event) # Try the host that we succesfully called /make_leave/ on first for # the /send_leave/ request. @@ -1358,26 +1350,14 @@ class FederationHandler(BaseHandler): defer.returnValue((origin, event)) def _sign_event(self, event): - event.internal_metadata.outlier = False - - builder = self.event_builder_factory.new( - unfreeze(event.get_pdu_json()) - ) - - builder.event_id = self.event_builder_factory.create_event_id() - builder.origin = self.hs.hostname - - if not hasattr(event, "signatures"): - builder.signatures = {} - - add_hashes_and_signatures( - builder, - self.hs.hostname, - self.hs.config.signing_key[0], + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) ) - return builder.build() - @defer.inlineCallbacks @log_function def on_make_leave_request(self, room_id, user_id): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 0707f9a86a..c7488f4259 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -591,10 +591,7 @@ class RoomMemberStore(RoomMemberWorkerStore): # i.e., its something that has just happened. # The only current event that can also be an outlier is if its an # invite that has come in across federation. - is_new_state = not backfilled and ( - not event.internal_metadata.is_outlier() - or event.internal_metadata.is_invite_from_remote() - ) + is_new_state = not backfilled is_mine = self.hs.is_mine_id(event.state_key) if is_new_state and is_mine: if event.membership == Membership.INVITE: -- cgit 1.4.1 From 07f62da55ac8903f7ea224255b8defd122724ec4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Jan 2019 19:44:37 +0000 Subject: Remove unnecessary '_sign_event' --- synapse/federation/federation_client.py | 9 ++++++++- synapse/handlers/federation.py | 14 -------------- 2 files changed, 8 insertions(+), 15 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 8fa726759e..f4adcb556d 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -32,6 +32,7 @@ from synapse.api.errors import ( HttpResponseException, SynapseError, ) +from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.util import logcontext, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache @@ -577,7 +578,13 @@ class FederationClient(FederationBase): pdu_dict.pop("origin_server_ts", None) pdu_dict.pop("unsigned", None) - ev = self.event_builder_factory.new(pdu_dict) + builder = self.event_builder_factory.new(pdu_dict) + add_hashes_and_signatures( + builder, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ev = builder.build() defer.returnValue( (destination, ev) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 70be87cd3d..9a14ba4517 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -43,7 +43,6 @@ from synapse.api.errors import ( StoreError, SynapseError, ) -from synapse.crypto.event_signing import compute_event_signature from synapse.events.validator import EventValidator from synapse.replication.http.federation import ( ReplicationCleanRoomRestServlet, @@ -1079,7 +1078,6 @@ class FederationHandler(BaseHandler): handled_events = set() try: - self._sign_event(event) event.internal_metadata.outlier = False # Try the host we successfully got a response to /make_join/ @@ -1287,8 +1285,6 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = True event.internal_metadata.invite_from_remote = True - self._sign_event(event) - context = yield self.state_handler.compute_event_context(event) yield self.persist_events_and_notify([(event, context)]) @@ -1305,7 +1301,6 @@ class FederationHandler(BaseHandler): # Mark as outlier as we don't have any state for this event; we're not # even in the room. event.internal_metadata.outlier = True - self._sign_event(event) # Try the host that we succesfully called /make_leave/ on first for # the /send_leave/ request. @@ -1349,15 +1344,6 @@ class FederationHandler(BaseHandler): assert(event.room_id == room_id) defer.returnValue((origin, event)) - def _sign_event(self, event): - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - @defer.inlineCallbacks @log_function def on_make_leave_request(self, room_id, user_id): -- cgit 1.4.1 From 5ee1f997a8e7177077e2c5f0750e28725a452791 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 24 Jan 2019 18:08:08 +0000 Subject: Update make_membership_event docs --- synapse/federation/federation_client.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index f4adcb556d..df7d18700c 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -524,6 +524,8 @@ class FederationClient(FederationBase): Does so by asking one of the already participating servers to create an event with proper context. + Returns a fully signed and hashed event. + Note that this does not append any events to any graphs. Args: @@ -538,8 +540,9 @@ class FederationClient(FederationBase): params (dict[str, str|Iterable[str]]): Query parameters to include in the request. Return: - Deferred: resolves to a tuple of (origin (str), event (object)) - where origin is the remote homeserver which generated the event. + Deferred[tuple[str, FrozenEvent]]: resolves to a tuple of `origin` + and event where origin is the remote homeserver which generated + the event. Fails with a ``SynapseError`` if the chosen remote server returns a 300/400 code. -- cgit 1.4.1 From 26f44164c82c89c08f7ad930f365dbf69759e317 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 24 Jan 2019 18:28:00 +0000 Subject: Review comments --- synapse/events/__init__.py | 3 ++- synapse/federation/federation_client.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index e6f94e68af..154a7c0198 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -247,6 +247,7 @@ def room_version_to_event_format(room_version): int """ if room_version not in KNOWN_ROOM_VERSIONS: - raise + # We should have already checked version, so this should not happen + raise RuntimeError("Unrecognized room version %s" % (room_version,)) return EventFormatVersions.V1 diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 0757ad12f4..c43f21a0ba 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -543,7 +543,8 @@ class FederationClient(FederationBase): Return: Deferred[tuple[str, dict, int]]: resolves to a tuple of `(origin, event, event_format)` where origin is the remote - homeserver which generated the event. + homeserver which generated the event, and event_format is one of + `synapse.api.constants.EventFormatVersions`. Fails with a ``SynapseError`` if the chosen remote server returns a 300/400 code. -- cgit 1.4.1 From d148c43050f7a85523a743ff6069683c644a517d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 24 Jan 2019 18:31:23 +0000 Subject: Review comments --- synapse/federation/federation_client.py | 3 ++- synapse/handlers/federation.py | 5 +++++ 2 files changed, 7 insertions(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 4e171f9b56..7fb5736142 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -674,7 +674,8 @@ class FederationClient(FederationBase): break if room_version is None: - # We use this error has that is what + # If the state doesn't have a create event then the room is + # invalid, and it would fail auth checks anyway. raise SynapseError(400, "No create event in state") valid_pdus = yield self._check_sigs_and_hash_and_fetch( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a9dc4a4e4e..5280d88a50 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1664,6 +1664,11 @@ class FederationHandler(BaseHandler): create_event = e break + if create_event is None: + # If the state doesn't have a create event then the room is + # invalid, and it would fail auth checks anyway. + raise SynapseError(400, "No create event in state") + room_version = create_event.content.get("room_version", RoomVersions.V1) missing_auth_events = set() -- cgit 1.4.1 From a50cf929c13044f25c3776802287458fe5695c37 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Jan 2019 20:21:33 +0000 Subject: Require event format version to parse or create events --- synapse/events/__init__.py | 24 ++++++++++- synapse/events/builder.py | 51 ++++++++++++++++++++++- synapse/federation/federation_base.py | 9 +++-- synapse/federation/federation_client.py | 60 ++++++++++++++++----------- synapse/federation/federation_server.py | 33 +++++++++------ synapse/federation/transport/server.py | 4 +- synapse/handlers/federation.py | 72 ++++++++++++++++++++------------- synapse/handlers/message.py | 10 ++++- 8 files changed, 191 insertions(+), 72 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 38470ad176..3fe52aaa45 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -18,7 +18,11 @@ from distutils.util import strtobool import six -from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventFormatVersions +from synapse.api.constants import ( + KNOWN_EVENT_FORMAT_VERSIONS, + KNOWN_ROOM_VERSIONS, + EventFormatVersions, +) from synapse.util.caches import intern_dict from synapse.util.frozenutils import freeze @@ -256,3 +260,21 @@ def room_version_to_event_format(room_version): raise RuntimeError("Unrecognized room version %s" % (room_version,)) return EventFormatVersions.V1 + + +def event_type_from_format_version(format_version): + """Returns the python type to use to construct an Event object for the + given event format version. + + Args: + format_version (int): The event format version + + Returns: + type: A type that can be initialized as per the initializer of + `FrozenEvent` + """ + if format_version not in KNOWN_EVENT_FORMAT_VERSIONS: + raise Exception( + "No event format %r" % (format_version,) + ) + return FrozenEvent diff --git a/synapse/events/builder.py b/synapse/events/builder.py index e662eaef10..7e63371095 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -15,12 +15,39 @@ import copy +from synapse.api.constants import RoomVersions from synapse.types import EventID from synapse.util.stringutils import random_string from . import EventBase, FrozenEvent, _event_dict_property +def get_event_builder(room_version, key_values={}, internal_metadata_dict={}): + """Generate an event builder appropriate for the given room version + + Args: + room_version (str): Version of the room that we're creating an + event builder for + key_values (dict): Fields used as the basis of the new event + internal_metadata_dict (dict): Used to create the `_EventInternalMetadata` + object. + + Returns: + EventBuilder + """ + if room_version in { + RoomVersions.V1, + RoomVersions.V2, + RoomVersions.VDH_TEST, + RoomVersions.STATE_V2_TEST, + }: + return EventBuilder(key_values, internal_metadata_dict) + else: + raise Exception( + "No event format defined for version %r" % (room_version,) + ) + + class EventBuilder(EventBase): def __init__(self, key_values={}, internal_metadata_dict={}): signatures = copy.deepcopy(key_values.pop("signatures", {})) @@ -58,7 +85,29 @@ class EventBuilderFactory(object): return e_id.to_string() - def new(self, key_values={}): + def new(self, room_version, key_values={}): + """Generate an event builder appropriate for the given room version + + Args: + room_version (str): Version of the room that we're creating an + event builder for + key_values (dict): Fields used as the basis of the new event + + Returns: + EventBuilder + """ + + # There's currently only the one event version defined + if room_version not in { + RoomVersions.V1, + RoomVersions.V2, + RoomVersions.VDH_TEST, + RoomVersions.STATE_V2_TEST, + }: + raise Exception( + "No event format defined for version %r" % (room_version,) + ) + key_values["event_id"] = self.create_event_id() time_now = int(self.clock.time_msec()) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index d749bfdd3a..5c31e5f85f 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -23,7 +23,7 @@ from twisted.internet.defer import DeferredList from synapse.api.constants import MAX_DEPTH, EventTypes, Membership from synapse.api.errors import Codes, SynapseError from synapse.crypto.event_signing import check_event_content_hash -from synapse.events import FrozenEvent +from synapse.events import event_type_from_format_version from synapse.events.utils import prune_event from synapse.http.servlet import assert_params_in_dict from synapse.types import get_domain_from_id @@ -302,11 +302,12 @@ def _is_invite_via_3pid(event): ) -def event_from_pdu_json(pdu_json, outlier=False): +def event_from_pdu_json(pdu_json, event_format_version, outlier=False): """Construct a FrozenEvent from an event json received over federation Args: pdu_json (object): pdu as received over federation + event_format_version (int): The event format version outlier (bool): True to mark this event as an outlier Returns: @@ -330,8 +331,8 @@ def event_from_pdu_json(pdu_json, outlier=False): elif depth > MAX_DEPTH: raise SynapseError(400, "Depth too large", Codes.BAD_JSON) - event = FrozenEvent( - pdu_json + event = event_type_from_format_version(event_format_version)( + pdu_json, ) event.internal_metadata.outlier = outlier diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 33ecabca29..71809893c5 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -170,13 +170,13 @@ class FederationClient(FederationBase): @defer.inlineCallbacks @log_function - def backfill(self, dest, context, limit, extremities): + def backfill(self, dest, room_id, limit, extremities): """Requests some more historic PDUs for the given context from the given destination server. Args: dest (str): The remote home server to ask. - context (str): The context to backfill. + room_id (str): The room_id to backfill. limit (int): The maximum number of PDUs to return. extremities (list): List of PDU id and origins of the first pdus we have seen from the context @@ -191,12 +191,15 @@ class FederationClient(FederationBase): return transaction_data = yield self.transport_layer.backfill( - dest, context, extremities, limit) + dest, room_id, extremities, limit) logger.debug("backfill transaction_data=%s", repr(transaction_data)) + room_version = yield self.store.get_room_version(room_id) + format_ver = room_version_to_event_format(room_version) + pdus = [ - event_from_pdu_json(p, outlier=False) + event_from_pdu_json(p, format_ver, outlier=False) for p in transaction_data["pdus"] ] @@ -240,6 +243,8 @@ class FederationClient(FederationBase): pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) + format_ver = room_version_to_event_format(room_version) + signed_pdu = None for destination in destinations: now = self._clock.time_msec() @@ -255,7 +260,7 @@ class FederationClient(FederationBase): logger.debug("transaction_data %r", transaction_data) pdu_list = [ - event_from_pdu_json(p, outlier=outlier) + event_from_pdu_json(p, format_ver, outlier=outlier) for p in transaction_data["pdus"] ] @@ -349,12 +354,16 @@ class FederationClient(FederationBase): destination, room_id, event_id=event_id, ) + room_version = yield self.store.get_room_version(room_id) + format_ver = room_version_to_event_format(room_version) + pdus = [ - event_from_pdu_json(p, outlier=True) for p in result["pdus"] + event_from_pdu_json(p, format_ver, outlier=True) + for p in result["pdus"] ] auth_chain = [ - event_from_pdu_json(p, outlier=True) + event_from_pdu_json(p, format_ver, outlier=True) for p in result.get("auth_chain", []) ] @@ -362,8 +371,6 @@ class FederationClient(FederationBase): ev.event_id for ev in itertools.chain(pdus, auth_chain) ]) - room_version = yield self.store.get_room_version(room_id) - signed_pdus = yield self._check_sigs_and_hash_and_fetch( destination, [p for p in pdus if p.event_id not in seen_events], @@ -462,13 +469,14 @@ class FederationClient(FederationBase): destination, room_id, event_id, ) + room_version = yield self.store.get_room_version(room_id) + format_ver = room_version_to_event_format(room_version) + auth_chain = [ - event_from_pdu_json(p, outlier=True) + event_from_pdu_json(p, format_ver, outlier=True) for p in res["auth_chain"] ] - room_version = yield self.store.get_room_version(room_id) - signed_auth = yield self._check_sigs_and_hash_and_fetch( destination, auth_chain, outlier=True, room_version=room_version, @@ -605,7 +613,7 @@ class FederationClient(FederationBase): pdu_dict.pop("origin_server_ts", None) pdu_dict.pop("unsigned", None) - builder = self.event_builder_factory.new(pdu_dict) + builder = self.event_builder_factory.new(room_version, pdu_dict) add_hashes_and_signatures( builder, self.hs.hostname, @@ -621,7 +629,7 @@ class FederationClient(FederationBase): "make_" + membership, destinations, send_request, ) - def send_join(self, destinations, pdu): + def send_join(self, destinations, pdu, event_format_version): """Sends a join event to one of a list of homeservers. Doing so will cause the remote server to add the event to the graph, @@ -631,6 +639,7 @@ class FederationClient(FederationBase): destinations (str): Candidate homeservers which are probably participating in the room. pdu (BaseEvent): event to be sent + event_format_version (int): The event format version Return: Deferred: resolves to a dict with members ``origin`` (a string @@ -676,12 +685,12 @@ class FederationClient(FederationBase): logger.debug("Got content: %s", content) state = [ - event_from_pdu_json(p, outlier=True) + event_from_pdu_json(p, event_format_version, outlier=True) for p in content.get("state", []) ] auth_chain = [ - event_from_pdu_json(p, outlier=True) + event_from_pdu_json(p, event_format_version, outlier=True) for p in content.get("auth_chain", []) ] @@ -759,7 +768,10 @@ class FederationClient(FederationBase): logger.debug("Got response to send_invite: %s", pdu_dict) - pdu = event_from_pdu_json(pdu_dict) + room_version = yield self.store.get_room_version(room_id) + format_ver = room_version_to_event_format(room_version) + + pdu = event_from_pdu_json(pdu_dict, format_ver) # Check signatures are correct. pdu = yield self._check_sigs_and_hash(pdu) @@ -837,13 +849,14 @@ class FederationClient(FederationBase): content=send_content, ) + room_version = yield self.store.get_room_version(room_id) + format_ver = room_version_to_event_format(room_version) + auth_chain = [ - event_from_pdu_json(e) + event_from_pdu_json(e, format_ver) for e in content["auth_chain"] ] - room_version = yield self.store.get_room_version(room_id) - signed_auth = yield self._check_sigs_and_hash_and_fetch( destination, auth_chain, outlier=True, room_version=room_version, ) @@ -887,13 +900,14 @@ class FederationClient(FederationBase): timeout=timeout, ) + room_version = yield self.store.get_room_version(room_id) + format_ver = room_version_to_event_format(room_version) + events = [ - event_from_pdu_json(e) + event_from_pdu_json(e, format_ver) for e in content.get("events", []) ] - room_version = yield self.store.get_room_version(room_id) - signed_events = yield self._check_sigs_and_hash_and_fetch( destination, events, outlier=False, room_version=room_version, ) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index dde166e295..4aa04b9588 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -34,6 +34,7 @@ from synapse.api.errors import ( SynapseError, ) from synapse.crypto.event_signing import compute_event_signature +from synapse.events import room_version_to_event_format from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction @@ -178,14 +179,13 @@ class FederationServer(FederationBase): continue try: - # In future we will actually use the room version to parse the - # PDU into an event. - yield self.store.get_room_version(room_id) + room_version = yield self.store.get_room_version(room_id) + format_ver = room_version_to_event_format(room_version) except NotFoundError: logger.info("Ignoring PDU for unknown room_id: %s", room_id) continue - event = event_from_pdu_json(p) + event = event_from_pdu_json(p, format_ver) pdus_by_room.setdefault(room_id, []).append(event) pdu_results = {} @@ -370,7 +370,9 @@ class FederationServer(FederationBase): @defer.inlineCallbacks def on_invite_request(self, origin, content, room_version): - pdu = event_from_pdu_json(content) + format_ver = room_version_to_event_format(room_version) + + pdu = event_from_pdu_json(content, format_ver) origin_host, _ = parse_server_name(origin) yield self.check_server_matches_acl(origin_host, pdu.room_id) ret_pdu = yield self.handler.on_invite_request(origin, pdu) @@ -378,9 +380,12 @@ class FederationServer(FederationBase): defer.returnValue({"event": ret_pdu.get_pdu_json(time_now)}) @defer.inlineCallbacks - def on_send_join_request(self, origin, content): + def on_send_join_request(self, origin, content, room_id): logger.debug("on_send_join_request: content: %s", content) - pdu = event_from_pdu_json(content) + + room_version = yield self.store.get_room_version(room_id) + format_ver = room_version_to_event_format(room_version) + pdu = event_from_pdu_json(content, format_ver) origin_host, _ = parse_server_name(origin) yield self.check_server_matches_acl(origin_host, pdu.room_id) @@ -410,9 +415,12 @@ class FederationServer(FederationBase): }) @defer.inlineCallbacks - def on_send_leave_request(self, origin, content): + def on_send_leave_request(self, origin, content, room_id): logger.debug("on_send_leave_request: content: %s", content) - pdu = event_from_pdu_json(content) + + room_version = yield self.store.get_room_version(room_id) + format_ver = room_version_to_event_format(room_version) + pdu = event_from_pdu_json(content, format_ver) origin_host, _ = parse_server_name(origin) yield self.check_server_matches_acl(origin_host, pdu.room_id) @@ -458,13 +466,14 @@ class FederationServer(FederationBase): origin_host, _ = parse_server_name(origin) yield self.check_server_matches_acl(origin_host, room_id) + room_version = yield self.store.get_room_version(room_id) + format_ver = room_version_to_event_format(room_version) + auth_chain = [ - event_from_pdu_json(e) + event_from_pdu_json(e, format_ver) for e in content["auth_chain"] ] - room_version = yield self.store.get_room_version(room_id) - signed_auth = yield self._check_sigs_and_hash_and_fetch( origin, auth_chain, outlier=True, room_version=room_version, ) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 4557a9e66e..67ae0212c3 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -469,7 +469,7 @@ class FederationSendLeaveServlet(BaseFederationServlet): @defer.inlineCallbacks def on_PUT(self, origin, content, query, room_id, event_id): - content = yield self.handler.on_send_leave_request(origin, content) + content = yield self.handler.on_send_leave_request(origin, content, room_id) defer.returnValue((200, content)) @@ -487,7 +487,7 @@ class FederationSendJoinServlet(BaseFederationServlet): def on_PUT(self, origin, content, query, context, event_id): # TODO(paul): assert that context/event_id parsed from path actually # match those given in content - content = yield self.handler.on_send_join_request(origin, content) + content = yield self.handler.on_send_join_request(origin, content, context) defer.returnValue((200, content)) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c52dca1b81..a4b771049c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1061,7 +1061,7 @@ class FederationHandler(BaseHandler): """ logger.debug("Joining %s to %s", joinee, room_id) - origin, event = yield self._make_and_verify_event( + origin, event, event_format_version = yield self._make_and_verify_event( target_hosts, room_id, joinee, @@ -1091,7 +1091,9 @@ class FederationHandler(BaseHandler): target_hosts.insert(0, origin) except ValueError: pass - ret = yield self.federation_client.send_join(target_hosts, event) + ret = yield self.federation_client.send_join( + target_hosts, event, event_format_version, + ) origin = ret["origin"] state = ret["state"] @@ -1164,13 +1166,18 @@ class FederationHandler(BaseHandler): """ event_content = {"membership": Membership.JOIN} - builder = self.event_builder_factory.new({ - "type": EventTypes.Member, - "content": event_content, - "room_id": room_id, - "sender": user_id, - "state_key": user_id, - }) + room_version = yield self.store.get_room_version(room_id) + + builder = self.event_builder_factory.new( + room_version, + { + "type": EventTypes.Member, + "content": event_content, + "room_id": room_id, + "sender": user_id, + "state_key": user_id, + } + ) try: event, context = yield self.event_creation_handler.create_new_client_event( @@ -1304,7 +1311,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def do_remotely_reject_invite(self, target_hosts, room_id, user_id): - origin, event = yield self._make_and_verify_event( + origin, event, event_format_version = yield self._make_and_verify_event( target_hosts, room_id, user_id, @@ -1336,7 +1343,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _make_and_verify_event(self, target_hosts, room_id, user_id, membership, content={}, params=None): - origin, pdu, _ = yield self.federation_client.make_membership_event( + origin, event, format_ver = yield self.federation_client.make_membership_event( target_hosts, room_id, user_id, @@ -1345,9 +1352,7 @@ class FederationHandler(BaseHandler): params=params, ) - logger.debug("Got response to make_%s: %s", membership, pdu) - - event = pdu + logger.debug("Got response to make_%s: %s", membership, event) # We should assert some things. # FIXME: Do this in a nicer way @@ -1355,7 +1360,7 @@ class FederationHandler(BaseHandler): assert(event.user_id == user_id) assert(event.state_key == user_id) assert(event.room_id == room_id) - defer.returnValue((origin, event)) + defer.returnValue((origin, event, format_ver)) @defer.inlineCallbacks @log_function @@ -1364,13 +1369,17 @@ class FederationHandler(BaseHandler): leave event for the room and return that. We do *not* persist or process it until the other server has signed it and sent it back. """ - builder = self.event_builder_factory.new({ - "type": EventTypes.Member, - "content": {"membership": Membership.LEAVE}, - "room_id": room_id, - "sender": user_id, - "state_key": user_id, - }) + room_version = yield self.store.get_room_version(room_id) + builder = self.event_builder_factory.new( + room_version, + { + "type": EventTypes.Member, + "content": {"membership": Membership.LEAVE}, + "room_id": room_id, + "sender": user_id, + "state_key": user_id, + } + ) event, context = yield self.event_creation_handler.create_new_client_event( builder=builder, @@ -2266,14 +2275,16 @@ class FederationHandler(BaseHandler): } if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)): - builder = self.event_builder_factory.new(event_dict) + room_version = yield self.store.get_room_version(room_id) + builder = self.event_builder_factory.new(room_version, event_dict) + EventValidator().validate_new(builder) event, context = yield self.event_creation_handler.create_new_client_event( builder=builder ) event, context = yield self.add_display_name_to_third_party_invite( - event_dict, event, context + room_version, event_dict, event, context ) try: @@ -2304,14 +2315,18 @@ class FederationHandler(BaseHandler): Returns: Deferred: resolves (to None) """ - builder = self.event_builder_factory.new(event_dict) + room_version = yield self.store.get_room_version(room_id) + + # NB: event_dict has a particular specced format we might need to fudge + # if we change event formats too much. + builder = self.event_builder_factory.new(room_version, event_dict) event, context = yield self.event_creation_handler.create_new_client_event( builder=builder, ) event, context = yield self.add_display_name_to_third_party_invite( - event_dict, event, context + room_version, event_dict, event, context ) try: @@ -2331,7 +2346,8 @@ class FederationHandler(BaseHandler): yield member_handler.send_membership_event(None, event, context) @defer.inlineCallbacks - def add_display_name_to_third_party_invite(self, event_dict, event, context): + def add_display_name_to_third_party_invite(self, room_version, event_dict, + event, context): key = ( EventTypes.ThirdPartyInvite, event.content["third_party_invite"]["signed"]["token"] @@ -2355,7 +2371,7 @@ class FederationHandler(BaseHandler): # auth checks. If we need the invite and don't have it then the # auth check code will explode appropriately. - builder = self.event_builder_factory.new(event_dict) + builder = self.event_builder_factory.new(room_version, event_dict) EventValidator().validate_new(builder) event, context = yield self.event_creation_handler.create_new_client_event( builder=builder, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a7cd779b02..7aaa4fba33 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -278,7 +278,15 @@ class EventCreationHandler(object): """ yield self.auth.check_auth_blocking(requester.user.to_string()) - builder = self.event_builder_factory.new(event_dict) + if event_dict["type"] == EventTypes.Create and event_dict["state_key"] == "": + room_version = event_dict["content"]["room_version"] + else: + try: + room_version = yield self.store.get_room_version(event_dict["room_id"]) + except NotFoundError: + raise AuthError(403, "Unknown room") + + builder = self.event_builder_factory.new(room_version, event_dict) self.validator.validate_new(builder) -- cgit 1.4.1 From d414f300190a1182742685415ef7a6419bc1dda8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 28 Jan 2019 14:55:53 +0000 Subject: Implement fallback for V2 invite API If the room version is either 1 or 2 then a server should retry failed `/v2/invite` requests with the v1 API --- synapse/federation/federation_client.py | 64 ++++++++++++++++++++++++++------- synapse/federation/transport/client.py | 39 ++++++++++++++++++-- 2 files changed, 89 insertions(+), 14 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 71809893c5..cacb1c8aaf 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -751,18 +751,9 @@ class FederationClient(FederationBase): @defer.inlineCallbacks def send_invite(self, destination, room_id, event_id, pdu): - time_now = self._clock.time_msec() - try: - code, content = yield self.transport_layer.send_invite( - destination=destination, - room_id=room_id, - event_id=event_id, - content=pdu.get_pdu_json(time_now), - ) - except HttpResponseException as e: - if e.code == 403: - raise e.to_synapse_error() - raise + room_version = yield self.store.get_room_version(room_id) + + content = yield self._do_send_invite(destination, pdu, room_version) pdu_dict = content["event"] @@ -780,6 +771,55 @@ class FederationClient(FederationBase): defer.returnValue(pdu) + @defer.inlineCallbacks + def _do_send_invite(self, destination, pdu, room_version): + """Actually sends the invite, first trying v2 API and falling back to + v1 API if necessary. + + Args: + destination (str): Target server + pdu (FrozenEvent) + room_version (str) + + Returns: + dict: The event as a dict as returned by the remote server + """ + time_now = self._clock.time_msec() + + try: + content = yield self.transport_layer.send_invite_v2( + destination=destination, + room_id=pdu.room_id, + event_id=pdu.event_id, + content={ + "event": pdu.get_pdu_json(time_now), + "room_version": room_version, + "invite_room_state": pdu.unsigned.get("invite_room_state", []), + }, + ) + defer.returnValue(content) + except HttpResponseException as e: + if e.code in [400, 404]: + if room_version in (RoomVersions.V1, RoomVersions.V2): + pass # We'll fall through + else: + raise Exception("Remote server is too old") + elif e.code == 403: + raise e.to_synapse_error() + else: + raise + + # Didn't work, try v1 API. + # Note the v1 API returns a tuple of `(200, content)` + + _, content = yield self.transport_layer.send_invite_v1( + destination=destination, + room_id=pdu.room_id, + event_id=pdu.event_id, + content=pdu.get_pdu_json(time_now), + ) + defer.returnValue(content) + def send_leave(self, destinations, pdu): """Sends a leave event to one of a list of homeservers. diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 260178c47b..8e2be218e2 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -21,7 +21,7 @@ from six.moves import urllib from twisted.internet import defer from synapse.api.constants import Membership -from synapse.api.urls import FEDERATION_V1_PREFIX +from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX from synapse.util.logutils import log_function logger = logging.getLogger(__name__) @@ -289,7 +289,7 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def send_invite(self, destination, room_id, event_id, content): + def send_invite_v1(self, destination, room_id, event_id, content): path = _create_v1_path("/invite/%s/%s", room_id, event_id) response = yield self.client.put_json( @@ -301,6 +301,20 @@ class TransportLayerClient(object): defer.returnValue(response) + @defer.inlineCallbacks + @log_function + def send_invite_v2(self, destination, room_id, event_id, content): + path = _create_v2_path("/invite/%s/%s", room_id, event_id) + + response = yield self.client.put_json( + destination=destination, + path=path, + data=content, + ignore_backoff=True, + ) + + defer.returnValue(response) + @defer.inlineCallbacks @log_function def get_public_rooms(self, remote_server, limit, since_token, @@ -958,3 +972,24 @@ def _create_v1_path(path, *args): FEDERATION_V1_PREFIX + path % tuple(urllib.parse.quote(arg, "") for arg in args) ) + + +def _create_v2_path(path, *args): + """Creates a path against V2 federation API from the path template and + args. Ensures that all args are url encoded. + + Example: + + _create_v2_path("/event/%s/", event_id) + + Args: + path (str): String template for the path + args: ([str]): Args to insert into path. Each arg will be url encoded + + Returns: + str + """ + return ( + FEDERATION_V2_PREFIX + + path % tuple(urllib.parse.quote(arg, "") for arg in args) + ) -- cgit 1.4.1 From be47cfa9c97b4acfd884440f1953ed000225eb37 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 25 Jan 2019 17:19:31 +0000 Subject: Refactor event building into EventBuilder This is so that everything is done in one place, making it easier to change the event format based on room version --- synapse/events/builder.py | 284 ++++++++++++++++++++++++-------- synapse/federation/federation_client.py | 20 +-- synapse/handlers/message.py | 34 +--- synapse/server.py | 5 +- synapse/storage/event_federation.py | 23 +++ 5 files changed, 254 insertions(+), 112 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/events/builder.py b/synapse/events/builder.py index 7e63371095..225b5fd670 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -13,79 +13,156 @@ # See the License for the specific language governing permissions and # limitations under the License. -import copy +import attr -from synapse.api.constants import RoomVersions +from twisted.internet import defer + +from synapse.api.constants import ( + KNOWN_EVENT_FORMAT_VERSIONS, + KNOWN_ROOM_VERSIONS, + MAX_DEPTH, +) +from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.types import EventID from synapse.util.stringutils import random_string -from . import EventBase, FrozenEvent, _event_dict_property +from . import ( + _EventInternalMetadata, + event_type_from_format_version, + room_version_to_event_format, +) -def get_event_builder(room_version, key_values={}, internal_metadata_dict={}): - """Generate an event builder appropriate for the given room version +@attr.s(slots=True, cmp=False, frozen=True) +class EventBuilder(object): + """A format independent event builder used to build up the event content + before signing the event. - Args: - room_version (str): Version of the room that we're creating an - event builder for - key_values (dict): Fields used as the basis of the new event - internal_metadata_dict (dict): Used to create the `_EventInternalMetadata` - object. + (Note that while objects of this class are frozen, the + content/unsigned/internal_metadata fields are still mutable) - Returns: - EventBuilder + Attributes: + format_version (int): Event format version + room_id (str) + type (str) + sender (str) + content (dict) + unsigned (dict) + internal_metadata (_EventInternalMetadata) + + _state (StateHandler) + _auth (synapse.api.Auth) + _store (DataStore) + _clock (Clock) + _hostname (str): The hostname of the server creating the event + _signing_key: The signing key to use to sign the event as the server """ - if room_version in { - RoomVersions.V1, - RoomVersions.V2, - RoomVersions.VDH_TEST, - RoomVersions.STATE_V2_TEST, - }: - return EventBuilder(key_values, internal_metadata_dict) - else: - raise Exception( - "No event format defined for version %r" % (room_version,) - ) + _state = attr.ib() + _auth = attr.ib() + _store = attr.ib() + _clock = attr.ib() + _hostname = attr.ib() + _signing_key = attr.ib() + + format_version = attr.ib() + + room_id = attr.ib() + type = attr.ib() + sender = attr.ib() + + content = attr.ib(default=attr.Factory(dict)) + unsigned = attr.ib(default=attr.Factory(dict)) + + # These only exist on a subset of events, so they raise AttributeError if + # someone tries to get them when they don't exist. + _state_key = attr.ib(default=None) + _redacts = attr.ib(default=None) + + internal_metadata = attr.ib(default=attr.Factory(lambda: _EventInternalMetadata({}))) + + @property + def state_key(self): + if self._state_key is not None: + return self._state_key + + raise AttributeError("state_key") + + def is_state(self): + return self._state_key is not None -class EventBuilder(EventBase): - def __init__(self, key_values={}, internal_metadata_dict={}): - signatures = copy.deepcopy(key_values.pop("signatures", {})) - unsigned = copy.deepcopy(key_values.pop("unsigned", {})) + @defer.inlineCallbacks + def build(self, prev_event_ids): + """Transform into a fully signed and hashed event - super(EventBuilder, self).__init__( - key_values, - signatures=signatures, - unsigned=unsigned, - internal_metadata_dict=internal_metadata_dict, + Args: + prev_event_ids (list[str]): The event IDs to use as the prev events + + Returns: + Deferred[FrozenEvent] + """ + + state_ids = yield self._state.get_current_state_ids( + self.room_id, prev_event_ids, + ) + auth_ids = yield self._auth.compute_auth_events( + self, state_ids, ) - event_id = _event_dict_property("event_id") - state_key = _event_dict_property("state_key") - type = _event_dict_property("type") + auth_events = yield self._store.add_event_hashes(auth_ids) + prev_events = yield self._store.add_event_hashes(prev_event_ids) - def build(self): - return FrozenEvent.from_event(self) + old_depth = yield self._store.get_max_depth_of( + prev_event_ids, + ) + depth = old_depth + 1 + # we cap depth of generated events, to ensure that they are not + # rejected by other servers (and so that they can be persisted in + # the db) + depth = min(depth, MAX_DEPTH) -class EventBuilderFactory(object): - def __init__(self, clock, hostname): - self.clock = clock - self.hostname = hostname + event_dict = { + "auth_events": auth_events, + "prev_events": prev_events, + "type": self.type, + "room_id": self.room_id, + "sender": self.sender, + "content": self.content, + "unsigned": self.unsigned, + "depth": depth, + "prev_state": [], + } + + if self.is_state(): + event_dict["state_key"] = self._state_key - self.event_id_count = 0 + if self._redacts is not None: + event_dict["redacts"] = self._redacts - def create_event_id(self): - i = str(self.event_id_count) - self.event_id_count += 1 + defer.returnValue( + create_local_event_from_event_dict( + clock=self._clock, + hostname=self._hostname, + signing_key=self._signing_key, + format_version=self.format_version, + event_dict=event_dict, + internal_metadata_dict=self.internal_metadata.get_dict(), + ) + ) - local_part = str(int(self.clock.time())) + i + random_string(5) - e_id = EventID(local_part, self.hostname) +class EventBuilderFactory(object): + def __init__(self, hs): + self.clock = hs.get_clock() + self.hostname = hs.hostname + self.signing_key = hs.config.signing_key[0] - return e_id.to_string() + self.store = hs.get_datastore() + self.state = hs.get_state_handler() + self.auth = hs.get_auth() - def new(self, room_version, key_values={}): + def new(self, room_version, key_values): """Generate an event builder appropriate for the given room version Args: @@ -98,27 +175,104 @@ class EventBuilderFactory(object): """ # There's currently only the one event version defined - if room_version not in { - RoomVersions.V1, - RoomVersions.V2, - RoomVersions.VDH_TEST, - RoomVersions.STATE_V2_TEST, - }: + if room_version not in KNOWN_ROOM_VERSIONS: raise Exception( "No event format defined for version %r" % (room_version,) ) - key_values["event_id"] = self.create_event_id() + key_values["event_id"] = _create_event_id(self.clock, self.hostname) + + return EventBuilder( + store=self.store, + state=self.state, + auth=self.auth, + clock=self.clock, + hostname=self.hostname, + signing_key=self.signing_key, + format_version=room_version_to_event_format(room_version), + type=key_values["type"], + state_key=key_values.get("state_key"), + room_id=key_values["room_id"], + sender=key_values["sender"], + content=key_values.get("content", {}), + unsigned=key_values.get("unsigned", {}), + redacts=key_values.get("redacts", None), + ) + + +def create_local_event_from_event_dict(clock, hostname, signing_key, + format_version, event_dict, + internal_metadata_dict=None): + """Takes a fully formed event dict, ensuring that fields like `origin` + and `origin_server_ts` have correct values for a locally produced event, + then signs and hashes it. + + Args: + clock (Clock) + hostname (str) + signing_key + format_version (int) + event_dict (dict) + internal_metadata_dict (dict|None) + + Returns: + FrozenEvent + """ + + # There's currently only the one event version defined + if format_version not in KNOWN_EVENT_FORMAT_VERSIONS: + raise Exception( + "No event format defined for version %r" % (format_version,) + ) + + if internal_metadata_dict is None: + internal_metadata_dict = {} + + time_now = int(clock.time_msec()) + + event_dict["event_id"] = _create_event_id(clock, hostname) + + event_dict["origin"] = hostname + event_dict["origin_server_ts"] = time_now + + event_dict.setdefault("unsigned", {}) + age = event_dict["unsigned"].pop("age", 0) + event_dict["unsigned"].setdefault("age_ts", time_now - age) + + event_dict.setdefault("signatures", {}) + + add_hashes_and_signatures( + event_dict, + hostname, + signing_key, + ) + return event_type_from_format_version(format_version)( + event_dict, internal_metadata_dict=internal_metadata_dict, + ) + + +# A counter used when generating new event IDs +_event_id_counter = 0 + + +def _create_event_id(clock, hostname): + """Create a new event ID + + Args: + clock (Clock) + hostname (str): The server name for the event ID + + Returns: + str + """ - time_now = int(self.clock.time_msec()) + global _event_id_counter - key_values.setdefault("origin", self.hostname) - key_values.setdefault("origin_server_ts", time_now) + i = str(_event_id_counter) + _event_id_counter += 1 - key_values.setdefault("unsigned", {}) - age = key_values["unsigned"].pop("age", 0) - key_values["unsigned"].setdefault("age_ts", time_now - age) + local_part = str(int(clock.time())) + i + random_string(5) - key_values["signatures"] = {} + e_id = EventID(local_part, hostname) - return EventBuilder(key_values=key_values,) + return e_id.to_string() diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 71809893c5..be3bb59431 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -37,8 +37,7 @@ from synapse.api.errors import ( HttpResponseException, SynapseError, ) -from synapse.crypto.event_signing import add_hashes_and_signatures -from synapse.events import room_version_to_event_format +from synapse.events import builder, room_version_to_event_format from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.util import logcontext, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache @@ -72,7 +71,8 @@ class FederationClient(FederationBase): self.state = hs.get_state_handler() self.transport_layer = hs.get_federation_transport_client() - self.event_builder_factory = hs.get_event_builder_factory() + self.hostname = hs.hostname + self.signing_key = hs.config.signing_key[0] self._get_pdu_cache = ExpiringCache( cache_name="get_pdu_cache", @@ -608,18 +608,10 @@ class FederationClient(FederationBase): if "prev_state" not in pdu_dict: pdu_dict["prev_state"] = [] - # Strip off the fields that we want to clobber. - pdu_dict.pop("origin", None) - pdu_dict.pop("origin_server_ts", None) - pdu_dict.pop("unsigned", None) - - builder = self.event_builder_factory.new(room_version, pdu_dict) - add_hashes_and_signatures( - builder, - self.hs.hostname, - self.hs.config.signing_key[0] + ev = builder.create_local_event_from_event_dict( + self._clock, self.hostname, self.signing_key, + format_version=event_format, event_dict=pdu_dict, ) - ev = builder.build() defer.returnValue( (destination, ev, event_format) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 05d1370c18..ac6f4fd985 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -22,7 +22,7 @@ from canonicaljson import encode_canonical_json, json from twisted.internet import defer from twisted.internet.defer import succeed -from synapse.api.constants import MAX_DEPTH, EventTypes, Membership, RoomVersions +from synapse.api.constants import EventTypes, Membership, RoomVersions from synapse.api.errors import ( AuthError, Codes, @@ -31,7 +31,6 @@ from synapse.api.errors import ( SynapseError, ) from synapse.api.urls import ConsentURIBuilder -from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.replication.http.send_event import ReplicationSendEventRestServlet @@ -545,40 +544,17 @@ class EventCreationHandler(object): prev_events_and_hashes = \ yield self.store.get_prev_events_for_room(builder.room_id) - if prev_events_and_hashes: - depth = max([d for _, _, d in prev_events_and_hashes]) + 1 - # we cap depth of generated events, to ensure that they are not - # rejected by other servers (and so that they can be persisted in - # the db) - depth = min(depth, MAX_DEPTH) - else: - depth = 1 - prev_events = [ (event_id, prev_hashes) for event_id, prev_hashes, _ in prev_events_and_hashes ] - builder.prev_events = prev_events - builder.depth = depth - - context = yield self.state.compute_event_context(builder) - if requester: - context.app_service = requester.app_service - - if builder.is_state(): - builder.prev_state = yield self.store.add_event_hashes( - context.prev_state_events - ) - - yield self.auth.add_auth_events(builder, context) - - signing_key = self.hs.config.signing_key[0] - add_hashes_and_signatures( - builder, self.server_name, signing_key + event = yield builder.build( + prev_event_ids=[p for p, _ in prev_events], ) + context = yield self.state.compute_event_context(event) - event = builder.build() + self.validator.validate_new(event) logger.debug( "Created event %s", diff --git a/synapse/server.py b/synapse/server.py index c8914302cf..6c52101616 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -355,10 +355,7 @@ class HomeServer(object): return Keyring(self) def build_event_builder_factory(self): - return EventBuilderFactory( - clock=self.get_clock(), - hostname=self.hostname, - ) + return EventBuilderFactory(self) def build_filtering(self): return Filtering(self) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index d3b9dea1d6..38809ed0fc 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -125,6 +125,29 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, return dict(txn) + @defer.inlineCallbacks + def get_max_depth_of(self, event_ids): + """Returns the max depth of a set of event IDs + + Args: + event_ids (list[str]) + + Returns + Deferred[int] + """ + rows = yield self._simple_select_many_batch( + table="events", + column="event_id", + iterable=event_ids, + retcols=("depth",), + desc="get_max_depth_of", + ) + + if not rows: + defer.returnValue(0) + else: + defer.returnValue(max(row["depth"] for row in rows)) + def _get_oldest_events_in_room_txn(self, txn, room_id): return self._simple_select_onecol_txn( txn, -- cgit 1.4.1 From 6598992b01629062791cd4d98f062aeabd4f7baf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 29 Jan 2019 11:28:02 +0000 Subject: Fixup calls to `comput_event_signature` We currently pass FrozenEvent instead of `dict` to `compute_event_signature`, which works by accident due to `dict(event)` producing the correct result. This fixes PR #4493 commit 855a151 --- synapse/federation/federation_server.py | 2 +- synapse/handlers/federation.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 4aa04b9588..6681614232 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -322,7 +322,7 @@ class FederationServer(FederationBase): if self.hs.is_mine_id(event.event_id): event.signatures.update( compute_event_signature( - event, + event.get_pdu_json(), self.hs.hostname, self.hs.config.signing_key[0] ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index adf59db7a8..fcaf7530b0 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1300,7 +1300,7 @@ class FederationHandler(BaseHandler): event.signatures.update( compute_event_signature( - event, + event.get_pdu_json(), self.hs.hostname, self.hs.config.signing_key[0] ) -- cgit 1.4.1 From 55d90248356b0068b201c5be7298e0f3ae1c8ace Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 29 Jan 2019 16:15:16 +0000 Subject: Use snder and not event ID domain to check if ours The transaction queue only sends out events that we generate. This was done by checking domain of event ID, but that can no longer be used. Instead, we may as well use the sender field. --- synapse/federation/transaction_queue.py | 2 +- synapse/handlers/federation.py | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index fe787abaeb..1f0b67f5f8 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -175,7 +175,7 @@ class TransactionQueue(object): def handle_event(event): # Only send events for this server. send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of() - is_mine = self.is_mine_id(event.event_id) + is_mine = self.is_mine_id(event.sender) if not is_mine and send_on_behalf_of is None: return diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index fcaf7530b0..f89dabb9eb 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2293,6 +2293,10 @@ class FederationHandler(BaseHandler): EventValidator().validate_new(event) + # We need to tell the transaction queue to send this out, even + # though the sender isn't a local user. + event.internal_metadata.send_on_behalf_of = self.hs.hostname + try: yield self.auth.check_from_context(room_version, event, context) except AuthError as e: @@ -2342,6 +2346,10 @@ class FederationHandler(BaseHandler): raise e yield self._check_signature(event, context) + # We need to tell the transaction queue to send this out, even + # though the sender isn't a local user. + event.internal_metadata.send_on_behalf_of = get_domain_from_id(event.sender) + # XXX we send the invite here, but send_membership_event also sends it, # so we end up making two requests. I think this is redundant. returned_invite = yield self.send_invite(origin, event) -- cgit 1.4.1 From 8e3d34e3c58374ab32f3aaace916ddcb4b1a150c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 29 Jan 2019 16:26:40 +0000 Subject: Use event origin for filtering incoming events We only process events sent to us from a server if the event ID matches the server, to help guard against federation storms. We replace this with a check against the event origin. --- synapse/federation/federation_server.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 6681614232..5c3784c560 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -25,7 +25,7 @@ from twisted.internet import defer from twisted.internet.abstract import isIPAddress from twisted.python import failure -from synapse.api.constants import EventTypes +from synapse.api.constants import EventTypes, Membership from synapse.api.errors import ( AuthError, FederationError, @@ -620,16 +620,19 @@ class FederationServer(FederationBase): """ # check that it's actually being sent from a valid destination to # workaround bug #1753 in 0.18.5 and 0.18.6 - if origin != get_domain_from_id(pdu.event_id): + if origin != get_domain_from_id(pdu.sender): # We continue to accept join events from any server; this is # necessary for the federation join dance to work correctly. # (When we join over federation, the "helper" server is # responsible for sending out the join event, rather than the - # origin. See bug #1893). + # origin. See bug #1893. This is also true for some third party + # invites). if not ( pdu.type == 'm.room.member' and pdu.content and - pdu.content.get("membership", None) == 'join' + pdu.content.get("membership", None) in ( + Membership.JOIN, Membership.INVITE, + ) ): logger.info( "Discarding PDU %s from invalid origin %s", -- cgit 1.4.1 From 840068bd787dbf4a8640549578af5ad39b8fb156 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 29 Jan 2019 17:21:48 +0000 Subject: Only check event ID domain for signatures for V1 events In future version events won't have an event ID, so we won't be able to do this check. --- synapse/federation/federation_base.py | 64 +++++++++++++++++++-------------- synapse/federation/federation_client.py | 6 ++-- synapse/federation/federation_server.py | 5 ++- 3 files changed, 44 insertions(+), 31 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 5c31e5f85f..0bff8686e0 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -20,7 +20,7 @@ import six from twisted.internet import defer from twisted.internet.defer import DeferredList -from synapse.api.constants import MAX_DEPTH, EventTypes, Membership +from synapse.api.constants import KNOWN_ROOM_VERSIONS, MAX_DEPTH, EventTypes, Membership from synapse.api.errors import Codes, SynapseError from synapse.crypto.event_signing import check_event_content_hash from synapse.events import event_type_from_format_version @@ -66,7 +66,7 @@ class FederationBase(object): Returns: Deferred : A list of PDUs that have valid signatures and hashes. """ - deferreds = self._check_sigs_and_hashes(pdus) + deferreds = self._check_sigs_and_hashes(room_version, pdus) @defer.inlineCallbacks def handle_check_result(pdu, deferred): @@ -121,16 +121,17 @@ class FederationBase(object): else: defer.returnValue([p for p in valid_pdus if p]) - def _check_sigs_and_hash(self, pdu): + def _check_sigs_and_hash(self, room_version, pdu): return logcontext.make_deferred_yieldable( - self._check_sigs_and_hashes([pdu])[0], + self._check_sigs_and_hashes(room_version, [pdu])[0], ) - def _check_sigs_and_hashes(self, pdus): + def _check_sigs_and_hashes(self, room_version, pdus): """Checks that each of the received events is correctly signed by the sending server. Args: + room_version (str): The room version of the PDUs pdus (list[FrozenEvent]): the events to be checked Returns: @@ -141,7 +142,7 @@ class FederationBase(object): * throws a SynapseError if the signature check failed. The deferreds run their callbacks in the sentinel logcontext. """ - deferreds = _check_sigs_on_pdus(self.keyring, pdus) + deferreds = _check_sigs_on_pdus(self.keyring, room_version, pdus) ctx = logcontext.LoggingContext.current_context() @@ -203,16 +204,17 @@ class FederationBase(object): class PduToCheckSig(namedtuple("PduToCheckSig", [ - "pdu", "redacted_pdu_json", "event_id_domain", "sender_domain", "deferreds", + "pdu", "redacted_pdu_json", "sender_domain", "deferreds", ])): pass -def _check_sigs_on_pdus(keyring, pdus): +def _check_sigs_on_pdus(keyring, room_version, pdus): """Check that the given events are correctly signed Args: keyring (synapse.crypto.Keyring): keyring object to do the checks + room_version (str): the room version of the PDUs pdus (Collection[EventBase]): the events to be checked Returns: @@ -243,32 +245,22 @@ def _check_sigs_on_pdus(keyring, pdus): # # let's start by getting the domain for each pdu, and flattening the event back # to JSON. + pdus_to_check = [ PduToCheckSig( pdu=p, redacted_pdu_json=prune_event(p).get_pdu_json(), - event_id_domain=get_domain_from_id(p.event_id), sender_domain=get_domain_from_id(p.sender), deferreds=[], ) for p in pdus ] - # first make sure that the event is signed by the event_id's domain - deferreds = keyring.verify_json_objects_for_server([ - (p.event_id_domain, p.redacted_pdu_json) - for p in pdus_to_check - ]) - - for p, d in zip(pdus_to_check, deferreds): - p.deferreds.append(d) - - # now let's look for events where the sender's domain is different to the - # event id's domain (normally only the case for joins/leaves), and add additional - # checks. + # First we check that the sender event is signed by the sender's domain + # (except if its a 3pid invite, in which case it may be sent by any server) pdus_to_check_sender = [ p for p in pdus_to_check - if p.sender_domain != p.event_id_domain and not _is_invite_via_3pid(p.pdu) + if not _is_invite_via_3pid(p.pdu) ] more_deferreds = keyring.verify_json_objects_for_server([ @@ -279,19 +271,37 @@ def _check_sigs_on_pdus(keyring, pdus): for p, d in zip(pdus_to_check_sender, more_deferreds): p.deferreds.append(d) + # now let's look for events where the sender's domain is different to the + # event id's domain (normally only the case for joins/leaves), and add additional + # checks. Only do this if the room version has a concept of event ID domain + if room_version in KNOWN_ROOM_VERSIONS: + pdus_to_check_event_id = [ + p for p in pdus_to_check + if p.sender_domain != get_domain_from_id(p.pdu.event_id) + ] + + more_deferreds = keyring.verify_json_objects_for_server([ + (get_domain_from_id(p.pdu.event_id), p.redacted_pdu_json) + for p in pdus_to_check_event_id + ]) + + for p, d in zip(pdus_to_check_event_id, more_deferreds): + p.deferreds.append(d) + # replace lists of deferreds with single Deferreds return [_flatten_deferred_list(p.deferreds) for p in pdus_to_check] def _flatten_deferred_list(deferreds): - """Given a list of one or more deferreds, either return the single deferred, or - combine into a DeferredList. + """Given a list of deferreds, either return the single deferred, + combine into a DeferredList, or return an already resolved deferred. """ if len(deferreds) > 1: return DeferredList(deferreds, fireOnOneErrback=True, consumeErrors=True) - else: - assert len(deferreds) == 1 + elif len(deferreds) == 1: return deferreds[0] + else: + return defer.succeed(None) def _is_invite_via_3pid(event): @@ -319,7 +329,7 @@ def event_from_pdu_json(pdu_json, event_format_version, outlier=False): """ # we could probably enforce a bunch of other fields here (room_id, sender, # origin, etc etc) - assert_params_in_dict(pdu_json, ('event_id', 'type', 'depth')) + assert_params_in_dict(pdu_json, ('type', 'depth')) depth = pdu_json['depth'] if not isinstance(depth, six.integer_types): diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 9b4acd2ed7..4e4f58b418 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -205,7 +205,7 @@ class FederationClient(FederationBase): # FIXME: We should handle signature failures more gracefully. pdus[:] = yield logcontext.make_deferred_yieldable(defer.gatherResults( - self._check_sigs_and_hashes(pdus), + self._check_sigs_and_hashes(room_version, pdus), consumeErrors=True, ).addErrback(unwrapFirstError)) @@ -268,7 +268,7 @@ class FederationClient(FederationBase): pdu = pdu_list[0] # Check signatures are correct. - signed_pdu = yield self._check_sigs_and_hash(pdu) + signed_pdu = yield self._check_sigs_and_hash(room_version, pdu) break @@ -757,7 +757,7 @@ class FederationClient(FederationBase): pdu = event_from_pdu_json(pdu_dict, format_ver) # Check signatures are correct. - pdu = yield self._check_sigs_and_hash(pdu) + pdu = yield self._check_sigs_and_hash(room_version, pdu) # FIXME: We should handle signature failures more gracefully. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 5c3784c560..aeadc9c564 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -645,9 +645,12 @@ class FederationServer(FederationBase): pdu.event_id, origin ) + # We've already checked that we know the room version by this point + room_version = yield self.store.get_room_version(pdu.room_id) + # Check signature. try: - pdu = yield self._check_sigs_and_hash(pdu) + pdu = yield self._check_sigs_and_hash(room_version, pdu) except SynapseError as e: raise FederationError( "ERROR", -- cgit 1.4.1 From ff2f65d737b3d36e2aef8c296d1ce731156d3847 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 29 Jan 2019 22:35:36 +0000 Subject: Update comment --- synapse/federation/federation_base.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 0bff8686e0..a400091db7 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -227,9 +227,7 @@ def _check_sigs_on_pdus(keyring, room_version, pdus): # we want to check that the event is signed by: # - # (a) the server which created the event_id - # - # (b) the sender's server. + # (a) the sender's server # # - except in the case of invites created from a 3pid invite, which are exempt # from this check, because the sender has to match that of the original 3pid @@ -243,6 +241,8 @@ def _check_sigs_on_pdus(keyring, room_version, pdus): # and signatures are *supposed* to be valid whether or not an event has been # redacted. But this isn't the worst of the ways that 3pid invites are broken. # + # (b) for V1 and V2 rooms, the server which created the event_id + # # let's start by getting the domain for each pdu, and flattening the event back # to JSON. -- cgit 1.4.1 From a1b0e1879b8bdc3dbff168f5948c35c423640f2a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 29 Jan 2019 17:35:31 +0000 Subject: Enable room version v3 --- synapse/api/constants.py | 3 ++- synapse/events/__init__.py | 2 ++ synapse/federation/federation_base.py | 11 +++++++++-- synapse/state/__init__.py | 2 +- 4 files changed, 14 insertions(+), 4 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 4912a55785..ba519005ca 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -104,7 +104,7 @@ class ThirdPartyEntityKind(object): class RoomVersions(object): V1 = "1" V2 = "2" - V3 = "3" # Not currently fully supported, so we don't add to known versions below + V3 = "3" STATE_V2_TEST = "state-v2-test" @@ -116,6 +116,7 @@ DEFAULT_ROOM_VERSION = RoomVersions.V1 KNOWN_ROOM_VERSIONS = { RoomVersions.V1, RoomVersions.V2, + RoomVersions.V3, RoomVersions.STATE_V2_TEST, } diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 697cf58582..20c1ab4203 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -346,6 +346,8 @@ def room_version_to_event_format(room_version): RoomVersions.V1, RoomVersions.V2, RoomVersions.STATE_V2_TEST, ): return EventFormatVersions.V1 + elif room_version in (RoomVersions.V3,): + return EventFormatVersions.V2 else: raise RuntimeError("Unrecognized room version %s" % (room_version,)) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index a400091db7..5188b7f0db 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -20,7 +20,7 @@ import six from twisted.internet import defer from twisted.internet.defer import DeferredList -from synapse.api.constants import KNOWN_ROOM_VERSIONS, MAX_DEPTH, EventTypes, Membership +from synapse.api.constants import MAX_DEPTH, EventTypes, Membership, RoomVersions from synapse.api.errors import Codes, SynapseError from synapse.crypto.event_signing import check_event_content_hash from synapse.events import event_type_from_format_version @@ -274,7 +274,10 @@ def _check_sigs_on_pdus(keyring, room_version, pdus): # now let's look for events where the sender's domain is different to the # event id's domain (normally only the case for joins/leaves), and add additional # checks. Only do this if the room version has a concept of event ID domain - if room_version in KNOWN_ROOM_VERSIONS: + if room_version in ( + RoomVersions.V1, RoomVersions.V2, RoomVersions.VDH_TEST, + RoomVersions.STATE_V2_TEST, + ): pdus_to_check_event_id = [ p for p in pdus_to_check if p.sender_domain != get_domain_from_id(p.pdu.event_id) @@ -287,6 +290,10 @@ def _check_sigs_on_pdus(keyring, room_version, pdus): for p, d in zip(pdus_to_check_event_id, more_deferreds): p.deferreds.append(d) + elif room_version in (RoomVersions.V3,): + pass # No further checks needed, as event IDs are hashes here + else: + raise RuntimeError("Unrecognized room version %s" % (room_version,)) # replace lists of deferreds with single Deferreds return [_flatten_deferred_list(p.deferreds) for p in pdus_to_check] diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 125635b01a..68058f613c 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -608,7 +608,7 @@ def resolve_events_with_store(room_version, state_sets, event_map, state_res_sto state_sets, event_map, state_res_store.get_events, ) elif room_version in ( - RoomVersions.STATE_V2_TEST, RoomVersions.V2, + RoomVersions.STATE_V2_TEST, RoomVersions.V2, RoomVersions.V3, ): return v2.resolve_events_with_store( room_version, state_sets, event_map, state_res_store, -- cgit 1.4.1 From 0b24d58e0548e2e7d274e50d7a55f570108b1228 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 29 Jan 2019 23:11:48 +0000 Subject: No vdh tests! --- synapse/federation/federation_base.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 5188b7f0db..a7a2ec4523 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -275,8 +275,7 @@ def _check_sigs_on_pdus(keyring, room_version, pdus): # event id's domain (normally only the case for joins/leaves), and add additional # checks. Only do this if the room version has a concept of event ID domain if room_version in ( - RoomVersions.V1, RoomVersions.V2, RoomVersions.VDH_TEST, - RoomVersions.STATE_V2_TEST, + RoomVersions.V1, RoomVersions.V2, RoomVersions.STATE_V2_TEST, ): pdus_to_check_event_id = [ p for p in pdus_to_check -- cgit 1.4.1 From 563f6a832b379e2cde6b5618a7c344c2bcd793a1 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Thu, 31 Jan 2019 11:44:04 +0000 Subject: Reject large transactions on federation (#4513) * Reject large transactions on federation * Add changelog * lint * Simplify large transaction handling --- changelog.d/4513.misc | 1 + synapse/federation/federation_server.py | 16 ++++++++++++++++ 2 files changed, 17 insertions(+) create mode 100644 changelog.d/4513.misc (limited to 'synapse/federation') diff --git a/changelog.d/4513.misc b/changelog.d/4513.misc new file mode 100644 index 0000000000..1f64a96465 --- /dev/null +++ b/changelog.d/4513.misc @@ -0,0 +1 @@ +Reject federation transactions if they include more than 50 PDUs or 100 EDUs. \ No newline at end of file diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index aeadc9c564..3da86d4ba6 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -148,6 +148,22 @@ class FederationServer(FederationBase): logger.debug("[%s] Transaction is new", transaction.transaction_id) + # Reject if PDU count > 50 and EDU count > 100 + if (len(transaction.pdus) > 50 + or (hasattr(transaction, "edus") and len(transaction.edus) > 100)): + + logger.info( + "Transaction PDU or EDU count too large. Returning 400", + ) + + response = {} + yield self.transaction_actions.set_response( + origin, + transaction, + 400, response + ) + defer.returnValue((400, response)) + received_pdus_counter.inc(len(transaction.pdus)) origin_host, _ = parse_server_name(origin) -- cgit 1.4.1 From 82ca6d1f9f43752eeec47dc2d6a4da93cd596f15 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 20 Feb 2019 14:13:14 +0000 Subject: Add metrics for number of outgoing EDUs, by type (#4695) --- changelog.d/4695.feature | 1 + synapse/federation/transaction_queue.py | 22 ++++++++++++++++++---- synapse/metrics/__init__.py | 2 -- 3 files changed, 19 insertions(+), 6 deletions(-) create mode 100644 changelog.d/4695.feature (limited to 'synapse/federation') diff --git a/changelog.d/4695.feature b/changelog.d/4695.feature new file mode 100644 index 0000000000..3816c9dec8 --- /dev/null +++ b/changelog.d/4695.feature @@ -0,0 +1 @@ +Add prometheus metrics for number of outgoing EDUs, by type. diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 1f0b67f5f8..30941f5ad6 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -33,7 +33,6 @@ from synapse.metrics import ( event_processing_loop_counter, event_processing_loop_room_count, events_processed_counter, - sent_edus_counter, sent_transactions_counter, ) from synapse.metrics.background_process_metrics import run_as_background_process @@ -47,10 +46,24 @@ from .units import Edu, Transaction logger = logging.getLogger(__name__) sent_pdus_destination_dist_count = Counter( - "synapse_federation_client_sent_pdu_destinations:count", "" + "synapse_federation_client_sent_pdu_destinations:count", + "Number of PDUs queued for sending to one or more destinations", ) + sent_pdus_destination_dist_total = Counter( "synapse_federation_client_sent_pdu_destinations:total", "" + "Total number of PDUs queued for sending across all destinations", +) + +sent_edus_counter = Counter( + "synapse_federation_client_sent_edus", + "Total number of EDUs successfully sent", +) + +sent_edus_by_type = Counter( + "synapse_federation_client_sent_edus_by_type", + "Number of sent EDUs successfully sent, by event type", + ["type"], ) @@ -360,8 +373,6 @@ class TransactionQueue(object): logger.info("Not sending EDU to ourselves") return - sent_edus_counter.inc() - if key: self.pending_edus_keyed_by_dest.setdefault( destination, {} @@ -496,6 +507,9 @@ class TransactionQueue(object): ) if success: sent_transactions_counter.inc() + sent_edus_counter.inc(len(pending_edus)) + for edu in pending_edus: + sent_edus_by_type.labels(edu.edu_type).inc() # Remove the acknowledged device messages from the database # Only bother if we actually sent some device messages if device_message_edus: diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 59900aa5d1..ef48984fdd 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -274,8 +274,6 @@ pending_calls_metric = Histogram( # Federation Metrics # -sent_edus_counter = Counter("synapse_federation_client_sent_edus", "") - sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "") events_processed_counter = Counter("synapse_federation_client_events_processed", "") -- cgit 1.4.1