diff options
author | Erik Johnston <erik@matrix.org> | 2018-11-29 15:50:04 +0000 |
---|---|---|
committer | Brendan Abolivier <babolivier@matrix.org> | 2019-02-13 15:16:05 +0000 |
commit | 4e0ac33053053048b30c9fffeaf6d02d72959c95 (patch) | |
tree | af4477169db06deeb4885c198775eaa0a2c194a0 | |
parent | pep8 (diff) | |
download | synapse-4e0ac33053053048b30c9fffeaf6d02d72959c95.tar.xz |
Handle slow/lossy connections better when sending transactions
-rw-r--r-- | synapse/federation/federation_server.py | 19 | ||||
-rw-r--r-- | synapse/federation/transaction_queue.py | 24 | ||||
-rw-r--r-- | synapse/federation/units.py | 5 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 32 |
4 files changed, 44 insertions, 36 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 5d4dc6370c..352453377a 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -37,7 +37,7 @@ from synapse.api.errors import ( from synapse.crypto.event_signing import compute_event_signature from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.federation.persistence import TransactionActions -from synapse.federation.units import Edu, Transaction +from synapse.federation.units import Edu, Transaction, _mangle_pdu from synapse.http.endpoint import parse_server_name from synapse.replication.http.federation import ( ReplicationFederationSendEduRestServlet, @@ -49,6 +49,7 @@ from synapse.util.caches.response_cache import ResponseCache from synapse.util.logcontext import nested_logging_context from synapse.util.logutils import log_function + # when processing incoming transactions, we try to handle multiple rooms in # parallel, up to this limit. TRANSACTION_CONCURRENCY_LIMIT = 10 @@ -365,8 +366,8 @@ class FederationServer(FederationBase): ) defer.returnValue({ - "pdus": [pdu.get_pdu_json() for pdu in pdus], - "auth_chain": [pdu.get_pdu_json() for pdu in auth_chain], + "pdus": [_mangle_pdu(pdu.get_pdu_json()) for pdu in pdus], + "auth_chain": [_mangle_pdu(pdu.get_pdu_json()) for pdu in auth_chain], }) @defer.inlineCallbacks @@ -411,7 +412,7 @@ class FederationServer(FederationBase): yield self.check_server_matches_acl(origin_host, pdu.room_id) ret_pdu = yield self.handler.on_invite_request(origin, pdu) time_now = self._clock.time_msec() - defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)})) + defer.returnValue((200, {"event": _mangle_pdu(ret_pdu.get_pdu_json(time_now))})) @defer.inlineCallbacks def on_send_join_request(self, origin, content): @@ -425,9 +426,9 @@ class FederationServer(FederationBase): res_pdus = yield self.handler.on_send_join_request(origin, pdu) time_now = self._clock.time_msec() defer.returnValue((200, { - "state": [p.get_pdu_json(time_now) for p in res_pdus["state"]], + "state": [_mangle_pdu(p.get_pdu_json(time_now)) for p in res_pdus["state"]], "auth_chain": [ - p.get_pdu_json(time_now) for p in res_pdus["auth_chain"] + _mangle_pdu(p.get_pdu_json(time_now)) for p in res_pdus["auth_chain"] ], })) @@ -460,7 +461,7 @@ class FederationServer(FederationBase): time_now = self._clock.time_msec() auth_pdus = yield self.handler.on_event_auth(event_id) res = { - "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus], + "auth_chain": [_mangle_pdu(a.get_pdu_json(time_now)) for a in auth_pdus], } defer.returnValue((200, res)) @@ -509,7 +510,7 @@ class FederationServer(FederationBase): time_now = self._clock.time_msec() send_content = { "auth_chain": [ - e.get_pdu_json(time_now) + _mangle_pdu(e.get_pdu_json(time_now)) for e in ret["auth_chain"] ], "rejects": ret.get("rejects", []), @@ -585,7 +586,7 @@ class FederationServer(FederationBase): time_now = self._clock.time_msec() defer.returnValue({ - "events": [ev.get_pdu_json(time_now) for ev in missing_events], + "events": [_mangle_pdu(ev.get_pdu_json(time_now)) for ev in missing_events], }) @log_function diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 00bbf01620..d1680c5e4a 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -509,16 +509,22 @@ class TransactionQueue(object): pending_pdus = self.pending_pdus_by_dest.pop(destination, []) # We can only include at most 50 PDUs per transactions - pending_pdus, leftover_pdus = pending_pdus[:50], pending_pdus[50:] + pending_pdus, leftover_pdus = pending_pdus[-5:], pending_pdus[:-5] if leftover_pdus: - self.pending_pdus_by_dest[destination] = leftover_pdus + # self.pending_pdus_by_dest[destination] = leftover_pdus + for _, _, p_span in leftover_pdus: + p_span.set_tag("success", False) + p_span.log_kv({"result": "dropped"}) + p_span.finish() + + logger.info("TX [%s] Sending PDUs: %s", destination, pending_pdus) pending_edus = self.pending_edus_by_dest.pop(destination, []) # We can only include at most 100 EDUs per transactions - pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:] - if leftover_edus: - self.pending_edus_by_dest[destination] = leftover_edus + pending_edus, leftover_edus = pending_edus[-5:], pending_edus[:-5] + # if leftover_edus: + # self.pending_edus_by_dest[destination] = leftover_edus pending_presence = self.pending_presence_by_dest.pop(destination, {}) @@ -834,7 +840,7 @@ class TransactionQueue(object): yield self._send_pdu(pdu, list(new_destinations), span) - @defer.inlineCallbacks + # @defer.inlineCallbacks def _pdu_send_txn_failed(self, destination, txn_id, pdu, span): """Gets called when sending a transaction failed (after retries) """ @@ -858,9 +864,9 @@ class TransactionQueue(object): }, ) - new_destinations = set(pdu.unsigned.get("destinations", [])) - new_destinations.discard(destination) - yield self._send_pdu(pdu, list(new_destinations), span) + # new_destinations = set(pdu.unsigned.get("destinations", [])) + # new_destinations.discard(destination) + # yield self._send_pdu(pdu, list(new_destinations), span) def _numberToBase(n, b): diff --git a/synapse/federation/units.py b/synapse/federation/units.py index 20822414a5..885bb7e80c 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -20,7 +20,7 @@ server protocol. import itertools import logging -from synapse.types import get_localpart_from_id +from synapse.types import get_localpart_from_id, get_domain_from_id from synapse.util.jsonobject import JsonEncodedObject logger = logging.getLogger(__name__) @@ -130,7 +130,8 @@ def _mangle_pdu(pdu_json): pdu_json["auth_events"] = list(_strip_hashes(pdu_json["auth_events"])) pdu_json["prev_events"] = list(_strip_hashes(pdu_json["prev_events"])) - pdu_json["event_id"] = get_localpart_from_id(pdu_json["event_id"]) + if get_domain_from_id(pdu_json["event_id"]) == get_domain_from_id(pdu_json["sender"]): + pdu_json["event_id"] = get_localpart_from_id(pdu_json["event_id"]) destinations = pdu_json["unsigned"].pop("destinations", None) if destinations: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 19dc6da4a7..5d8e6c71d8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -325,20 +325,20 @@ class FederationHandler(BaseHandler): # but there is an interaction with min_depth that I'm not really # following. - if sent_to_us_directly: - logger.warn( - "[%s %s] Rejecting: failed to fetch %d prev events: %s", - room_id, event_id, len(prevs - seen), shortstr(prevs - seen) - ) - raise FederationError( - "ERROR", - 403, - ( - "Your server isn't divulging details about prev_events " - "referenced in this event." - ), - affected=pdu.event_id, - ) + # if sent_to_us_directly: + # logger.warn( + # "[%s %s] Rejecting: failed to fetch %d prev events: %s", + # room_id, event_id, len(prevs - seen), shortstr(prevs - seen) + # ) + # raise FederationError( + # "ERROR", + # 403, + # ( + # "Your server isn't divulging details about prev_events " + # "referenced in this event." + # ), + # affected=pdu.event_id, + # ) # Calculate the state after each of the previous events, and # resolve them to find the correct state at the current event. @@ -563,9 +563,9 @@ class FederationHandler(BaseHandler): room_id, earliest_events_ids=list(latest), latest_events=[pdu], - limit=10, + limit=5, min_depth=min_depth, - timeout=60000, + timeout=15000, ) logger.info( |