diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 6bd48db497..fdce831e6f 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -36,7 +36,7 @@ from synapse.api.errors import (
from synapse.crypto.event_signing import compute_event_signature
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.federation.persistence import TransactionActions
-from synapse.federation.units import Edu, Transaction
+from synapse.federation.units import Edu, Transaction, _mangle_pdu
from synapse.http.endpoint import parse_server_name
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
@@ -49,6 +49,7 @@ from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import nested_logging_context
from synapse.util.logutils import log_function
+
# when processing incoming transactions, we try to handle multiple rooms in
# parallel, up to this limit.
TRANSACTION_CONCURRENCY_LIMIT = 10
@@ -338,8 +339,8 @@ class FederationServer(FederationBase):
)
defer.returnValue({
- "pdus": [pdu.get_pdu_json() for pdu in pdus],
- "auth_chain": [pdu.get_pdu_json() for pdu in auth_chain],
+ "pdus": [_mangle_pdu(pdu.get_pdu_json()) for pdu in pdus],
+ "auth_chain": [_mangle_pdu(pdu.get_pdu_json()) for pdu in auth_chain],
})
@defer.inlineCallbacks
@@ -384,7 +385,7 @@ class FederationServer(FederationBase):
yield self.check_server_matches_acl(origin_host, pdu.room_id)
ret_pdu = yield self.handler.on_invite_request(origin, pdu)
time_now = self._clock.time_msec()
- defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)}))
+ defer.returnValue((200, {"event": _mangle_pdu(ret_pdu.get_pdu_json(time_now))}))
@defer.inlineCallbacks
def on_send_join_request(self, origin, content):
@@ -398,9 +399,9 @@ class FederationServer(FederationBase):
res_pdus = yield self.handler.on_send_join_request(origin, pdu)
time_now = self._clock.time_msec()
defer.returnValue((200, {
- "state": [p.get_pdu_json(time_now) for p in res_pdus["state"]],
+ "state": [_mangle_pdu(p.get_pdu_json(time_now)) for p in res_pdus["state"]],
"auth_chain": [
- p.get_pdu_json(time_now) for p in res_pdus["auth_chain"]
+ _mangle_pdu(p.get_pdu_json(time_now)) for p in res_pdus["auth_chain"]
],
}))
@@ -433,7 +434,7 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec()
auth_pdus = yield self.handler.on_event_auth(event_id)
res = {
- "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus],
+ "auth_chain": [_mangle_pdu(a.get_pdu_json(time_now)) for a in auth_pdus],
}
defer.returnValue((200, res))
@@ -482,7 +483,7 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec()
send_content = {
"auth_chain": [
- e.get_pdu_json(time_now)
+ _mangle_pdu(e.get_pdu_json(time_now))
for e in ret["auth_chain"]
],
"rejects": ret.get("rejects", []),
@@ -558,7 +559,7 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec()
defer.returnValue({
- "events": [ev.get_pdu_json(time_now) for ev in missing_events],
+ "events": [_mangle_pdu(ev.get_pdu_json(time_now)) for ev in missing_events],
})
@log_function
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index a4e760f935..2f70a148e9 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -455,16 +455,22 @@ class TransactionQueue(object):
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
# We can only include at most 50 PDUs per transactions
- pending_pdus, leftover_pdus = pending_pdus[:50], pending_pdus[50:]
+ pending_pdus, leftover_pdus = pending_pdus[-5:], pending_pdus[:-5]
if leftover_pdus:
- self.pending_pdus_by_dest[destination] = leftover_pdus
+ # self.pending_pdus_by_dest[destination] = leftover_pdus
+ for _, _, p_span in leftover_pdus:
+ p_span.set_tag("success", False)
+ p_span.log_kv({"result": "dropped"})
+ p_span.finish()
+
+ logger.info("TX [%s] Sending PDUs: %s", destination, pending_pdus)
pending_edus = self.pending_edus_by_dest.pop(destination, [])
# We can only include at most 100 EDUs per transactions
- pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:]
- if leftover_edus:
- self.pending_edus_by_dest[destination] = leftover_edus
+ pending_edus, leftover_edus = pending_edus[-5:], pending_edus[:-5]
+ # if leftover_edus:
+ # self.pending_edus_by_dest[destination] = leftover_edus
pending_presence = self.pending_presence_by_dest.pop(destination, {})
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 580e8972b5..fba6cf9a88 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -19,7 +19,7 @@ server protocol.
import logging
-from synapse.types import get_localpart_from_id
+from synapse.types import get_localpart_from_id, get_domain_from_id
from synapse.util.jsonobject import JsonEncodedObject
logger = logging.getLogger(__name__)
@@ -126,7 +126,8 @@ def _mangle_pdu(pdu_json):
pdu_json["auth_events"] = list(_strip_hashes(pdu_json["auth_events"]))
pdu_json["prev_events"] = list(_strip_hashes(pdu_json["prev_events"]))
- pdu_json["event_id"] = get_localpart_from_id(pdu_json["event_id"])
+ if get_domain_from_id(pdu_json["event_id"]) == get_domain_from_id(pdu_json["sender"]):
+ pdu_json["event_id"] = get_localpart_from_id(pdu_json["event_id"])
logger.info("Mangled PDU: %s", pdu_json)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 7dbb9d49fb..15a04bb777 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -304,20 +304,20 @@ class FederationHandler(BaseHandler):
# but there is an interaction with min_depth that I'm not really
# following.
- if sent_to_us_directly:
- logger.warn(
- "[%s %s] Rejecting: failed to fetch %d prev events: %s",
- room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
- )
- raise FederationError(
- "ERROR",
- 403,
- (
- "Your server isn't divulging details about prev_events "
- "referenced in this event."
- ),
- affected=pdu.event_id,
- )
+ # if sent_to_us_directly:
+ # logger.warn(
+ # "[%s %s] Rejecting: failed to fetch %d prev events: %s",
+ # room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
+ # )
+ # raise FederationError(
+ # "ERROR",
+ # 403,
+ # (
+ # "Your server isn't divulging details about prev_events "
+ # "referenced in this event."
+ # ),
+ # affected=pdu.event_id,
+ # )
# Calculate the state after each of the previous events, and
# resolve them to find the correct state at the current event.
@@ -506,9 +506,9 @@ class FederationHandler(BaseHandler):
room_id,
earliest_events_ids=list(latest),
latest_events=[pdu],
- limit=10,
+ limit=5,
min_depth=min_depth,
- timeout=60000,
+ timeout=15000,
)
logger.info(
|