From a6e3222fe5abf5b65b53678d1208c4c58f97b391 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 6 Oct 2017 14:24:06 +0100 Subject: Fed server: Move origin-check code to _handle_received_pdu The response-building code expects there to be an entry in the `results` list for each entry in the pdu_list, so the early `continue` was messing this up. That doesn't really matter, because all that the federation client does is log any errors, but it's pretty poor form. --- synapse/federation/federation_server.py | 48 ++++++++++++++++----------------- 1 file changed, 24 insertions(+), 24 deletions(-) (limited to 'synapse/federation/federation_server.py') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 51e3fdea06..e791a1266d 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -143,30 +143,6 @@ class FederationServer(FederationBase): results = [] for pdu in pdu_list: - # check that it's actually being sent from a valid destination to - # workaround bug #1753 in 0.18.5 and 0.18.6 - if transaction.origin != get_domain_from_id(pdu.event_id): - # We continue to accept join events from any server; this is - # necessary for the federation join dance to work correctly. - # (When we join over federation, the "helper" server is - # responsible for sending out the join event, rather than the - # origin. See bug #1893). - if not ( - pdu.type == 'm.room.member' and - pdu.content and - pdu.content.get("membership", None) == 'join' - ): - logger.info( - "Discarding PDU %s from invalid origin %s", - pdu.event_id, transaction.origin - ) - continue - else: - logger.info( - "Accepting join PDU %s from %s", - pdu.event_id, transaction.origin - ) - try: yield self._handle_received_pdu(transaction.origin, pdu) results.append({}) @@ -520,6 +496,30 @@ class FederationServer(FederationBase): Returns (Deferred): completes with None Raises: FederationError if the signatures / hash do not match """ + # check that it's actually being sent from a valid destination to + # workaround bug #1753 in 0.18.5 and 0.18.6 + if origin != get_domain_from_id(pdu.event_id): + # We continue to accept join events from any server; this is + # necessary for the federation join dance to work correctly. + # (When we join over federation, the "helper" server is + # responsible for sending out the join event, rather than the + # origin. See bug #1893). + if not ( + pdu.type == 'm.room.member' and + pdu.content and + pdu.content.get("membership", None) == 'join' + ): + logger.info( + "Discarding PDU %s from invalid origin %s", + pdu.event_id, origin + ) + return + else: + logger.info( + "Accepting join PDU %s from %s", + pdu.event_id, origin + ) + # Check signature. try: pdu = yield self._check_sigs_and_hash(pdu) -- cgit 1.4.1 From ba5b9b80a56a449ffab44afaf4661d5b44277898 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 6 Oct 2017 15:18:58 +0100 Subject: fed server: refactor on_incoming_transaction Move as much as possible to after the have_responded check, and reduce the number of times we iterate over the pdu list. --- synapse/federation/federation_server.py | 53 ++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 24 deletions(-) (limited to 'synapse/federation/federation_server.py') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index e791a1266d..fa4ec2ad3c 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -109,22 +109,11 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function def on_incoming_transaction(self, transaction_data): - transaction = Transaction(**transaction_data) - - received_pdus_counter.inc_by(len(transaction.pdus)) - - for p in transaction.pdus: - if "unsigned" in p: - unsigned = p["unsigned"] - if "age" in unsigned: - p["age"] = unsigned["age"] - if "age" in p: - p["age_ts"] = int(self._clock.time_msec()) - int(p["age"]) - del p["age"] + # keep this as early as possible to make the calculated origin ts as + # accurate as possible. + request_time = int(self._clock.time_msec()) - pdu_list = [ - self.event_from_pdu_json(p) for p in transaction.pdus - ] + transaction = Transaction(**transaction_data) logger.debug("[%s] Got transaction", transaction.transaction_id) @@ -140,17 +129,35 @@ class FederationServer(FederationBase): logger.debug("[%s] Transaction is new", transaction.transaction_id) - results = [] + received_pdus_counter.inc_by(len(transaction.pdus)) + + pdu_list = [] + + for p in transaction.pdus: + if "unsigned" in p: + unsigned = p["unsigned"] + if "age" in unsigned: + p["age"] = unsigned["age"] + if "age" in p: + p["age_ts"] = request_time - int(p["age"]) + del p["age"] + + event = self.event_from_pdu_json(p) + pdu_list.append(event) + + pdu_results = {} for pdu in pdu_list: + event_id = pdu.event_id try: yield self._handle_received_pdu(transaction.origin, pdu) - results.append({}) + pdu_results[event_id] = {} except FederationError as e: + logger.warn("Error handling PDU %s: %s", event_id, e) self.send_failure(e, transaction.origin) - results.append({"error": str(e)}) + pdu_results[event_id] = {"error": str(e)} except Exception as e: - results.append({"error": str(e)}) + pdu_results[event_id] = {"error": str(e)} logger.exception("Failed to handle PDU") if hasattr(transaction, "edus"): @@ -164,14 +171,12 @@ class FederationServer(FederationBase): for failure in getattr(transaction, "pdu_failures", []): logger.info("Got failure %r", failure) - logger.debug("Returning: %s", str(results)) - response = { - "pdus": dict(zip( - (p.event_id for p in pdu_list), results - )), + "pdus": pdu_results, } + logger.debug("Returning: %s", str(response)) + yield self.transaction_actions.set_response( transaction, 200, response -- cgit 1.4.1 From 4c7c4d4061ae298ce6df445c888b91d3e5791164 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 6 Oct 2017 15:31:58 +0100 Subject: Fed server: use a linearizer for ongoing transactions We don't want to process the same transaction multiple times concurrently, so use a linearizer. --- synapse/federation/federation_server.py | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) (limited to 'synapse/federation/federation_server.py') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index fa4ec2ad3c..b2dffa2c3d 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -53,6 +53,7 @@ class FederationServer(FederationBase): self.auth = hs.get_auth() self._server_linearizer = Linearizer("fed_server") + self._transaction_linearizer = Linearizer("fed_txn_handler") # We cache responses to state queries, as they take a while and often # come in waves. @@ -111,12 +112,39 @@ class FederationServer(FederationBase): def on_incoming_transaction(self, transaction_data): # keep this as early as possible to make the calculated origin ts as # accurate as possible. - request_time = int(self._clock.time_msec()) + request_time = self._clock.time_msec() transaction = Transaction(**transaction_data) + if not transaction.transaction_id: + raise Exception("Transaction missing transaction_id") + if not transaction.origin: + raise Exception("Transaction missing origin") + logger.debug("[%s] Got transaction", transaction.transaction_id) + # use a linearizer to ensure that we don't process the same transaction + # multiple times in parallel. + with (yield self._transaction_linearizer.queue( + (transaction.origin, transaction.transaction_id), + )): + result = yield self._handle_incoming_transaction( + transaction, request_time, + ) + + defer.returnValue(result) + + @defer.inlineCallbacks + def _handle_incoming_transaction(self, transaction, request_time): + """ Process an incoming transaction and return the HTTP response + + Args: + transaction (Transaction): incoming transaction + request_time (int): timestamp that the HTTP request arrived at + + Returns: + Deferred[(int, object)]: http response code and body + """ response = yield self.transaction_actions.have_responded(transaction) if response: -- cgit 1.4.1 From 6a6cc27aee16ee045b6909d2c401a9d4f6e54324 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 6 Oct 2017 16:07:20 +0100 Subject: fed server: process PDUs for different rooms in parallel With luck, this will give a real-time improvement when there are many rooms and the server ends up calling out to fetch missing events. --- synapse/federation/federation_server.py | 53 +++++++++++++++++++++------------ 1 file changed, 34 insertions(+), 19 deletions(-) (limited to 'synapse/federation/federation_server.py') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index b2dffa2c3d..f00d59e701 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -12,14 +12,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - from twisted.internet import defer from .federation_base import FederationBase from .units import Transaction, Edu -from synapse.util.async import Linearizer +from synapse.util import async from synapse.util.logutils import log_function from synapse.util.caches.response_cache import ResponseCache from synapse.events import FrozenEvent @@ -33,6 +31,9 @@ from synapse.crypto.event_signing import compute_event_signature import simplejson as json import logging +# when processing incoming transactions, we try to handle multiple rooms in +# parallel, up to this limit. +TRANSACTION_CONCURRENCY_LIMIT = 10 logger = logging.getLogger(__name__) @@ -52,8 +53,8 @@ class FederationServer(FederationBase): self.auth = hs.get_auth() - self._server_linearizer = Linearizer("fed_server") - self._transaction_linearizer = Linearizer("fed_txn_handler") + self._server_linearizer = async.Linearizer("fed_server") + self._transaction_linearizer = async.Linearizer("fed_txn_handler") # We cache responses to state queries, as they take a while and often # come in waves. @@ -159,7 +160,7 @@ class FederationServer(FederationBase): received_pdus_counter.inc_by(len(transaction.pdus)) - pdu_list = [] + pdus_by_room = {} for p in transaction.pdus: if "unsigned" in p: @@ -171,22 +172,36 @@ class FederationServer(FederationBase): del p["age"] event = self.event_from_pdu_json(p) - pdu_list.append(event) + room_id = event.room_id + pdus_by_room.setdefault(room_id, []).append(event) pdu_results = {} - for pdu in pdu_list: - event_id = pdu.event_id - try: - yield self._handle_received_pdu(transaction.origin, pdu) - pdu_results[event_id] = {} - except FederationError as e: - logger.warn("Error handling PDU %s: %s", event_id, e) - self.send_failure(e, transaction.origin) - pdu_results[event_id] = {"error": str(e)} - except Exception as e: - pdu_results[event_id] = {"error": str(e)} - logger.exception("Failed to handle PDU") + # we can process different rooms in parallel (which is useful if they + # require callouts to other servers to fetch missing events), but + # impose a limit to avoid going too crazy with ram/cpu. + @defer.inlineCallbacks + def process_pdus_for_room(room_id): + logger.debug("Processing PDUs for %s", room_id) + for pdu in pdus_by_room[room_id]: + event_id = pdu.event_id + try: + yield self._handle_received_pdu( + transaction.origin, pdu + ) + pdu_results[event_id] = {} + except FederationError as e: + logger.warn("Error handling PDU %s: %s", event_id, e) + self.send_failure(e, transaction.origin) + pdu_results[event_id] = {"error": str(e)} + except Exception as e: + pdu_results[event_id] = {"error": str(e)} + logger.exception("Failed to handle PDU %s", event_id) + + yield async.concurrently_execute( + process_pdus_for_room, pdus_by_room.keys(), + TRANSACTION_CONCURRENCY_LIMIT, + ) if hasattr(transaction, "edus"): for edu in (Edu(**x) for x in transaction.edus): -- cgit 1.4.1