diff --git a/CHANGES.rst b/CHANGES.rst
index 207f1e4d74..6779a36f72 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,3 +1,11 @@
+Changes in synapse 0.5.3 (2014-11-27)
+=====================================
+
+ * Fix bug that caused joining a remote room to fail if a single event was not
+ signed correctly.
+ * Fix bug which caused servers to continuously try and fetch events from other
+ servers.
+
Changes in synapse 0.5.2 (2014-11-26)
=====================================
diff --git a/VERSION b/VERSION
index cb0c939a93..be14282b7f 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.5.2
+0.5.3
diff --git a/synapse/__init__.py b/synapse/__init__.py
index d5c2f25484..c99cd96537 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
""" This is a reference implementation of a synapse home server.
"""
-__version__ = "0.5.2"
+__version__ = "0.5.3"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index fb911e51a6..2b0475543d 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -202,7 +202,10 @@ class Auth(object):
# Invites are valid iff caller is in the room and target isn't.
if not caller_in_room: # caller isn't joined
- raise AuthError(403, "You are not in room %s." % event.room_id)
+ raise AuthError(
+ 403,
+ "%s not in room %s." % (event.user_id, event.room_id,)
+ )
elif target_in_room: # the target is already in the room.
raise AuthError(403, "%s is already in the room." %
target_user_id)
@@ -225,7 +228,10 @@ class Auth(object):
# TODO (erikj): Implement kicks.
if not caller_in_room: # trying to leave a room you aren't joined
- raise AuthError(403, "You are not in room %s." % event.room_id)
+ raise AuthError(
+ 403,
+ "%s not in room %s." % (target_user_id, event.room_id,)
+ )
elif target_user_id != event.user_id:
if kick_level:
kick_level = int(kick_level)
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 6bfb30b42d..312d69fcaa 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -283,6 +283,22 @@ class ReplicationLayer(object):
@defer.inlineCallbacks
@log_function
+ def get_event_auth(self, destination, context, event_id):
+ res = yield self.transport_layer.get_event_auth(
+ destination, context, event_id,
+ )
+
+ auth_chain = [
+ self.event_from_pdu_json(p, outlier=True)
+ for p in res["auth_chain"]
+ ]
+
+ auth_chain.sort(key=lambda e: e.depth)
+
+ defer.returnValue(auth_chain)
+
+ @defer.inlineCallbacks
+ @log_function
def on_backfill_request(self, origin, context, versions, limit):
pdus = yield self.handler.on_backfill_request(
origin, context, versions, limit
@@ -549,34 +565,34 @@ class ReplicationLayer(object):
state = None
# We need to make sure we have all the auth events.
- for e_id, _ in pdu.auth_events:
- exists = yield self._get_persisted_pdu(
- origin,
- e_id,
- do_auth=False
- )
-
- if not exists:
- try:
- logger.debug(
- "_handle_new_pdu fetch missing auth event %s from %s",
- e_id,
- origin,
- )
-
- yield self.get_pdu(
- origin,
- event_id=e_id,
- outlier=True,
- )
-
- logger.debug("Processed pdu %s", e_id)
- except:
- logger.warn(
- "Failed to get auth event %s from %s",
- e_id,
- origin
- )
+ # for e_id, _ in pdu.auth_events:
+ # exists = yield self._get_persisted_pdu(
+ # origin,
+ # e_id,
+ # do_auth=False
+ # )
+ #
+ # if not exists:
+ # try:
+ # logger.debug(
+ # "_handle_new_pdu fetch missing auth event %s from %s",
+ # e_id,
+ # origin,
+ # )
+ #
+ # yield self.get_pdu(
+ # origin,
+ # event_id=e_id,
+ # outlier=True,
+ # )
+ #
+ # logger.debug("Processed pdu %s", e_id)
+ # except:
+ # logger.warn(
+ # "Failed to get auth event %s from %s",
+ # e_id,
+ # origin
+ # )
# Get missing pdus if necessary.
if not pdu.outlier:
@@ -626,6 +642,7 @@ class ReplicationLayer(object):
if not backfilled:
ret = yield self.handler.on_receive_pdu(
+ origin,
pdu,
backfilled=backfilled,
state=state,
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index d59221a4fb..02202692d4 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -53,8 +53,12 @@ class EventStreamHandler(BaseHandler):
if auth_user not in self._streams_per_user:
self._streams_per_user[auth_user] = 0
if auth_user in self._stop_timer_per_user:
- self.clock.cancel_call_later(
- self._stop_timer_per_user.pop(auth_user))
+ try:
+ self.clock.cancel_call_later(
+ self._stop_timer_per_user.pop(auth_user)
+ )
+ except:
+ logger.exception("Failed to cancel event timer")
else:
yield self.distributor.fire(
"started_user_eventstream", auth_user
@@ -95,10 +99,12 @@ class EventStreamHandler(BaseHandler):
logger.debug(
"_later stopped_user_eventstream %s", auth_user
)
+
+ self._stop_timer_per_user.pop(auth_user, None)
+
yield self.distributor.fire(
"stopped_user_eventstream", auth_user
)
- del self._stop_timer_per_user[auth_user]
logger.debug("Scheduling _later: for %s", auth_user)
self._stop_timer_per_user[auth_user] = (
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index fcef602055..925eb5376e 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -101,7 +101,7 @@ class FederationHandler(BaseHandler):
@log_function
@defer.inlineCallbacks
- def on_receive_pdu(self, pdu, backfilled, state=None):
+ def on_receive_pdu(self, origin, pdu, backfilled, state=None):
""" Called by the ReplicationLayer when we have a new pdu. We need to
do auth checks and put it through the StateHandler.
"""
@@ -112,7 +112,7 @@ class FederationHandler(BaseHandler):
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
if event.room_id in self.room_queues:
- self.room_queues[event.room_id].append(pdu)
+ self.room_queues[event.room_id].append((pdu, origin))
return
logger.debug("Processing event: %s", event.event_id)
@@ -149,14 +149,49 @@ class FederationHandler(BaseHandler):
# FIXME (erikj): Awful hack to make the case where we are not currently
# in the room work
current_state = None
- if state:
- is_in_room = yield self.auth.check_host_in_room(
- event.room_id,
- self.server_name
+ is_in_room = yield self.auth.check_host_in_room(
+ event.room_id,
+ self.server_name
+ )
+ if not is_in_room and not event.outlier:
+ logger.debug("Got event for room we're not in.")
+
+ replication_layer = self.replication_layer
+ auth_chain = yield replication_layer.get_event_auth(
+ origin,
+ context=event.room_id,
+ event_id=event.event_id,
)
- if not is_in_room:
- logger.debug("Got event for room we're not in.")
- current_state = state
+
+ for e in auth_chain:
+ e.outlier = True
+ try:
+ yield self._handle_new_event(e, fetch_missing=False)
+ except:
+ logger.exception(
+ "Failed to parse auth event %s",
+ e.event_id,
+ )
+
+ if not state:
+ state = yield replication_layer.get_state_for_context(
+ origin,
+ context=event.room_id,
+ event_id=event.event_id,
+ )
+
+ current_state = state
+
+ if state:
+ for e in state:
+ e.outlier = True
+ try:
+ yield self._handle_new_event(e)
+ except:
+ logger.exception(
+ "Failed to parse state event %s",
+ e.event_id,
+ )
try:
yield self._handle_new_event(
@@ -251,6 +286,16 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def on_event_auth(self, event_id):
auth = yield self.store.get_auth_chain(event_id)
+
+ for event in auth:
+ event.signatures.update(
+ compute_event_signature(
+ event,
+ self.hs.hostname,
+ self.hs.config.signing_key[0]
+ )
+ )
+
defer.returnValue([e for e in auth])
@log_function
@@ -310,6 +355,7 @@ class FederationHandler(BaseHandler):
state = ret["state"]
auth_chain = ret["auth_chain"]
+ auth_chain.sort(key=lambda e: e.depth)
logger.debug("do_invite_join auth_chain: %s", auth_chain)
logger.debug("do_invite_join state: %s", state)
@@ -328,23 +374,32 @@ class FederationHandler(BaseHandler):
for e in auth_chain:
e.outlier = True
- yield self._handle_new_event(e)
- yield self.notifier.on_new_room_event(
- e, extra_users=[joinee]
- )
+ try:
+ yield self._handle_new_event(e, fetch_missing=False)
+ except:
+ logger.exception(
+ "Failed to parse auth event %s",
+ e.event_id,
+ )
for e in state:
# FIXME: Auth these.
e.outlier = True
- yield self._handle_new_event(e)
- yield self.notifier.on_new_room_event(
- e, extra_users=[joinee]
- )
+ try:
+ yield self._handle_new_event(
+ e,
+ fetch_missing=True
+ )
+ except:
+ logger.exception(
+ "Failed to parse state event %s",
+ e.event_id,
+ )
yield self._handle_new_event(
event,
state=state,
- current_state=state
+ current_state=state,
)
yield self.notifier.on_new_room_event(
@@ -356,9 +411,9 @@ class FederationHandler(BaseHandler):
room_queue = self.room_queues[room_id]
del self.room_queues[room_id]
- for p in room_queue:
+ for p, origin in room_queue:
try:
- self.on_receive_pdu(p, backfilled=False)
+ self.on_receive_pdu(origin, p, backfilled=False)
except:
logger.exception("Couldn't handle pdu")
@@ -507,7 +562,17 @@ class FederationHandler(BaseHandler):
else:
del results[(event.type, event.state_key)]
- defer.returnValue(results.values())
+ res = results.values()
+ for event in res:
+ event.signatures.update(
+ compute_event_signature(
+ event,
+ self.hs.hostname,
+ self.hs.config.signing_key[0]
+ )
+ )
+
+ defer.returnValue(res)
else:
defer.returnValue([])
@@ -540,6 +605,17 @@ class FederationHandler(BaseHandler):
)
if event:
+ # FIXME: This is a temporary work around where we occasionally
+ # return events slightly differently than when they were
+ # originally signed
+ event.signatures.update(
+ compute_event_signature(
+ event,
+ self.hs.hostname,
+ self.hs.config.signing_key[0]
+ )
+ )
+
if do_auth:
in_room = yield self.auth.check_host_in_room(
event.room_id,
@@ -567,11 +643,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def _handle_new_event(self, event, state=None, backfilled=False,
- current_state=None):
- if state:
- for s in state:
- yield self._handle_new_event(s)
-
+ current_state=None, fetch_missing=True):
is_new_state = yield self.state_handler.annotate_event_with_state(
event,
old_state=state
@@ -611,11 +683,22 @@ class FederationHandler(BaseHandler):
)
if not e:
- raise AuthError(
- 403,
- "Can't find auth event %s." % (e_id, )
+ e = yield self.replication_layer.get_pdu(
+ event.origin, e_id, outlier=True
)
+ if e and fetch_missing:
+ try:
+ yield self.on_receive_pdu(event.origin, e, False)
+ except:
+ logger.exception(
+ "Failed to parse auth event %s",
+ e_id,
+ )
+
+ if not e:
+ logger.warn("Can't find auth event %s.", e_id)
+
auth_events[(e.type, e.state_key)] = e
if event.type == RoomMemberEvent.TYPE and not event.auth_events:
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index b84735e61c..3405cb365e 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -283,7 +283,7 @@ class StreamStore(SQLBaseStore):
sql = (
"SELECT *, (%(redacted)s) AS redacted FROM events "
- "WHERE room_id = ? AND stream_ordering <= ? "
+ "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 "
"ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
) % {
"redacted": del_sql,
diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py
index 98cfbe50b3..33016c16ef 100644
--- a/tests/handlers/test_federation.py
+++ b/tests/handlers/test_federation.py
@@ -42,6 +42,7 @@ class FederationTestCase(unittest.TestCase):
self.auth = NonCallableMock(spec_set=[
"check",
+ "check_host_in_room",
])
self.hostname = "test"
@@ -89,13 +90,16 @@ class FederationTestCase(unittest.TestCase):
self.datastore.persist_event.return_value = defer.succeed(None)
self.datastore.get_room.return_value = defer.succeed(True)
+ self.auth.check_host_in_room.return_value = defer.succeed(True)
def annotate(ev, old_state=None):
ev.old_state_events = []
return defer.succeed(False)
self.state_handler.annotate_event_with_state.side_effect = annotate
- yield self.handlers.federation_handler.on_receive_pdu(pdu, False)
+ yield self.handlers.federation_handler.on_receive_pdu(
+ "fo", pdu, False
+ )
self.datastore.persist_event.assert_called_once_with(
ANY, is_new_state=False, backfilled=False, current_state=None
|