From e1625d62a8313ff34662aa72ae4d0574e540cc2b Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 31 May 2016 11:55:57 +0100 Subject: Add federation room list servlet --- synapse/federation/transport/server.py | 65 +++++++++++++++++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) (limited to 'synapse/federation/transport/server.py') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 5b6c7d11dd..b82f72fd57 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -134,10 +134,11 @@ class Authenticator(object): class BaseFederationServlet(object): - def __init__(self, handler, authenticator, ratelimiter, server_name): + def __init__(self, handler, authenticator, ratelimiter, server_name, room_list_handler): self.handler = handler self.authenticator = authenticator self.ratelimiter = ratelimiter + self.room_list_handler = room_list_handler def _wrap(self, code): authenticator = self.authenticator @@ -491,6 +492,66 @@ class OpenIdUserInfo(BaseFederationServlet): def _wrap(self, code): return code +class PublicRoomList(BaseFederationServlet): + """ + Fetch the public room list for this server. + + This API returns information in the same format as /publicRooms on the + client API, but will only ever include local public rooms and hence is + intended for consumption by other home servers. + + GET /publicRooms HTTP/1.1 + + HTTP/1.1 200 OK + Content-Type: application/json + + { + "chunk": [ + { + "aliases": [ + "#test:localhost" + ], + "guest_can_join": false, + "name": "test room", + "num_joined_members": 3, + "room_id": "!whkydVegtvatLfXmPN:localhost", + "world_readable": false + } + ], + "end": "END", + "start": "START" + } + """ + + PATH = "/publicRooms" + + @defer.inlineCallbacks + def on_GET(self, request): + data = yield self.room_list_handler.get_public_room_list() + defer.returnValue((200, data)) + + token = parse_string(request, "access_token") + if token is None: + defer.returnValue((401, { + "errcode": "M_MISSING_TOKEN", "error": "Access Token required" + })) + return + + user_id = yield self.handler.on_openid_userinfo(token) + + if user_id is None: + defer.returnValue((401, { + "errcode": "M_UNKNOWN_TOKEN", + "error": "Access Token unknown or expired" + })) + + defer.returnValue((200, {"sub": user_id})) + + # Avoid doing remote HS authorization checks which are done by default by + # BaseFederationServlet. + def _wrap(self, code): + return code + SERVLET_CLASSES = ( FederationSendServlet, @@ -513,6 +574,7 @@ SERVLET_CLASSES = ( FederationThirdPartyInviteExchangeServlet, On3pidBindServlet, OpenIdUserInfo, + PublicRoomList, ) @@ -523,4 +585,5 @@ def register_servlets(hs, resource, authenticator, ratelimiter): authenticator=authenticator, ratelimiter=ratelimiter, server_name=hs.hostname, + room_list_handler=hs.get_room_list_handler(), ).register(resource) -- cgit 1.4.1 From 70ecb415f553e5de86833034ce184a8a905b7ed5 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 31 May 2016 12:00:54 +0100 Subject: Fix c+p fail --- synapse/federation/transport/server.py | 17 ----------------- 1 file changed, 17 deletions(-) (limited to 'synapse/federation/transport/server.py') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index b82f72fd57..f23c02efde 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -530,23 +530,6 @@ class PublicRoomList(BaseFederationServlet): data = yield self.room_list_handler.get_public_room_list() defer.returnValue((200, data)) - token = parse_string(request, "access_token") - if token is None: - defer.returnValue((401, { - "errcode": "M_MISSING_TOKEN", "error": "Access Token required" - })) - return - - user_id = yield self.handler.on_openid_userinfo(token) - - if user_id is None: - defer.returnValue((401, { - "errcode": "M_UNKNOWN_TOKEN", - "error": "Access Token unknown or expired" - })) - - defer.returnValue((200, {"sub": user_id})) - # Avoid doing remote HS authorization checks which are done by default by # BaseFederationServlet. def _wrap(self, code): -- cgit 1.4.1 From d240796dedcfae1f6929c1501e7e335df417cfaf Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 31 May 2016 17:20:07 +0100 Subject: Basic, un-cached support for secondary_directory_servers --- synapse/federation/federation_client.py | 21 +++++++++++++++++++++ synapse/federation/transport/client.py | 12 ++++++++++++ synapse/federation/transport/server.py | 2 +- synapse/handlers/room.py | 33 ++++++++++++++++++++++++++++++++- synapse/rest/client/v1/room.py | 3 ++- 5 files changed, 68 insertions(+), 3 deletions(-) (limited to 'synapse/federation/transport/server.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 37ee469fa2..ba8d71c050 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -24,6 +24,7 @@ from synapse.api.errors import ( CodeMessageException, HttpResponseException, SynapseError, ) from synapse.util import unwrapFirstError +from synapse.util.async import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.events import FrozenEvent @@ -550,6 +551,26 @@ class FederationClient(FederationBase): raise RuntimeError("Failed to send to any server.") + @defer.inlineCallbacks + def get_public_rooms(self, destinations): + results_by_server = {} + + @defer.inlineCallbacks + def _get_result(s): + if s == self.server_name: + defer.returnValue() + + try: + result = yield self.transport_layer.get_public_rooms(s) + results_by_server[s] = result + except: + logger.exception("Error getting room list from server %r", s) + + + yield concurrently_execute(_get_result, destinations, 3) + + defer.returnValue(results_by_server) + @defer.inlineCallbacks def query_auth(self, destination, room_id, event_id, local_auth): """ diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index cd2841c4db..ebb698e278 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -224,6 +224,18 @@ class TransportLayerClient(object): defer.returnValue(response) + @defer.inlineCallbacks + @log_function + def get_public_rooms(self, remote_server): + path = PREFIX + "/publicRooms" + + response = yield self.client.get_json( + destination=remote_server, + path=path, + ) + + defer.returnValue(response) + @defer.inlineCallbacks @log_function def exchange_third_party_invite(self, destination, room_id, event_dict): diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index f23c02efde..da9e7a326d 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -527,7 +527,7 @@ class PublicRoomList(BaseFederationServlet): @defer.inlineCallbacks def on_GET(self, request): - data = yield self.room_list_handler.get_public_room_list() + data = yield self.room_list_handler.get_local_public_room_list() defer.returnValue((200, data)) # Avoid doing remote HS authorization checks which are done by default by diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3d63b3c513..b0aa9fb511 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -345,7 +345,7 @@ class RoomListHandler(BaseHandler): super(RoomListHandler, self).__init__(hs) self.response_cache = ResponseCache() - def get_public_room_list(self): + def get_local_public_room_list(self): result = self.response_cache.get(()) if not result: result = self.response_cache.set((), self._get_public_room_list()) @@ -427,6 +427,37 @@ class RoomListHandler(BaseHandler): # FIXME (erikj): START is no longer a valid value defer.returnValue({"start": "START", "end": "END", "chunk": results}) + @defer.inlineCallbacks + def get_aggregated_public_room_list(self): + """ + Get the public room list from this server and the servers + specified in the secondary_directory_servers config option. + XXX: Pagination... + """ + federated_by_server = yield self.hs.get_replication_layer().get_public_rooms( + self.hs.config.secondary_directory_servers + ) + public_rooms = yield self.get_local_public_room_list() + + # keep track of which room IDs we've seen so we can de-dup + room_ids = set() + + # tag all the ones in our list with our server name. + # Also add the them to the de-deping set + for room in public_rooms['chunk']: + room["server_name"] = self.hs.hostname + room_ids.add(room["room_id"]) + + # Now add the results from federation + for server_name, server_result in federated_by_server.items(): + for room in server_result["chunk"]: + if room["room_id"] not in room_ids: + room["server_name"] = server_name + public_rooms["chunk"].append(room) + room_ids.add(room["room_id"]) + + defer.returnValue(public_rooms) + class RoomContextHandler(BaseHandler): @defer.inlineCallbacks diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 2d22bbdaa3..db52a1fc39 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -280,7 +280,8 @@ class PublicRoomListRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request): handler = self.hs.get_room_list_handler() - data = yield handler.get_public_room_list() + data = yield handler.get_aggregated_public_room_list() + defer.returnValue((200, data)) -- cgit 1.4.1 From 6ecb2ca4ec3fae8c6f2e837b4ec99cc6929de638 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 1 Jun 2016 09:48:55 +0100 Subject: pep8 --- synapse/federation/transport/server.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/federation/transport/server.py') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index da9e7a326d..a1a334955f 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -134,7 +134,8 @@ class Authenticator(object): class BaseFederationServlet(object): - def __init__(self, handler, authenticator, ratelimiter, server_name, room_list_handler): + def __init__(self, handler, authenticator, ratelimiter, server_name, + room_list_handler): self.handler = handler self.authenticator = authenticator self.ratelimiter = ratelimiter @@ -492,6 +493,7 @@ class OpenIdUserInfo(BaseFederationServlet): def _wrap(self, code): return code + class PublicRoomList(BaseFederationServlet): """ Fetch the public room list for this server. -- cgit 1.4.1 From 1fd6eb695d1fffbe830faf50c13607116300095b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Jun 2016 14:15:18 +0100 Subject: Enable auth on federation PublicRoomList --- synapse/federation/transport/server.py | 5 ----- 1 file changed, 5 deletions(-) (limited to 'synapse/federation/transport/server.py') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index a1a334955f..ab9f38f010 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -532,11 +532,6 @@ class PublicRoomList(BaseFederationServlet): data = yield self.room_list_handler.get_local_public_room_list() defer.returnValue((200, data)) - # Avoid doing remote HS authorization checks which are done by default by - # BaseFederationServlet. - def _wrap(self, code): - return code - SERVLET_CLASSES = ( FederationSendServlet, -- cgit 1.4.1 From d88faf92d16d9384433452e4fb7901fd2bd6eda4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Jun 2016 14:39:31 +0100 Subject: Fix up federation PublicRoomList --- synapse/federation/transport/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/federation/transport/server.py') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index ab9f38f010..6fc3e2207c 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -528,7 +528,7 @@ class PublicRoomList(BaseFederationServlet): PATH = "/publicRooms" @defer.inlineCallbacks - def on_GET(self, request): + def on_GET(self, origin, content, query): data = yield self.room_list_handler.get_local_public_room_list() defer.returnValue((200, data)) -- cgit 1.4.1 From 120c2387053bdc30824d6b15931532664f739192 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 17 Jun 2016 16:10:37 +0100 Subject: Disable responding with canonical json for federation --- synapse/federation/transport/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/federation/transport/server.py') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 6fc3e2207c..8a1965f45a 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -37,7 +37,7 @@ class TransportLayerServer(JsonResource): self.hs = hs self.clock = hs.get_clock() - super(TransportLayerServer, self).__init__(hs) + super(TransportLayerServer, self).__init__(hs, canonical_json=False) self.authenticator = Authenticator(hs) self.ratelimiter = FederationRateLimiter( -- cgit 1.4.1 From 8f4a9bbc16e6b54f1ab110085e42884fd16abb6a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 17 Jun 2016 16:43:45 +0100 Subject: Linearize some federation endpoints based on (origin, room_id) --- synapse/federation/federation_server.py | 143 +++++++++++++++++--------------- synapse/federation/transport/server.py | 2 +- 2 files changed, 78 insertions(+), 67 deletions(-) (limited to 'synapse/federation/transport/server.py') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 2a589524a4..85f5e752fe 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -49,6 +49,7 @@ class FederationServer(FederationBase): super(FederationServer, self).__init__(hs) self._room_pdu_linearizer = Linearizer() + self._server_linearizer = Linearizer() def set_handler(self, handler): """Sets the handler that the replication layer will use to communicate @@ -89,11 +90,14 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function def on_backfill_request(self, origin, room_id, versions, limit): - pdus = yield self.handler.on_backfill_request( - origin, room_id, versions, limit - ) + with (yield self._server_linearizer.queue((origin, room_id))): + pdus = yield self.handler.on_backfill_request( + origin, room_id, versions, limit + ) + + res = self._transaction_from_pdus(pdus).get_dict() - defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) + defer.returnValue((200, res)) @defer.inlineCallbacks @log_function @@ -184,27 +188,28 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function def on_context_state_request(self, origin, room_id, event_id): - if event_id: - pdus = yield self.handler.get_state_for_pdu( - origin, room_id, event_id, - ) - auth_chain = yield self.store.get_auth_chain( - [pdu.event_id for pdu in pdus] - ) + with (yield self._server_linearizer.queue((origin, room_id))): + if event_id: + pdus = yield self.handler.get_state_for_pdu( + origin, room_id, event_id, + ) + auth_chain = yield self.store.get_auth_chain( + [pdu.event_id for pdu in pdus] + ) - for event in auth_chain: - # We sign these again because there was a bug where we - # incorrectly signed things the first time round - if self.hs.is_mine_id(event.event_id): - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] + for event in auth_chain: + # We sign these again because there was a bug where we + # incorrectly signed things the first time round + if self.hs.is_mine_id(event.event_id): + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) ) - ) - else: - raise NotImplementedError("Specify an event") + else: + raise NotImplementedError("Specify an event") defer.returnValue((200, { "pdus": [pdu.get_pdu_json() for pdu in pdus], @@ -283,14 +288,16 @@ class FederationServer(FederationBase): @defer.inlineCallbacks def on_event_auth(self, origin, room_id, event_id): - time_now = self._clock.time_msec() - auth_pdus = yield self.handler.on_event_auth(event_id) - defer.returnValue((200, { - "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus], - })) + with (yield self._server_linearizer.queue((origin, room_id))): + time_now = self._clock.time_msec() + auth_pdus = yield self.handler.on_event_auth(event_id) + res = { + "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus], + } + defer.returnValue((200, res)) @defer.inlineCallbacks - def on_query_auth_request(self, origin, content, event_id): + def on_query_auth_request(self, origin, content, room_id, event_id): """ Content is a dict with keys:: auth_chain (list): A list of events that give the auth chain. @@ -309,32 +316,33 @@ class FederationServer(FederationBase): Returns: Deferred: Results in `dict` with the same format as `content` """ - auth_chain = [ - self.event_from_pdu_json(e) - for e in content["auth_chain"] - ] - - signed_auth = yield self._check_sigs_and_hash_and_fetch( - origin, auth_chain, outlier=True - ) + with (yield self._server_linearizer.queue((origin, room_id))): + auth_chain = [ + self.event_from_pdu_json(e) + for e in content["auth_chain"] + ] + + signed_auth = yield self._check_sigs_and_hash_and_fetch( + origin, auth_chain, outlier=True + ) - ret = yield self.handler.on_query_auth( - origin, - event_id, - signed_auth, - content.get("rejects", []), - content.get("missing", []), - ) + ret = yield self.handler.on_query_auth( + origin, + event_id, + signed_auth, + content.get("rejects", []), + content.get("missing", []), + ) - time_now = self._clock.time_msec() - send_content = { - "auth_chain": [ - e.get_pdu_json(time_now) - for e in ret["auth_chain"] - ], - "rejects": ret.get("rejects", []), - "missing": ret.get("missing", []), - } + time_now = self._clock.time_msec() + send_content = { + "auth_chain": [ + e.get_pdu_json(time_now) + for e in ret["auth_chain"] + ], + "rejects": ret.get("rejects", []), + "missing": ret.get("missing", []), + } defer.returnValue( (200, send_content) @@ -386,21 +394,24 @@ class FederationServer(FederationBase): @log_function def on_get_missing_events(self, origin, room_id, earliest_events, latest_events, limit, min_depth): - logger.info( - "on_get_missing_events: earliest_events: %r, latest_events: %r," - " limit: %d, min_depth: %d", - earliest_events, latest_events, limit, min_depth - ) - missing_events = yield self.handler.on_get_missing_events( - origin, room_id, earliest_events, latest_events, limit, min_depth - ) + with (yield self._server_linearizer.queue((origin, room_id))): + logger.info( + "on_get_missing_events: earliest_events: %r, latest_events: %r," + " limit: %d, min_depth: %d", + earliest_events, latest_events, limit, min_depth + ) + missing_events = yield self.handler.on_get_missing_events( + origin, room_id, earliest_events, latest_events, limit, min_depth + ) - if len(missing_events) < 5: - logger.info("Returning %d events: %r", len(missing_events), missing_events) - else: - logger.info("Returning %d events", len(missing_events)) + if len(missing_events) < 5: + logger.info( + "Returning %d events: %r", len(missing_events), missing_events + ) + else: + logger.info("Returning %d events", len(missing_events)) - time_now = self._clock.time_msec() + time_now = self._clock.time_msec() defer.returnValue({ "events": [ev.get_pdu_json(time_now) for ev in missing_events], diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 8a1965f45a..26fa88ae84 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -388,7 +388,7 @@ class FederationQueryAuthServlet(BaseFederationServlet): @defer.inlineCallbacks def on_POST(self, origin, content, query, context, event_id): new_content = yield self.handler.on_query_auth_request( - origin, content, event_id + origin, content, context, event_id ) defer.returnValue((200, new_content)) -- cgit 1.4.1 From 1efee2f52b931ddcd90e87d06c7ea614da2c9cd0 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 2 Aug 2016 18:06:31 +0100 Subject: E2E keys: Make federation query share code with client query Refactor the e2e query handler to separate out the local query, and then make the federation handler use it. --- synapse/federation/federation_server.py | 20 +----- synapse/federation/transport/server.py | 4 +- synapse/handlers/e2e_keys.py | 115 +++++++++++++++++++++++++------- 3 files changed, 92 insertions(+), 47 deletions(-) (limited to 'synapse/federation/transport/server.py') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 85f5e752fe..e637f2a8bd 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -348,27 +348,9 @@ class FederationServer(FederationBase): (200, send_content) ) - @defer.inlineCallbacks @log_function def on_query_client_keys(self, origin, content): - query = [] - for user_id, device_ids in content.get("device_keys", {}).items(): - if not device_ids: - query.append((user_id, None)) - else: - for device_id in device_ids: - query.append((user_id, device_id)) - - results = yield self.store.get_e2e_device_keys(query) - - json_result = {} - for user_id, device_keys in results.items(): - for device_id, json_bytes in device_keys.items(): - json_result.setdefault(user_id, {})[device_id] = json.loads( - json_bytes - ) - - defer.returnValue({"device_keys": json_result}) + return self.on_query_request("client_keys", content) @defer.inlineCallbacks @log_function diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 26fa88ae84..1a88413d18 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -367,10 +367,8 @@ class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet): class FederationClientKeysQueryServlet(BaseFederationServlet): PATH = "/user/keys/query" - @defer.inlineCallbacks def on_POST(self, origin, content, query): - response = yield self.handler.on_query_client_keys(origin, content) - defer.returnValue((200, response)) + return self.handler.on_query_client_keys(origin, content) class FederationClientKeysClaimServlet(BaseFederationServlet): diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 73a14cf952..9c7e9494d6 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -13,12 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections import json import logging from twisted.internet import defer +from synapse.api import errors import synapse.types + from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -29,39 +32,101 @@ class E2eKeysHandler(BaseHandler): super(E2eKeysHandler, self).__init__(hs) self.store = hs.get_datastore() self.federation = hs.get_replication_layer() - self.is_mine = hs.is_mine + self.is_mine_id = hs.is_mine_id + + # doesn't really work as part of the generic query API, because the + # query request requires an object POST, but we abuse the + # "query handler" interface. + self.federation.register_query_handler( + "client_keys", self.on_federation_query_client_keys + ) @defer.inlineCallbacks def query_devices(self, query_body): - local_query = [] - remote_queries = {} - for user_id, device_ids in query_body.get("device_keys", {}).items(): + """ Handle a device key query from a client + + { + "device_keys": { + "": [""] + } + } + -> + { + "device_keys": { + "": { + "": { + ... + } + } + } + } + """ + device_keys_query = query_body.get("device_keys", {}) + + # separate users by domain. + # make a map from domain to user_id to device_ids + queries_by_domain = collections.defaultdict(dict) + for user_id, device_ids in device_keys_query.items(): user = synapse.types.UserID.from_string(user_id) - if self.is_mine(user): - if not device_ids: - local_query.append((user_id, None)) - else: - for device_id in device_ids: - local_query.append((user_id, device_id)) + queries_by_domain[user.domain][user_id] = device_ids + + # do the queries + # TODO: do these in parallel + results = {} + for destination, destination_query in queries_by_domain.items(): + if destination == self.hs.hostname: + res = yield self.query_local_devices(destination_query) else: - remote_queries.setdefault(user.domain, {})[user_id] = list( - device_ids + res = yield self.federation.query_client_keys( + destination, {"device_keys": destination_query} ) + res = res["device_keys"] + for user_id, keys in res.items(): + if user_id in destination_query: + results[user_id] = keys + + defer.returnValue((200, {"device_keys": results})) + + @defer.inlineCallbacks + def query_local_devices(self, query): + """Get E2E device keys for local users + + Args: + query (dict[string, list[string]|None): map from user_id to a list + of devices to query (None for all devices) + + Returns: + defer.Deferred: (resolves to dict[string, dict[string, dict]]): + map from user_id -> device_id -> device details + """ + local_query = [] + + for user_id, device_ids in query.items(): + if not self.is_mine_id(user_id): + logger.warning("Request for keys for non-local user %s", + user_id) + raise errors.SynapseError(400, "Not a user here") + + if not device_ids: + local_query.append((user_id, None)) + else: + for device_id in device_ids: + local_query.append((user_id, device_id)) + results = yield self.store.get_e2e_device_keys(local_query) - json_result = {} + # un-jsonify the results + json_result = collections.defaultdict(dict) for user_id, device_keys in results.items(): for device_id, json_bytes in device_keys.items(): - json_result.setdefault(user_id, {})[ - device_id] = json.loads( - json_bytes - ) + json_result[user_id][device_id] = json.loads(json_bytes) - for destination, device_keys in remote_queries.items(): - remote_result = yield self.federation.query_client_keys( - destination, {"device_keys": device_keys} - ) - for user_id, keys in remote_result["device_keys"].items(): - if user_id in device_keys: - json_result[user_id] = keys - defer.returnValue((200, {"device_keys": json_result})) + defer.returnValue(json_result) + + @defer.inlineCallbacks + def on_federation_query_client_keys(self, query_body): + """ Handle a device key query from a federated server + """ + device_keys_query = query_body.get("device_keys", {}) + res = yield self.query_local_devices(device_keys_query) + defer.returnValue({"device_keys": res}) -- cgit 1.4.1 From e3a720217a9d200a7c3db8305df53ef8bf76f565 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 14:47:37 +0100 Subject: Add /state_ids federation API The new API only returns the event_ids for the state, as most requesters will already have the vast majority of the events already. --- synapse/federation/federation_client.py | 73 +++++++++++++++++++++++++++++++-- synapse/federation/federation_server.py | 21 ++++++++++ synapse/federation/transport/client.py | 22 ++++++++++ synapse/federation/transport/server.py | 12 ++++++ 4 files changed, 125 insertions(+), 3 deletions(-) (limited to 'synapse/federation/transport/server.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b06387051c..03f6133e61 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -314,9 +314,32 @@ class FederationClient(FederationBase): Deferred: Results in a list of PDUs. """ - result = yield self.transport_layer.get_room_state( - destination, room_id, event_id=event_id, - ) + try: + # First we try and ask for just the IDs, as thats far quicker if + # we have most of the state and auth_chain already. + # However, this may 404 if the other side has an old synapse. + result = yield self.transport_layer.get_room_state_ids( + destination, room_id, event_id=event_id, + ) + + state_event_ids = result["pdus"] + auth_event_ids = result.get("auth_chain", []) + + event_map, _failed_to_fetch = yield self.get_events( + [destination], room_id, set(state_event_ids + auth_event_ids) + ) + + pdus = [event_map[e_id] for e_id in state_event_ids] + auth_chain = [event_map[e_id] for e_id in auth_event_ids] + + auth_chain.sort(key=lambda e: e.depth) + + defer.returnValue((pdus, auth_chain)) + except HttpResponseException as e: + if e.code == 404: + logger.info("Failed to use get_room_state_ids API, falling back") + else: + raise e pdus = [ self.event_from_pdu_json(p, outlier=True) for p in result["pdus"] @@ -339,6 +362,50 @@ class FederationClient(FederationBase): defer.returnValue((signed_pdus, signed_auth)) + @defer.inlineCallbacks + def get_events(self, destinations, room_id, event_ids, return_local=True): + if return_local: + seen_events = yield self.store.get_events(event_ids) + signed_events = seen_events.values() + else: + seen_events = yield self.store.have_events(event_ids) + signed_events = [] + + failed_to_fetch = [] + + missing_events = set(event_ids) + for k in seen_events: + missing_events.discard(k) + + if not missing_events: + defer.returnValue((signed_events, failed_to_fetch)) + + def random_server_list(): + srvs = list(destinations) + random.shuffle(srvs) + return srvs + + batch_size = 20 + for i in xrange(0, len(missing_events), batch_size): + batch = missing_events[i:i + batch_size] + + deferreds = [ + self.get_pdu( + destinations=random_server_list(), + event_id=e_id, + ).addBoth(lambda r, e: (r, e), e_id) + for e_id in batch + ] + + res = yield defer.DeferredList(deferreds, consumeErrors=True) + for (result, val), (e_id, _) in res: + if result and val: + signed_events.append(val) + else: + failed_to_fetch.add(e_id) + + defer.returnValue((signed_events, failed_to_fetch)) + @defer.inlineCallbacks @log_function def get_event_auth(self, destination, room_id, event_id): diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 612d274bdb..40e9fda0eb 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -214,6 +214,27 @@ class FederationServer(FederationBase): defer.returnValue((200, resp)) + @defer.inlineCallbacks + def on_state_ids_request(self, origin, room_id, event_id): + if not event_id: + raise NotImplementedError("Specify an event") + + in_room = yield self.auth.check_host_in_room(room_id, origin) + if not in_room: + raise AuthError(403, "Host not in room.") + + pdus = yield self.handler.get_state_for_pdu( + room_id, event_id, + ) + auth_chain = yield self.store.get_auth_chain( + [pdu.event_id for pdu in pdus] + ) + + defer.returnValue((200, { + "pdus": [pdu.event_id for pdu in pdus], + "auth_chain": [pdu.event_id for pdu in auth_chain], + })) + @defer.inlineCallbacks def _on_context_state_request_compute(self, room_id, event_id): pdus = yield self.handler.get_state_for_pdu( diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index ebb698e278..3d088e43cb 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -54,6 +54,28 @@ class TransportLayerClient(object): destination, path=path, args={"event_id": event_id}, ) + @log_function + def get_room_state_ids(self, destination, room_id, event_id): + """ Requests all state for a given room from the given server at the + given event. Returns the state's event_id's + + Args: + destination (str): The host name of the remote home server we want + to get the state from. + context (str): The name of the context we want the state of + event_id (str): The event we want the context at. + + Returns: + Deferred: Results in a dict received from the remote homeserver. + """ + logger.debug("get_room_state_ids dest=%s, room=%s", + destination, room_id) + + path = PREFIX + "/state_ids/%s/" % room_id + return self.client.get_json( + destination, path=path, args={"event_id": event_id}, + ) + @log_function def get_event(self, destination, event_id, timeout=None): """ Requests the pdu with give id and origin from the given server. diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 26fa88ae84..3ae7c48457 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -271,6 +271,17 @@ class FederationStateServlet(BaseFederationServlet): ) +class FederationStateIdsServlet(BaseFederationServlet): + PATH = "/state_ids/(?P[^/]*)/" + + def on_GET(self, origin, content, query, room_id): + return self.handler.on_state_ids_request( + origin, + room_id, + query.get("event_id", [None])[0], + ) + + class FederationBackfillServlet(BaseFederationServlet): PATH = "/backfill/(?P[^/]*)/" @@ -538,6 +549,7 @@ SERVLET_CLASSES = ( FederationPullServlet, FederationEventServlet, FederationStateServlet, + FederationStateIdsServlet, FederationBackfillServlet, FederationQueryServlet, FederationMakeJoinServlet, -- cgit 1.4.1 From 597c79be1001f748d467c0c64acfe85262ee00f9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Aug 2016 16:17:04 +0100 Subject: Change the way we specify if we require auth or not --- synapse/federation/transport/server.py | 95 ++++++++++++++++++++-------------- 1 file changed, 55 insertions(+), 40 deletions(-) (limited to 'synapse/federation/transport/server.py') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 0bc6e0801d..ee8f94e340 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX from synapse.api.errors import Codes, SynapseError from synapse.http.server import JsonResource -from synapse.http.servlet import parse_json_object_from_request, parse_string +from synapse.http.servlet import parse_json_object_from_request from synapse.util.ratelimitutils import FederationRateLimiter import functools @@ -60,6 +60,16 @@ class TransportLayerServer(JsonResource): ) +class AuthenticationError(SynapseError): + """There was a problem authenticating the request""" + pass + + +class NoAuthenticationError(AuthenticationError): + """The request had no authentication information""" + pass + + class Authenticator(object): def __init__(self, hs): self.keyring = hs.get_keyring() @@ -67,7 +77,7 @@ class Authenticator(object): # A method just so we can pass 'self' as the authenticator to the Servlets @defer.inlineCallbacks - def authenticate_request(self, request): + def authenticate_request(self, request, content): json_request = { "method": request.method, "uri": request.uri, @@ -75,17 +85,10 @@ class Authenticator(object): "signatures": {}, } - content = None - origin = None + if content is not None: + json_request["content"] = content - if request.method in ["PUT", "POST"]: - # TODO: Handle other method types? other content types? - try: - content_bytes = request.content.read() - content = json.loads(content_bytes) - json_request["content"] = content - except: - raise SynapseError(400, "Unable to parse JSON", Codes.BAD_JSON) + origin = None def parse_auth_header(header_str): try: @@ -103,14 +106,14 @@ class Authenticator(object): sig = strip_quotes(param_dict["sig"]) return (origin, key, sig) except: - raise SynapseError( + raise AuthenticationError( 400, "Malformed Authorization header", Codes.UNAUTHORIZED ) auth_headers = request.requestHeaders.getRawHeaders(b"Authorization") if not auth_headers: - raise SynapseError( + raise NoAuthenticationError( 401, "Missing Authorization headers", Codes.UNAUTHORIZED, ) @@ -121,7 +124,7 @@ class Authenticator(object): json_request["signatures"].setdefault(origin, {})[key] = sig if not json_request["signatures"]: - raise SynapseError( + raise NoAuthenticationError( 401, "Missing Authorization headers", Codes.UNAUTHORIZED, ) @@ -130,10 +133,12 @@ class Authenticator(object): logger.info("Request from %s", origin) request.authenticated_entity = origin - defer.returnValue((origin, content)) + defer.returnValue(origin) class BaseFederationServlet(object): + REQUIRE_AUTH = True + def __init__(self, handler, authenticator, ratelimiter, server_name, room_list_handler): self.handler = handler @@ -141,29 +146,46 @@ class BaseFederationServlet(object): self.ratelimiter = ratelimiter self.room_list_handler = room_list_handler - def _wrap(self, code): + def _wrap(self, func): authenticator = self.authenticator ratelimiter = self.ratelimiter @defer.inlineCallbacks - @functools.wraps(code) - def new_code(request, *args, **kwargs): + @functools.wraps(func) + def new_func(request, *args, **kwargs): + content = None + if request.method in ["PUT", "POST"]: + # TODO: Handle other method types? other content types? + content = parse_json_object_from_request(request) + try: - (origin, content) = yield authenticator.authenticate_request(request) + origin = yield authenticator.authenticate_request(request, content) + except NoAuthenticationError: + origin = None + if self.REQUIRE_AUTH: + logger.exception("authenticate_request failed") + raise + except: + logger.exception("authenticate_request failed") + raise + + if origin: with ratelimiter.ratelimit(origin) as d: yield d - response = yield code( + response = yield func( origin, content, request.args, *args, **kwargs ) - except: - logger.exception("authenticate_request failed") - raise + else: + response = yield func( + origin, content, request.args, *args, **kwargs + ) + defer.returnValue(response) # Extra logic that functools.wraps() doesn't finish - new_code.__self__ = code.__self__ + new_func.__self__ = func.__self__ - return new_code + return new_func def register(self, server): pattern = re.compile("^" + PREFIX + self.PATH + "$") @@ -429,9 +451,10 @@ class FederationGetMissingEventsServlet(BaseFederationServlet): class On3pidBindServlet(BaseFederationServlet): PATH = "/3pid/onbind" + REQUIRE_AUTH = False + @defer.inlineCallbacks - def on_POST(self, request): - content = parse_json_object_from_request(request) + def on_POST(self, origin, content, query): if "invites" in content: last_exception = None for invite in content["invites"]: @@ -453,11 +476,6 @@ class On3pidBindServlet(BaseFederationServlet): raise last_exception defer.returnValue((200, {})) - # Avoid doing remote HS authorization checks which are done by default by - # BaseFederationServlet. - def _wrap(self, code): - return code - class OpenIdUserInfo(BaseFederationServlet): """ @@ -478,9 +496,11 @@ class OpenIdUserInfo(BaseFederationServlet): PATH = "/openid/userinfo" + REQUIRE_AUTH = False + @defer.inlineCallbacks - def on_GET(self, request): - token = parse_string(request, "access_token") + def on_GET(self, origin, content, query): + token = query.get("access_token", [None])[0] if token is None: defer.returnValue((401, { "errcode": "M_MISSING_TOKEN", "error": "Access Token required" @@ -497,11 +517,6 @@ class OpenIdUserInfo(BaseFederationServlet): defer.returnValue((200, {"sub": user_id})) - # Avoid doing remote HS authorization checks which are done by default by - # BaseFederationServlet. - def _wrap(self, code): - return code - class PublicRoomList(BaseFederationServlet): """ -- cgit 1.4.1 From 24f36469bc5c634ff49c87e49e32579d6ac43d7c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Aug 2016 16:36:07 +0100 Subject: Add federation /version API --- synapse/app/federation_reader.py | 2 +- synapse/app/homeserver.py | 2 +- synapse/app/pusher.py | 2 +- synapse/app/synchrotron.py | 2 +- synapse/federation/transport/server.py | 18 +++++++++++++++++- synapse/util/versionstring.py | 8 ++++---- 6 files changed, 25 insertions(+), 9 deletions(-) (limited to 'synapse/federation/transport/server.py') diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 58d425f9ac..7355499ae2 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -165,7 +165,7 @@ def start(config_options): db_config=config.database_config, tls_server_context_factory=tls_server_context_factory, config=config, - version_string=get_version_string("Synapse", synapse), + version_string="Synapse/" + get_version_string(synapse), database_engine=database_engine, ) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index fe68ceb07c..40e6f65236 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -285,7 +285,7 @@ def setup(config_options): # check any extra requirements we have now we have a config check_requirements(config) - version_string = get_version_string("Synapse", synapse) + version_string = "Synapse/" + get_version_string(synapse) logger.info("Server hostname: %s", config.server_name) logger.info("Server version: %s", version_string) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 4f1d18ab5f..c8dde0fcb8 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -273,7 +273,7 @@ def start(config_options): config.server_name, db_config=config.database_config, config=config, - version_string=get_version_string("Synapse", synapse), + version_string="Synapse/" + get_version_string(synapse), database_engine=database_engine, ) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 8cf5bbbb6d..215ccfd522 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -424,7 +424,7 @@ def start(config_options): config.server_name, db_config=config.database_config, config=config, - version_string=get_version_string("Synapse", synapse), + version_string="Synapse/" + get_version_string(synapse), database_engine=database_engine, application_service_handler=SynchrotronApplicationService(), ) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index ee8f94e340..37c0d4fbc4 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -20,11 +20,12 @@ from synapse.api.errors import Codes, SynapseError from synapse.http.server import JsonResource from synapse.http.servlet import parse_json_object_from_request from synapse.util.ratelimitutils import FederationRateLimiter +from synapse.util.versionstring import get_version_string import functools import logging -import simplejson as json import re +import synapse logger = logging.getLogger(__name__) @@ -557,6 +558,20 @@ class PublicRoomList(BaseFederationServlet): defer.returnValue((200, data)) +class FederationVersionServlet(BaseFederationServlet): + PATH = "/version" + + REQUIRE_AUTH = False + + def on_GET(self, origin, content, query): + return defer.succeed((200, { + "server": { + "name": "Synapse", + "version": get_version_string(synapse) + }, + })) + + SERVLET_CLASSES = ( FederationSendServlet, FederationPullServlet, @@ -580,6 +595,7 @@ SERVLET_CLASSES = ( On3pidBindServlet, OpenIdUserInfo, PublicRoomList, + FederationVersionServlet, ) diff --git a/synapse/util/versionstring.py b/synapse/util/versionstring.py index a4f156cb3b..52086df465 100644 --- a/synapse/util/versionstring.py +++ b/synapse/util/versionstring.py @@ -21,7 +21,7 @@ import logging logger = logging.getLogger(__name__) -def get_version_string(name, module): +def get_version_string(module): try: null = open(os.devnull, 'w') cwd = os.path.dirname(os.path.abspath(module.__file__)) @@ -74,11 +74,11 @@ def get_version_string(name, module): ) return ( - "%s/%s (%s)" % ( - name, module.__version__, git_version, + "%s (%s)" % ( + module.__version__, git_version, ) ).encode("ascii") except Exception as e: logger.info("Failed to check for git repository: %s", e) - return ("%s/%s" % (name, module.__version__,)).encode("ascii") + return module.__version__.encode("ascii") -- cgit 1.4.1