diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 5217d91aab..f0430b2cb1 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -80,6 +80,7 @@ class FederationBase(object):
destinations=[pdu.origin],
event_id=pdu.event_id,
outlier=outlier,
+ timeout=10000,
)
if new_pdu:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 3a7bc0c9a7..3249060bcf 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -22,6 +22,7 @@ from .units import Edu
from synapse.api.errors import (
CodeMessageException, HttpResponseException, SynapseError,
)
+from synapse.util import unwrapFirstError
from synapse.util.expiringcache import ExpiringCache
from synapse.util.logutils import log_function
from synapse.events import FrozenEvent
@@ -173,7 +174,7 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
@log_function
- def get_pdu(self, destinations, event_id, outlier=False):
+ def get_pdu(self, destinations, event_id, outlier=False, timeout=None):
"""Requests the PDU with given origin and ID from the remote home
servers.
@@ -189,6 +190,8 @@ class FederationClient(FederationBase):
outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if
it's from an arbitary point in the context as opposed to part
of the current block of PDUs. Defaults to `False`
+ timeout (int): How long to try (in ms) each destination for before
+ moving to the next destination. None indicates no timeout.
Returns:
Deferred: Results in the requested PDU.
@@ -212,7 +215,7 @@ class FederationClient(FederationBase):
with limiter:
transaction_data = yield self.transport_layer.get_event(
- destination, event_id
+ destination, event_id, timeout=timeout,
)
logger.debug("transaction_data %r", transaction_data)
@@ -370,13 +373,17 @@ class FederationClient(FederationBase):
for p in content.get("auth_chain", [])
]
- signed_state = yield self._check_sigs_and_hash_and_fetch(
- destination, state, outlier=True
- )
-
- signed_auth = yield self._check_sigs_and_hash_and_fetch(
- destination, auth_chain, outlier=True
- )
+ signed_state, signed_auth = yield defer.gatherResults(
+ [
+ self._check_sigs_and_hash_and_fetch(
+ destination, state, outlier=True
+ ),
+ self._check_sigs_and_hash_and_fetch(
+ destination, auth_chain, outlier=True
+ )
+ ],
+ consumeErrors=True
+ ).addErrback(unwrapFirstError)
auth_chain.sort(key=lambda e: e.depth)
@@ -518,7 +525,7 @@ class FederationClient(FederationBase):
# Are we missing any?
seen_events = set(earliest_events_ids)
- seen_events.update(e.event_id for e in signed_events)
+ seen_events.update(e.event_id for e in signed_events if e)
missing_events = {}
for e in itertools.chain(latest_events, signed_events):
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 80d03012b7..610a4c3163 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -50,13 +50,15 @@ class TransportLayerClient(object):
)
@log_function
- def get_event(self, destination, event_id):
+ def get_event(self, destination, event_id, timeout=None):
""" Requests the pdu with give id and origin from the given server.
Args:
destination (str): The host name of the remote home server we want
to get the state from.
event_id (str): The id of the event being requested.
+ timeout (int): How long to try (in ms) the destination for before
+ giving up. None indicates no timeout.
Returns:
Deferred: Results in a dict received from the remote homeserver.
@@ -65,7 +67,7 @@ class TransportLayerClient(object):
destination, event_id)
path = PREFIX + "/event/%s/" % (event_id, )
- return self.client.get_json(destination, path=path)
+ return self.client.get_json(destination, path=path, timeout=timeout)
@log_function
def backfill(self, destination, room_id, event_tuples, limit):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d35d9f603c..d85b1cf5de 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -517,30 +517,59 @@ class FederationHandler(BaseHandler):
# FIXME
pass
- for e in auth_chain:
- e.internal_metadata.outlier = True
+ auth_ids_to_deferred = {}
+
+ def process_auth_ev(ev):
+ auth_ids = [e_id for e_id, _ in ev.auth_events]
+
+ prev_ds = [
+ auth_ids_to_deferred[i]
+ for i in auth_ids
+ if i in auth_ids_to_deferred
+ ]
+
+ d = defer.Deferred()
+
+ auth_ids_to_deferred[ev.event_id] = d
+
+ @defer.inlineCallbacks
+ def f(*_):
+ ev.internal_metadata.outlier = True
+
+ try:
+ auth = {
+ (e.type, e.state_key): e for e in auth_chain
+ if e.event_id in auth_ids
+ }
+
+ yield self._handle_new_event(
+ origin, ev, auth_events=auth
+ )
+ except:
+ logger.exception(
+ "Failed to handle auth event %s",
+ ev.event_id,
+ )
+
+ d.callback(None)
+
+ if prev_ds:
+ dx = defer.DeferredList(prev_ds)
+ dx.addBoth(f)
+ else:
+ f()
+ for e in auth_chain:
if e.event_id == event.event_id:
- continue
+ return
+ process_auth_ev(e)
- try:
- auth_ids = [e_id for e_id, _ in e.auth_events]
- auth = {
- (e.type, e.state_key): e for e in auth_chain
- if e.event_id in auth_ids
- }
- yield self._handle_new_event(
- origin, e, auth_events=auth
- )
- except:
- logger.exception(
- "Failed to handle auth event %s",
- e.event_id,
- )
+ yield defer.DeferredList(auth_ids_to_deferred.values())
- for e in state:
+ @defer.inlineCallbacks
+ def handle_state(e):
if e.event_id == event.event_id:
- continue
+ return
e.internal_metadata.outlier = True
try:
@@ -558,6 +587,8 @@ class FederationHandler(BaseHandler):
e.event_id,
)
+ yield defer.DeferredList([handle_state(e) for e in state])
+
auth_ids = [e_id for e_id, _ in event.auth_events]
auth_events = {
(e.type, e.state_key): e for e in auth_chain
@@ -893,9 +924,12 @@ class FederationHandler(BaseHandler):
# This is a hack to fix some old rooms where the initial join event
# didn't reference the create event in its auth events.
if event.type == EventTypes.Member and not event.auth_events:
- if len(event.prev_events) == 1:
- c = yield self.store.get_event(event.prev_events[0][0])
- if c.type == EventTypes.Create:
+ if len(event.prev_events) == 1 and event.depth < 5:
+ c = yield self.store.get_event(
+ event.prev_events[0][0],
+ allow_none=True,
+ )
+ if c and c.type == EventTypes.Create:
auth_events[(c.type, c.state_key)] = c
try:
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index c99d237c73..6f976d5ce8 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -110,7 +110,8 @@ class MatrixFederationHttpClient(object):
@defer.inlineCallbacks
def _create_request(self, destination, method, path_bytes,
body_callback, headers_dict={}, param_bytes=b"",
- query_bytes=b"", retry_on_dns_fail=True):
+ query_bytes=b"", retry_on_dns_fail=True,
+ timeout=None):
""" Creates and sends a request to the given url
"""
headers_dict[b"User-Agent"] = [self.version_string]
@@ -158,7 +159,7 @@ class MatrixFederationHttpClient(object):
response = yield self.clock.time_bound_deferred(
request_deferred,
- time_out=60,
+ time_out=timeout/1000. if timeout else 60,
)
logger.debug("Got response to %s", method)
@@ -181,7 +182,7 @@ class MatrixFederationHttpClient(object):
_flatten_response_never_received(e),
)
- if retries_left:
+ if retries_left and not timeout:
yield sleep(2 ** (5 - retries_left))
retries_left -= 1
else:
@@ -334,7 +335,8 @@ class MatrixFederationHttpClient(object):
defer.returnValue(json.loads(body))
@defer.inlineCallbacks
- def get_json(self, destination, path, args={}, retry_on_dns_fail=True):
+ def get_json(self, destination, path, args={}, retry_on_dns_fail=True,
+ timeout=None):
""" GETs some json from the given host homeserver and path
Args:
@@ -343,6 +345,9 @@ class MatrixFederationHttpClient(object):
path (str): The HTTP path.
args (dict): A dictionary used to create query strings, defaults to
None.
+ timeout (int): How long to try (in ms) the destination for before
+ giving up. None indicates no timeout and that the request will
+ be retried.
Returns:
Deferred: Succeeds when we get *any* HTTP response.
@@ -370,7 +375,8 @@ class MatrixFederationHttpClient(object):
path.encode("ascii"),
query_bytes=query_bytes,
body_callback=body_callback,
- retry_on_dns_fail=retry_on_dns_fail
+ retry_on_dns_fail=retry_on_dns_fail,
+ timeout=timeout,
)
if 200 <= response.code < 300:
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 23573e8b2b..114ccece65 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -330,12 +330,13 @@ class EventFederationStore(SQLBaseStore):
" WHERE event_id = ? AND room_id = ?"
" )"
" AND NOT EXISTS ("
- " SELECT 1 FROM events WHERE event_id = ? AND room_id = ?"
+ " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+ " AND outlier = ?"
" )"
)
txn.executemany(query, [
- (e_id, room_id, e_id, room_id, e_id, room_id, )
+ (e_id, room_id, e_id, room_id, e_id, room_id, False)
for e_id, _ in prev_events
])
|