diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index d53cd3df3e..15adc9dc2c 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -78,7 +78,7 @@ class BaseHandler(object):
if not suppress_auth:
logger.debug("Authing...")
- self.auth.check(event, raises=True)
+ self.auth.check(event, auth_events=event.old_state_events)
logger.debug("Authed")
else:
logger.debug("Suppressed auth.")
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index af4e7d49c8..3b37e49e6f 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -17,7 +17,7 @@
from twisted.internet import defer
from ._base import BaseHandler
-from synapse.api.errors import SynapseError
+from synapse.api.errors import SynapseError, Codes, CodeMessageException
from synapse.api.events.room import RoomAliasesEvent
import logging
@@ -84,22 +84,32 @@ class DirectoryHandler(BaseHandler):
room_id = result.room_id
servers = result.servers
else:
- result = yield self.federation.make_query(
- destination=room_alias.domain,
- query_type="directory",
- args={
- "room_alias": room_alias.to_string(),
- },
- retry_on_dns_fail=False,
- )
+ try:
+ result = yield self.federation.make_query(
+ destination=room_alias.domain,
+ query_type="directory",
+ args={
+ "room_alias": room_alias.to_string(),
+ },
+ retry_on_dns_fail=False,
+ )
+ except CodeMessageException as e:
+ logging.warn("Error retrieving alias")
+ if e.code == 404:
+ result = None
+ else:
+ raise
if result and "room_id" in result and "servers" in result:
room_id = result["room_id"]
servers = result["servers"]
if not room_id:
- defer.returnValue({})
- return
+ raise SynapseError(
+ 404,
+ "Room alias %r not found" % (room_alias.to_string(),),
+ Codes.NOT_FOUND
+ )
extra_servers = yield self.store.get_joined_hosts_for_room(room_id)
servers = list(set(extra_servers) | set(servers))
@@ -129,7 +139,9 @@ class DirectoryHandler(BaseHandler):
})
else:
raise SynapseError(
- 404, "Room alias \"%s\" not found" % (room_alias,)
+ 404,
+ "Room alias %r not found" % (room_alias.to_string(),),
+ Codes.NOT_FOUND
)
@defer.inlineCallbacks
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index d59221a4fb..02202692d4 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -53,8 +53,12 @@ class EventStreamHandler(BaseHandler):
if auth_user not in self._streams_per_user:
self._streams_per_user[auth_user] = 0
if auth_user in self._stop_timer_per_user:
- self.clock.cancel_call_later(
- self._stop_timer_per_user.pop(auth_user))
+ try:
+ self.clock.cancel_call_later(
+ self._stop_timer_per_user.pop(auth_user)
+ )
+ except:
+ logger.exception("Failed to cancel event timer")
else:
yield self.distributor.fire(
"started_user_eventstream", auth_user
@@ -95,10 +99,12 @@ class EventStreamHandler(BaseHandler):
logger.debug(
"_later stopped_user_eventstream %s", auth_user
)
+
+ self._stop_timer_per_user.pop(auth_user, None)
+
yield self.distributor.fire(
"stopped_user_eventstream", auth_user
)
- del self._stop_timer_per_user[auth_user]
logger.debug("Scheduling _later: for %s", auth_user)
self._stop_timer_per_user[auth_user] = (
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2e8b8a1f9a..925eb5376e 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -18,13 +18,16 @@
from ._base import BaseHandler
from synapse.api.events.utils import prune_event
-from synapse.api.errors import AuthError, FederationError, SynapseError
-from synapse.api.events.room import RoomMemberEvent
+from synapse.api.errors import (
+ AuthError, FederationError, SynapseError, StoreError,
+)
+from synapse.api.events.room import RoomMemberEvent, RoomCreateEvent
from synapse.api.constants import Membership
from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor
from synapse.crypto.event_signing import (
- compute_event_signature, check_event_content_hash
+ compute_event_signature, check_event_content_hash,
+ add_hashes_and_signatures,
)
from syutil.jsonutil import encode_canonical_json
@@ -98,7 +101,7 @@ class FederationHandler(BaseHandler):
@log_function
@defer.inlineCallbacks
- def on_receive_pdu(self, pdu, backfilled, state=None):
+ def on_receive_pdu(self, origin, pdu, backfilled, state=None):
""" Called by the ReplicationLayer when we have a new pdu. We need to
do auth checks and put it through the StateHandler.
"""
@@ -109,7 +112,7 @@ class FederationHandler(BaseHandler):
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
if event.room_id in self.room_queues:
- self.room_queues[event.room_id].append(pdu)
+ self.room_queues[event.room_id].append((pdu, origin))
return
logger.debug("Processing event: %s", event.event_id)
@@ -141,15 +144,62 @@ class FederationHandler(BaseHandler):
)
event = redacted_event
- is_new_state = yield self.state_handler.annotate_event_with_state(
- event,
- old_state=state
+ logger.debug("Event: %s", event)
+
+ # FIXME (erikj): Awful hack to make the case where we are not currently
+ # in the room work
+ current_state = None
+ is_in_room = yield self.auth.check_host_in_room(
+ event.room_id,
+ self.server_name
)
+ if not is_in_room and not event.outlier:
+ logger.debug("Got event for room we're not in.")
+
+ replication_layer = self.replication_layer
+ auth_chain = yield replication_layer.get_event_auth(
+ origin,
+ context=event.room_id,
+ event_id=event.event_id,
+ )
- logger.debug("Event: %s", event)
+ for e in auth_chain:
+ e.outlier = True
+ try:
+ yield self._handle_new_event(e, fetch_missing=False)
+ except:
+ logger.exception(
+ "Failed to parse auth event %s",
+ e.event_id,
+ )
+
+ if not state:
+ state = yield replication_layer.get_state_for_context(
+ origin,
+ context=event.room_id,
+ event_id=event.event_id,
+ )
+
+ current_state = state
+
+ if state:
+ for e in state:
+ e.outlier = True
+ try:
+ yield self._handle_new_event(e)
+ except:
+ logger.exception(
+ "Failed to parse state event %s",
+ e.event_id,
+ )
try:
- self.auth.check(event, raises=True)
+ yield self._handle_new_event(
+ event,
+ state=state,
+ backfilled=backfilled,
+ current_state=current_state,
+ )
except AuthError as e:
raise FederationError(
"ERROR",
@@ -158,43 +208,17 @@ class FederationHandler(BaseHandler):
affected=event.event_id,
)
- is_new_state = is_new_state and not backfilled
-
- # TODO: Implement something in federation that allows us to
- # respond to PDU.
-
- yield self.store.persist_event(
- event,
- backfilled,
- is_new_state=is_new_state
- )
-
room = yield self.store.get_room(event.room_id)
if not room:
- # Huh, let's try and get the current state
try:
- yield self.replication_layer.get_state_for_context(
- event.origin, event.room_id, event.event_id,
- )
-
- hosts = yield self.store.get_joined_hosts_for_room(
- event.room_id
- )
- if self.hs.hostname in hosts:
- try:
- yield self.store.store_room(
- room_id=event.room_id,
- room_creator_user_id="",
- is_public=False,
- )
- except:
- pass
- except:
- logger.exception(
- "Failed to get current state for room %s",
- event.room_id
+ yield self.store.store_room(
+ room_id=event.room_id,
+ room_creator_user_id="",
+ is_public=False,
)
+ except StoreError:
+ logger.exception("Failed to store room.")
if not backfilled:
extra_users = []
@@ -255,11 +279,23 @@ class FederationHandler(BaseHandler):
pdu=event
)
+
+
defer.returnValue(pdu)
@defer.inlineCallbacks
def on_event_auth(self, event_id):
auth = yield self.store.get_auth_chain(event_id)
+
+ for event in auth:
+ event.signatures.update(
+ compute_event_signature(
+ event,
+ self.hs.hostname,
+ self.hs.config.signing_key[0]
+ )
+ )
+
defer.returnValue([e for e in auth])
@log_function
@@ -276,6 +312,8 @@ class FederationHandler(BaseHandler):
We suspend processing of any received events from this room until we
have finished processing the join.
"""
+ logger.debug("Joining %s to %s", joinee, room_id)
+
pdu = yield self.replication_layer.make_join(
target_host,
room_id,
@@ -298,19 +336,29 @@ class FederationHandler(BaseHandler):
try:
event.event_id = self.event_factory.create_event_id()
+ event.origin = self.hs.hostname
event.content = content
- state = yield self.replication_layer.send_join(
+ if not hasattr(event, "signatures"):
+ event.signatures = {}
+
+ add_hashes_and_signatures(
+ event,
+ self.hs.hostname,
+ self.hs.config.signing_key[0],
+ )
+
+ ret = yield self.replication_layer.send_join(
target_host,
event
)
- logger.debug("do_invite_join state: %s", state)
+ state = ret["state"]
+ auth_chain = ret["auth_chain"]
+ auth_chain.sort(key=lambda e: e.depth)
- yield self.state_handler.annotate_event_with_state(
- event,
- old_state=state
- )
+ logger.debug("do_invite_join auth_chain: %s", auth_chain)
+ logger.debug("do_invite_join state: %s", state)
logger.debug("do_invite_join event: %s", event)
@@ -324,34 +372,50 @@ class FederationHandler(BaseHandler):
# FIXME
pass
+ for e in auth_chain:
+ e.outlier = True
+ try:
+ yield self._handle_new_event(e, fetch_missing=False)
+ except:
+ logger.exception(
+ "Failed to parse auth event %s",
+ e.event_id,
+ )
+
for e in state:
# FIXME: Auth these.
e.outlier = True
+ try:
+ yield self._handle_new_event(
+ e,
+ fetch_missing=True
+ )
+ except:
+ logger.exception(
+ "Failed to parse state event %s",
+ e.event_id,
+ )
- yield self.state_handler.annotate_event_with_state(
- e,
- )
-
- yield self.store.persist_event(
- e,
- backfilled=False,
- is_new_state=True
- )
-
- yield self.store.persist_event(
+ yield self._handle_new_event(
event,
- backfilled=False,
- is_new_state=True
+ state=state,
+ current_state=state,
)
+
+ yield self.notifier.on_new_room_event(
+ event, extra_users=[joinee]
+ )
+
+ logger.debug("Finished joining %s to %s", joinee, room_id)
finally:
room_queue = self.room_queues[room_id]
del self.room_queues[room_id]
- for p in room_queue:
+ for p, origin in room_queue:
try:
- yield self.on_receive_pdu(p, backfilled=False)
+ self.on_receive_pdu(origin, p, backfilled=False)
except:
- pass
+ logger.exception("Couldn't handle pdu")
defer.returnValue(True)
@@ -375,7 +439,7 @@ class FederationHandler(BaseHandler):
yield self.state_handler.annotate_event_with_state(event)
yield self.auth.add_auth_events(event)
- self.auth.check(event, raises=True)
+ self.auth.check(event, auth_events=event.old_state_events)
pdu = event
@@ -391,17 +455,7 @@ class FederationHandler(BaseHandler):
event.outlier = False
- state_handler = self.state_handler
- is_new_state = yield state_handler.annotate_event_with_state(event)
- self.auth.check(event, raises=True)
-
- # FIXME (erikj): All this is duplicated above :(
-
- yield self.store.persist_event(
- event,
- backfilled=False,
- is_new_state=is_new_state
- )
+ yield self._handle_new_event(event)
extra_users = []
if event.type == RoomMemberEvent.TYPE:
@@ -414,7 +468,7 @@ class FederationHandler(BaseHandler):
)
if event.type == RoomMemberEvent.TYPE:
- if event.membership == Membership.JOIN:
+ if event.content["membership"] == Membership.JOIN:
user = self.hs.parse_userid(event.state_key)
yield self.distributor.fire(
"user_joined_room", user=user, room_id=event.room_id
@@ -508,7 +562,17 @@ class FederationHandler(BaseHandler):
else:
del results[(event.type, event.state_key)]
- defer.returnValue(results.values())
+ res = results.values()
+ for event in res:
+ event.signatures.update(
+ compute_event_signature(
+ event,
+ self.hs.hostname,
+ self.hs.config.signing_key[0]
+ )
+ )
+
+ defer.returnValue(res)
else:
defer.returnValue([])
@@ -529,7 +593,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
- def get_persisted_pdu(self, origin, event_id):
+ def get_persisted_pdu(self, origin, event_id, do_auth=True):
""" Get a PDU from the database with given origin and id.
Returns:
@@ -541,12 +605,24 @@ class FederationHandler(BaseHandler):
)
if event:
- in_room = yield self.auth.check_host_in_room(
- event.room_id,
- origin
+ # FIXME: This is a temporary work around where we occasionally
+ # return events slightly differently than when they were
+ # originally signed
+ event.signatures.update(
+ compute_event_signature(
+ event,
+ self.hs.hostname,
+ self.hs.config.signing_key[0]
+ )
)
- if not in_room:
- raise AuthError(403, "Host not in room.")
+
+ if do_auth:
+ in_room = yield self.auth.check_host_in_room(
+ event.room_id,
+ origin
+ )
+ if not in_room:
+ raise AuthError(403, "Host not in room.")
defer.returnValue(event)
else:
@@ -564,3 +640,78 @@ class FederationHandler(BaseHandler):
)
while waiters:
waiters.pop().callback(None)
+
+ @defer.inlineCallbacks
+ def _handle_new_event(self, event, state=None, backfilled=False,
+ current_state=None, fetch_missing=True):
+ is_new_state = yield self.state_handler.annotate_event_with_state(
+ event,
+ old_state=state
+ )
+
+ if event.old_state_events:
+ known_ids = set(
+ [s.event_id for s in event.old_state_events.values()]
+ )
+ for e_id, _ in event.auth_events:
+ if e_id not in known_ids:
+ e = yield self.store.get_event(
+ e_id,
+ allow_none=True,
+ )
+
+ if not e:
+ # TODO: Do some conflict res to make sure that we're
+ # not the ones who are wrong.
+ logger.info(
+ "Rejecting %s as %s not in %s",
+ event.event_id, e_id, known_ids,
+ )
+ raise AuthError(403, "Auth events are stale")
+
+ auth_events = event.old_state_events
+ else:
+ # We need to get the auth events from somewhere.
+
+ # TODO: Don't just hit the DBs?
+
+ auth_events = {}
+ for e_id, _ in event.auth_events:
+ e = yield self.store.get_event(
+ e_id,
+ allow_none=True,
+ )
+
+ if not e:
+ e = yield self.replication_layer.get_pdu(
+ event.origin, e_id, outlier=True
+ )
+
+ if e and fetch_missing:
+ try:
+ yield self.on_receive_pdu(event.origin, e, False)
+ except:
+ logger.exception(
+ "Failed to parse auth event %s",
+ e_id,
+ )
+
+ if not e:
+ logger.warn("Can't find auth event %s.", e_id)
+
+ auth_events[(e.type, e.state_key)] = e
+
+ if event.type == RoomMemberEvent.TYPE and not event.auth_events:
+ if len(event.prev_events) == 1:
+ c = yield self.store.get_event(event.prev_events[0][0])
+ if c.type == RoomCreateEvent.TYPE:
+ auth_events[(c.type, c.state_key)] = c
+
+ self.auth.check(event, auth_events=auth_events)
+
+ yield self.store.persist_event(
+ event,
+ backfilled=backfilled,
+ is_new_state=(is_new_state and not backfilled),
+ current_state=current_state,
+ )
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 06a4e173f6..42dc4d46f3 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -243,7 +243,7 @@ class MessageHandler(BaseHandler):
public_room_ids = [r["room_id"] for r in public_rooms]
limit = pagin_config.limit
- if not limit:
+ if limit is None:
limit = 10
for event in room_list:
@@ -306,7 +306,7 @@ class MessageHandler(BaseHandler):
auth_user = self.hs.parse_userid(user_id)
# TODO: These concurrently
- state_tuples = yield self.store.get_current_state(room_id)
+ state_tuples = yield self.state_handler.get_current_state(room_id)
state = [self.hs.serialize_event(x) for x in state_tuples]
member_event = (yield self.store.get_room_member(
|