diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2e8b8a1f9a..925eb5376e 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -18,13 +18,16 @@
from ._base import BaseHandler
from synapse.api.events.utils import prune_event
-from synapse.api.errors import AuthError, FederationError, SynapseError
-from synapse.api.events.room import RoomMemberEvent
+from synapse.api.errors import (
+ AuthError, FederationError, SynapseError, StoreError,
+)
+from synapse.api.events.room import RoomMemberEvent, RoomCreateEvent
from synapse.api.constants import Membership
from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor
from synapse.crypto.event_signing import (
- compute_event_signature, check_event_content_hash
+ compute_event_signature, check_event_content_hash,
+ add_hashes_and_signatures,
)
from syutil.jsonutil import encode_canonical_json
@@ -98,7 +101,7 @@ class FederationHandler(BaseHandler):
@log_function
@defer.inlineCallbacks
- def on_receive_pdu(self, pdu, backfilled, state=None):
+ def on_receive_pdu(self, origin, pdu, backfilled, state=None):
""" Called by the ReplicationLayer when we have a new pdu. We need to
do auth checks and put it through the StateHandler.
"""
@@ -109,7 +112,7 @@ class FederationHandler(BaseHandler):
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
if event.room_id in self.room_queues:
- self.room_queues[event.room_id].append(pdu)
+ self.room_queues[event.room_id].append((pdu, origin))
return
logger.debug("Processing event: %s", event.event_id)
@@ -141,15 +144,62 @@ class FederationHandler(BaseHandler):
)
event = redacted_event
- is_new_state = yield self.state_handler.annotate_event_with_state(
- event,
- old_state=state
+ logger.debug("Event: %s", event)
+
+ # FIXME (erikj): Awful hack to make the case where we are not currently
+ # in the room work
+ current_state = None
+ is_in_room = yield self.auth.check_host_in_room(
+ event.room_id,
+ self.server_name
)
+ if not is_in_room and not event.outlier:
+ logger.debug("Got event for room we're not in.")
+
+ replication_layer = self.replication_layer
+ auth_chain = yield replication_layer.get_event_auth(
+ origin,
+ context=event.room_id,
+ event_id=event.event_id,
+ )
- logger.debug("Event: %s", event)
+ for e in auth_chain:
+ e.outlier = True
+ try:
+ yield self._handle_new_event(e, fetch_missing=False)
+ except:
+ logger.exception(
+ "Failed to parse auth event %s",
+ e.event_id,
+ )
+
+ if not state:
+ state = yield replication_layer.get_state_for_context(
+ origin,
+ context=event.room_id,
+ event_id=event.event_id,
+ )
+
+ current_state = state
+
+ if state:
+ for e in state:
+ e.outlier = True
+ try:
+ yield self._handle_new_event(e)
+ except:
+ logger.exception(
+ "Failed to parse state event %s",
+ e.event_id,
+ )
try:
- self.auth.check(event, raises=True)
+ yield self._handle_new_event(
+ event,
+ state=state,
+ backfilled=backfilled,
+ current_state=current_state,
+ )
except AuthError as e:
raise FederationError(
"ERROR",
@@ -158,43 +208,17 @@ class FederationHandler(BaseHandler):
affected=event.event_id,
)
- is_new_state = is_new_state and not backfilled
-
- # TODO: Implement something in federation that allows us to
- # respond to PDU.
-
- yield self.store.persist_event(
- event,
- backfilled,
- is_new_state=is_new_state
- )
-
room = yield self.store.get_room(event.room_id)
if not room:
- # Huh, let's try and get the current state
try:
- yield self.replication_layer.get_state_for_context(
- event.origin, event.room_id, event.event_id,
- )
-
- hosts = yield self.store.get_joined_hosts_for_room(
- event.room_id
- )
- if self.hs.hostname in hosts:
- try:
- yield self.store.store_room(
- room_id=event.room_id,
- room_creator_user_id="",
- is_public=False,
- )
- except:
- pass
- except:
- logger.exception(
- "Failed to get current state for room %s",
- event.room_id
+ yield self.store.store_room(
+ room_id=event.room_id,
+ room_creator_user_id="",
+ is_public=False,
)
+ except StoreError:
+ logger.exception("Failed to store room.")
if not backfilled:
extra_users = []
@@ -255,11 +279,23 @@ class FederationHandler(BaseHandler):
pdu=event
)
+
+
defer.returnValue(pdu)
@defer.inlineCallbacks
def on_event_auth(self, event_id):
auth = yield self.store.get_auth_chain(event_id)
+
+ for event in auth:
+ event.signatures.update(
+ compute_event_signature(
+ event,
+ self.hs.hostname,
+ self.hs.config.signing_key[0]
+ )
+ )
+
defer.returnValue([e for e in auth])
@log_function
@@ -276,6 +312,8 @@ class FederationHandler(BaseHandler):
We suspend processing of any received events from this room until we
have finished processing the join.
"""
+ logger.debug("Joining %s to %s", joinee, room_id)
+
pdu = yield self.replication_layer.make_join(
target_host,
room_id,
@@ -298,19 +336,29 @@ class FederationHandler(BaseHandler):
try:
event.event_id = self.event_factory.create_event_id()
+ event.origin = self.hs.hostname
event.content = content
- state = yield self.replication_layer.send_join(
+ if not hasattr(event, "signatures"):
+ event.signatures = {}
+
+ add_hashes_and_signatures(
+ event,
+ self.hs.hostname,
+ self.hs.config.signing_key[0],
+ )
+
+ ret = yield self.replication_layer.send_join(
target_host,
event
)
- logger.debug("do_invite_join state: %s", state)
+ state = ret["state"]
+ auth_chain = ret["auth_chain"]
+ auth_chain.sort(key=lambda e: e.depth)
- yield self.state_handler.annotate_event_with_state(
- event,
- old_state=state
- )
+ logger.debug("do_invite_join auth_chain: %s", auth_chain)
+ logger.debug("do_invite_join state: %s", state)
logger.debug("do_invite_join event: %s", event)
@@ -324,34 +372,50 @@ class FederationHandler(BaseHandler):
# FIXME
pass
+ for e in auth_chain:
+ e.outlier = True
+ try:
+ yield self._handle_new_event(e, fetch_missing=False)
+ except:
+ logger.exception(
+ "Failed to parse auth event %s",
+ e.event_id,
+ )
+
for e in state:
# FIXME: Auth these.
e.outlier = True
+ try:
+ yield self._handle_new_event(
+ e,
+ fetch_missing=True
+ )
+ except:
+ logger.exception(
+ "Failed to parse state event %s",
+ e.event_id,
+ )
- yield self.state_handler.annotate_event_with_state(
- e,
- )
-
- yield self.store.persist_event(
- e,
- backfilled=False,
- is_new_state=True
- )
-
- yield self.store.persist_event(
+ yield self._handle_new_event(
event,
- backfilled=False,
- is_new_state=True
+ state=state,
+ current_state=state,
)
+
+ yield self.notifier.on_new_room_event(
+ event, extra_users=[joinee]
+ )
+
+ logger.debug("Finished joining %s to %s", joinee, room_id)
finally:
room_queue = self.room_queues[room_id]
del self.room_queues[room_id]
- for p in room_queue:
+ for p, origin in room_queue:
try:
- yield self.on_receive_pdu(p, backfilled=False)
+ self.on_receive_pdu(origin, p, backfilled=False)
except:
- pass
+ logger.exception("Couldn't handle pdu")
defer.returnValue(True)
@@ -375,7 +439,7 @@ class FederationHandler(BaseHandler):
yield self.state_handler.annotate_event_with_state(event)
yield self.auth.add_auth_events(event)
- self.auth.check(event, raises=True)
+ self.auth.check(event, auth_events=event.old_state_events)
pdu = event
@@ -391,17 +455,7 @@ class FederationHandler(BaseHandler):
event.outlier = False
- state_handler = self.state_handler
- is_new_state = yield state_handler.annotate_event_with_state(event)
- self.auth.check(event, raises=True)
-
- # FIXME (erikj): All this is duplicated above :(
-
- yield self.store.persist_event(
- event,
- backfilled=False,
- is_new_state=is_new_state
- )
+ yield self._handle_new_event(event)
extra_users = []
if event.type == RoomMemberEvent.TYPE:
@@ -414,7 +468,7 @@ class FederationHandler(BaseHandler):
)
if event.type == RoomMemberEvent.TYPE:
- if event.membership == Membership.JOIN:
+ if event.content["membership"] == Membership.JOIN:
user = self.hs.parse_userid(event.state_key)
yield self.distributor.fire(
"user_joined_room", user=user, room_id=event.room_id
@@ -508,7 +562,17 @@ class FederationHandler(BaseHandler):
else:
del results[(event.type, event.state_key)]
- defer.returnValue(results.values())
+ res = results.values()
+ for event in res:
+ event.signatures.update(
+ compute_event_signature(
+ event,
+ self.hs.hostname,
+ self.hs.config.signing_key[0]
+ )
+ )
+
+ defer.returnValue(res)
else:
defer.returnValue([])
@@ -529,7 +593,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
- def get_persisted_pdu(self, origin, event_id):
+ def get_persisted_pdu(self, origin, event_id, do_auth=True):
""" Get a PDU from the database with given origin and id.
Returns:
@@ -541,12 +605,24 @@ class FederationHandler(BaseHandler):
)
if event:
- in_room = yield self.auth.check_host_in_room(
- event.room_id,
- origin
+ # FIXME: This is a temporary work around where we occasionally
+ # return events slightly differently than when they were
+ # originally signed
+ event.signatures.update(
+ compute_event_signature(
+ event,
+ self.hs.hostname,
+ self.hs.config.signing_key[0]
+ )
)
- if not in_room:
- raise AuthError(403, "Host not in room.")
+
+ if do_auth:
+ in_room = yield self.auth.check_host_in_room(
+ event.room_id,
+ origin
+ )
+ if not in_room:
+ raise AuthError(403, "Host not in room.")
defer.returnValue(event)
else:
@@ -564,3 +640,78 @@ class FederationHandler(BaseHandler):
)
while waiters:
waiters.pop().callback(None)
+
+ @defer.inlineCallbacks
+ def _handle_new_event(self, event, state=None, backfilled=False,
+ current_state=None, fetch_missing=True):
+ is_new_state = yield self.state_handler.annotate_event_with_state(
+ event,
+ old_state=state
+ )
+
+ if event.old_state_events:
+ known_ids = set(
+ [s.event_id for s in event.old_state_events.values()]
+ )
+ for e_id, _ in event.auth_events:
+ if e_id not in known_ids:
+ e = yield self.store.get_event(
+ e_id,
+ allow_none=True,
+ )
+
+ if not e:
+ # TODO: Do some conflict res to make sure that we're
+ # not the ones who are wrong.
+ logger.info(
+ "Rejecting %s as %s not in %s",
+ event.event_id, e_id, known_ids,
+ )
+ raise AuthError(403, "Auth events are stale")
+
+ auth_events = event.old_state_events
+ else:
+ # We need to get the auth events from somewhere.
+
+ # TODO: Don't just hit the DBs?
+
+ auth_events = {}
+ for e_id, _ in event.auth_events:
+ e = yield self.store.get_event(
+ e_id,
+ allow_none=True,
+ )
+
+ if not e:
+ e = yield self.replication_layer.get_pdu(
+ event.origin, e_id, outlier=True
+ )
+
+ if e and fetch_missing:
+ try:
+ yield self.on_receive_pdu(event.origin, e, False)
+ except:
+ logger.exception(
+ "Failed to parse auth event %s",
+ e_id,
+ )
+
+ if not e:
+ logger.warn("Can't find auth event %s.", e_id)
+
+ auth_events[(e.type, e.state_key)] = e
+
+ if event.type == RoomMemberEvent.TYPE and not event.auth_events:
+ if len(event.prev_events) == 1:
+ c = yield self.store.get_event(event.prev_events[0][0])
+ if c.type == RoomCreateEvent.TYPE:
+ auth_events[(c.type, c.state_key)] = c
+
+ self.auth.check(event, auth_events=auth_events)
+
+ yield self.store.persist_event(
+ event,
+ backfilled=backfilled,
+ is_new_state=(is_new_state and not backfilled),
+ current_state=current_state,
+ )
|