diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 79eaa31031..4cc98a3fe8 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -14,7 +14,10 @@
# limitations under the License.
import logging
-from synapse.api.errors import SynapseError
+import six
+
+from synapse.api.constants import MAX_DEPTH
+from synapse.api.errors import SynapseError, Codes
from synapse.crypto.event_signing import check_event_content_hash
from synapse.events import FrozenEvent
from synapse.events.utils import prune_event
@@ -190,11 +193,23 @@ def event_from_pdu_json(pdu_json, outlier=False):
FrozenEvent
Raises:
- SynapseError: if the pdu is missing required fields
+ SynapseError: if the pdu is missing required fields or is otherwise
+ not a valid matrix event
"""
# we could probably enforce a bunch of other fields here (room_id, sender,
# origin, etc etc)
- assert_params_in_request(pdu_json, ('event_id', 'type'))
+ assert_params_in_request(pdu_json, ('event_id', 'type', 'depth'))
+
+ depth = pdu_json['depth']
+ if not isinstance(depth, six.integer_types):
+ raise SynapseError(400, "Depth %r not an intger" % (depth, ),
+ Codes.BAD_JSON)
+
+ if depth < 0:
+ raise SynapseError(400, "Depth too small", Codes.BAD_JSON)
+ elif depth > MAX_DEPTH:
+ raise SynapseError(400, "Depth too large", Codes.BAD_JSON)
+
event = FrozenEvent(
pdu_json
)
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 8e2c0c4cd2..6163f7c466 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -19,6 +19,8 @@ import itertools
import logging
import random
+from six.moves import range
+
from twisted.internet import defer
from synapse.api.constants import Membership
@@ -33,7 +35,7 @@ from synapse.federation.federation_base import (
import synapse.metrics
from synapse.util import logcontext, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination
@@ -413,11 +415,12 @@ class FederationClient(FederationBase):
batch_size = 20
missing_events = list(missing_events)
- for i in xrange(0, len(missing_events), batch_size):
+ for i in range(0, len(missing_events), batch_size):
batch = set(missing_events[i:i + batch_size])
deferreds = [
- preserve_fn(self.get_pdu)(
+ run_in_background(
+ self.get_pdu,
destinations=random_server_list(),
event_id=e_id,
)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 963d938edd..ded2b1871a 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -323,6 +323,8 @@ class TransactionQueue(object):
break
yield self._process_presence_inner(states_map.values())
+ except Exception:
+ logger.exception("Error sending presence states to servers")
finally:
self._processing_pending_presence = False
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index ff0656df3e..19d09f5422 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -25,7 +25,7 @@ from synapse.http.servlet import (
)
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
from synapse.types import ThirdPartyInstanceID, get_domain_from_id
import functools
@@ -152,11 +152,18 @@ class Authenticator(object):
# alive
retry_timings = yield self.store.get_destination_retry_timings(origin)
if retry_timings and retry_timings["retry_last_ts"]:
- logger.info("Marking origin %r as up", origin)
- preserve_fn(self.store.set_destination_retry_timings)(origin, 0, 0)
+ run_in_background(self._reset_retry_timings, origin)
defer.returnValue(origin)
+ @defer.inlineCallbacks
+ def _reset_retry_timings(self, origin):
+ try:
+ logger.info("Marking origin %r as up", origin)
+ yield self.store.set_destination_retry_timings(origin, 0, 0)
+ except Exception:
+ logger.exception("Error resetting retry timings on %s", origin)
+
class BaseFederationServlet(object):
REQUIRE_AUTH = True
|