summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-11-29 15:50:04 +0000
committerBrendan Abolivier <babolivier@matrix.org>2019-02-13 15:16:05 +0000
commit4e0ac33053053048b30c9fffeaf6d02d72959c95 (patch)
treeaf4477169db06deeb4885c198775eaa0a2c194a0
parentpep8 (diff)
downloadsynapse-4e0ac33053053048b30c9fffeaf6d02d72959c95.tar.xz
Handle slow/lossy connections better when sending transactions
-rw-r--r--synapse/federation/federation_server.py19
-rw-r--r--synapse/federation/transaction_queue.py24
-rw-r--r--synapse/federation/units.py5
-rw-r--r--synapse/handlers/federation.py32
4 files changed, 44 insertions, 36 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 5d4dc6370c..352453377a 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -37,7 +37,7 @@ from synapse.api.errors import (
 from synapse.crypto.event_signing import compute_event_signature
 from synapse.federation.federation_base import FederationBase, event_from_pdu_json
 from synapse.federation.persistence import TransactionActions
-from synapse.federation.units import Edu, Transaction
+from synapse.federation.units import Edu, Transaction, _mangle_pdu
 from synapse.http.endpoint import parse_server_name
 from synapse.replication.http.federation import (
     ReplicationFederationSendEduRestServlet,
@@ -49,6 +49,7 @@ from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.logcontext import nested_logging_context
 from synapse.util.logutils import log_function
 
+
 # when processing incoming transactions, we try to handle multiple rooms in
 # parallel, up to this limit.
 TRANSACTION_CONCURRENCY_LIMIT = 10
@@ -365,8 +366,8 @@ class FederationServer(FederationBase):
                 )
 
         defer.returnValue({
-            "pdus": [pdu.get_pdu_json() for pdu in pdus],
-            "auth_chain": [pdu.get_pdu_json() for pdu in auth_chain],
+            "pdus": [_mangle_pdu(pdu.get_pdu_json()) for pdu in pdus],
+            "auth_chain": [_mangle_pdu(pdu.get_pdu_json()) for pdu in auth_chain],
         })
 
     @defer.inlineCallbacks
@@ -411,7 +412,7 @@ class FederationServer(FederationBase):
         yield self.check_server_matches_acl(origin_host, pdu.room_id)
         ret_pdu = yield self.handler.on_invite_request(origin, pdu)
         time_now = self._clock.time_msec()
-        defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)}))
+        defer.returnValue((200, {"event": _mangle_pdu(ret_pdu.get_pdu_json(time_now))}))
 
     @defer.inlineCallbacks
     def on_send_join_request(self, origin, content):
@@ -425,9 +426,9 @@ class FederationServer(FederationBase):
         res_pdus = yield self.handler.on_send_join_request(origin, pdu)
         time_now = self._clock.time_msec()
         defer.returnValue((200, {
-            "state": [p.get_pdu_json(time_now) for p in res_pdus["state"]],
+            "state": [_mangle_pdu(p.get_pdu_json(time_now)) for p in res_pdus["state"]],
             "auth_chain": [
-                p.get_pdu_json(time_now) for p in res_pdus["auth_chain"]
+                _mangle_pdu(p.get_pdu_json(time_now)) for p in res_pdus["auth_chain"]
             ],
         }))
 
@@ -460,7 +461,7 @@ class FederationServer(FederationBase):
             time_now = self._clock.time_msec()
             auth_pdus = yield self.handler.on_event_auth(event_id)
             res = {
-                "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus],
+                "auth_chain": [_mangle_pdu(a.get_pdu_json(time_now)) for a in auth_pdus],
             }
         defer.returnValue((200, res))
 
@@ -509,7 +510,7 @@ class FederationServer(FederationBase):
             time_now = self._clock.time_msec()
             send_content = {
                 "auth_chain": [
-                    e.get_pdu_json(time_now)
+                    _mangle_pdu(e.get_pdu_json(time_now))
                     for e in ret["auth_chain"]
                 ],
                 "rejects": ret.get("rejects", []),
@@ -585,7 +586,7 @@ class FederationServer(FederationBase):
             time_now = self._clock.time_msec()
 
         defer.returnValue({
-            "events": [ev.get_pdu_json(time_now) for ev in missing_events],
+            "events": [_mangle_pdu(ev.get_pdu_json(time_now)) for ev in missing_events],
         })
 
     @log_function
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 00bbf01620..d1680c5e4a 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -509,16 +509,22 @@ class TransactionQueue(object):
                 pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
 
                 # We can only include at most 50 PDUs per transactions
-                pending_pdus, leftover_pdus = pending_pdus[:50], pending_pdus[50:]
+                pending_pdus, leftover_pdus = pending_pdus[-5:], pending_pdus[:-5]
                 if leftover_pdus:
-                    self.pending_pdus_by_dest[destination] = leftover_pdus
+                    # self.pending_pdus_by_dest[destination] = leftover_pdus
+                    for _, _, p_span in leftover_pdus:
+                        p_span.set_tag("success", False)
+                        p_span.log_kv({"result": "dropped"})
+                        p_span.finish()
+
+                logger.info("TX [%s] Sending PDUs: %s", destination, pending_pdus)
 
                 pending_edus = self.pending_edus_by_dest.pop(destination, [])
 
                 # We can only include at most 100 EDUs per transactions
-                pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:]
-                if leftover_edus:
-                    self.pending_edus_by_dest[destination] = leftover_edus
+                pending_edus, leftover_edus = pending_edus[-5:], pending_edus[:-5]
+                # if leftover_edus:
+                #     self.pending_edus_by_dest[destination] = leftover_edus
 
                 pending_presence = self.pending_presence_by_dest.pop(destination, {})
 
@@ -834,7 +840,7 @@ class TransactionQueue(object):
 
             yield self._send_pdu(pdu, list(new_destinations), span)
 
-    @defer.inlineCallbacks
+    # @defer.inlineCallbacks
     def _pdu_send_txn_failed(self, destination, txn_id, pdu, span):
         """Gets called when sending a transaction failed (after retries)
         """
@@ -858,9 +864,9 @@ class TransactionQueue(object):
             },
         )
 
-        new_destinations = set(pdu.unsigned.get("destinations", []))
-        new_destinations.discard(destination)
-        yield self._send_pdu(pdu, list(new_destinations), span)
+        # new_destinations = set(pdu.unsigned.get("destinations", []))
+        # new_destinations.discard(destination)
+        # yield self._send_pdu(pdu, list(new_destinations), span)
 
 
 def _numberToBase(n, b):
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 20822414a5..885bb7e80c 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -20,7 +20,7 @@ server protocol.
 import itertools
 import logging
 
-from synapse.types import get_localpart_from_id
+from synapse.types import get_localpart_from_id, get_domain_from_id
 from synapse.util.jsonobject import JsonEncodedObject
 
 logger = logging.getLogger(__name__)
@@ -130,7 +130,8 @@ def _mangle_pdu(pdu_json):
     pdu_json["auth_events"] = list(_strip_hashes(pdu_json["auth_events"]))
     pdu_json["prev_events"] = list(_strip_hashes(pdu_json["prev_events"]))
 
-    pdu_json["event_id"] = get_localpart_from_id(pdu_json["event_id"])
+    if get_domain_from_id(pdu_json["event_id"]) == get_domain_from_id(pdu_json["sender"]):
+        pdu_json["event_id"] = get_localpart_from_id(pdu_json["event_id"])
 
     destinations = pdu_json["unsigned"].pop("destinations", None)
     if destinations:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 19dc6da4a7..5d8e6c71d8 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -325,20 +325,20 @@ class FederationHandler(BaseHandler):
                 # but there is an interaction with min_depth that I'm not really
                 # following.
 
-                if sent_to_us_directly:
-                    logger.warn(
-                        "[%s %s] Rejecting: failed to fetch %d prev events: %s",
-                        room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
-                    )
-                    raise FederationError(
-                        "ERROR",
-                        403,
-                        (
-                            "Your server isn't divulging details about prev_events "
-                            "referenced in this event."
-                        ),
-                        affected=pdu.event_id,
-                    )
+                # if sent_to_us_directly:
+                #     logger.warn(
+                #         "[%s %s] Rejecting: failed to fetch %d prev events: %s",
+                #         room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
+                #     )
+                #     raise FederationError(
+                #         "ERROR",
+                #         403,
+                #         (
+                #             "Your server isn't divulging details about prev_events "
+                #             "referenced in this event."
+                #         ),
+                #         affected=pdu.event_id,
+                #     )
 
                 # Calculate the state after each of the previous events, and
                 # resolve them to find the correct state at the current event.
@@ -563,9 +563,9 @@ class FederationHandler(BaseHandler):
             room_id,
             earliest_events_ids=list(latest),
             latest_events=[pdu],
-            limit=10,
+            limit=5,
             min_depth=min_depth,
-            timeout=60000,
+            timeout=15000,
         )
 
         logger.info(