diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 578e9250fb..9dc46aa15f 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -341,7 +341,7 @@ class E2eKeysHandler(object):
def _exception_to_failure(e):
if isinstance(e, CodeMessageException):
return {
- "status": e.code, "message": e.message,
+ "status": e.code, "message": str(e),
}
if isinstance(e, NotRetryingDestination):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8d6bd7976d..38bebbf598 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -136,7 +136,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def on_receive_pdu(
- self, origin, pdu, get_missing=True, sent_to_us_directly=False,
+ self, origin, pdu, sent_to_us_directly=False,
):
""" Process a PDU received via a federation /send/ transaction, or
via backfill of missing prev_events
@@ -145,7 +145,8 @@ class FederationHandler(BaseHandler):
origin (str): server which initiated the /send/ transaction. Will
be used to fetch missing events or state.
pdu (FrozenEvent): received PDU
- get_missing (bool): True if we should fetch missing prev_events
+ sent_to_us_directly (bool): True if this event was pushed to us; False if
+ we pulled it as the result of a missing prev_event.
Returns (Deferred): completes with None
"""
@@ -250,7 +251,7 @@ class FederationHandler(BaseHandler):
pdu.internal_metadata.outlier = True
elif min_depth and pdu.depth > min_depth:
missing_prevs = prevs - seen
- if get_missing and missing_prevs:
+ if sent_to_us_directly and missing_prevs:
# If we're missing stuff, ensure we only fetch stuff one
# at a time.
logger.info(
@@ -282,24 +283,46 @@ class FederationHandler(BaseHandler):
room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
)
- if sent_to_us_directly and prevs - seen:
- # If they have sent it to us directly, and the server
- # isn't telling us about the auth events that it's
- # made a message referencing, we explode
- logger.warn(
- "[%s %s] Failed to fetch %d prev events: rejecting",
- room_id, event_id, len(prevs - seen),
- )
- raise FederationError(
- "ERROR",
- 403,
- (
- "Your server isn't divulging details about prev_events "
- "referenced in this event."
- ),
- affected=pdu.event_id,
- )
- elif prevs - seen:
+ if prevs - seen:
+ # We've still not been able to get all of the prev_events for this event.
+ #
+ # In this case, we need to fall back to asking another server in the
+ # federation for the state at this event. That's ok provided we then
+ # resolve the state against other bits of the DAG before using it (which
+ # will ensure that you can't just take over a room by sending an event,
+ # withholding its prev_events, and declaring yourself to be an admin in
+ # the subsequent state request).
+ #
+ # Now, if we're pulling this event as a missing prev_event, then clearly
+ # this event is not going to become the only forward-extremity and we are
+ # guaranteed to resolve its state against our existing forward
+ # extremities, so that should be fine.
+ #
+ # On the other hand, if this event was pushed to us, it is possible for
+ # it to become the only forward-extremity in the room, and we would then
+ # trust its state to be the state for the whole room. This is very bad.
+ # Further, if the event was pushed to us, there is no excuse for us not to
+ # have all the prev_events. We therefore reject any such events.
+ #
+ # XXX this really feels like it could/should be merged with the above,
+ # but there is an interaction with min_depth that I'm not really
+ # following.
+
+ if sent_to_us_directly:
+ logger.warn(
+ "[%s %s] Failed to fetch %d prev events: rejecting",
+ room_id, event_id, len(prevs - seen),
+ )
+ raise FederationError(
+ "ERROR",
+ 403,
+ (
+ "Your server isn't divulging details about prev_events "
+ "referenced in this event."
+ ),
+ affected=pdu.event_id,
+ )
+
# Calculate the state of the previous events, and
# de-conflict them to find the current state.
state_groups = []
@@ -316,14 +339,26 @@ class FederationHandler(BaseHandler):
"[%s %s] Requesting state at missing prev_event %s",
room_id, event_id, p,
)
- state, got_auth_chain = (
- yield self.federation_client.get_state_for_room(
- origin, room_id, p,
+
+ with logcontext.nested_logging_context(p):
+ # note that if any of the missing prevs share missing state or
+ # auth events, the requests to fetch those events are deduped
+ # by the get_pdu_cache in federation_client.
+ remote_state, got_auth_chain = (
+ yield self.federation_client.get_state_for_room(
+ origin, room_id, p,
+ )
)
- )
- auth_chains.update(got_auth_chain)
- state_group = {(x.type, x.state_key): x.event_id for x in state}
- state_groups.append(state_group)
+
+ # XXX hrm I'm not convinced that duplicate events will compare
+ # for equality, so I'm not sure this does what the author
+ # hoped.
+ auth_chains.update(got_auth_chain)
+
+ state_group = {
+ (x.type, x.state_key): x.event_id for x in remote_state
+ }
+ state_groups.append(state_group)
# Resolve any conflicting state
def fetch(ev_ids):
@@ -460,20 +495,21 @@ class FederationHandler(BaseHandler):
"[%s %s] Handling received prev_event %s",
room_id, event_id, ev.event_id,
)
- try:
- yield self.on_receive_pdu(
- origin,
- ev,
- get_missing=False
- )
- except FederationError as e:
- if e.code == 403:
- logger.warn(
- "[%s %s] Received prev_event %s failed history check.",
- room_id, event_id, ev.event_id,
+ with logcontext.nested_logging_context(ev.event_id):
+ try:
+ yield self.on_receive_pdu(
+ origin,
+ ev,
+ sent_to_us_directly=False,
)
- else:
- raise
+ except FederationError as e:
+ if e.code == 403:
+ logger.warn(
+ "[%s %s] Received prev_event %s failed history check.",
+ room_id, event_id, ev.event_id,
+ )
+ else:
+ raise
@defer.inlineCallbacks
def _process_received_pdu(self, origin, event, state, auth_chain):
@@ -549,6 +585,10 @@ class FederationHandler(BaseHandler):
})
seen_ids.add(e.event_id)
+ logger.info(
+ "[%s %s] persisting newly-received auth/state events %s",
+ room_id, event_id, [e["event"].event_id for e in event_infos]
+ )
yield self._handle_new_events(origin, event_infos)
try:
@@ -1112,7 +1152,8 @@ class FederationHandler(BaseHandler):
try:
logger.info("Processing queued PDU %s which was received "
"while we were joining %s", p.event_id, p.room_id)
- yield self.on_receive_pdu(origin, p)
+ with logcontext.nested_logging_context(p.event_id):
+ yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
except Exception as e:
logger.warn(
"Error handling queued PDU %s from %s: %s",
@@ -1558,15 +1599,22 @@ class FederationHandler(BaseHandler):
Notifies about the events where appropriate.
"""
- contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
- [
- logcontext.run_in_background(
- self._prep_event,
+
+ @defer.inlineCallbacks
+ def prep(ev_info):
+ event = ev_info["event"]
+ with logcontext.nested_logging_context(suffix=event.event_id):
+ res = yield self._prep_event(
origin,
- ev_info["event"],
+ event,
state=ev_info.get("state"),
auth_events=ev_info.get("auth_events"),
)
+ defer.returnValue(res)
+
+ contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
+ [
+ logcontext.run_in_background(prep, ev_info)
for ev_info in event_infos
], consumeErrors=True,
))
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 75b8b7ce6a..f284d5a385 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -278,7 +278,7 @@ class BaseProfileHandler(BaseHandler):
except Exception as e:
logger.warn(
"Failed to update join event for room %s - %s",
- room_id, str(e.message)
+ room_id, str(e)
)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 2d2d3d5a0d..65f475d639 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -20,6 +20,7 @@ from twisted.internet import defer
from synapse.api.errors import AuthError, SynapseError
from synapse.types import UserID, get_domain_from_id
+from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
@@ -68,6 +69,11 @@ class TypingHandler(object):
# map room IDs to sets of users currently typing
self._room_typing = {}
+ # caches which room_ids changed at which serials
+ self._typing_stream_change_cache = StreamChangeCache(
+ "TypingStreamChangeCache", self._latest_room_serial,
+ )
+
self.clock.looping_call(
self._handle_timeouts,
5000,
@@ -274,19 +280,29 @@ class TypingHandler(object):
self._latest_room_serial += 1
self._room_serials[member.room_id] = self._latest_room_serial
+ self._typing_stream_change_cache.entity_has_changed(
+ member.room_id, self._latest_room_serial,
+ )
self.notifier.on_new_event(
"typing_key", self._latest_room_serial, rooms=[member.room_id]
)
def get_all_typing_updates(self, last_id, current_id):
- # TODO: Work out a way to do this without scanning the entire state.
if last_id == current_id:
return []
+ changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
+ last_id,
+ )
+
+ if changed_rooms is None:
+ changed_rooms = self._room_serials
+
rows = []
- for room_id, serial in self._room_serials.items():
- if last_id < serial and serial <= current_id:
+ for room_id in changed_rooms:
+ serial = self._room_serials[room_id]
+ if last_id < serial <= current_id:
typing = self._room_typing[room_id]
rows.append((serial, room_id, list(typing)))
rows.sort()
|