diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
new file mode 100644
index 0000000000..a990aec4fd
--- /dev/null
+++ b/synapse/federation/federation_base.py
@@ -0,0 +1,118 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from twisted.internet import defer
+
+from synapse.events.utils import prune_event
+
+from syutil.jsonutil import encode_canonical_json
+
+from synapse.crypto.event_signing import check_event_content_hash
+
+from synapse.api.errors import SynapseError
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+class FederationBase(object):
+ @defer.inlineCallbacks
+ def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False):
+ """Takes a list of PDUs and checks the signatures and hashs of each
+ one. If a PDU fails its signature check then we check if we have it in
+ the database and if not then request if from the originating server of
+ that PDU.
+
+ If a PDU fails its content hash check then it is redacted.
+
+ The given list of PDUs are not modified, instead the function returns
+ a new list.
+
+ Args:
+ pdu (list)
+ outlier (bool)
+
+ Returns:
+ Deferred : A list of PDUs that have valid signatures and hashes.
+ """
+ signed_pdus = []
+ for pdu in pdus:
+ try:
+ new_pdu = yield self._check_sigs_and_hash(pdu)
+ signed_pdus.append(new_pdu)
+ except SynapseError:
+ # FIXME: We should handle signature failures more gracefully.
+
+ # Check local db.
+ new_pdu = yield self.store.get_event(
+ pdu.event_id,
+ allow_rejected=True
+ )
+ if new_pdu:
+ signed_pdus.append(new_pdu)
+ continue
+
+ # Check pdu.origin
+ if pdu.origin != origin:
+ new_pdu = yield self.get_pdu(
+ destinations=[pdu.origin],
+ event_id=pdu.event_id,
+ outlier=outlier,
+ )
+
+ if new_pdu:
+ signed_pdus.append(new_pdu)
+ continue
+
+ logger.warn("Failed to find copy of %s with valid signature")
+
+ defer.returnValue(signed_pdus)
+
+ @defer.inlineCallbacks
+ def _check_sigs_and_hash(self, pdu):
+ """Throws a SynapseError if the PDU does not have the correct
+ signatures.
+
+ Returns:
+ FrozenEvent: Either the given event or it redacted if it failed the
+ content hash check.
+ """
+ # Check signatures are correct.
+ redacted_event = prune_event(pdu)
+ redacted_pdu_json = redacted_event.get_pdu_json()
+
+ try:
+ yield self.keyring.verify_json_for_server(
+ pdu.origin, redacted_pdu_json
+ )
+ except SynapseError:
+ logger.warn(
+ "Signature check failed for %s redacted to %s",
+ encode_canonical_json(pdu.get_pdu_json()),
+ encode_canonical_json(redacted_pdu_json),
+ )
+ raise
+
+ if not check_event_content_hash(pdu):
+ logger.warn(
+ "Event content has been tampered, redacting %s, %s",
+ pdu.event_id, encode_canonical_json(pdu.get_dict())
+ )
+ defer.returnValue(redacted_event)
+
+ defer.returnValue(pdu)
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index e1539bd0e0..70c9a6f46b 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -16,17 +16,12 @@
from twisted.internet import defer
+from .federation_base import FederationBase
from .units import Edu
+from synapse.api.errors import CodeMessageException
from synapse.util.logutils import log_function
from synapse.events import FrozenEvent
-from synapse.events.utils import prune_event
-
-from syutil.jsonutil import encode_canonical_json
-
-from synapse.crypto.event_signing import check_event_content_hash
-
-from synapse.api.errors import SynapseError
import logging
@@ -34,7 +29,7 @@ import logging
logger = logging.getLogger(__name__)
-class FederationClient(object):
+class FederationClient(FederationBase):
@log_function
def send_pdu(self, pdu, destinations):
"""Informs the replication layer about a new PDU generated within the
@@ -186,7 +181,8 @@ class FederationClient(object):
pdu = yield self._check_sigs_and_hash(pdu)
break
-
+ except CodeMessageException:
+ raise
except Exception as e:
logger.info(
"Failed to get PDU %s from %s because %s",
@@ -224,17 +220,17 @@ class FederationClient(object):
for p in result.get("auth_chain", [])
]
- for i, pdu in enumerate(pdus):
- pdus[i] = yield self._check_sigs_and_hash(pdu)
-
- # FIXME: We should handle signature failures more gracefully.
+ signed_pdus = yield self._check_sigs_and_hash_and_fetch(
+ destination, pdus, outlier=True
+ )
- for i, pdu in enumerate(auth_chain):
- auth_chain[i] = yield self._check_sigs_and_hash(pdu)
+ signed_auth = yield self._check_sigs_and_hash_and_fetch(
+ destination, auth_chain, outlier=True
+ )
- # FIXME: We should handle signature failures more gracefully.
+ signed_auth.sort(key=lambda e: e.depth)
- defer.returnValue((pdus, auth_chain))
+ defer.returnValue((signed_pdus, signed_auth))
@defer.inlineCallbacks
@log_function
@@ -248,65 +244,88 @@ class FederationClient(object):
for p in res["auth_chain"]
]
- for i, pdu in enumerate(auth_chain):
- auth_chain[i] = yield self._check_sigs_and_hash(pdu)
-
- # FIXME: We should handle signature failures more gracefully.
+ signed_auth = yield self._check_sigs_and_hash_and_fetch(
+ destination, auth_chain, outlier=True
+ )
- auth_chain.sort(key=lambda e: e.depth)
+ signed_auth.sort(key=lambda e: e.depth)
- defer.returnValue(auth_chain)
+ defer.returnValue(signed_auth)
@defer.inlineCallbacks
- def make_join(self, destination, room_id, user_id):
- ret = yield self.transport_layer.make_join(
- destination, room_id, user_id
- )
+ def make_join(self, destinations, room_id, user_id):
+ for destination in destinations:
+ try:
+ ret = yield self.transport_layer.make_join(
+ destination, room_id, user_id
+ )
- pdu_dict = ret["event"]
+ pdu_dict = ret["event"]
- logger.debug("Got response to make_join: %s", pdu_dict)
+ logger.debug("Got response to make_join: %s", pdu_dict)
- defer.returnValue(self.event_from_pdu_json(pdu_dict))
+ defer.returnValue(
+ (destination, self.event_from_pdu_json(pdu_dict))
+ )
+ break
+ except CodeMessageException:
+ raise
+ except Exception as e:
+ logger.warn(
+ "Failed to make_join via %s: %s",
+ destination, e.message
+ )
- @defer.inlineCallbacks
- def send_join(self, destination, pdu):
- time_now = self._clock.time_msec()
- _, content = yield self.transport_layer.send_join(
- destination=destination,
- room_id=pdu.room_id,
- event_id=pdu.event_id,
- content=pdu.get_pdu_json(time_now),
- )
+ raise RuntimeError("Failed to send to any server.")
- logger.debug("Got content: %s", content)
+ @defer.inlineCallbacks
+ def send_join(self, destinations, pdu):
+ for destination in destinations:
+ try:
+ time_now = self._clock.time_msec()
+ _, content = yield self.transport_layer.send_join(
+ destination=destination,
+ room_id=pdu.room_id,
+ event_id=pdu.event_id,
+ content=pdu.get_pdu_json(time_now),
+ )
- state = [
- self.event_from_pdu_json(p, outlier=True)
- for p in content.get("state", [])
- ]
+ logger.debug("Got content: %s", content)
- auth_chain = [
- self.event_from_pdu_json(p, outlier=True)
- for p in content.get("auth_chain", [])
- ]
+ state = [
+ self.event_from_pdu_json(p, outlier=True)
+ for p in content.get("state", [])
+ ]
- for i, pdu in enumerate(state):
- state[i] = yield self._check_sigs_and_hash(pdu)
+ auth_chain = [
+ self.event_from_pdu_json(p, outlier=True)
+ for p in content.get("auth_chain", [])
+ ]
- # FIXME: We should handle signature failures more gracefully.
+ signed_state = yield self._check_sigs_and_hash_and_fetch(
+ destination, state, outlier=True
+ )
- for i, pdu in enumerate(auth_chain):
- auth_chain[i] = yield self._check_sigs_and_hash(pdu)
+ signed_auth = yield self._check_sigs_and_hash_and_fetch(
+ destination, auth_chain, outlier=True
+ )
- # FIXME: We should handle signature failures more gracefully.
+ auth_chain.sort(key=lambda e: e.depth)
- auth_chain.sort(key=lambda e: e.depth)
+ defer.returnValue({
+ "state": signed_state,
+ "auth_chain": signed_auth,
+ "origin": destination,
+ })
+ except CodeMessageException:
+ raise
+ except Exception as e:
+ logger.warn(
+ "Failed to send_join via %s: %s",
+ destination, e.message
+ )
- defer.returnValue({
- "state": state,
- "auth_chain": auth_chain,
- })
+ raise RuntimeError("Failed to send to any server.")
@defer.inlineCallbacks
def send_invite(self, destination, room_id, event_id, pdu):
@@ -353,12 +372,18 @@ class FederationClient(object):
)
auth_chain = [
- (yield self._check_sigs_and_hash(self.event_from_pdu_json(e)))
+ self.event_from_pdu_json(e)
for e in content["auth_chain"]
]
+ signed_auth = yield self._check_sigs_and_hash_and_fetch(
+ destination, auth_chain, outlier=True
+ )
+
+ signed_auth.sort(key=lambda e: e.depth)
+
ret = {
- "auth_chain": auth_chain,
+ "auth_chain": signed_auth,
"rejects": content.get("rejects", []),
"missing": content.get("missing", []),
}
@@ -373,37 +398,3 @@ class FederationClient(object):
event.internal_metadata.outlier = outlier
return event
-
- @defer.inlineCallbacks
- def _check_sigs_and_hash(self, pdu):
- """Throws a SynapseError if the PDU does not have the correct
- signatures.
-
- Returns:
- FrozenEvent: Either the given event or it redacted if it failed the
- content hash check.
- """
- # Check signatures are correct.
- redacted_event = prune_event(pdu)
- redacted_pdu_json = redacted_event.get_pdu_json()
-
- try:
- yield self.keyring.verify_json_for_server(
- pdu.origin, redacted_pdu_json
- )
- except SynapseError:
- logger.warn(
- "Signature check failed for %s redacted to %s",
- encode_canonical_json(pdu.get_pdu_json()),
- encode_canonical_json(redacted_pdu_json),
- )
- raise
-
- if not check_event_content_hash(pdu):
- logger.warn(
- "Event content has been tampered, redacting %s, %s",
- pdu.event_id, encode_canonical_json(pdu.get_dict())
- )
- defer.returnValue(redacted_event)
-
- defer.returnValue(pdu)
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 5fbd8b19de..4742ca9390 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -16,16 +16,12 @@
from twisted.internet import defer
+from .federation_base import FederationBase
from .units import Transaction, Edu
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
from synapse.events import FrozenEvent
-from synapse.events.utils import prune_event
-
-from syutil.jsonutil import encode_canonical_json
-
-from synapse.crypto.event_signing import check_event_content_hash
from synapse.api.errors import FederationError, SynapseError
@@ -35,7 +31,7 @@ import logging
logger = logging.getLogger(__name__)
-class FederationServer(object):
+class FederationServer(FederationBase):
def set_handler(self, handler):
"""Sets the handler that the replication layer will use to communicate
receipt of new PDUs from other home servers. The required methods are
@@ -251,17 +247,20 @@ class FederationServer(object):
Deferred: Results in `dict` with the same format as `content`
"""
auth_chain = [
- (yield self._check_sigs_and_hash(self.event_from_pdu_json(e)))
+ self.event_from_pdu_json(e)
for e in content["auth_chain"]
]
- missing = [
- (yield self._check_sigs_and_hash(self.event_from_pdu_json(e)))
- for e in content.get("missing", [])
- ]
+ signed_auth = yield self._check_sigs_and_hash_and_fetch(
+ origin, auth_chain, outlier=True
+ )
ret = yield self.handler.on_query_auth(
- origin, event_id, auth_chain, content.get("rejects", []), missing
+ origin,
+ event_id,
+ signed_auth,
+ content.get("rejects", []),
+ content.get("missing", []),
)
time_now = self._clock.time_msec()
@@ -426,37 +425,3 @@ class FederationServer(object):
event.internal_metadata.outlier = outlier
return event
-
- @defer.inlineCallbacks
- def _check_sigs_and_hash(self, pdu):
- """Throws a SynapseError if the PDU does not have the correct
- signatures.
-
- Returns:
- FrozenEvent: Either the given event or it redacted if it failed the
- content hash check.
- """
- # Check signatures are correct.
- redacted_event = prune_event(pdu)
- redacted_pdu_json = redacted_event.get_pdu_json()
-
- try:
- yield self.keyring.verify_json_for_server(
- pdu.origin, redacted_pdu_json
- )
- except SynapseError:
- logger.warn(
- "Signature check failed for %s redacted to %s",
- encode_canonical_json(pdu.get_pdu_json()),
- encode_canonical_json(redacted_pdu_json),
- )
- raise
-
- if not check_event_content_hash(pdu):
- logger.warn(
- "Event content has been tampered, redacting %s, %s",
- pdu.event_id, encode_canonical_json(pdu.get_dict())
- )
- defer.returnValue(redacted_event)
-
- defer.returnValue(pdu)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 9d4f2c09a2..f38aeba7cc 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
from .persistence import TransactionActions
from .units import Transaction
+from synapse.api.errors import HttpResponseException
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
@@ -238,9 +239,14 @@ class TransactionQueue(object):
del p["age_ts"]
return data
- code, response = yield self.transport_layer.send_transaction(
- transaction, json_data_cb
- )
+ try:
+ response = yield self.transport_layer.send_transaction(
+ transaction, json_data_cb
+ )
+ code = 200
+ except HttpResponseException as e:
+ code = e.code
+ response = e.response
logger.info("TX [%s] got %d response", destination, code)
@@ -274,8 +280,7 @@ class TransactionQueue(object):
pass
logger.debug("TX [%s] Yielded to callbacks", destination)
-
- except Exception as e:
+ except RuntimeError as e:
# We capture this here as there as nothing actually listens
# for this finishing functions deferred.
logger.warn(
@@ -283,6 +288,14 @@ class TransactionQueue(object):
destination,
e,
)
+ except Exception as e:
+ # We capture this here as there as nothing actually listens
+ # for this finishing functions deferred.
+ logger.exception(
+ "TX [%s] Problem in _attempt_transaction: %s",
+ destination,
+ e,
+ )
self.set_retrying(destination, retry_interval)
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 4cb1dea2de..8b137e7128 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -19,7 +19,6 @@ from synapse.api.urls import FEDERATION_PREFIX as PREFIX
from synapse.util.logutils import log_function
import logging
-import json
logger = logging.getLogger(__name__)
@@ -129,7 +128,7 @@ class TransportLayerClient(object):
# generated by the json_data_callback.
json_data = transaction.get_dict()
- code, response = yield self.client.put_json(
+ response = yield self.client.put_json(
transaction.destination,
path=PREFIX + "/send/%s/" % transaction.transaction_id,
data=json_data,
@@ -137,95 +136,86 @@ class TransportLayerClient(object):
)
logger.debug(
- "send_data dest=%s, txid=%s, got response: %d",
- transaction.destination, transaction.transaction_id, code
+ "send_data dest=%s, txid=%s, got response: 200",
+ transaction.destination, transaction.transaction_id,
)
- defer.returnValue((code, response))
+ defer.returnValue(response)
@defer.inlineCallbacks
@log_function
def make_query(self, destination, query_type, args, retry_on_dns_fail):
path = PREFIX + "/query/%s" % query_type
- response = yield self.client.get_json(
+ content = yield self.client.get_json(
destination=destination,
path=path,
args=args,
retry_on_dns_fail=retry_on_dns_fail,
)
- defer.returnValue(response)
+ defer.returnValue(content)
@defer.inlineCallbacks
@log_function
def make_join(self, destination, room_id, user_id, retry_on_dns_fail=True):
path = PREFIX + "/make_join/%s/%s" % (room_id, user_id)
- response = yield self.client.get_json(
+ content = yield self.client.get_json(
destination=destination,
path=path,
retry_on_dns_fail=retry_on_dns_fail,
)
- defer.returnValue(response)
+ defer.returnValue(content)
@defer.inlineCallbacks
@log_function
def send_join(self, destination, room_id, event_id, content):
path = PREFIX + "/send_join/%s/%s" % (room_id, event_id)
- code, content = yield self.client.put_json(
+ response = yield self.client.put_json(
destination=destination,
path=path,
data=content,
)
- if not 200 <= code < 300:
- raise RuntimeError("Got %d from send_join", code)
-
- defer.returnValue(json.loads(content))
+ defer.returnValue(response)
@defer.inlineCallbacks
@log_function
def send_invite(self, destination, room_id, event_id, content):
path = PREFIX + "/invite/%s/%s" % (room_id, event_id)
- code, content = yield self.client.put_json(
+ response = yield self.client.put_json(
destination=destination,
path=path,
data=content,
)
- if not 200 <= code < 300:
- raise RuntimeError("Got %d from send_invite", code)
-
- defer.returnValue(json.loads(content))
+ defer.returnValue(response)
@defer.inlineCallbacks
@log_function
def get_event_auth(self, destination, room_id, event_id):
path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id)
- response = yield self.client.get_json(
+ content = yield self.client.get_json(
destination=destination,
path=path,
)
- defer.returnValue(response)
+ defer.returnValue(content)
@defer.inlineCallbacks
@log_function
def send_query_auth(self, destination, room_id, event_id, content):
path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id)
- code, content = yield self.client.post_json(
+ content = yield self.client.post_json(
destination=destination,
path=path,
data=content,
)
- if not 200 <= code < 300:
- raise RuntimeError("Got %d from send_invite", code)
-
- defer.returnValue(json.loads(content))
+ defer.returnValue(content)
|