diff --git a/synapse/__init__.py b/synapse/__init__.py
index 14564e735e..658574dab9 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
""" This is a reference implementation of a synapse home server.
"""
-__version__ = "0.5.0"
+__version__ = "0.5.3a"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index cbf3ae0ca4..2b0475543d 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -38,79 +38,66 @@ class Auth(object):
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
- def check(self, event, raises=False):
+ def check(self, event, auth_events):
""" Checks if this event is correctly authed.
Returns:
True if the auth checks pass.
- Raises:
- AuthError if there was a problem authorising this event. This will
- be raised only if raises=True.
"""
try:
- if hasattr(event, "room_id"):
- if event.old_state_events is None:
- # Oh, we don't know what the state of the room was, so we
- # are trusting that this is allowed (at least for now)
- logger.warn("Trusting event: %s", event.event_id)
- return True
-
- if hasattr(event, "outlier") and event.outlier is True:
- # TODO (erikj): Auth for outliers is done differently.
- return True
+ if not hasattr(event, "room_id"):
+ raise AuthError(500, "Event has no room_id: %s" % event)
+ if auth_events is None:
+ # Oh, we don't know what the state of the room was, so we
+ # are trusting that this is allowed (at least for now)
+ logger.warn("Trusting event: %s", event.event_id)
+ return True
- if event.type == RoomCreateEvent.TYPE:
- # FIXME
- return True
+ if event.type == RoomCreateEvent.TYPE:
+ # FIXME
+ return True
- # FIXME: Temp hack
- if event.type == RoomAliasesEvent.TYPE:
- return True
+ # FIXME: Temp hack
+ if event.type == RoomAliasesEvent.TYPE:
+ return True
- if event.type == RoomMemberEvent.TYPE:
- allowed = self.is_membership_change_allowed(event)
- if allowed:
- logger.debug("Allowing! %s", event)
- else:
- logger.debug("Denying! %s", event)
- return allowed
+ if event.type == RoomMemberEvent.TYPE:
+ allowed = self.is_membership_change_allowed(
+ event, auth_events
+ )
+ if allowed:
+ logger.debug("Allowing! %s", event)
+ else:
+ logger.debug("Denying! %s", event)
+ return allowed
- self.check_event_sender_in_room(event)
- self._can_send_event(event)
+ self.check_event_sender_in_room(event, auth_events)
+ self._can_send_event(event, auth_events)
- if event.type == RoomPowerLevelsEvent.TYPE:
- self._check_power_levels(event)
+ if event.type == RoomPowerLevelsEvent.TYPE:
+ self._check_power_levels(event, auth_events)
- if event.type == RoomRedactionEvent.TYPE:
- self._check_redaction(event)
+ if event.type == RoomRedactionEvent.TYPE:
+ self._check_redaction(event, auth_events)
- logger.debug("Allowing! %s", event)
- return True
- else:
- raise AuthError(500, "Unknown event: %s" % event)
+ logger.debug("Allowing! %s", event)
except AuthError as e:
logger.info(
"Event auth check failed on event %s with msg: %s",
event, e.msg
)
logger.info("Denying! %s", event)
- if raises:
- raise
-
- return False
+ raise
@defer.inlineCallbacks
def check_joined_room(self, room_id, user_id):
- try:
- member = yield self.store.get_room_member(
- room_id=room_id,
- user_id=user_id
- )
- self._check_joined_room(member, user_id, room_id)
- defer.returnValue(member)
- except AttributeError:
- pass
- defer.returnValue(None)
+ member = yield self.state.get_current_state(
+ room_id=room_id,
+ event_type=RoomMemberEvent.TYPE,
+ state_key=user_id
+ )
+ self._check_joined_room(member, user_id, room_id)
+ defer.returnValue(member)
@defer.inlineCallbacks
def check_host_in_room(self, room_id, host):
@@ -130,9 +117,9 @@ class Auth(object):
defer.returnValue(False)
- def check_event_sender_in_room(self, event):
+ def check_event_sender_in_room(self, event, auth_events):
key = (RoomMemberEvent.TYPE, event.user_id, )
- member_event = event.state_events.get(key)
+ member_event = auth_events.get(key)
return self._check_joined_room(
member_event,
@@ -147,15 +134,15 @@ class Auth(object):
))
@log_function
- def is_membership_change_allowed(self, event):
+ def is_membership_change_allowed(self, event, auth_events):
membership = event.content["membership"]
# Check if this is the room creator joining:
if len(event.prev_events) == 1 and Membership.JOIN == membership:
# Get room creation event:
key = (RoomCreateEvent.TYPE, "", )
- create = event.old_state_events.get(key)
- if event.prev_events[0][0] == create.event_id:
+ create = auth_events.get(key)
+ if create and event.prev_events[0][0] == create.event_id:
if create.content["creator"] == event.state_key:
return True
@@ -163,19 +150,19 @@ class Auth(object):
# get info about the caller
key = (RoomMemberEvent.TYPE, event.user_id, )
- caller = event.old_state_events.get(key)
+ caller = auth_events.get(key)
caller_in_room = caller and caller.membership == Membership.JOIN
caller_invited = caller and caller.membership == Membership.INVITE
# get info about the target
key = (RoomMemberEvent.TYPE, target_user_id, )
- target = event.old_state_events.get(key)
+ target = auth_events.get(key)
target_in_room = target and target.membership == Membership.JOIN
key = (RoomJoinRulesEvent.TYPE, "", )
- join_rule_event = event.old_state_events.get(key)
+ join_rule_event = auth_events.get(key)
if join_rule_event:
join_rule = join_rule_event.content.get(
"join_rule", JoinRules.INVITE
@@ -186,11 +173,13 @@ class Auth(object):
user_level = self._get_power_level_from_event_state(
event,
event.user_id,
+ auth_events,
)
ban_level, kick_level, redact_level = (
self._get_ops_level_from_event_state(
- event
+ event,
+ auth_events,
)
)
@@ -213,7 +202,10 @@ class Auth(object):
# Invites are valid iff caller is in the room and target isn't.
if not caller_in_room: # caller isn't joined
- raise AuthError(403, "You are not in room %s." % event.room_id)
+ raise AuthError(
+ 403,
+ "%s not in room %s." % (event.user_id, event.room_id,)
+ )
elif target_in_room: # the target is already in the room.
raise AuthError(403, "%s is already in the room." %
target_user_id)
@@ -236,7 +228,10 @@ class Auth(object):
# TODO (erikj): Implement kicks.
if not caller_in_room: # trying to leave a room you aren't joined
- raise AuthError(403, "You are not in room %s." % event.room_id)
+ raise AuthError(
+ 403,
+ "%s not in room %s." % (target_user_id, event.room_id,)
+ )
elif target_user_id != event.user_id:
if kick_level:
kick_level = int(kick_level)
@@ -260,9 +255,9 @@ class Auth(object):
return True
- def _get_power_level_from_event_state(self, event, user_id):
+ def _get_power_level_from_event_state(self, event, user_id, auth_events):
key = (RoomPowerLevelsEvent.TYPE, "", )
- power_level_event = event.old_state_events.get(key)
+ power_level_event = auth_events.get(key)
level = None
if power_level_event:
level = power_level_event.content.get("users", {}).get(user_id)
@@ -270,16 +265,16 @@ class Auth(object):
level = power_level_event.content.get("users_default", 0)
else:
key = (RoomCreateEvent.TYPE, "", )
- create_event = event.old_state_events.get(key)
+ create_event = auth_events.get(key)
if (create_event is not None and
create_event.content["creator"] == user_id):
return 100
return level
- def _get_ops_level_from_event_state(self, event):
+ def _get_ops_level_from_event_state(self, event, auth_events):
key = (RoomPowerLevelsEvent.TYPE, "", )
- power_level_event = event.old_state_events.get(key)
+ power_level_event = auth_events.get(key)
if power_level_event:
return (
@@ -375,6 +370,11 @@ class Auth(object):
key = (RoomMemberEvent.TYPE, event.user_id, )
member_event = event.old_state_events.get(key)
+ key = (RoomCreateEvent.TYPE, "", )
+ create_event = event.old_state_events.get(key)
+ if create_event:
+ auth_events.append(create_event.event_id)
+
if join_rule_event:
join_rule = join_rule_event.content.get("join_rule")
is_public = join_rule == JoinRules.PUBLIC if join_rule else False
@@ -406,9 +406,9 @@ class Auth(object):
event.auth_events = zip(auth_events, hashes)
@log_function
- def _can_send_event(self, event):
+ def _can_send_event(self, event, auth_events):
key = (RoomPowerLevelsEvent.TYPE, "", )
- send_level_event = event.old_state_events.get(key)
+ send_level_event = auth_events.get(key)
send_level = None
if send_level_event:
send_level = send_level_event.content.get("events", {}).get(
@@ -432,6 +432,7 @@ class Auth(object):
user_level = self._get_power_level_from_event_state(
event,
event.user_id,
+ auth_events,
)
if user_level:
@@ -468,14 +469,16 @@ class Auth(object):
return True
- def _check_redaction(self, event):
+ def _check_redaction(self, event, auth_events):
user_level = self._get_power_level_from_event_state(
event,
event.user_id,
+ auth_events,
)
_, _, redact_level = self._get_ops_level_from_event_state(
- event
+ event,
+ auth_events,
)
if user_level < redact_level:
@@ -484,7 +487,7 @@ class Auth(object):
"You don't have permission to redact events"
)
- def _check_power_levels(self, event):
+ def _check_power_levels(self, event, auth_events):
user_list = event.content.get("users", {})
# Validate users
for k, v in user_list.items():
@@ -499,7 +502,7 @@ class Auth(object):
raise SynapseError(400, "Not a valid power level: %s" % (v,))
key = (event.type, event.state_key, )
- current_state = event.old_state_events.get(key)
+ current_state = auth_events.get(key)
if not current_state:
return
@@ -507,6 +510,7 @@ class Auth(object):
user_level = self._get_power_level_from_event_state(
event,
event.user_id,
+ auth_events,
)
# Check other levels:
diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py
index 8a35b4cb7d..22939d011a 100644
--- a/synapse/api/events/__init__.py
+++ b/synapse/api/events/__init__.py
@@ -125,6 +125,7 @@ class SynapseEvent(JsonEncodedObject):
pdu_json.pop("outlier", None)
pdu_json.pop("replaces_state", None)
pdu_json.pop("redacted", None)
+ pdu_json.pop("prev_content", None)
state_hash = pdu_json.pop("state_hash", None)
if state_hash is not None:
pdu_json.setdefault("unsigned", {})["state_hash"] = state_hash
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 124dc31225..312d69fcaa 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -283,6 +283,22 @@ class ReplicationLayer(object):
@defer.inlineCallbacks
@log_function
+ def get_event_auth(self, destination, context, event_id):
+ res = yield self.transport_layer.get_event_auth(
+ destination, context, event_id,
+ )
+
+ auth_chain = [
+ self.event_from_pdu_json(p, outlier=True)
+ for p in res["auth_chain"]
+ ]
+
+ auth_chain.sort(key=lambda e: e.depth)
+
+ defer.returnValue(auth_chain)
+
+ @defer.inlineCallbacks
+ @log_function
def on_backfill_request(self, origin, context, versions, limit):
pdus = yield self.handler.on_backfill_request(
origin, context, versions, limit
@@ -481,11 +497,17 @@ class ReplicationLayer(object):
# FIXME: We probably want to do something with the auth_chain given
# to us
- # auth_chain = [
- # Pdu(outlier=True, **p) for p in content.get("auth_chain", [])
- # ]
+ auth_chain = [
+ self.event_from_pdu_json(p, outlier=True)
+ for p in content.get("auth_chain", [])
+ ]
+
+ auth_chain.sort(key=lambda e: e.depth)
- defer.returnValue(state)
+ defer.returnValue({
+ "state": state,
+ "auth_chain": auth_chain,
+ })
@defer.inlineCallbacks
def send_invite(self, destination, context, event_id, pdu):
@@ -504,13 +526,15 @@ class ReplicationLayer(object):
defer.returnValue(self.event_from_pdu_json(pdu_dict))
@log_function
- def _get_persisted_pdu(self, origin, event_id):
+ def _get_persisted_pdu(self, origin, event_id, do_auth=True):
""" Get a PDU from the database with given origin and id.
Returns:
Deferred: Results in a `Pdu`.
"""
- return self.handler.get_persisted_pdu(origin, event_id)
+ return self.handler.get_persisted_pdu(
+ origin, event_id, do_auth=do_auth
+ )
def _transaction_from_pdus(self, pdu_list):
"""Returns a new Transaction containing the given PDUs suitable for
@@ -529,7 +553,9 @@ class ReplicationLayer(object):
@log_function
def _handle_new_pdu(self, origin, pdu, backfilled=False):
# We reprocess pdus when we have seen them only as outliers
- existing = yield self._get_persisted_pdu(origin, pdu.event_id)
+ existing = yield self._get_persisted_pdu(
+ origin, pdu.event_id, do_auth=False
+ )
if existing and (not existing.outlier or pdu.outlier):
logger.debug("Already seen pdu %s", pdu.event_id)
@@ -538,6 +564,36 @@ class ReplicationLayer(object):
state = None
+ # We need to make sure we have all the auth events.
+ # for e_id, _ in pdu.auth_events:
+ # exists = yield self._get_persisted_pdu(
+ # origin,
+ # e_id,
+ # do_auth=False
+ # )
+ #
+ # if not exists:
+ # try:
+ # logger.debug(
+ # "_handle_new_pdu fetch missing auth event %s from %s",
+ # e_id,
+ # origin,
+ # )
+ #
+ # yield self.get_pdu(
+ # origin,
+ # event_id=e_id,
+ # outlier=True,
+ # )
+ #
+ # logger.debug("Processed pdu %s", e_id)
+ # except:
+ # logger.warn(
+ # "Failed to get auth event %s from %s",
+ # e_id,
+ # origin
+ # )
+
# Get missing pdus if necessary.
if not pdu.outlier:
# We only backfill backwards to the min depth.
@@ -545,16 +601,28 @@ class ReplicationLayer(object):
pdu.room_id
)
+ logger.debug(
+ "_handle_new_pdu min_depth for %s: %d",
+ pdu.room_id, min_depth
+ )
+
if min_depth and pdu.depth > min_depth:
for event_id, hashes in pdu.prev_events:
- exists = yield self._get_persisted_pdu(origin, event_id)
+ exists = yield self._get_persisted_pdu(
+ origin,
+ event_id,
+ do_auth=False
+ )
if not exists:
- logger.debug("Requesting pdu %s", event_id)
+ logger.debug(
+ "_handle_new_pdu requesting pdu %s",
+ event_id
+ )
try:
yield self.get_pdu(
- pdu.origin,
+ origin,
event_id=event_id,
)
logger.debug("Processed pdu %s", event_id)
@@ -564,12 +632,17 @@ class ReplicationLayer(object):
else:
# We need to get the state at this event, since we have reached
# a backward extremity edge.
+ logger.debug(
+ "_handle_new_pdu getting state for %s",
+ pdu.room_id
+ )
state = yield self.get_state_for_context(
origin, pdu.room_id, pdu.event_id,
)
if not backfilled:
ret = yield self.handler.on_receive_pdu(
+ origin,
pdu,
backfilled=backfilled,
state=state,
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index d53cd3df3e..15adc9dc2c 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -78,7 +78,7 @@ class BaseHandler(object):
if not suppress_auth:
logger.debug("Authing...")
- self.auth.check(event, raises=True)
+ self.auth.check(event, auth_events=event.old_state_events)
logger.debug("Authed")
else:
logger.debug("Suppressed auth.")
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index af4e7d49c8..3b37e49e6f 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -17,7 +17,7 @@
from twisted.internet import defer
from ._base import BaseHandler
-from synapse.api.errors import SynapseError
+from synapse.api.errors import SynapseError, Codes, CodeMessageException
from synapse.api.events.room import RoomAliasesEvent
import logging
@@ -84,22 +84,32 @@ class DirectoryHandler(BaseHandler):
room_id = result.room_id
servers = result.servers
else:
- result = yield self.federation.make_query(
- destination=room_alias.domain,
- query_type="directory",
- args={
- "room_alias": room_alias.to_string(),
- },
- retry_on_dns_fail=False,
- )
+ try:
+ result = yield self.federation.make_query(
+ destination=room_alias.domain,
+ query_type="directory",
+ args={
+ "room_alias": room_alias.to_string(),
+ },
+ retry_on_dns_fail=False,
+ )
+ except CodeMessageException as e:
+ logging.warn("Error retrieving alias")
+ if e.code == 404:
+ result = None
+ else:
+ raise
if result and "room_id" in result and "servers" in result:
room_id = result["room_id"]
servers = result["servers"]
if not room_id:
- defer.returnValue({})
- return
+ raise SynapseError(
+ 404,
+ "Room alias %r not found" % (room_alias.to_string(),),
+ Codes.NOT_FOUND
+ )
extra_servers = yield self.store.get_joined_hosts_for_room(room_id)
servers = list(set(extra_servers) | set(servers))
@@ -129,7 +139,9 @@ class DirectoryHandler(BaseHandler):
})
else:
raise SynapseError(
- 404, "Room alias \"%s\" not found" % (room_alias,)
+ 404,
+ "Room alias %r not found" % (room_alias.to_string(),),
+ Codes.NOT_FOUND
)
@defer.inlineCallbacks
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index d59221a4fb..02202692d4 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -53,8 +53,12 @@ class EventStreamHandler(BaseHandler):
if auth_user not in self._streams_per_user:
self._streams_per_user[auth_user] = 0
if auth_user in self._stop_timer_per_user:
- self.clock.cancel_call_later(
- self._stop_timer_per_user.pop(auth_user))
+ try:
+ self.clock.cancel_call_later(
+ self._stop_timer_per_user.pop(auth_user)
+ )
+ except:
+ logger.exception("Failed to cancel event timer")
else:
yield self.distributor.fire(
"started_user_eventstream", auth_user
@@ -95,10 +99,12 @@ class EventStreamHandler(BaseHandler):
logger.debug(
"_later stopped_user_eventstream %s", auth_user
)
+
+ self._stop_timer_per_user.pop(auth_user, None)
+
yield self.distributor.fire(
"stopped_user_eventstream", auth_user
)
- del self._stop_timer_per_user[auth_user]
logger.debug("Scheduling _later: for %s", auth_user)
self._stop_timer_per_user[auth_user] = (
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2e8b8a1f9a..925eb5376e 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -18,13 +18,16 @@
from ._base import BaseHandler
from synapse.api.events.utils import prune_event
-from synapse.api.errors import AuthError, FederationError, SynapseError
-from synapse.api.events.room import RoomMemberEvent
+from synapse.api.errors import (
+ AuthError, FederationError, SynapseError, StoreError,
+)
+from synapse.api.events.room import RoomMemberEvent, RoomCreateEvent
from synapse.api.constants import Membership
from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor
from synapse.crypto.event_signing import (
- compute_event_signature, check_event_content_hash
+ compute_event_signature, check_event_content_hash,
+ add_hashes_and_signatures,
)
from syutil.jsonutil import encode_canonical_json
@@ -98,7 +101,7 @@ class FederationHandler(BaseHandler):
@log_function
@defer.inlineCallbacks
- def on_receive_pdu(self, pdu, backfilled, state=None):
+ def on_receive_pdu(self, origin, pdu, backfilled, state=None):
""" Called by the ReplicationLayer when we have a new pdu. We need to
do auth checks and put it through the StateHandler.
"""
@@ -109,7 +112,7 @@ class FederationHandler(BaseHandler):
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
if event.room_id in self.room_queues:
- self.room_queues[event.room_id].append(pdu)
+ self.room_queues[event.room_id].append((pdu, origin))
return
logger.debug("Processing event: %s", event.event_id)
@@ -141,15 +144,62 @@ class FederationHandler(BaseHandler):
)
event = redacted_event
- is_new_state = yield self.state_handler.annotate_event_with_state(
- event,
- old_state=state
+ logger.debug("Event: %s", event)
+
+ # FIXME (erikj): Awful hack to make the case where we are not currently
+ # in the room work
+ current_state = None
+ is_in_room = yield self.auth.check_host_in_room(
+ event.room_id,
+ self.server_name
)
+ if not is_in_room and not event.outlier:
+ logger.debug("Got event for room we're not in.")
+
+ replication_layer = self.replication_layer
+ auth_chain = yield replication_layer.get_event_auth(
+ origin,
+ context=event.room_id,
+ event_id=event.event_id,
+ )
- logger.debug("Event: %s", event)
+ for e in auth_chain:
+ e.outlier = True
+ try:
+ yield self._handle_new_event(e, fetch_missing=False)
+ except:
+ logger.exception(
+ "Failed to parse auth event %s",
+ e.event_id,
+ )
+
+ if not state:
+ state = yield replication_layer.get_state_for_context(
+ origin,
+ context=event.room_id,
+ event_id=event.event_id,
+ )
+
+ current_state = state
+
+ if state:
+ for e in state:
+ e.outlier = True
+ try:
+ yield self._handle_new_event(e)
+ except:
+ logger.exception(
+ "Failed to parse state event %s",
+ e.event_id,
+ )
try:
- self.auth.check(event, raises=True)
+ yield self._handle_new_event(
+ event,
+ state=state,
+ backfilled=backfilled,
+ current_state=current_state,
+ )
except AuthError as e:
raise FederationError(
"ERROR",
@@ -158,43 +208,17 @@ class FederationHandler(BaseHandler):
affected=event.event_id,
)
- is_new_state = is_new_state and not backfilled
-
- # TODO: Implement something in federation that allows us to
- # respond to PDU.
-
- yield self.store.persist_event(
- event,
- backfilled,
- is_new_state=is_new_state
- )
-
room = yield self.store.get_room(event.room_id)
if not room:
- # Huh, let's try and get the current state
try:
- yield self.replication_layer.get_state_for_context(
- event.origin, event.room_id, event.event_id,
- )
-
- hosts = yield self.store.get_joined_hosts_for_room(
- event.room_id
- )
- if self.hs.hostname in hosts:
- try:
- yield self.store.store_room(
- room_id=event.room_id,
- room_creator_user_id="",
- is_public=False,
- )
- except:
- pass
- except:
- logger.exception(
- "Failed to get current state for room %s",
- event.room_id
+ yield self.store.store_room(
+ room_id=event.room_id,
+ room_creator_user_id="",
+ is_public=False,
)
+ except StoreError:
+ logger.exception("Failed to store room.")
if not backfilled:
extra_users = []
@@ -255,11 +279,23 @@ class FederationHandler(BaseHandler):
pdu=event
)
+
+
defer.returnValue(pdu)
@defer.inlineCallbacks
def on_event_auth(self, event_id):
auth = yield self.store.get_auth_chain(event_id)
+
+ for event in auth:
+ event.signatures.update(
+ compute_event_signature(
+ event,
+ self.hs.hostname,
+ self.hs.config.signing_key[0]
+ )
+ )
+
defer.returnValue([e for e in auth])
@log_function
@@ -276,6 +312,8 @@ class FederationHandler(BaseHandler):
We suspend processing of any received events from this room until we
have finished processing the join.
"""
+ logger.debug("Joining %s to %s", joinee, room_id)
+
pdu = yield self.replication_layer.make_join(
target_host,
room_id,
@@ -298,19 +336,29 @@ class FederationHandler(BaseHandler):
try:
event.event_id = self.event_factory.create_event_id()
+ event.origin = self.hs.hostname
event.content = content
- state = yield self.replication_layer.send_join(
+ if not hasattr(event, "signatures"):
+ event.signatures = {}
+
+ add_hashes_and_signatures(
+ event,
+ self.hs.hostname,
+ self.hs.config.signing_key[0],
+ )
+
+ ret = yield self.replication_layer.send_join(
target_host,
event
)
- logger.debug("do_invite_join state: %s", state)
+ state = ret["state"]
+ auth_chain = ret["auth_chain"]
+ auth_chain.sort(key=lambda e: e.depth)
- yield self.state_handler.annotate_event_with_state(
- event,
- old_state=state
- )
+ logger.debug("do_invite_join auth_chain: %s", auth_chain)
+ logger.debug("do_invite_join state: %s", state)
logger.debug("do_invite_join event: %s", event)
@@ -324,34 +372,50 @@ class FederationHandler(BaseHandler):
# FIXME
pass
+ for e in auth_chain:
+ e.outlier = True
+ try:
+ yield self._handle_new_event(e, fetch_missing=False)
+ except:
+ logger.exception(
+ "Failed to parse auth event %s",
+ e.event_id,
+ )
+
for e in state:
# FIXME: Auth these.
e.outlier = True
+ try:
+ yield self._handle_new_event(
+ e,
+ fetch_missing=True
+ )
+ except:
+ logger.exception(
+ "Failed to parse state event %s",
+ e.event_id,
+ )
- yield self.state_handler.annotate_event_with_state(
- e,
- )
-
- yield self.store.persist_event(
- e,
- backfilled=False,
- is_new_state=True
- )
-
- yield self.store.persist_event(
+ yield self._handle_new_event(
event,
- backfilled=False,
- is_new_state=True
+ state=state,
+ current_state=state,
)
+
+ yield self.notifier.on_new_room_event(
+ event, extra_users=[joinee]
+ )
+
+ logger.debug("Finished joining %s to %s", joinee, room_id)
finally:
room_queue = self.room_queues[room_id]
del self.room_queues[room_id]
- for p in room_queue:
+ for p, origin in room_queue:
try:
- yield self.on_receive_pdu(p, backfilled=False)
+ self.on_receive_pdu(origin, p, backfilled=False)
except:
- pass
+ logger.exception("Couldn't handle pdu")
defer.returnValue(True)
@@ -375,7 +439,7 @@ class FederationHandler(BaseHandler):
yield self.state_handler.annotate_event_with_state(event)
yield self.auth.add_auth_events(event)
- self.auth.check(event, raises=True)
+ self.auth.check(event, auth_events=event.old_state_events)
pdu = event
@@ -391,17 +455,7 @@ class FederationHandler(BaseHandler):
event.outlier = False
- state_handler = self.state_handler
- is_new_state = yield state_handler.annotate_event_with_state(event)
- self.auth.check(event, raises=True)
-
- # FIXME (erikj): All this is duplicated above :(
-
- yield self.store.persist_event(
- event,
- backfilled=False,
- is_new_state=is_new_state
- )
+ yield self._handle_new_event(event)
extra_users = []
if event.type == RoomMemberEvent.TYPE:
@@ -414,7 +468,7 @@ class FederationHandler(BaseHandler):
)
if event.type == RoomMemberEvent.TYPE:
- if event.membership == Membership.JOIN:
+ if event.content["membership"] == Membership.JOIN:
user = self.hs.parse_userid(event.state_key)
yield self.distributor.fire(
"user_joined_room", user=user, room_id=event.room_id
@@ -508,7 +562,17 @@ class FederationHandler(BaseHandler):
else:
del results[(event.type, event.state_key)]
- defer.returnValue(results.values())
+ res = results.values()
+ for event in res:
+ event.signatures.update(
+ compute_event_signature(
+ event,
+ self.hs.hostname,
+ self.hs.config.signing_key[0]
+ )
+ )
+
+ defer.returnValue(res)
else:
defer.returnValue([])
@@ -529,7 +593,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
- def get_persisted_pdu(self, origin, event_id):
+ def get_persisted_pdu(self, origin, event_id, do_auth=True):
""" Get a PDU from the database with given origin and id.
Returns:
@@ -541,12 +605,24 @@ class FederationHandler(BaseHandler):
)
if event:
- in_room = yield self.auth.check_host_in_room(
- event.room_id,
- origin
+ # FIXME: This is a temporary work around where we occasionally
+ # return events slightly differently than when they were
+ # originally signed
+ event.signatures.update(
+ compute_event_signature(
+ event,
+ self.hs.hostname,
+ self.hs.config.signing_key[0]
+ )
)
- if not in_room:
- raise AuthError(403, "Host not in room.")
+
+ if do_auth:
+ in_room = yield self.auth.check_host_in_room(
+ event.room_id,
+ origin
+ )
+ if not in_room:
+ raise AuthError(403, "Host not in room.")
defer.returnValue(event)
else:
@@ -564,3 +640,78 @@ class FederationHandler(BaseHandler):
)
while waiters:
waiters.pop().callback(None)
+
+ @defer.inlineCallbacks
+ def _handle_new_event(self, event, state=None, backfilled=False,
+ current_state=None, fetch_missing=True):
+ is_new_state = yield self.state_handler.annotate_event_with_state(
+ event,
+ old_state=state
+ )
+
+ if event.old_state_events:
+ known_ids = set(
+ [s.event_id for s in event.old_state_events.values()]
+ )
+ for e_id, _ in event.auth_events:
+ if e_id not in known_ids:
+ e = yield self.store.get_event(
+ e_id,
+ allow_none=True,
+ )
+
+ if not e:
+ # TODO: Do some conflict res to make sure that we're
+ # not the ones who are wrong.
+ logger.info(
+ "Rejecting %s as %s not in %s",
+ event.event_id, e_id, known_ids,
+ )
+ raise AuthError(403, "Auth events are stale")
+
+ auth_events = event.old_state_events
+ else:
+ # We need to get the auth events from somewhere.
+
+ # TODO: Don't just hit the DBs?
+
+ auth_events = {}
+ for e_id, _ in event.auth_events:
+ e = yield self.store.get_event(
+ e_id,
+ allow_none=True,
+ )
+
+ if not e:
+ e = yield self.replication_layer.get_pdu(
+ event.origin, e_id, outlier=True
+ )
+
+ if e and fetch_missing:
+ try:
+ yield self.on_receive_pdu(event.origin, e, False)
+ except:
+ logger.exception(
+ "Failed to parse auth event %s",
+ e_id,
+ )
+
+ if not e:
+ logger.warn("Can't find auth event %s.", e_id)
+
+ auth_events[(e.type, e.state_key)] = e
+
+ if event.type == RoomMemberEvent.TYPE and not event.auth_events:
+ if len(event.prev_events) == 1:
+ c = yield self.store.get_event(event.prev_events[0][0])
+ if c.type == RoomCreateEvent.TYPE:
+ auth_events[(c.type, c.state_key)] = c
+
+ self.auth.check(event, auth_events=auth_events)
+
+ yield self.store.persist_event(
+ event,
+ backfilled=backfilled,
+ is_new_state=(is_new_state and not backfilled),
+ current_state=current_state,
+ )
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 06a4e173f6..42dc4d46f3 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -243,7 +243,7 @@ class MessageHandler(BaseHandler):
public_room_ids = [r["room_id"] for r in public_rooms]
limit = pagin_config.limit
- if not limit:
+ if limit is None:
limit = 10
for event in room_list:
@@ -306,7 +306,7 @@ class MessageHandler(BaseHandler):
auth_user = self.hs.parse_userid(user_id)
# TODO: These concurrently
- state_tuples = yield self.store.get_current_state(room_id)
+ state_tuples = yield self.state_handler.get_current_state(room_id)
state = [self.hs.serialize_event(x) for x in state_tuples]
member_event = (yield self.store.get_room_member(
diff --git a/synapse/http/content_repository.py b/synapse/http/content_repository.py
index 7e046dfe49..64ecb5346e 100644
--- a/synapse/http/content_repository.py
+++ b/synapse/http/content_repository.py
@@ -181,7 +181,7 @@ class ContentRepoResource(resource.Resource):
fname = yield self.map_request_to_name(request)
- # TODO I have a suspcious feeling this is just going to block
+ # TODO I have a suspicious feeling this is just going to block
with open(fname, "wb") as f:
f.write(request.content.read())
@@ -190,7 +190,7 @@ class ContentRepoResource(resource.Resource):
# FIXME: we can't assume what the repo's public mounted path is
# ...plus self-signed SSL won't work to remote clients anyway
# ...and we can't assume that it's SSL anyway, as we might want to
- # server it via the non-SSL listener...
+ # serve it via the non-SSL listener...
url = "%s/_matrix/content/%s" % (
self.external_addr, file_name
)
diff --git a/synapse/rest/room.py b/synapse/rest/room.py
index 4f6d039b61..cc6ffb9aff 100644
--- a/synapse/rest/room.py
+++ b/synapse/rest/room.py
@@ -148,7 +148,7 @@ class RoomStateEventRestServlet(RestServlet):
content = _parse_json(request)
event = self.event_factory.create_event(
- etype=urllib.unquote(event_type),
+ etype=event_type, # already urldecoded
content=content,
room_id=urllib.unquote(room_id),
user_id=user.to_string(),
diff --git a/synapse/state.py b/synapse/state.py
index 1c999e4d79..430665f7ba 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -82,7 +82,7 @@ class StateHandler(object):
if hasattr(event, "outlier") and event.outlier:
event.state_group = None
event.old_state_events = None
- event.state_events = {}
+ event.state_events = None
defer.returnValue(False)
return
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index e91fcc9789..642e5e289e 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -69,7 +69,7 @@ SCHEMAS = [
# Remember to update this number every time an incompatible change is made to
# database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 7
+SCHEMA_VERSION = 8
class _RollbackButIsFineException(Exception):
@@ -95,7 +95,8 @@ class DataStore(RoomMemberStore, RoomStore,
@defer.inlineCallbacks
@log_function
- def persist_event(self, event, backfilled=False, is_new_state=True):
+ def persist_event(self, event, backfilled=False, is_new_state=True,
+ current_state=None):
stream_ordering = None
if backfilled:
if not self.min_token_deferred.called:
@@ -111,6 +112,7 @@ class DataStore(RoomMemberStore, RoomStore,
backfilled=backfilled,
stream_ordering=stream_ordering,
is_new_state=is_new_state,
+ current_state=current_state,
)
except _RollbackButIsFineException:
pass
@@ -139,7 +141,7 @@ class DataStore(RoomMemberStore, RoomStore,
@log_function
def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None,
- is_new_state=True):
+ is_new_state=True, current_state=None):
if event.type == RoomMemberEvent.TYPE:
self._store_room_member_txn(txn, event)
elif event.type == FeedbackEvent.TYPE:
@@ -208,8 +210,27 @@ class DataStore(RoomMemberStore, RoomStore,
self._store_state_groups_txn(txn, event)
+ if current_state:
+ txn.execute(
+ "DELETE FROM current_state_events WHERE room_id = ?",
+ (event.room_id,)
+ )
+
+ for s in current_state:
+ self._simple_insert_txn(
+ txn,
+ "current_state_events",
+ {
+ "event_id": s.event_id,
+ "room_id": s.room_id,
+ "type": s.type,
+ "state_key": s.state_key,
+ },
+ or_replace=True,
+ )
+
is_state = hasattr(event, "state_key") and event.state_key is not None
- if is_new_state and is_state:
+ if is_state:
vals = {
"event_id": event.event_id,
"room_id": event.room_id,
@@ -227,17 +248,18 @@ class DataStore(RoomMemberStore, RoomStore,
or_replace=True,
)
- self._simple_insert_txn(
- txn,
- "current_state_events",
- {
- "event_id": event.event_id,
- "room_id": event.room_id,
- "type": event.type,
- "state_key": event.state_key,
- },
- or_replace=True,
- )
+ if is_new_state:
+ self._simple_insert_txn(
+ txn,
+ "current_state_events",
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ },
+ or_replace=True,
+ )
for e_id, h in event.prev_state:
self._simple_insert_txn(
@@ -314,7 +336,12 @@ class DataStore(RoomMemberStore, RoomStore,
txn, event.event_id, ref_alg, ref_hash_bytes
)
- self._update_min_depth_for_room_txn(txn, event.room_id, event.depth)
+ if not outlier:
+ self._update_min_depth_for_room_txn(
+ txn,
+ event.room_id,
+ event.depth
+ )
def _store_redaction(self, txn, event):
txn.execute(
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index fd5b2affad..4881f03368 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -246,7 +246,10 @@ class SQLBaseStore(object):
raise StoreError(404, "No row found")
def _simple_select_onecol_txn(self, txn, table, keyvalues, retcol):
- sql = "SELECT %(retcol)s FROM %(table)s WHERE %(where)s" % {
+ sql = (
+ "SELECT %(retcol)s FROM %(table)s WHERE %(where)s "
+ "ORDER BY rowid asc"
+ ) % {
"retcol": retcol,
"table": table,
"where": " AND ".join("%s = ?" % k for k in keyvalues.keys()),
@@ -299,7 +302,7 @@ class SQLBaseStore(object):
keyvalues : dict of column names and values to select the rows with
retcols : list of strings giving the names of the columns to return
"""
- sql = "SELECT %s FROM %s WHERE %s" % (
+ sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
", ".join(retcols),
table,
" AND ".join("%s = ?" % (k, ) for k in keyvalues)
@@ -334,7 +337,7 @@ class SQLBaseStore(object):
retcols=None, allow_none=False):
""" Combined SELECT then UPDATE."""
if retcols:
- select_sql = "SELECT %s FROM %s WHERE %s" % (
+ select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
", ".join(retcols),
table,
" AND ".join("%s = ?" % (k) for k in keyvalues)
@@ -461,7 +464,7 @@ class SQLBaseStore(object):
def _get_events_txn(self, txn, event_ids):
# FIXME (erikj): This should be batched?
- sql = "SELECT * FROM events WHERE event_id = ?"
+ sql = "SELECT * FROM events WHERE event_id = ? ORDER BY rowid asc"
event_rows = []
for e_id in event_ids:
@@ -478,7 +481,9 @@ class SQLBaseStore(object):
def _parse_events_txn(self, txn, rows):
events = [self._parse_event_from_row(r) for r in rows]
- select_event_sql = "SELECT * FROM events WHERE event_id = ?"
+ select_event_sql = (
+ "SELECT * FROM events WHERE event_id = ? ORDER BY rowid asc"
+ )
for i, ev in enumerate(events):
signatures = self._get_event_signatures_txn(
diff --git a/synapse/storage/schema/delta/v8.sql b/synapse/storage/schema/delta/v8.sql
new file mode 100644
index 0000000000..daf6646ed5
--- /dev/null
+++ b/synapse/storage/schema/delta/v8.sql
@@ -0,0 +1,34 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ CREATE TABLE IF NOT EXISTS event_signatures_2 (
+ event_id TEXT,
+ signature_name TEXT,
+ key_id TEXT,
+ signature BLOB,
+ CONSTRAINT uniqueness UNIQUE (event_id, signature_name, key_id)
+);
+
+INSERT INTO event_signatures_2 (event_id, signature_name, key_id, signature)
+SELECT event_id, signature_name, key_id, signature FROM event_signatures;
+
+DROP TABLE event_signatures;
+ALTER TABLE event_signatures_2 RENAME TO event_signatures;
+
+CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures (
+ event_id
+);
+
+PRAGMA user_version = 8;
\ No newline at end of file
diff --git a/synapse/storage/schema/event_signatures.sql b/synapse/storage/schema/event_signatures.sql
index 4efa8a3e63..b6b56b47a2 100644
--- a/synapse/storage/schema/event_signatures.sql
+++ b/synapse/storage/schema/event_signatures.sql
@@ -42,7 +42,7 @@ CREATE TABLE IF NOT EXISTS event_signatures (
signature_name TEXT,
key_id TEXT,
signature BLOB,
- CONSTRAINT uniqueness UNIQUE (event_id, key_id)
+ CONSTRAINT uniqueness UNIQUE (event_id, signature_name, key_id)
);
CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures (
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 55ea567793..e0f44b3e59 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -87,7 +87,7 @@ class StateStore(SQLBaseStore):
)
def _store_state_groups_txn(self, txn, event):
- if not event.state_events:
+ if event.state_events is None:
return
state_group = event.state_group
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index b84735e61c..3405cb365e 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -283,7 +283,7 @@ class StreamStore(SQLBaseStore):
sql = (
"SELECT *, (%(redacted)s) AS redacted FROM events "
- "WHERE room_id = ? AND stream_ordering <= ? "
+ "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 "
"ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
) % {
"redacted": del_sql,
diff --git a/synapse/streams/config.py b/synapse/streams/config.py
index 527507e5cd..0317e78c08 100644
--- a/synapse/streams/config.py
+++ b/synapse/streams/config.py
@@ -28,11 +28,11 @@ class SourcePaginationConfig(object):
specific event source."""
def __init__(self, from_key=None, to_key=None, direction='f',
- limit=0):
+ limit=None):
self.from_key = from_key
self.to_key = to_key
self.direction = 'f' if direction == 'f' else 'b'
- self.limit = int(limit)
+ self.limit = int(limit) if limit is not None else None
class PaginationConfig(object):
@@ -40,11 +40,11 @@ class PaginationConfig(object):
"""A configuration object which stores pagination parameters."""
def __init__(self, from_token=None, to_token=None, direction='f',
- limit=0):
+ limit=None):
self.from_token = from_token
self.to_token = to_token
self.direction = 'f' if direction == 'f' else 'b'
- self.limit = int(limit)
+ self.limit = int(limit) if limit is not None else None
@classmethod
def from_request(cls, request, raise_invalid_params=True):
@@ -80,8 +80,8 @@ class PaginationConfig(object):
except:
raise SynapseError(400, "'to' paramater is invalid")
- limit = get_param("limit", "0")
- if not limit.isdigit():
+ limit = get_param("limit", None)
+ if limit is not None and not limit.isdigit():
raise SynapseError(400, "'limit' parameter must be an integer.")
try:
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 23b3decb45..7d85018d97 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -55,11 +55,14 @@ class LoggingContext(object):
None to avoid suppressing any exeptions that were thrown.
"""
if self.thread_local.current_context is not self:
- logger.error(
- "Current logging context %s is not the expected context %s",
- self.thread_local.current_context,
- self
- )
+ if self.thread_local.current_context is self.sentinel:
+ logger.debug("Expected logging context %s has been lost", self)
+ else:
+ logger.warn(
+ "Current logging context %s is not expected context %s",
+ self.thread_local.current_context,
+ self
+ )
self.thread_local.current_context = self.parent_context
self.parent_context = None
|