summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-11-29 15:50:04 +0000
committerBrendan Abolivier <babolivier@matrix.org>2019-02-13 20:54:34 +0000
commiteca7ece93f419f8381d63d44d0b605f2f7cf25dc (patch)
tree990c1a36a0b8094e8b43a186c2042ff55d9c3f9e
parentActually fix exceptions (diff)
downloadsynapse-eca7ece93f419f8381d63d44d0b605f2f7cf25dc.tar.xz
Handle slow/lossy connections better when sending transactions
-rw-r--r--synapse/federation/federation_server.py19
-rw-r--r--synapse/federation/transaction_queue.py16
-rw-r--r--synapse/federation/units.py5
-rw-r--r--synapse/handlers/federation.py32
4 files changed, 40 insertions, 32 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 6bd48db497..fdce831e6f 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -36,7 +36,7 @@ from synapse.api.errors import (
 from synapse.crypto.event_signing import compute_event_signature
 from synapse.federation.federation_base import FederationBase, event_from_pdu_json
 from synapse.federation.persistence import TransactionActions
-from synapse.federation.units import Edu, Transaction
+from synapse.federation.units import Edu, Transaction, _mangle_pdu
 from synapse.http.endpoint import parse_server_name
 from synapse.replication.http.federation import (
     ReplicationFederationSendEduRestServlet,
@@ -49,6 +49,7 @@ from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.logcontext import nested_logging_context
 from synapse.util.logutils import log_function
 
+
 # when processing incoming transactions, we try to handle multiple rooms in
 # parallel, up to this limit.
 TRANSACTION_CONCURRENCY_LIMIT = 10
@@ -338,8 +339,8 @@ class FederationServer(FederationBase):
                 )
 
         defer.returnValue({
-            "pdus": [pdu.get_pdu_json() for pdu in pdus],
-            "auth_chain": [pdu.get_pdu_json() for pdu in auth_chain],
+            "pdus": [_mangle_pdu(pdu.get_pdu_json()) for pdu in pdus],
+            "auth_chain": [_mangle_pdu(pdu.get_pdu_json()) for pdu in auth_chain],
         })
 
     @defer.inlineCallbacks
@@ -384,7 +385,7 @@ class FederationServer(FederationBase):
         yield self.check_server_matches_acl(origin_host, pdu.room_id)
         ret_pdu = yield self.handler.on_invite_request(origin, pdu)
         time_now = self._clock.time_msec()
-        defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)}))
+        defer.returnValue((200, {"event": _mangle_pdu(ret_pdu.get_pdu_json(time_now))}))
 
     @defer.inlineCallbacks
     def on_send_join_request(self, origin, content):
@@ -398,9 +399,9 @@ class FederationServer(FederationBase):
         res_pdus = yield self.handler.on_send_join_request(origin, pdu)
         time_now = self._clock.time_msec()
         defer.returnValue((200, {
-            "state": [p.get_pdu_json(time_now) for p in res_pdus["state"]],
+            "state": [_mangle_pdu(p.get_pdu_json(time_now)) for p in res_pdus["state"]],
             "auth_chain": [
-                p.get_pdu_json(time_now) for p in res_pdus["auth_chain"]
+                _mangle_pdu(p.get_pdu_json(time_now)) for p in res_pdus["auth_chain"]
             ],
         }))
 
@@ -433,7 +434,7 @@ class FederationServer(FederationBase):
             time_now = self._clock.time_msec()
             auth_pdus = yield self.handler.on_event_auth(event_id)
             res = {
-                "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus],
+                "auth_chain": [_mangle_pdu(a.get_pdu_json(time_now)) for a in auth_pdus],
             }
         defer.returnValue((200, res))
 
@@ -482,7 +483,7 @@ class FederationServer(FederationBase):
             time_now = self._clock.time_msec()
             send_content = {
                 "auth_chain": [
-                    e.get_pdu_json(time_now)
+                    _mangle_pdu(e.get_pdu_json(time_now))
                     for e in ret["auth_chain"]
                 ],
                 "rejects": ret.get("rejects", []),
@@ -558,7 +559,7 @@ class FederationServer(FederationBase):
             time_now = self._clock.time_msec()
 
         defer.returnValue({
-            "events": [ev.get_pdu_json(time_now) for ev in missing_events],
+            "events": [_mangle_pdu(ev.get_pdu_json(time_now)) for ev in missing_events],
         })
 
     @log_function
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index a4e760f935..2f70a148e9 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -455,16 +455,22 @@ class TransactionQueue(object):
                 pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
 
                 # We can only include at most 50 PDUs per transactions
-                pending_pdus, leftover_pdus = pending_pdus[:50], pending_pdus[50:]
+                pending_pdus, leftover_pdus = pending_pdus[-5:], pending_pdus[:-5]
                 if leftover_pdus:
-                    self.pending_pdus_by_dest[destination] = leftover_pdus
+                    # self.pending_pdus_by_dest[destination] = leftover_pdus
+                    for _, _, p_span in leftover_pdus:
+                        p_span.set_tag("success", False)
+                        p_span.log_kv({"result": "dropped"})
+                        p_span.finish()
+
+                logger.info("TX [%s] Sending PDUs: %s", destination, pending_pdus)
 
                 pending_edus = self.pending_edus_by_dest.pop(destination, [])
 
                 # We can only include at most 100 EDUs per transactions
-                pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:]
-                if leftover_edus:
-                    self.pending_edus_by_dest[destination] = leftover_edus
+                pending_edus, leftover_edus = pending_edus[-5:], pending_edus[:-5]
+                # if leftover_edus:
+                #     self.pending_edus_by_dest[destination] = leftover_edus
 
                 pending_presence = self.pending_presence_by_dest.pop(destination, {})
 
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 580e8972b5..fba6cf9a88 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -19,7 +19,7 @@ server protocol.
 
 import logging
 
-from synapse.types import get_localpart_from_id
+from synapse.types import get_localpart_from_id, get_domain_from_id
 from synapse.util.jsonobject import JsonEncodedObject
 
 logger = logging.getLogger(__name__)
@@ -126,7 +126,8 @@ def _mangle_pdu(pdu_json):
     pdu_json["auth_events"] = list(_strip_hashes(pdu_json["auth_events"]))
     pdu_json["prev_events"] = list(_strip_hashes(pdu_json["prev_events"]))
 
-    pdu_json["event_id"] = get_localpart_from_id(pdu_json["event_id"])
+    if get_domain_from_id(pdu_json["event_id"]) == get_domain_from_id(pdu_json["sender"]):
+        pdu_json["event_id"] = get_localpart_from_id(pdu_json["event_id"])
 
     logger.info("Mangled PDU: %s", pdu_json)
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 7dbb9d49fb..15a04bb777 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -304,20 +304,20 @@ class FederationHandler(BaseHandler):
                 # but there is an interaction with min_depth that I'm not really
                 # following.
 
-                if sent_to_us_directly:
-                    logger.warn(
-                        "[%s %s] Rejecting: failed to fetch %d prev events: %s",
-                        room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
-                    )
-                    raise FederationError(
-                        "ERROR",
-                        403,
-                        (
-                            "Your server isn't divulging details about prev_events "
-                            "referenced in this event."
-                        ),
-                        affected=pdu.event_id,
-                    )
+                # if sent_to_us_directly:
+                #     logger.warn(
+                #         "[%s %s] Rejecting: failed to fetch %d prev events: %s",
+                #         room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
+                #     )
+                #     raise FederationError(
+                #         "ERROR",
+                #         403,
+                #         (
+                #             "Your server isn't divulging details about prev_events "
+                #             "referenced in this event."
+                #         ),
+                #         affected=pdu.event_id,
+                #     )
 
                 # Calculate the state after each of the previous events, and
                 # resolve them to find the correct state at the current event.
@@ -506,9 +506,9 @@ class FederationHandler(BaseHandler):
             room_id,
             earliest_events_ids=list(latest),
             latest_events=[pdu],
-            limit=10,
+            limit=5,
             min_depth=min_depth,
-            timeout=60000,
+            timeout=15000,
         )
 
         logger.info(