diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index fa4ec2ad3c..a8034bddc6 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -12,14 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-
from twisted.internet import defer
from .federation_base import FederationBase
from .units import Transaction, Edu
-from synapse.util.async import Linearizer
+from synapse.util import async
from synapse.util.logutils import log_function
from synapse.util.caches.response_cache import ResponseCache
from synapse.events import FrozenEvent
@@ -33,6 +31,9 @@ from synapse.crypto.event_signing import compute_event_signature
import simplejson as json
import logging
+# when processing incoming transactions, we try to handle multiple rooms in
+# parallel, up to this limit.
+TRANSACTION_CONCURRENCY_LIMIT = 10
logger = logging.getLogger(__name__)
@@ -52,7 +53,8 @@ class FederationServer(FederationBase):
self.auth = hs.get_auth()
- self._server_linearizer = Linearizer("fed_server")
+ self._server_linearizer = async.Linearizer("fed_server")
+ self._transaction_linearizer = async.Linearizer("fed_txn_handler")
# We cache responses to state queries, as they take a while and often
# come in waves.
@@ -111,12 +113,39 @@ class FederationServer(FederationBase):
def on_incoming_transaction(self, transaction_data):
# keep this as early as possible to make the calculated origin ts as
# accurate as possible.
- request_time = int(self._clock.time_msec())
+ request_time = self._clock.time_msec()
transaction = Transaction(**transaction_data)
+ if not transaction.transaction_id:
+ raise Exception("Transaction missing transaction_id")
+ if not transaction.origin:
+ raise Exception("Transaction missing origin")
+
logger.debug("[%s] Got transaction", transaction.transaction_id)
+ # use a linearizer to ensure that we don't process the same transaction
+ # multiple times in parallel.
+ with (yield self._transaction_linearizer.queue(
+ (transaction.origin, transaction.transaction_id),
+ )):
+ result = yield self._handle_incoming_transaction(
+ transaction, request_time,
+ )
+
+ defer.returnValue(result)
+
+ @defer.inlineCallbacks
+ def _handle_incoming_transaction(self, transaction, request_time):
+ """ Process an incoming transaction and return the HTTP response
+
+ Args:
+ transaction (Transaction): incoming transaction
+ request_time (int): timestamp that the HTTP request arrived at
+
+ Returns:
+ Deferred[(int, object)]: http response code and body
+ """
response = yield self.transaction_actions.have_responded(transaction)
if response:
@@ -131,7 +160,7 @@ class FederationServer(FederationBase):
received_pdus_counter.inc_by(len(transaction.pdus))
- pdu_list = []
+ pdus_by_room = {}
for p in transaction.pdus:
if "unsigned" in p:
@@ -143,22 +172,36 @@ class FederationServer(FederationBase):
del p["age"]
event = self.event_from_pdu_json(p)
- pdu_list.append(event)
+ room_id = event.room_id
+ pdus_by_room.setdefault(room_id, []).append(event)
pdu_results = {}
- for pdu in pdu_list:
- event_id = pdu.event_id
- try:
- yield self._handle_received_pdu(transaction.origin, pdu)
- pdu_results[event_id] = {}
- except FederationError as e:
- logger.warn("Error handling PDU %s: %s", event_id, e)
- self.send_failure(e, transaction.origin)
- pdu_results[event_id] = {"error": str(e)}
- except Exception as e:
- pdu_results[event_id] = {"error": str(e)}
- logger.exception("Failed to handle PDU")
+ # we can process different rooms in parallel (which is useful if they
+ # require callouts to other servers to fetch missing events), but
+ # impose a limit to avoid going too crazy with ram/cpu.
+ @defer.inlineCallbacks
+ def process_pdus_for_room(room_id):
+ logger.debug("Processing PDUs for %s", room_id)
+ for pdu in pdus_by_room[room_id]:
+ event_id = pdu.event_id
+ try:
+ yield self._handle_received_pdu(
+ transaction.origin, pdu
+ )
+ pdu_results[event_id] = {}
+ except FederationError as e:
+ logger.warn("Error handling PDU %s: %s", event_id, e)
+ self.send_failure(e, transaction.origin)
+ pdu_results[event_id] = {"error": str(e)}
+ except Exception as e:
+ pdu_results[event_id] = {"error": str(e)}
+ logger.exception("Failed to handle PDU %s", event_id)
+
+ yield async.concurrently_execute(
+ process_pdus_for_room, pdus_by_room.keys(),
+ TRANSACTION_CONCURRENCY_LIMIT,
+ )
if hasattr(transaction, "edus"):
for edu in (Edu(**x) for x in transaction.edus):
@@ -168,8 +211,9 @@ class FederationServer(FederationBase):
edu.content
)
- for failure in getattr(transaction, "pdu_failures", []):
- logger.info("Got failure %r", failure)
+ pdu_failures = getattr(transaction, "pdu_failures", [])
+ for failure in pdu_failures:
+ logger.info("Got failure %r", failure)
response = {
"pdus": pdu_results,
|