diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index aab18d7f71..0e9fd902af 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -88,6 +88,8 @@ def prune_event(event):
if "age_ts" in event.unsigned:
allowed_fields["unsigned"]["age_ts"] = event.unsigned["age_ts"]
+ if "replaces_state" in event.unsigned:
+ allowed_fields["unsigned"]["replaces_state"] = event.unsigned["replaces_state"]
return type(event)(
allowed_fields,
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index da95c2ad6d..9ba3151713 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -51,10 +51,34 @@ sent_edus_counter = metrics.register_counter("sent_edus")
sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
+PDU_RETRY_TIME_MS = 1 * 60 * 1000
+
+
class FederationClient(FederationBase):
def __init__(self, hs):
super(FederationClient, self).__init__(hs)
+ self.pdu_destination_tried = {}
+ self._clock.looping_call(
+ self._clear_tried_cache, 60 * 1000,
+ )
+
+ def _clear_tried_cache(self):
+ """Clear pdu_destination_tried cache"""
+ now = self._clock.time_msec()
+
+ old_dict = self.pdu_destination_tried
+ self.pdu_destination_tried = {}
+
+ for event_id, destination_dict in old_dict.items():
+ destination_dict = {
+ dest: time
+ for dest, time in destination_dict.items()
+ if time + PDU_RETRY_TIME_MS > now
+ }
+ if destination_dict:
+ self.pdu_destination_tried[event_id] = destination_dict
+
def start_get_pdu_cache(self):
self._get_pdu_cache = ExpiringCache(
cache_name="get_pdu_cache",
@@ -240,8 +264,15 @@ class FederationClient(FederationBase):
if ev:
defer.returnValue(ev)
+ pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})
+
pdu = None
for destination in destinations:
+ now = self._clock.time_msec()
+ last_attempt = pdu_attempts.get(destination, 0)
+ if last_attempt + PDU_RETRY_TIME_MS > now:
+ continue
+
try:
limiter = yield get_retry_limiter(
destination,
@@ -269,25 +300,19 @@ class FederationClient(FederationBase):
break
- except SynapseError as e:
- logger.info(
- "Failed to get PDU %s from %s because %s",
- event_id, destination, e,
- )
- continue
- except CodeMessageException as e:
- if 400 <= e.code < 500:
- raise
+ pdu_attempts[destination] = now
+ except SynapseError as e:
logger.info(
"Failed to get PDU %s from %s because %s",
event_id, destination, e,
)
- continue
except NotRetryingDestination as e:
logger.info(e.message)
continue
except Exception as e:
+ pdu_attempts[destination] = now
+
logger.info(
"Failed to get PDU %s from %s because %s",
event_id, destination, e,
@@ -406,7 +431,7 @@ class FederationClient(FederationBase):
events and the second is a list of event ids that we failed to fetch.
"""
if return_local:
- seen_events = yield self.store.get_events(event_ids)
+ seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
signed_events = seen_events.values()
else:
seen_events = yield self.store.have_events(event_ids)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 5787f854d4..cb2ef0210c 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -21,11 +21,11 @@ from .units import Transaction
from synapse.api.errors import HttpResponseException
from synapse.util.async import run_on_reactor
-from synapse.util.logutils import log_function
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logcontext import preserve_context_over_fn
from synapse.util.retryutils import (
get_retry_limiter, NotRetryingDestination,
)
+from synapse.util.metrics import measure_func
import synapse.metrics
import logging
@@ -51,7 +51,7 @@ class TransactionQueue(object):
self.transport_layer = transport_layer
- self._clock = hs.get_clock()
+ self.clock = hs.get_clock()
# Is a mapping from destinations -> deferreds. Used to keep track
# of which destinations have transactions in flight and when they are
@@ -82,7 +82,7 @@ class TransactionQueue(object):
self.pending_failures_by_dest = {}
# HACK to get unique tx id
- self._next_txn_id = int(self._clock.time_msec())
+ self._next_txn_id = int(self.clock.time_msec())
def can_send_to(self, destination):
"""Can we send messages to the given server?
@@ -119,266 +119,215 @@ class TransactionQueue(object):
if not destinations:
return
- deferreds = []
-
for destination in destinations:
- deferred = defer.Deferred()
self.pending_pdus_by_dest.setdefault(destination, []).append(
- (pdu, deferred, order)
+ (pdu, order)
)
- def chain(failure):
- if not deferred.called:
- deferred.errback(failure)
-
- def log_failure(f):
- logger.warn("Failed to send pdu to %s: %s", destination, f.value)
-
- deferred.addErrback(log_failure)
-
- with PreserveLoggingContext():
- self._attempt_new_transaction(destination).addErrback(chain)
-
- deferreds.append(deferred)
+ preserve_context_over_fn(
+ self._attempt_new_transaction, destination
+ )
- # NO inlineCallbacks
def enqueue_edu(self, edu):
destination = edu.destination
if not self.can_send_to(destination):
return
- deferred = defer.Deferred()
- self.pending_edus_by_dest.setdefault(destination, []).append(
- (edu, deferred)
- )
+ self.pending_edus_by_dest.setdefault(destination, []).append(edu)
- def chain(failure):
- if not deferred.called:
- deferred.errback(failure)
-
- def log_failure(f):
- logger.warn("Failed to send edu to %s: %s", destination, f.value)
-
- deferred.addErrback(log_failure)
-
- with PreserveLoggingContext():
- self._attempt_new_transaction(destination).addErrback(chain)
-
- return deferred
+ preserve_context_over_fn(
+ self._attempt_new_transaction, destination
+ )
- @defer.inlineCallbacks
def enqueue_failure(self, failure, destination):
if destination == self.server_name or destination == "localhost":
return
- deferred = defer.Deferred()
-
if not self.can_send_to(destination):
return
self.pending_failures_by_dest.setdefault(
destination, []
- ).append(
- (failure, deferred)
- )
-
- def chain(f):
- if not deferred.called:
- deferred.errback(f)
-
- def log_failure(f):
- logger.warn("Failed to send failure to %s: %s", destination, f.value)
-
- deferred.addErrback(log_failure)
-
- with PreserveLoggingContext():
- self._attempt_new_transaction(destination).addErrback(chain)
+ ).append(failure)
- yield deferred
+ preserve_context_over_fn(
+ self._attempt_new_transaction, destination
+ )
@defer.inlineCallbacks
- @log_function
def _attempt_new_transaction(self, destination):
yield run_on_reactor()
+ while True:
+ # list of (pending_pdu, deferred, order)
+ if destination in self.pending_transactions:
+ # XXX: pending_transactions can get stuck on by a never-ending
+ # request at which point pending_pdus_by_dest just keeps growing.
+ # we need application-layer timeouts of some flavour of these
+ # requests
+ logger.debug(
+ "TX [%s] Transaction already in progress",
+ destination
+ )
+ return
- # list of (pending_pdu, deferred, order)
- if destination in self.pending_transactions:
- # XXX: pending_transactions can get stuck on by a never-ending
- # request at which point pending_pdus_by_dest just keeps growing.
- # we need application-layer timeouts of some flavour of these
- # requests
- logger.debug(
- "TX [%s] Transaction already in progress",
- destination
- )
- return
-
- pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
- pending_edus = self.pending_edus_by_dest.pop(destination, [])
- pending_failures = self.pending_failures_by_dest.pop(destination, [])
+ pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
+ pending_edus = self.pending_edus_by_dest.pop(destination, [])
+ pending_failures = self.pending_failures_by_dest.pop(destination, [])
- if pending_pdus:
- logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
- destination, len(pending_pdus))
+ if pending_pdus:
+ logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
+ destination, len(pending_pdus))
- if not pending_pdus and not pending_edus and not pending_failures:
- logger.debug("TX [%s] Nothing to send", destination)
- return
+ if not pending_pdus and not pending_edus and not pending_failures:
+ logger.debug("TX [%s] Nothing to send", destination)
+ return
- try:
- self.pending_transactions[destination] = 1
+ yield self._send_new_transaction(
+ destination, pending_pdus, pending_edus, pending_failures
+ )
- logger.debug("TX [%s] _attempt_new_transaction", destination)
+ @measure_func("_send_new_transaction")
+ @defer.inlineCallbacks
+ def _send_new_transaction(self, destination, pending_pdus, pending_edus,
+ pending_failures):
# Sort based on the order field
- pending_pdus.sort(key=lambda t: t[2])
-
+ pending_pdus.sort(key=lambda t: t[1])
pdus = [x[0] for x in pending_pdus]
- edus = [x[0] for x in pending_edus]
- failures = [x[0].get_dict() for x in pending_failures]
- deferreds = [
- x[1]
- for x in pending_pdus + pending_edus + pending_failures
- ]
-
- txn_id = str(self._next_txn_id)
-
- limiter = yield get_retry_limiter(
- destination,
- self._clock,
- self.store,
- )
+ edus = pending_edus
+ failures = [x.get_dict() for x in pending_failures]
- logger.debug(
- "TX [%s] {%s} Attempting new transaction"
- " (pdus: %d, edus: %d, failures: %d)",
- destination, txn_id,
- len(pending_pdus),
- len(pending_edus),
- len(pending_failures)
- )
+ try:
+ self.pending_transactions[destination] = 1
- logger.debug("TX [%s] Persisting transaction...", destination)
+ logger.debug("TX [%s] _attempt_new_transaction", destination)
- transaction = Transaction.create_new(
- origin_server_ts=int(self._clock.time_msec()),
- transaction_id=txn_id,
- origin=self.server_name,
- destination=destination,
- pdus=pdus,
- edus=edus,
- pdu_failures=failures,
- )
+ txn_id = str(self._next_txn_id)
- self._next_txn_id += 1
+ limiter = yield get_retry_limiter(
+ destination,
+ self.clock,
+ self.store,
+ )
- yield self.transaction_actions.prepare_to_send(transaction)
+ logger.debug(
+ "TX [%s] {%s} Attempting new transaction"
+ " (pdus: %d, edus: %d, failures: %d)",
+ destination, txn_id,
+ len(pending_pdus),
+ len(pending_edus),
+ len(pending_failures)
+ )
- logger.debug("TX [%s] Persisted transaction", destination)
- logger.info(
- "TX [%s] {%s} Sending transaction [%s],"
- " (PDUs: %d, EDUs: %d, failures: %d)",
- destination, txn_id,
- transaction.transaction_id,
- len(pending_pdus),
- len(pending_edus),
- len(pending_failures),
- )
+ logger.debug("TX [%s] Persisting transaction...", destination)
- with limiter:
- # Actually send the transaction
-
- # FIXME (erikj): This is a bit of a hack to make the Pdu age
- # keys work
- def json_data_cb():
- data = transaction.get_dict()
- now = int(self._clock.time_msec())
- if "pdus" in data:
- for p in data["pdus"]:
- if "age_ts" in p:
- unsigned = p.setdefault("unsigned", {})
- unsigned["age"] = now - int(p["age_ts"])
- del p["age_ts"]
- return data
-
- try:
- response = yield self.transport_layer.send_transaction(
- transaction, json_data_cb
- )
- code = 200
-
- if response:
- for e_id, r in response.get("pdus", {}).items():
- if "error" in r:
- logger.warn(
- "Transaction returned error for %s: %s",
- e_id, r,
- )
- except HttpResponseException as e:
- code = e.code
- response = e.response
+ transaction = Transaction.create_new(
+ origin_server_ts=int(self.clock.time_msec()),
+ transaction_id=txn_id,
+ origin=self.server_name,
+ destination=destination,
+ pdus=pdus,
+ edus=edus,
+ pdu_failures=failures,
+ )
+
+ self._next_txn_id += 1
+
+ yield self.transaction_actions.prepare_to_send(transaction)
+ logger.debug("TX [%s] Persisted transaction", destination)
logger.info(
- "TX [%s] {%s} got %d response",
- destination, txn_id, code
+ "TX [%s] {%s} Sending transaction [%s],"
+ " (PDUs: %d, EDUs: %d, failures: %d)",
+ destination, txn_id,
+ transaction.transaction_id,
+ len(pending_pdus),
+ len(pending_edus),
+ len(pending_failures),
)
- logger.debug("TX [%s] Sent transaction", destination)
- logger.debug("TX [%s] Marking as delivered...", destination)
+ with limiter:
+ # Actually send the transaction
+
+ # FIXME (erikj): This is a bit of a hack to make the Pdu age
+ # keys work
+ def json_data_cb():
+ data = transaction.get_dict()
+ now = int(self.clock.time_msec())
+ if "pdus" in data:
+ for p in data["pdus"]:
+ if "age_ts" in p:
+ unsigned = p.setdefault("unsigned", {})
+ unsigned["age"] = now - int(p["age_ts"])
+ del p["age_ts"]
+ return data
+
+ try:
+ response = yield self.transport_layer.send_transaction(
+ transaction, json_data_cb
+ )
+ code = 200
+
+ if response:
+ for e_id, r in response.get("pdus", {}).items():
+ if "error" in r:
+ logger.warn(
+ "Transaction returned error for %s: %s",
+ e_id, r,
+ )
+ except HttpResponseException as e:
+ code = e.code
+ response = e.response
+
+ logger.info(
+ "TX [%s] {%s} got %d response",
+ destination, txn_id, code
+ )
- yield self.transaction_actions.delivered(
- transaction, code, response
- )
+ logger.debug("TX [%s] Sent transaction", destination)
+ logger.debug("TX [%s] Marking as delivered...", destination)
- logger.debug("TX [%s] Marked as delivered", destination)
-
- logger.debug("TX [%s] Yielding to callbacks...", destination)
-
- for deferred in deferreds:
- if code == 200:
- deferred.callback(None)
- else:
- deferred.errback(RuntimeError("Got status %d" % code))
-
- # Ensures we don't continue until all callbacks on that
- # deferred have fired
- try:
- yield deferred
- except:
- pass
-
- logger.debug("TX [%s] Yielded to callbacks", destination)
- except NotRetryingDestination:
- logger.info(
- "TX [%s] not ready for retry yet - "
- "dropping transaction for now",
- destination,
- )
- except RuntimeError as e:
- # We capture this here as there as nothing actually listens
- # for this finishing functions deferred.
- logger.warn(
- "TX [%s] Problem in _attempt_transaction: %s",
- destination,
- e,
- )
- except Exception as e:
- # We capture this here as there as nothing actually listens
- # for this finishing functions deferred.
- logger.warn(
- "TX [%s] Problem in _attempt_transaction: %s",
- destination,
- e,
- )
+ yield self.transaction_actions.delivered(
+ transaction, code, response
+ )
- for deferred in deferreds:
- if not deferred.called:
- deferred.errback(e)
+ logger.debug("TX [%s] Marked as delivered", destination)
+
+ if code != 200:
+ for p in pdus:
+ logger.info(
+ "Failed to send event %s to %s", p.event_id, destination
+ )
+ except NotRetryingDestination:
+ logger.info(
+ "TX [%s] not ready for retry yet - "
+ "dropping transaction for now",
+ destination,
+ )
+ except RuntimeError as e:
+ # We capture this here as there as nothing actually listens
+ # for this finishing functions deferred.
+ logger.warn(
+ "TX [%s] Problem in _attempt_transaction: %s",
+ destination,
+ e,
+ )
+
+ for p in pdus:
+ logger.info("Failed to send event %s to %s", p.event_id, destination)
+ except Exception as e:
+ # We capture this here as there as nothing actually listens
+ # for this finishing functions deferred.
+ logger.warn(
+ "TX [%s] Problem in _attempt_transaction: %s",
+ destination,
+ e,
+ )
- finally:
- # We want to be *very* sure we delete this after we stop processing
- self.pending_transactions.pop(destination, None)
+ for p in pdus:
+ logger.info("Failed to send event %s to %s", p.event_id, destination)
- # Check to see if there is anything else to send.
- self._attempt_new_transaction(destination)
+ finally:
+ # We want to be *very* sure we delete this after we stop processing
+ self.pending_transactions.pop(destination, None)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 618cb53629..ff6bb475b5 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -249,7 +249,7 @@ class FederationHandler(BaseHandler):
if ev.type != EventTypes.Member:
continue
try:
- domain = UserID.from_string(ev.state_key).domain
+ domain = get_domain_from_id(ev.state_key)
except:
continue
@@ -1093,16 +1093,17 @@ class FederationHandler(BaseHandler):
)
if event:
- # FIXME: This is a temporary work around where we occasionally
- # return events slightly differently than when they were
- # originally signed
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
+ if self.hs.is_mine_id(event.event_id):
+ # FIXME: This is a temporary work around where we occasionally
+ # return events slightly differently than when they were
+ # originally signed
+ event.signatures.update(
+ compute_event_signature(
+ event,
+ self.hs.hostname,
+ self.hs.config.signing_key[0]
+ )
)
- )
if do_auth:
in_room = yield self.auth.check_host_in_room(
@@ -1112,6 +1113,12 @@ class FederationHandler(BaseHandler):
if not in_room:
raise AuthError(403, "Host not in room.")
+ events = yield self._filter_events_for_server(
+ origin, event.room_id, [event]
+ )
+
+ event = events[0]
+
defer.returnValue(event)
else:
defer.returnValue(None)
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index c3589534f8..f93093dd85 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -155,9 +155,7 @@ class MatrixFederationHttpClient(object):
time_out=timeout / 1000. if timeout else 60,
)
- response = yield preserve_context_over_fn(
- send_request,
- )
+ response = yield preserve_context_over_fn(send_request)
log_result = "%d %s" % (response.code, response.phrase,)
break
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 5316259d15..7a87045f87 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -317,7 +317,6 @@ def preserve_fn(f):
def g(*args, **kwargs):
with PreserveLoggingContext(current):
return f(*args, **kwargs)
-
return g
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 0b944d3e63..76f301f549 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -13,10 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from twisted.internet import defer
from synapse.util.logcontext import LoggingContext
import synapse.metrics
+from functools import wraps
import logging
@@ -47,6 +49,18 @@ block_db_txn_duration = metrics.register_distribution(
)
+def measure_func(name):
+ def wrapper(func):
+ @wraps(func)
+ @defer.inlineCallbacks
+ def measured_func(self, *args, **kwargs):
+ with Measure(self.clock, name):
+ r = yield func(self, *args, **kwargs)
+ defer.returnValue(r)
+ return measured_func
+ return wrapper
+
+
class Measure(object):
__slots__ = [
"clock", "name", "start_context", "start", "new_context", "ru_utime",
@@ -64,7 +78,6 @@ class Measure(object):
self.start = self.clock.time_msec()
self.start_context = LoggingContext.current_context()
if not self.start_context:
- logger.warn("Entered Measure without log context: %s", self.name)
self.start_context = LoggingContext("Measure")
self.start_context.__enter__()
self.created_context = True
@@ -85,7 +98,7 @@ class Measure(object):
if context != self.start_context:
logger.warn(
"Context has unexpectedly changed from '%s' to '%s'. (%r)",
- context, self.start_context, self.name
+ self.start_context, context, self.name
)
return
|