From 8f4a9bbc16e6b54f1ab110085e42884fd16abb6a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 17 Jun 2016 16:43:45 +0100 Subject: Linearize some federation endpoints based on (origin, room_id) --- synapse/federation/federation_server.py | 143 +++++++++++++++++--------------- synapse/federation/transport/server.py | 2 +- 2 files changed, 78 insertions(+), 67 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 2a589524a4..85f5e752fe 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -49,6 +49,7 @@ class FederationServer(FederationBase): super(FederationServer, self).__init__(hs) self._room_pdu_linearizer = Linearizer() + self._server_linearizer = Linearizer() def set_handler(self, handler): """Sets the handler that the replication layer will use to communicate @@ -89,11 +90,14 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function def on_backfill_request(self, origin, room_id, versions, limit): - pdus = yield self.handler.on_backfill_request( - origin, room_id, versions, limit - ) + with (yield self._server_linearizer.queue((origin, room_id))): + pdus = yield self.handler.on_backfill_request( + origin, room_id, versions, limit + ) + + res = self._transaction_from_pdus(pdus).get_dict() - defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) + defer.returnValue((200, res)) @defer.inlineCallbacks @log_function @@ -184,27 +188,28 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function def on_context_state_request(self, origin, room_id, event_id): - if event_id: - pdus = yield self.handler.get_state_for_pdu( - origin, room_id, event_id, - ) - auth_chain = yield self.store.get_auth_chain( - [pdu.event_id for pdu in pdus] - ) + with (yield self._server_linearizer.queue((origin, room_id))): + if event_id: + pdus = yield self.handler.get_state_for_pdu( + origin, room_id, event_id, + ) + auth_chain = yield self.store.get_auth_chain( + [pdu.event_id for pdu in pdus] + ) - for event in auth_chain: - # We sign these again because there was a bug where we - # incorrectly signed things the first time round - if self.hs.is_mine_id(event.event_id): - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] + for event in auth_chain: + # We sign these again because there was a bug where we + # incorrectly signed things the first time round + if self.hs.is_mine_id(event.event_id): + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) ) - ) - else: - raise NotImplementedError("Specify an event") + else: + raise NotImplementedError("Specify an event") defer.returnValue((200, { "pdus": [pdu.get_pdu_json() for pdu in pdus], @@ -283,14 +288,16 @@ class FederationServer(FederationBase): @defer.inlineCallbacks def on_event_auth(self, origin, room_id, event_id): - time_now = self._clock.time_msec() - auth_pdus = yield self.handler.on_event_auth(event_id) - defer.returnValue((200, { - "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus], - })) + with (yield self._server_linearizer.queue((origin, room_id))): + time_now = self._clock.time_msec() + auth_pdus = yield self.handler.on_event_auth(event_id) + res = { + "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus], + } + defer.returnValue((200, res)) @defer.inlineCallbacks - def on_query_auth_request(self, origin, content, event_id): + def on_query_auth_request(self, origin, content, room_id, event_id): """ Content is a dict with keys:: auth_chain (list): A list of events that give the auth chain. @@ -309,32 +316,33 @@ class FederationServer(FederationBase): Returns: Deferred: Results in `dict` with the same format as `content` """ - auth_chain = [ - self.event_from_pdu_json(e) - for e in content["auth_chain"] - ] - - signed_auth = yield self._check_sigs_and_hash_and_fetch( - origin, auth_chain, outlier=True - ) + with (yield self._server_linearizer.queue((origin, room_id))): + auth_chain = [ + self.event_from_pdu_json(e) + for e in content["auth_chain"] + ] + + signed_auth = yield self._check_sigs_and_hash_and_fetch( + origin, auth_chain, outlier=True + ) - ret = yield self.handler.on_query_auth( - origin, - event_id, - signed_auth, - content.get("rejects", []), - content.get("missing", []), - ) + ret = yield self.handler.on_query_auth( + origin, + event_id, + signed_auth, + content.get("rejects", []), + content.get("missing", []), + ) - time_now = self._clock.time_msec() - send_content = { - "auth_chain": [ - e.get_pdu_json(time_now) - for e in ret["auth_chain"] - ], - "rejects": ret.get("rejects", []), - "missing": ret.get("missing", []), - } + time_now = self._clock.time_msec() + send_content = { + "auth_chain": [ + e.get_pdu_json(time_now) + for e in ret["auth_chain"] + ], + "rejects": ret.get("rejects", []), + "missing": ret.get("missing", []), + } defer.returnValue( (200, send_content) @@ -386,21 +394,24 @@ class FederationServer(FederationBase): @log_function def on_get_missing_events(self, origin, room_id, earliest_events, latest_events, limit, min_depth): - logger.info( - "on_get_missing_events: earliest_events: %r, latest_events: %r," - " limit: %d, min_depth: %d", - earliest_events, latest_events, limit, min_depth - ) - missing_events = yield self.handler.on_get_missing_events( - origin, room_id, earliest_events, latest_events, limit, min_depth - ) + with (yield self._server_linearizer.queue((origin, room_id))): + logger.info( + "on_get_missing_events: earliest_events: %r, latest_events: %r," + " limit: %d, min_depth: %d", + earliest_events, latest_events, limit, min_depth + ) + missing_events = yield self.handler.on_get_missing_events( + origin, room_id, earliest_events, latest_events, limit, min_depth + ) - if len(missing_events) < 5: - logger.info("Returning %d events: %r", len(missing_events), missing_events) - else: - logger.info("Returning %d events", len(missing_events)) + if len(missing_events) < 5: + logger.info( + "Returning %d events: %r", len(missing_events), missing_events + ) + else: + logger.info("Returning %d events", len(missing_events)) - time_now = self._clock.time_msec() + time_now = self._clock.time_msec() defer.returnValue({ "events": [ev.get_pdu_json(time_now) for ev in missing_events], diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 8a1965f45a..26fa88ae84 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -388,7 +388,7 @@ class FederationQueryAuthServlet(BaseFederationServlet): @defer.inlineCallbacks def on_POST(self, origin, content, query, context, event_id): new_content = yield self.handler.on_query_auth_request( - origin, content, event_id + origin, content, context, event_id ) defer.returnValue((200, new_content)) -- cgit 1.4.1 From 248e6770ca0faadf574cfd62f72d8e200cb5b57a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Jul 2016 10:30:12 +0100 Subject: Cache federation state responses --- synapse/federation/federation_server.py | 66 ++++++++++++++++++++++----------- synapse/handlers/federation.py | 7 +--- synapse/handlers/room.py | 4 +- synapse/handlers/sync.py | 2 +- synapse/util/caches/response_cache.py | 13 ++++++- 5 files changed, 60 insertions(+), 32 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 85f5e752fe..d15c7e1b40 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -21,10 +21,11 @@ from .units import Transaction, Edu from synapse.util.async import Linearizer from synapse.util.logutils import log_function +from synapse.util.caches.response_cache import ResponseCache from synapse.events import FrozenEvent import synapse.metrics -from synapse.api.errors import FederationError, SynapseError +from synapse.api.errors import AuthError, FederationError, SynapseError from synapse.crypto.event_signing import compute_event_signature @@ -48,9 +49,15 @@ class FederationServer(FederationBase): def __init__(self, hs): super(FederationServer, self).__init__(hs) + self.auth = hs.get_auth() + self._room_pdu_linearizer = Linearizer() self._server_linearizer = Linearizer() + # We cache responses to state queries, as they take a while and often + # come in waves. + self._state_resp_cache = ResponseCache(hs, timeout_ms=30000) + def set_handler(self, handler): """Sets the handler that the replication layer will use to communicate receipt of new PDUs from other home servers. The required methods are @@ -188,28 +195,45 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function def on_context_state_request(self, origin, room_id, event_id): - with (yield self._server_linearizer.queue((origin, room_id))): - if event_id: - pdus = yield self.handler.get_state_for_pdu( - origin, room_id, event_id, - ) - auth_chain = yield self.store.get_auth_chain( - [pdu.event_id for pdu in pdus] + if not event_id: + raise NotImplementedError("Specify an event") + + in_room = yield self.auth.check_host_in_room(room_id, origin) + if not in_room: + raise AuthError(403, "Host not in room.") + + result = self._state_resp_cache.get((room_id, event_id)) + if not result: + with (yield self._server_linearizer.queue((origin, room_id))): + resp = yield self.response_cache.set( + (room_id, event_id), + self._on_context_state_request_compute(room_id, event_id) ) + else: + resp = yield result - for event in auth_chain: - # We sign these again because there was a bug where we - # incorrectly signed things the first time round - if self.hs.is_mine_id(event.event_id): - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - else: - raise NotImplementedError("Specify an event") + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def _on_context_state_request_compute(self, room_id, event_id): + pdus = yield self.handler.get_state_for_pdu( + room_id, event_id, + ) + auth_chain = yield self.store.get_auth_chain( + [pdu.event_id for pdu in pdus] + ) + + for event in auth_chain: + # We sign these again because there was a bug where we + # incorrectly signed things the first time round + if self.hs.is_mine_id(event.event_id): + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ) defer.returnValue((200, { "pdus": [pdu.get_pdu_json() for pdu in pdus], diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3f138daf17..fcad41d7b6 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -991,14 +991,9 @@ class FederationHandler(BaseHandler): defer.returnValue(None) @defer.inlineCallbacks - def get_state_for_pdu(self, origin, room_id, event_id, do_auth=True): + def get_state_for_pdu(self, room_id, event_id): yield run_on_reactor() - if do_auth: - in_room = yield self.auth.check_host_in_room(room_id, origin) - if not in_room: - raise AuthError(403, "Host not in room.") - state_groups = yield self.store.get_state_groups( room_id, [event_id] ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index ae44c7a556..bf6b1c1535 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -345,8 +345,8 @@ class RoomCreationHandler(BaseHandler): class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) - self.response_cache = ResponseCache() - self.remote_list_request_cache = ResponseCache() + self.response_cache = ResponseCache(hs) + self.remote_list_request_cache = ResponseCache(hs) self.remote_list_cache = {} self.fetch_looping_call = hs.get_clock().looping_call( self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index be26a491ff..0ee4ebe504 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -138,7 +138,7 @@ class SyncHandler(object): self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() - self.response_cache = ResponseCache() + self.response_cache = ResponseCache(hs) def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, full_state=False): diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 36686b479e..00af539880 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -24,9 +24,12 @@ class ResponseCache(object): used rather than trying to compute a new response. """ - def __init__(self): + def __init__(self, hs, timeout_ms=0): self.pending_result_cache = {} # Requests that haven't finished yet. + self.clock = hs.get_clock() + self.timeout_sec = timeout_ms / 1000. + def get(self, key): result = self.pending_result_cache.get(key) if result is not None: @@ -39,7 +42,13 @@ class ResponseCache(object): self.pending_result_cache[key] = result def remove(r): - self.pending_result_cache.pop(key, None) + if self.timeout_sec: + self.clock.call_later( + self.timeout_sec, + self.pending_result_cache.pop, key, None, + ) + else: + self.pending_result_cache.pop(key, None) return r result.addBoth(remove) -- cgit 1.4.1 From b3d5c4ad9d0c6d858cae1c46bebf0c9442f0187b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Aug 2016 16:42:21 +0100 Subject: Fix response cache --- synapse/federation/federation_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index d15c7e1b40..8f6955ac18 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -205,7 +205,7 @@ class FederationServer(FederationBase): result = self._state_resp_cache.get((room_id, event_id)) if not result: with (yield self._server_linearizer.queue((origin, room_id))): - resp = yield self.response_cache.set( + resp = yield self._state_resp_cache.set( (room_id, event_id), self._on_context_state_request_compute(room_id, event_id) ) -- cgit 1.4.1 From c9154b970c0af5eb19c43a401f44de95afd3f7de Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Aug 2016 16:45:53 +0100 Subject: Don't double wrap 200 --- synapse/federation/federation_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 8f6955ac18..612d274bdb 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -235,10 +235,10 @@ class FederationServer(FederationBase): ) ) - defer.returnValue((200, { + defer.returnValue({ "pdus": [pdu.get_pdu_json() for pdu in pdus], "auth_chain": [pdu.get_pdu_json() for pdu in auth_chain], - })) + }) @defer.inlineCallbacks @log_function -- cgit 1.4.1 From 1efee2f52b931ddcd90e87d06c7ea614da2c9cd0 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 2 Aug 2016 18:06:31 +0100 Subject: E2E keys: Make federation query share code with client query Refactor the e2e query handler to separate out the local query, and then make the federation handler use it. --- synapse/federation/federation_server.py | 20 +----- synapse/federation/transport/server.py | 4 +- synapse/handlers/e2e_keys.py | 115 +++++++++++++++++++++++++------- 3 files changed, 92 insertions(+), 47 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 85f5e752fe..e637f2a8bd 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -348,27 +348,9 @@ class FederationServer(FederationBase): (200, send_content) ) - @defer.inlineCallbacks @log_function def on_query_client_keys(self, origin, content): - query = [] - for user_id, device_ids in content.get("device_keys", {}).items(): - if not device_ids: - query.append((user_id, None)) - else: - for device_id in device_ids: - query.append((user_id, device_id)) - - results = yield self.store.get_e2e_device_keys(query) - - json_result = {} - for user_id, device_keys in results.items(): - for device_id, json_bytes in device_keys.items(): - json_result.setdefault(user_id, {})[device_id] = json.loads( - json_bytes - ) - - defer.returnValue({"device_keys": json_result}) + return self.on_query_request("client_keys", content) @defer.inlineCallbacks @log_function diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 26fa88ae84..1a88413d18 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -367,10 +367,8 @@ class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet): class FederationClientKeysQueryServlet(BaseFederationServlet): PATH = "/user/keys/query" - @defer.inlineCallbacks def on_POST(self, origin, content, query): - response = yield self.handler.on_query_client_keys(origin, content) - defer.returnValue((200, response)) + return self.handler.on_query_client_keys(origin, content) class FederationClientKeysClaimServlet(BaseFederationServlet): diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 73a14cf952..9c7e9494d6 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -13,12 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections import json import logging from twisted.internet import defer +from synapse.api import errors import synapse.types + from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -29,39 +32,101 @@ class E2eKeysHandler(BaseHandler): super(E2eKeysHandler, self).__init__(hs) self.store = hs.get_datastore() self.federation = hs.get_replication_layer() - self.is_mine = hs.is_mine + self.is_mine_id = hs.is_mine_id + + # doesn't really work as part of the generic query API, because the + # query request requires an object POST, but we abuse the + # "query handler" interface. + self.federation.register_query_handler( + "client_keys", self.on_federation_query_client_keys + ) @defer.inlineCallbacks def query_devices(self, query_body): - local_query = [] - remote_queries = {} - for user_id, device_ids in query_body.get("device_keys", {}).items(): + """ Handle a device key query from a client + + { + "device_keys": { + "": [""] + } + } + -> + { + "device_keys": { + "": { + "": { + ... + } + } + } + } + """ + device_keys_query = query_body.get("device_keys", {}) + + # separate users by domain. + # make a map from domain to user_id to device_ids + queries_by_domain = collections.defaultdict(dict) + for user_id, device_ids in device_keys_query.items(): user = synapse.types.UserID.from_string(user_id) - if self.is_mine(user): - if not device_ids: - local_query.append((user_id, None)) - else: - for device_id in device_ids: - local_query.append((user_id, device_id)) + queries_by_domain[user.domain][user_id] = device_ids + + # do the queries + # TODO: do these in parallel + results = {} + for destination, destination_query in queries_by_domain.items(): + if destination == self.hs.hostname: + res = yield self.query_local_devices(destination_query) else: - remote_queries.setdefault(user.domain, {})[user_id] = list( - device_ids + res = yield self.federation.query_client_keys( + destination, {"device_keys": destination_query} ) + res = res["device_keys"] + for user_id, keys in res.items(): + if user_id in destination_query: + results[user_id] = keys + + defer.returnValue((200, {"device_keys": results})) + + @defer.inlineCallbacks + def query_local_devices(self, query): + """Get E2E device keys for local users + + Args: + query (dict[string, list[string]|None): map from user_id to a list + of devices to query (None for all devices) + + Returns: + defer.Deferred: (resolves to dict[string, dict[string, dict]]): + map from user_id -> device_id -> device details + """ + local_query = [] + + for user_id, device_ids in query.items(): + if not self.is_mine_id(user_id): + logger.warning("Request for keys for non-local user %s", + user_id) + raise errors.SynapseError(400, "Not a user here") + + if not device_ids: + local_query.append((user_id, None)) + else: + for device_id in device_ids: + local_query.append((user_id, device_id)) + results = yield self.store.get_e2e_device_keys(local_query) - json_result = {} + # un-jsonify the results + json_result = collections.defaultdict(dict) for user_id, device_keys in results.items(): for device_id, json_bytes in device_keys.items(): - json_result.setdefault(user_id, {})[ - device_id] = json.loads( - json_bytes - ) + json_result[user_id][device_id] = json.loads(json_bytes) - for destination, device_keys in remote_queries.items(): - remote_result = yield self.federation.query_client_keys( - destination, {"device_keys": device_keys} - ) - for user_id, keys in remote_result["device_keys"].items(): - if user_id in device_keys: - json_result[user_id] = keys - defer.returnValue((200, {"device_keys": json_result})) + defer.returnValue(json_result) + + @defer.inlineCallbacks + def on_federation_query_client_keys(self, query_body): + """ Handle a device key query from a federated server + """ + device_keys_query = query_body.get("device_keys", {}) + res = yield self.query_local_devices(device_keys_query) + defer.returnValue({"device_keys": res}) -- cgit 1.4.1 From e3a720217a9d200a7c3db8305df53ef8bf76f565 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 14:47:37 +0100 Subject: Add /state_ids federation API The new API only returns the event_ids for the state, as most requesters will already have the vast majority of the events already. --- synapse/federation/federation_client.py | 73 +++++++++++++++++++++++++++++++-- synapse/federation/federation_server.py | 21 ++++++++++ synapse/federation/transport/client.py | 22 ++++++++++ synapse/federation/transport/server.py | 12 ++++++ 4 files changed, 125 insertions(+), 3 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b06387051c..03f6133e61 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -314,9 +314,32 @@ class FederationClient(FederationBase): Deferred: Results in a list of PDUs. """ - result = yield self.transport_layer.get_room_state( - destination, room_id, event_id=event_id, - ) + try: + # First we try and ask for just the IDs, as thats far quicker if + # we have most of the state and auth_chain already. + # However, this may 404 if the other side has an old synapse. + result = yield self.transport_layer.get_room_state_ids( + destination, room_id, event_id=event_id, + ) + + state_event_ids = result["pdus"] + auth_event_ids = result.get("auth_chain", []) + + event_map, _failed_to_fetch = yield self.get_events( + [destination], room_id, set(state_event_ids + auth_event_ids) + ) + + pdus = [event_map[e_id] for e_id in state_event_ids] + auth_chain = [event_map[e_id] for e_id in auth_event_ids] + + auth_chain.sort(key=lambda e: e.depth) + + defer.returnValue((pdus, auth_chain)) + except HttpResponseException as e: + if e.code == 404: + logger.info("Failed to use get_room_state_ids API, falling back") + else: + raise e pdus = [ self.event_from_pdu_json(p, outlier=True) for p in result["pdus"] @@ -339,6 +362,50 @@ class FederationClient(FederationBase): defer.returnValue((signed_pdus, signed_auth)) + @defer.inlineCallbacks + def get_events(self, destinations, room_id, event_ids, return_local=True): + if return_local: + seen_events = yield self.store.get_events(event_ids) + signed_events = seen_events.values() + else: + seen_events = yield self.store.have_events(event_ids) + signed_events = [] + + failed_to_fetch = [] + + missing_events = set(event_ids) + for k in seen_events: + missing_events.discard(k) + + if not missing_events: + defer.returnValue((signed_events, failed_to_fetch)) + + def random_server_list(): + srvs = list(destinations) + random.shuffle(srvs) + return srvs + + batch_size = 20 + for i in xrange(0, len(missing_events), batch_size): + batch = missing_events[i:i + batch_size] + + deferreds = [ + self.get_pdu( + destinations=random_server_list(), + event_id=e_id, + ).addBoth(lambda r, e: (r, e), e_id) + for e_id in batch + ] + + res = yield defer.DeferredList(deferreds, consumeErrors=True) + for (result, val), (e_id, _) in res: + if result and val: + signed_events.append(val) + else: + failed_to_fetch.add(e_id) + + defer.returnValue((signed_events, failed_to_fetch)) + @defer.inlineCallbacks @log_function def get_event_auth(self, destination, room_id, event_id): diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 612d274bdb..40e9fda0eb 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -214,6 +214,27 @@ class FederationServer(FederationBase): defer.returnValue((200, resp)) + @defer.inlineCallbacks + def on_state_ids_request(self, origin, room_id, event_id): + if not event_id: + raise NotImplementedError("Specify an event") + + in_room = yield self.auth.check_host_in_room(room_id, origin) + if not in_room: + raise AuthError(403, "Host not in room.") + + pdus = yield self.handler.get_state_for_pdu( + room_id, event_id, + ) + auth_chain = yield self.store.get_auth_chain( + [pdu.event_id for pdu in pdus] + ) + + defer.returnValue((200, { + "pdus": [pdu.event_id for pdu in pdus], + "auth_chain": [pdu.event_id for pdu in auth_chain], + })) + @defer.inlineCallbacks def _on_context_state_request_compute(self, room_id, event_id): pdus = yield self.handler.get_state_for_pdu( diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index ebb698e278..3d088e43cb 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -54,6 +54,28 @@ class TransportLayerClient(object): destination, path=path, args={"event_id": event_id}, ) + @log_function + def get_room_state_ids(self, destination, room_id, event_id): + """ Requests all state for a given room from the given server at the + given event. Returns the state's event_id's + + Args: + destination (str): The host name of the remote home server we want + to get the state from. + context (str): The name of the context we want the state of + event_id (str): The event we want the context at. + + Returns: + Deferred: Results in a dict received from the remote homeserver. + """ + logger.debug("get_room_state_ids dest=%s, room=%s", + destination, room_id) + + path = PREFIX + "/state_ids/%s/" % room_id + return self.client.get_json( + destination, path=path, args={"event_id": event_id}, + ) + @log_function def get_event(self, destination, event_id, timeout=None): """ Requests the pdu with give id and origin from the given server. diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 26fa88ae84..3ae7c48457 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -271,6 +271,17 @@ class FederationStateServlet(BaseFederationServlet): ) +class FederationStateIdsServlet(BaseFederationServlet): + PATH = "/state_ids/(?P[^/]*)/" + + def on_GET(self, origin, content, query, room_id): + return self.handler.on_state_ids_request( + origin, + room_id, + query.get("event_id", [None])[0], + ) + + class FederationBackfillServlet(BaseFederationServlet): PATH = "/backfill/(?P[^/]*)/" @@ -538,6 +549,7 @@ SERVLET_CLASSES = ( FederationPullServlet, FederationEventServlet, FederationStateServlet, + FederationStateIdsServlet, FederationBackfillServlet, FederationQueryServlet, FederationMakeJoinServlet, -- cgit 1.4.1 From a60a2eaa02f454dbc450cf821f6cd1c6b0b93993 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 14:52:43 +0100 Subject: Comment --- synapse/federation/federation_client.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 03f6133e61..0491f1c3fe 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -364,6 +364,20 @@ class FederationClient(FederationBase): @defer.inlineCallbacks def get_events(self, destinations, room_id, event_ids, return_local=True): + """Fetch events from some remote destinations, checking if we already + have them. + + Args: + destinations (list) + room_id (str) + event_ids (list) + return_local (bool): Whether to include events we already have in + the DB in the returned list of events + + Returns: + Deferred: A deferred resolving to a 2-tuple where the first is a list of + events and the second is a list of event ids that we failed to fetch. + """ if return_local: seen_events = yield self.store.get_events(event_ids) signed_events = seen_events.values() -- cgit 1.4.1 From 520ee9bd2c91a75eb1dc7ed923016967856c6bdf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 15:03:15 +0100 Subject: Fix syntax error --- synapse/federation/federation_client.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 0491f1c3fe..6c626cf12c 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -325,10 +325,17 @@ class FederationClient(FederationBase): state_event_ids = result["pdus"] auth_event_ids = result.get("auth_chain", []) - event_map, _failed_to_fetch = yield self.get_events( + fetched_events, failed_to_fetch = yield self.get_events( [destination], room_id, set(state_event_ids + auth_event_ids) ) + if failed_to_fetch: + logger.warn("Failed to get %r", failed_to_fetch) + + event_map = { + ev.event_id: ev for ev in fetched_events + } + pdus = [event_map[e_id] for e_id in state_event_ids] auth_chain = [event_map[e_id] for e_id in auth_event_ids] -- cgit 1.4.1 From 4c56bedee3bb63d7035fca4a1a092b11de0b18cc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 15:04:29 +0100 Subject: Actually call get_room_state --- synapse/federation/federation_client.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 6c626cf12c..7eadcdd28c 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -348,6 +348,10 @@ class FederationClient(FederationBase): else: raise e + result = yield self.transport_layer.get_room_state( + destination, room_id, event_id=event_id, + ) + pdus = [ self.event_from_pdu_json(p, outlier=True) for p in result["pdus"] ] -- cgit 1.4.1 From bcc9cda8ca75b5cc381ce10ba9b8e4af56c6bdaa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 17:17:26 +0100 Subject: Fix copy + paste fails --- synapse/federation/federation_client.py | 15 ++++++++++----- synapse/federation/federation_server.py | 2 +- 2 files changed, 11 insertions(+), 6 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 7eadcdd28c..dde10967be 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -411,8 +411,13 @@ class FederationClient(FederationBase): return srvs batch_size = 20 - for i in xrange(0, len(missing_events), batch_size): - batch = missing_events[i:i + batch_size] + while missing_events: + batch = [] + try: + for _ in range(0, batch_size): + batch.append(missing_events.pop()) + except KeyError: + pass deferreds = [ self.get_pdu( @@ -423,9 +428,9 @@ class FederationClient(FederationBase): ] res = yield defer.DeferredList(deferreds, consumeErrors=True) - for (result, val), (e_id, _) in res: - if result and val: - signed_events.append(val) + for success, (result, e_id) in res: + if success and result: + signed_events.append(result) else: failed_to_fetch.add(e_id) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 40e9fda0eb..35a01eecca 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -623,7 +623,7 @@ class FederationServer(FederationBase): origin, pdu.room_id, pdu.event_id, ) except: - logger.warn("Failed to get state for event: %s", pdu.event_id) + logger.exception("Failed to get state for event: %s", pdu.event_id) yield self.handler.on_receive_pdu( origin, -- cgit 1.4.1 From edb33eb163b6c60bfd2c3cab78a6bd13a47b6702 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 17:19:15 +0100 Subject: Rename fields to _ids --- synapse/federation/federation_client.py | 4 ++-- synapse/federation/federation_server.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index dde10967be..264f3c0aec 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -322,8 +322,8 @@ class FederationClient(FederationBase): destination, room_id, event_id=event_id, ) - state_event_ids = result["pdus"] - auth_event_ids = result.get("auth_chain", []) + state_event_ids = result["pdu_ids"] + auth_event_ids = result.get("auth_chain_ids", []) fetched_events, failed_to_fetch = yield self.get_events( [destination], room_id, set(state_event_ids + auth_event_ids) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 35a01eecca..2b91f93e09 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -231,8 +231,8 @@ class FederationServer(FederationBase): ) defer.returnValue((200, { - "pdus": [pdu.event_id for pdu in pdus], - "auth_chain": [pdu.event_id for pdu in auth_chain], + "pdu_ids": [pdu.event_id for pdu in pdus], + "auth_chain_ids": [pdu.event_id for pdu in auth_chain], })) @defer.inlineCallbacks -- cgit 1.4.1 From e3ee63578f335037c73675209bb7861045c2027a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Aug 2016 14:01:18 +0100 Subject: Tidy up get_events --- synapse/federation/federation_client.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 264f3c0aec..ae0d650700 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -411,28 +411,26 @@ class FederationClient(FederationBase): return srvs batch_size = 20 - while missing_events: - batch = [] - try: - for _ in range(0, batch_size): - batch.append(missing_events.pop()) - except KeyError: - pass + missing_events = len(missing_events) + for i in xrange(0, batch_size, batch_size): + batch = set(missing_events[i:i + batch_size]) deferreds = [ self.get_pdu( destinations=random_server_list(), event_id=e_id, - ).addBoth(lambda r, e: (r, e), e_id) + ) for e_id in batch ] res = yield defer.DeferredList(deferreds, consumeErrors=True) - for success, (result, e_id) in res: - if success and result: + for success, result in res: + if success: signed_events.append(result) - else: - failed_to_fetch.add(e_id) + batch.discard(result.event_id) + + # We removed all events we successfully fetched from `batch` + failed_to_fetch.update(batch) defer.returnValue((signed_events, failed_to_fetch)) -- cgit 1.4.1 From 257c41cc2e7163c42b8bcfa3a29d42ae5ac087b9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Aug 2016 14:05:45 +0100 Subject: Fix typos. --- synapse/federation/federation_client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index ae0d650700..c6ed720166 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -396,7 +396,7 @@ class FederationClient(FederationBase): seen_events = yield self.store.have_events(event_ids) signed_events = [] - failed_to_fetch = [] + failed_to_fetch = set() missing_events = set(event_ids) for k in seen_events: @@ -411,8 +411,8 @@ class FederationClient(FederationBase): return srvs batch_size = 20 - missing_events = len(missing_events) - for i in xrange(0, batch_size, batch_size): + missing_events = list(missing_events) + for i in xrange(0, len(missing_events), batch_size): batch = set(missing_events[i:i + batch_size]) deferreds = [ -- cgit 1.4.1 From a2b7102eea23572c8a3619704f5f5c3f21a8edcd Mon Sep 17 00:00:00 2001 From: Benjamin Saunders Date: Thu, 4 Aug 2016 20:38:08 -0700 Subject: Tweak integrity error recovery to work as intended --- synapse/federation/federation_client.py | 2 +- synapse/storage/events.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c6ed720166..c5e99cebf7 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -343,7 +343,7 @@ class FederationClient(FederationBase): defer.returnValue((pdus, auth_chain)) except HttpResponseException as e: - if e.code == 404: + if e.code == 400: logger.info("Failed to use get_room_state_ids API, falling back") else: raise e diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 340c0621cc..643c3aed2a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -580,6 +580,7 @@ class EventsStore(SQLBaseStore): for table in ( "events", + "event_auth", "event_json", "event_content_hashes", "event_destinations", @@ -593,6 +594,8 @@ class EventsStore(SQLBaseStore): "event_to_state_groups", "rejections", "redactions", + "room_memberships", + "state_events" ): txn.executemany( "DELETE FROM %s WHERE event_id = ?" % (table,), -- cgit 1.4.1 From 1515d1b581158e98c6513cf8b3b9548787854a64 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Aug 2016 10:24:23 +0100 Subject: Fallback to /state/ on both 400 and 404 --- synapse/federation/federation_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c5e99cebf7..7e1690b0fb 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -343,7 +343,7 @@ class FederationClient(FederationBase): defer.returnValue((pdus, auth_chain)) except HttpResponseException as e: - if e.code == 400: + if e.code == 400 or e.code == 404: logger.info("Failed to use get_room_state_ids API, falling back") else: raise e -- cgit 1.4.1 From fccadb7719f872ecdabd6ebfb299156eca208454 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Aug 2016 10:43:47 +0100 Subject: Check if we already have the events returned by /state/ --- synapse/federation/federation_client.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 7e1690b0fb..b751d26de1 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -361,12 +361,26 @@ class FederationClient(FederationBase): for p in result.get("auth_chain", []) ] + seen_events = yield self.store.get_events([ + ev.event_id for ev in itertools.chain(pdus, auth_chain) + ]) + signed_pdus = yield self._check_sigs_and_hash_and_fetch( - destination, pdus, outlier=True + destination, + [p for p in pdus if p.event_id not in seen_events], + outlier=True + ) + signed_pdus.extend( + seen_events[p.event_id] for p in pdus if p.event_id in seen_events ) signed_auth = yield self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True + destination, + [p for p in pdus if p.event_id not in auth_chain], + outlier=True + ) + signed_auth.extend( + seen_events[p.event_id] for p in auth_chain if p.event_id in seen_events ) signed_auth.sort(key=lambda e: e.depth) -- cgit 1.4.1 From 2d4de61fb7b6276255fac3c07ef4c242f5a93c71 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Aug 2016 10:48:56 +0100 Subject: Fix typo --- synapse/federation/federation_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b751d26de1..65778fd4ee 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -376,7 +376,7 @@ class FederationClient(FederationBase): signed_auth = yield self._check_sigs_and_hash_and_fetch( destination, - [p for p in pdus if p.event_id not in auth_chain], + [p for p in auth_chain if p.event_id not in seen_events], outlier=True ) signed_auth.extend( -- cgit 1.4.1 From 597c79be1001f748d467c0c64acfe85262ee00f9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Aug 2016 16:17:04 +0100 Subject: Change the way we specify if we require auth or not --- synapse/federation/transport/server.py | 95 ++++++++++++++++++++-------------- 1 file changed, 55 insertions(+), 40 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 0bc6e0801d..ee8f94e340 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX from synapse.api.errors import Codes, SynapseError from synapse.http.server import JsonResource -from synapse.http.servlet import parse_json_object_from_request, parse_string +from synapse.http.servlet import parse_json_object_from_request from synapse.util.ratelimitutils import FederationRateLimiter import functools @@ -60,6 +60,16 @@ class TransportLayerServer(JsonResource): ) +class AuthenticationError(SynapseError): + """There was a problem authenticating the request""" + pass + + +class NoAuthenticationError(AuthenticationError): + """The request had no authentication information""" + pass + + class Authenticator(object): def __init__(self, hs): self.keyring = hs.get_keyring() @@ -67,7 +77,7 @@ class Authenticator(object): # A method just so we can pass 'self' as the authenticator to the Servlets @defer.inlineCallbacks - def authenticate_request(self, request): + def authenticate_request(self, request, content): json_request = { "method": request.method, "uri": request.uri, @@ -75,17 +85,10 @@ class Authenticator(object): "signatures": {}, } - content = None - origin = None + if content is not None: + json_request["content"] = content - if request.method in ["PUT", "POST"]: - # TODO: Handle other method types? other content types? - try: - content_bytes = request.content.read() - content = json.loads(content_bytes) - json_request["content"] = content - except: - raise SynapseError(400, "Unable to parse JSON", Codes.BAD_JSON) + origin = None def parse_auth_header(header_str): try: @@ -103,14 +106,14 @@ class Authenticator(object): sig = strip_quotes(param_dict["sig"]) return (origin, key, sig) except: - raise SynapseError( + raise AuthenticationError( 400, "Malformed Authorization header", Codes.UNAUTHORIZED ) auth_headers = request.requestHeaders.getRawHeaders(b"Authorization") if not auth_headers: - raise SynapseError( + raise NoAuthenticationError( 401, "Missing Authorization headers", Codes.UNAUTHORIZED, ) @@ -121,7 +124,7 @@ class Authenticator(object): json_request["signatures"].setdefault(origin, {})[key] = sig if not json_request["signatures"]: - raise SynapseError( + raise NoAuthenticationError( 401, "Missing Authorization headers", Codes.UNAUTHORIZED, ) @@ -130,10 +133,12 @@ class Authenticator(object): logger.info("Request from %s", origin) request.authenticated_entity = origin - defer.returnValue((origin, content)) + defer.returnValue(origin) class BaseFederationServlet(object): + REQUIRE_AUTH = True + def __init__(self, handler, authenticator, ratelimiter, server_name, room_list_handler): self.handler = handler @@ -141,29 +146,46 @@ class BaseFederationServlet(object): self.ratelimiter = ratelimiter self.room_list_handler = room_list_handler - def _wrap(self, code): + def _wrap(self, func): authenticator = self.authenticator ratelimiter = self.ratelimiter @defer.inlineCallbacks - @functools.wraps(code) - def new_code(request, *args, **kwargs): + @functools.wraps(func) + def new_func(request, *args, **kwargs): + content = None + if request.method in ["PUT", "POST"]: + # TODO: Handle other method types? other content types? + content = parse_json_object_from_request(request) + try: - (origin, content) = yield authenticator.authenticate_request(request) + origin = yield authenticator.authenticate_request(request, content) + except NoAuthenticationError: + origin = None + if self.REQUIRE_AUTH: + logger.exception("authenticate_request failed") + raise + except: + logger.exception("authenticate_request failed") + raise + + if origin: with ratelimiter.ratelimit(origin) as d: yield d - response = yield code( + response = yield func( origin, content, request.args, *args, **kwargs ) - except: - logger.exception("authenticate_request failed") - raise + else: + response = yield func( + origin, content, request.args, *args, **kwargs + ) + defer.returnValue(response) # Extra logic that functools.wraps() doesn't finish - new_code.__self__ = code.__self__ + new_func.__self__ = func.__self__ - return new_code + return new_func def register(self, server): pattern = re.compile("^" + PREFIX + self.PATH + "$") @@ -429,9 +451,10 @@ class FederationGetMissingEventsServlet(BaseFederationServlet): class On3pidBindServlet(BaseFederationServlet): PATH = "/3pid/onbind" + REQUIRE_AUTH = False + @defer.inlineCallbacks - def on_POST(self, request): - content = parse_json_object_from_request(request) + def on_POST(self, origin, content, query): if "invites" in content: last_exception = None for invite in content["invites"]: @@ -453,11 +476,6 @@ class On3pidBindServlet(BaseFederationServlet): raise last_exception defer.returnValue((200, {})) - # Avoid doing remote HS authorization checks which are done by default by - # BaseFederationServlet. - def _wrap(self, code): - return code - class OpenIdUserInfo(BaseFederationServlet): """ @@ -478,9 +496,11 @@ class OpenIdUserInfo(BaseFederationServlet): PATH = "/openid/userinfo" + REQUIRE_AUTH = False + @defer.inlineCallbacks - def on_GET(self, request): - token = parse_string(request, "access_token") + def on_GET(self, origin, content, query): + token = query.get("access_token", [None])[0] if token is None: defer.returnValue((401, { "errcode": "M_MISSING_TOKEN", "error": "Access Token required" @@ -497,11 +517,6 @@ class OpenIdUserInfo(BaseFederationServlet): defer.returnValue((200, {"sub": user_id})) - # Avoid doing remote HS authorization checks which are done by default by - # BaseFederationServlet. - def _wrap(self, code): - return code - class PublicRoomList(BaseFederationServlet): """ -- cgit 1.4.1 From 24f36469bc5c634ff49c87e49e32579d6ac43d7c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Aug 2016 16:36:07 +0100 Subject: Add federation /version API --- synapse/app/federation_reader.py | 2 +- synapse/app/homeserver.py | 2 +- synapse/app/pusher.py | 2 +- synapse/app/synchrotron.py | 2 +- synapse/federation/transport/server.py | 18 +++++++++++++++++- synapse/util/versionstring.py | 8 ++++---- 6 files changed, 25 insertions(+), 9 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 58d425f9ac..7355499ae2 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -165,7 +165,7 @@ def start(config_options): db_config=config.database_config, tls_server_context_factory=tls_server_context_factory, config=config, - version_string=get_version_string("Synapse", synapse), + version_string="Synapse/" + get_version_string(synapse), database_engine=database_engine, ) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index fe68ceb07c..40e6f65236 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -285,7 +285,7 @@ def setup(config_options): # check any extra requirements we have now we have a config check_requirements(config) - version_string = get_version_string("Synapse", synapse) + version_string = "Synapse/" + get_version_string(synapse) logger.info("Server hostname: %s", config.server_name) logger.info("Server version: %s", version_string) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 4f1d18ab5f..c8dde0fcb8 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -273,7 +273,7 @@ def start(config_options): config.server_name, db_config=config.database_config, config=config, - version_string=get_version_string("Synapse", synapse), + version_string="Synapse/" + get_version_string(synapse), database_engine=database_engine, ) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 8cf5bbbb6d..215ccfd522 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -424,7 +424,7 @@ def start(config_options): config.server_name, db_config=config.database_config, config=config, - version_string=get_version_string("Synapse", synapse), + version_string="Synapse/" + get_version_string(synapse), database_engine=database_engine, application_service_handler=SynchrotronApplicationService(), ) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index ee8f94e340..37c0d4fbc4 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -20,11 +20,12 @@ from synapse.api.errors import Codes, SynapseError from synapse.http.server import JsonResource from synapse.http.servlet import parse_json_object_from_request from synapse.util.ratelimitutils import FederationRateLimiter +from synapse.util.versionstring import get_version_string import functools import logging -import simplejson as json import re +import synapse logger = logging.getLogger(__name__) @@ -557,6 +558,20 @@ class PublicRoomList(BaseFederationServlet): defer.returnValue((200, data)) +class FederationVersionServlet(BaseFederationServlet): + PATH = "/version" + + REQUIRE_AUTH = False + + def on_GET(self, origin, content, query): + return defer.succeed((200, { + "server": { + "name": "Synapse", + "version": get_version_string(synapse) + }, + })) + + SERVLET_CLASSES = ( FederationSendServlet, FederationPullServlet, @@ -580,6 +595,7 @@ SERVLET_CLASSES = ( On3pidBindServlet, OpenIdUserInfo, PublicRoomList, + FederationVersionServlet, ) diff --git a/synapse/util/versionstring.py b/synapse/util/versionstring.py index a4f156cb3b..52086df465 100644 --- a/synapse/util/versionstring.py +++ b/synapse/util/versionstring.py @@ -21,7 +21,7 @@ import logging logger = logging.getLogger(__name__) -def get_version_string(name, module): +def get_version_string(module): try: null = open(os.devnull, 'w') cwd = os.path.dirname(os.path.abspath(module.__file__)) @@ -74,11 +74,11 @@ def get_version_string(name, module): ) return ( - "%s/%s (%s)" % ( - name, module.__version__, git_version, + "%s (%s)" % ( + module.__version__, git_version, ) ).encode("ascii") except Exception as e: logger.info("Failed to check for git repository: %s", e) - return ("%s/%s" % (name, module.__version__,)).encode("ascii") + return module.__version__.encode("ascii") -- cgit 1.4.1 From 46453bfc2f203bae8ac7845f04ef64c04e172c92 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Aug 2016 18:02:03 +0100 Subject: Retry joining via other servers if first one failed --- synapse/federation/federation_client.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 65778fd4ee..92332cfdcf 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -523,14 +523,19 @@ class FederationClient(FederationBase): (destination, self.event_from_pdu_json(pdu_dict)) ) break - except CodeMessageException: - raise + except CodeMessageException as e: + if not 500 <= e.code < 600: + raise + else: + logger.warn( + "Failed to make_%s via %s: %s", + membership, destination, e.message + ) except Exception as e: logger.warn( "Failed to make_%s via %s: %s", membership, destination, e.message ) - raise raise RuntimeError("Failed to send to any server.") @@ -602,8 +607,14 @@ class FederationClient(FederationBase): "auth_chain": signed_auth, "origin": destination, }) - except CodeMessageException: - raise + except CodeMessageException as e: + if not 500 <= e.code < 600: + raise + else: + logger.exception( + "Failed to send_join via %s: %s", + destination, e.message + ) except Exception as e: logger.exception( "Failed to send_join via %s: %s", -- cgit 1.4.1 From 5f360182c65d1ef24417ee284c5a66239beca9fe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Aug 2016 18:08:32 +0100 Subject: Fix a couple of python bugs --- synapse/federation/federation_client.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 92332cfdcf..da95c2ad6d 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -236,9 +236,9 @@ class FederationClient(FederationBase): # TODO: Rate limit the number of times we try and get the same event. if self._get_pdu_cache: - e = self._get_pdu_cache.get(event_id) - if e: - defer.returnValue(e) + ev = self._get_pdu_cache.get(event_id) + if ev: + defer.returnValue(ev) pdu = None for destination in destinations: @@ -269,7 +269,7 @@ class FederationClient(FederationBase): break - except SynapseError: + except SynapseError as e: logger.info( "Failed to get PDU %s from %s because %s", event_id, destination, e, @@ -336,8 +336,10 @@ class FederationClient(FederationBase): ev.event_id: ev for ev in fetched_events } - pdus = [event_map[e_id] for e_id in state_event_ids] - auth_chain = [event_map[e_id] for e_id in auth_event_ids] + pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map] + auth_chain = [ + event_map[e_id] for e_id in auth_event_ids if e_id in event_map + ] auth_chain.sort(key=lambda e: e.depth) -- cgit 1.4.1