diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index de4d23bbb3..30c6733063 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -14,7 +14,18 @@
# limitations under the License.
from twisted.internet import defer
+
from synapse.api.errors import LimitExceededError
+from synapse.util.async import run_on_reactor
+from synapse.crypto.event_signing import add_hashes_and_signatures
+from synapse.api.events.room import RoomMemberEvent
+from synapse.api.constants import Membership
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
class BaseHandler(object):
@@ -30,6 +41,9 @@ class BaseHandler(object):
self.clock = hs.get_clock()
self.hs = hs
+ self.signing_key = hs.config.signing_key[0]
+ self.server_name = hs.hostname
+
def ratelimit(self, user_id):
time_now = self.clock.time()
allowed, time_allowed = self.ratelimiter.send_message(
@@ -44,16 +58,58 @@ class BaseHandler(object):
@defer.inlineCallbacks
def _on_new_room_event(self, event, snapshot, extra_destinations=[],
- extra_users=[]):
+ extra_users=[], suppress_auth=False,
+ do_invite_host=None):
+ yield run_on_reactor()
+
snapshot.fill_out_prev_events(event)
+ yield self.state_handler.annotate_event_with_state(event)
+
+ yield self.auth.add_auth_events(event)
+
+ logger.debug("Signing event...")
+
+ add_hashes_and_signatures(
+ event, self.server_name, self.signing_key
+ )
+
+ logger.debug("Signed event.")
+
+ if not suppress_auth:
+ logger.debug("Authing...")
+ self.auth.check(event, raises=True)
+ logger.debug("Authed")
+ else:
+ logger.debug("Suppressed auth.")
+
+ if do_invite_host:
+ federation_handler = self.hs.get_handlers().federation_handler
+ invite_event = yield federation_handler.send_invite(
+ do_invite_host,
+ event
+ )
+
+ # FIXME: We need to check if the remote changed anything else
+ event.signatures = invite_event.signatures
+
yield self.store.persist_event(event)
destinations = set(extra_destinations)
# Send a PDU to all hosts who have joined the room.
- destinations.update((yield self.store.get_joined_hosts_for_room(
- event.room_id
- )))
+
+ for k, s in event.state_events.items():
+ try:
+ if k[0] == RoomMemberEvent.TYPE:
+ if s.content["membership"] == Membership.JOIN:
+ destinations.add(
+ self.hs.parse_userid(s.state_key).domain
+ )
+ except:
+ logger.warn(
+ "Failed to get destination from event %s", s.event_id
+ )
+
event.destinations = list(destinations)
self.notifier.on_new_room_event(event, extra_users=extra_users)
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index a56830d520..164363cdc5 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -147,10 +147,8 @@ class DirectoryHandler(BaseHandler):
content={"aliases": aliases},
)
- snapshot = yield self.store.snapshot_room(
- room_id=room_id,
- user_id=user_id,
- )
+ snapshot = yield self.store.snapshot_room(event)
- yield self.state_handler.handle_new_event(event, snapshot)
- yield self._on_new_room_event(event, snapshot, extra_users=[user_id])
+ yield self._on_new_room_event(
+ event, snapshot, extra_users=[user_id], suppress_auth=True
+ )
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index f52591d2a3..492005a170 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -17,13 +17,18 @@
from ._base import BaseHandler
-from synapse.api.events.room import InviteJoinEvent, RoomMemberEvent
+from synapse.api.events.utils import prune_event
+from synapse.api.errors import AuthError, FederationError, SynapseError
+from synapse.api.events.room import RoomMemberEvent
from synapse.api.constants import Membership
from synapse.util.logutils import log_function
-from synapse.federation.pdu_codec import PduCodec
-from synapse.api.errors import SynapseError
+from synapse.util.async import run_on_reactor
+from synapse.crypto.event_signing import (
+ compute_event_signature, check_event_content_hash
+)
+from syutil.jsonutil import encode_canonical_json
-from twisted.internet import defer, reactor
+from twisted.internet import defer
import logging
@@ -38,6 +43,8 @@ class FederationHandler(BaseHandler):
of the home server (including auth and state conflict resoultion)
b) converting events that were produced by local clients that may need
to be sent to remote home servers.
+ c) doing the necessary dances to invite remote users and join remote
+ rooms.
"""
def __init__(self, hs):
@@ -55,12 +62,14 @@ class FederationHandler(BaseHandler):
self.state_handler = hs.get_state_handler()
# self.auth_handler = gs.get_auth_handler()
self.server_name = hs.hostname
+ self.keyring = hs.get_keyring()
self.lock_manager = hs.get_room_lock_manager()
self.replication_layer.set_handler(self)
- self.pdu_codec = PduCodec(hs)
+ # When joining a room we need to queue any events for that room up
+ self.room_queues = {}
@log_function
@defer.inlineCallbacks
@@ -78,7 +87,9 @@ class FederationHandler(BaseHandler):
processing.
"""
- pdu = self.pdu_codec.pdu_from_event(event)
+ yield run_on_reactor()
+
+ pdu = event
if not hasattr(pdu, "destinations") or not pdu.destinations:
pdu.destinations = []
@@ -87,98 +98,114 @@ class FederationHandler(BaseHandler):
@log_function
@defer.inlineCallbacks
- def on_receive_pdu(self, pdu, backfilled):
+ def on_receive_pdu(self, pdu, backfilled, state=None):
""" Called by the ReplicationLayer when we have a new pdu. We need to
- do auth checks and put it throught the StateHandler.
+ do auth checks and put it through the StateHandler.
"""
- event = self.pdu_codec.event_from_pdu(pdu)
+ event = pdu
logger.debug("Got event: %s", event.event_id)
- with (yield self.lock_manager.lock(pdu.context)):
- if event.is_state and not backfilled:
- is_new_state = yield self.state_handler.handle_new_state(
- pdu
- )
- else:
- is_new_state = False
- # TODO: Implement something in federation that allows us to
- # respond to PDU.
+ # If we are currently in the process of joining this room, then we
+ # queue up events for later processing.
+ if event.room_id in self.room_queues:
+ self.room_queues[event.room_id].append(pdu)
+ return
+
+ logger.debug("Processing event: %s", event.event_id)
- target_is_mine = False
- if hasattr(event, "target_host"):
- target_is_mine = event.target_host == self.hs.hostname
-
- if event.type == InviteJoinEvent.TYPE:
- if not target_is_mine:
- logger.debug("Ignoring invite/join event %s", event)
- return
-
- # If we receive an invite/join event then we need to join the
- # sender to the given room.
- # TODO: We should probably auth this or some such
- content = event.content
- content.update({"membership": Membership.JOIN})
- new_event = self.event_factory.create_event(
- etype=RoomMemberEvent.TYPE,
- state_key=event.user_id,
- room_id=event.room_id,
- user_id=event.user_id,
- membership=Membership.JOIN,
- content=content
+ redacted_event = prune_event(event)
+
+ redacted_pdu_json = redacted_event.get_pdu_json()
+ try:
+ yield self.keyring.verify_json_for_server(
+ event.origin, redacted_pdu_json
+ )
+ except SynapseError as e:
+ logger.warn("Signature check failed for %s redacted to %s",
+ encode_canonical_json(pdu.get_pdu_json()),
+ encode_canonical_json(redacted_pdu_json),
+ )
+ raise FederationError(
+ "ERROR",
+ e.code,
+ e.msg,
+ affected=event.event_id,
)
- yield self.hs.get_handlers().room_member_handler.change_membership(
- new_event,
- do_auth=False,
+ if not check_event_content_hash(event):
+ logger.warn(
+ "Event content has been tampered, redacting %s, %s",
+ event.event_id, encode_canonical_json(event.get_full_dict())
)
+ event = redacted_event
- else:
- with (yield self.room_lock.lock(event.room_id)):
- yield self.store.persist_event(
- event,
- backfilled,
- is_new_state=is_new_state
- )
+ is_new_state = yield self.state_handler.annotate_event_with_state(
+ event,
+ old_state=state
+ )
- room = yield self.store.get_room(event.room_id)
+ logger.debug("Event: %s", event)
- if not room:
- # Huh, let's try and get the current state
- try:
- yield self.replication_layer.get_state_for_context(
- event.origin, event.room_id
- )
+ try:
+ self.auth.check(event, raises=True)
+ except AuthError as e:
+ raise FederationError(
+ "ERROR",
+ e.code,
+ e.msg,
+ affected=event.event_id,
+ )
- hosts = yield self.store.get_joined_hosts_for_room(
- event.room_id
- )
- if self.hs.hostname in hosts:
- try:
- yield self.store.store_room(
- room_id=event.room_id,
- room_creator_user_id="",
- is_public=False,
- )
- except:
- pass
- except:
- logger.exception(
- "Failed to get current state for room %s",
- event.room_id
- )
+ is_new_state = is_new_state and not backfilled
- if not backfilled:
- extra_users = []
- if event.type == RoomMemberEvent.TYPE:
- target_user_id = event.state_key
- target_user = self.hs.parse_userid(target_user_id)
- extra_users.append(target_user)
+ # TODO: Implement something in federation that allows us to
+ # respond to PDU.
+
+ yield self.store.persist_event(
+ event,
+ backfilled,
+ is_new_state=is_new_state
+ )
- yield self.notifier.on_new_room_event(
- event, extra_users=extra_users
+ room = yield self.store.get_room(event.room_id)
+
+ if not room:
+ # Huh, let's try and get the current state
+ try:
+ yield self.replication_layer.get_state_for_context(
+ event.origin, event.room_id, event.event_id,
+ )
+
+ hosts = yield self.store.get_joined_hosts_for_room(
+ event.room_id
+ )
+ if self.hs.hostname in hosts:
+ try:
+ yield self.store.store_room(
+ room_id=event.room_id,
+ room_creator_user_id="",
+ is_public=False,
+ )
+ except:
+ pass
+ except:
+ logger.exception(
+ "Failed to get current state for room %s",
+ event.room_id
)
+ if not backfilled:
+ extra_users = []
+ if event.type == RoomMemberEvent.TYPE:
+ target_user_id = event.state_key
+ target_user = self.hs.parse_userid(target_user_id)
+ extra_users.append(target_user)
+
+ yield self.notifier.on_new_room_event(
+ event, extra_users=extra_users
+ )
+
if event.type == RoomMemberEvent.TYPE:
if event.membership == Membership.JOIN:
user = self.hs.parse_userid(event.state_key)
@@ -189,79 +216,349 @@ class FederationHandler(BaseHandler):
@log_function
@defer.inlineCallbacks
def backfill(self, dest, room_id, limit):
- pdus = yield self.replication_layer.backfill(dest, room_id, limit)
+ """ Trigger a backfill request to `dest` for the given `room_id`
+ """
+ extremities = yield self.store.get_oldest_events_in_room(room_id)
+
+ pdus = yield self.replication_layer.backfill(
+ dest,
+ room_id,
+ limit,
+ extremities=extremities,
+ )
events = []
for pdu in pdus:
- event = self.pdu_codec.event_from_pdu(pdu)
+ event = pdu
+
+ # FIXME (erikj): Not sure this actually works :/
+ yield self.state_handler.annotate_event_with_state(event)
+
events.append(event)
+
yield self.store.persist_event(event, backfilled=True)
defer.returnValue(events)
+ @defer.inlineCallbacks
+ def send_invite(self, target_host, event):
+ """ Sends the invite to the remote server for signing.
+
+ Invites must be signed by the invitee's server before distribution.
+ """
+ pdu = yield self.replication_layer.send_invite(
+ destination=target_host,
+ context=event.room_id,
+ event_id=event.event_id,
+ pdu=event
+ )
+
+ defer.returnValue(pdu)
+
+ @defer.inlineCallbacks
+ def on_event_auth(self, event_id):
+ auth = yield self.store.get_auth_chain(event_id)
+ defer.returnValue([e for e in auth])
+
@log_function
@defer.inlineCallbacks
def do_invite_join(self, target_host, room_id, joinee, content, snapshot):
+ """ Attempts to join the `joinee` to the room `room_id` via the
+ server `target_host`.
+
+ This first triggers a /make_join/ request that returns a partial
+ event that we can fill out and sign. This is then sent to the
+ remote server via /send_join/ which responds with the state at that
+ event and the auth_chains.
- hosts = yield self.store.get_joined_hosts_for_room(room_id)
- if self.hs.hostname in hosts:
- # We are already in the room.
- logger.debug("We're already in the room apparently")
- defer.returnValue(False)
+ We suspend processing of any received events from this room until we
+ have finished processing the join.
+ """
+ pdu = yield self.replication_layer.make_join(
+ target_host,
+ room_id,
+ joinee
+ )
+
+ logger.debug("Got response to make_join: %s", pdu)
+
+ event = pdu
+
+ # We should assert some things.
+ assert(event.type == RoomMemberEvent.TYPE)
+ assert(event.user_id == joinee)
+ assert(event.state_key == joinee)
+ assert(event.room_id == room_id)
+
+ event.outlier = False
+
+ self.room_queues[room_id] = []
- # First get current state to see if we are already joined.
try:
- yield self.replication_layer.get_state_for_context(
- target_host, room_id
+ event.event_id = self.event_factory.create_event_id()
+ event.content = content
+
+ state = yield self.replication_layer.send_join(
+ target_host,
+ event
)
- hosts = yield self.store.get_joined_hosts_for_room(room_id)
- if self.hs.hostname in hosts:
- # Oh, we were actually in the room already.
- logger.debug("We're already in the room apparently")
- defer.returnValue(False)
- except Exception:
- logger.exception("Failed to get current state")
-
- new_event = self.event_factory.create_event(
- etype=InviteJoinEvent.TYPE,
- target_host=target_host,
- room_id=room_id,
- user_id=joinee,
- content=content
- )
+ logger.debug("do_invite_join state: %s", state)
- new_event.destinations = [target_host]
+ yield self.state_handler.annotate_event_with_state(
+ event,
+ old_state=state
+ )
- snapshot.fill_out_prev_events(new_event)
- yield self.handle_new_event(new_event, snapshot)
+ logger.debug("do_invite_join event: %s", event)
- # TODO (erikj): Time out here.
- d = defer.Deferred()
- self.waiting_for_join_list.setdefault((joinee, room_id), []).append(d)
- reactor.callLater(10, d.cancel)
+ try:
+ yield self.store.store_room(
+ room_id=room_id,
+ room_creator_user_id="",
+ is_public=False
+ )
+ except:
+ # FIXME
+ pass
- try:
- yield d
- except defer.CancelledError:
- raise SynapseError(500, "Unable to join remote room")
+ for e in state:
+ # FIXME: Auth these.
+ e.outlier = True
- try:
- yield self.store.store_room(
- room_id=room_id,
- room_creator_user_id="",
- is_public=False
+ yield self.state_handler.annotate_event_with_state(
+ e,
+ )
+
+ yield self.store.persist_event(
+ e,
+ backfilled=False,
+ is_new_state=True
+ )
+
+ yield self.store.persist_event(
+ event,
+ backfilled=False,
+ is_new_state=True
)
- except:
- pass
+ finally:
+ room_queue = self.room_queues[room_id]
+ del self.room_queues[room_id]
+ for p in room_queue:
+ try:
+ yield self.on_receive_pdu(p, backfilled=False)
+ except:
+ pass
defer.returnValue(True)
+ @defer.inlineCallbacks
+ @log_function
+ def on_make_join_request(self, context, user_id):
+ """ We've received a /make_join/ request, so we create a partial
+ join event for the room and return that. We don *not* persist or
+ process it until the other server has signed it and sent it back.
+ """
+ event = self.event_factory.create_event(
+ etype=RoomMemberEvent.TYPE,
+ content={"membership": Membership.JOIN},
+ room_id=context,
+ user_id=user_id,
+ state_key=user_id,
+ )
+
+ snapshot = yield self.store.snapshot_room(event)
+ snapshot.fill_out_prev_events(event)
+
+ yield self.state_handler.annotate_event_with_state(event)
+ yield self.auth.add_auth_events(event)
+ self.auth.check(event, raises=True)
+
+ pdu = event
+
+ defer.returnValue(pdu)
+
+ @defer.inlineCallbacks
+ @log_function
+ def on_send_join_request(self, origin, pdu):
+ """ We have received a join event for a room. Fully process it and
+ respond with the current state and auth chains.
+ """
+ event = pdu
+
+ event.outlier = False
+
+ is_new_state = yield self.state_handler.annotate_event_with_state(event)
+ self.auth.check(event, raises=True)
+
+ # FIXME (erikj): All this is duplicated above :(
+
+ yield self.store.persist_event(
+ event,
+ backfilled=False,
+ is_new_state=is_new_state
+ )
+
+ extra_users = []
+ if event.type == RoomMemberEvent.TYPE:
+ target_user_id = event.state_key
+ target_user = self.hs.parse_userid(target_user_id)
+ extra_users.append(target_user)
+
+ yield self.notifier.on_new_room_event(
+ event, extra_users=extra_users
+ )
+
+ if event.type == RoomMemberEvent.TYPE:
+ if event.membership == Membership.JOIN:
+ user = self.hs.parse_userid(event.state_key)
+ self.distributor.fire(
+ "user_joined_room", user=user, room_id=event.room_id
+ )
+
+ new_pdu = event
+
+ destinations = set()
+
+ for k, s in event.state_events.items():
+ try:
+ if k[0] == RoomMemberEvent.TYPE:
+ if s.content["membership"] == Membership.JOIN:
+ destinations.add(
+ self.hs.parse_userid(s.state_key).domain
+ )
+ except:
+ logger.warn(
+ "Failed to get destination from event %s", s.event_id
+ )
+
+ new_pdu.destinations = list(destinations)
+
+ yield self.replication_layer.send_pdu(new_pdu)
+
+ auth_chain = yield self.store.get_auth_chain(event.event_id)
+
+ defer.returnValue({
+ "state": event.state_events.values(),
+ "auth_chain": auth_chain,
+ })
+
+ @defer.inlineCallbacks
+ def on_invite_request(self, origin, pdu):
+ """ We've got an invite event. Process and persist it. Sign it.
+
+ Respond with the now signed event.
+ """
+ event = pdu
+
+ event.outlier = True
+
+ event.signatures.update(
+ compute_event_signature(
+ event,
+ self.hs.hostname,
+ self.hs.config.signing_key[0]
+ )
+ )
+
+ yield self.state_handler.annotate_event_with_state(event)
+
+ yield self.store.persist_event(
+ event,
+ backfilled=False,
+ )
+
+ target_user = self.hs.parse_userid(event.state_key)
+ yield self.notifier.on_new_room_event(
+ event, extra_users=[target_user],
+ )
+
+ defer.returnValue(event)
+
+ @defer.inlineCallbacks
+ def get_state_for_pdu(self, origin, room_id, event_id):
+ yield run_on_reactor()
+
+ in_room = yield self.auth.check_host_in_room(room_id, origin)
+ if not in_room:
+ raise AuthError(403, "Host not in room.")
+
+ state_groups = yield self.store.get_state_groups(
+ [event_id]
+ )
+
+ if state_groups:
+ _, state = state_groups.items().pop()
+ results = {
+ (e.type, e.state_key): e for e in state
+ }
+
+ event = yield self.store.get_event(event_id)
+ if hasattr(event, "state_key"):
+ # Get previous state
+ if hasattr(event, "replaces_state") and event.replaces_state:
+ prev_event = yield self.store.get_event(
+ event.replaces_state
+ )
+ results[(event.type, event.state_key)] = prev_event
+ else:
+ del results[(event.type, event.state_key)]
+
+ defer.returnValue(results.values())
+ else:
+ defer.returnValue([])
+
+ @defer.inlineCallbacks
+ @log_function
+ def on_backfill_request(self, origin, context, pdu_list, limit):
+ in_room = yield self.auth.check_host_in_room(context, origin)
+ if not in_room:
+ raise AuthError(403, "Host not in room.")
+
+ events = yield self.store.get_backfill_events(
+ context,
+ pdu_list,
+ limit
+ )
+
+ defer.returnValue(events)
+
+ @defer.inlineCallbacks
+ @log_function
+ def get_persisted_pdu(self, origin, event_id):
+ """ Get a PDU from the database with given origin and id.
+
+ Returns:
+ Deferred: Results in a `Pdu`.
+ """
+ event = yield self.store.get_event(
+ event_id,
+ allow_none=True,
+ )
+
+ if event:
+ in_room = yield self.auth.check_host_in_room(
+ event.room_id,
+ origin
+ )
+ if not in_room:
+ raise AuthError(403, "Host not in room.")
+
+ defer.returnValue(event)
+ else:
+ defer.returnValue(None)
+
+ @log_function
+ def get_min_depth_for_context(self, context):
+ return self.store.get_min_depth(context)
@log_function
def _on_user_joined(self, user, room_id):
- waiters = self.waiting_for_join_list.get((user.to_string(), room_id), [])
+ waiters = self.waiting_for_join_list.get(
+ (user.to_string(), room_id),
+ []
+ )
while waiters:
waiters.pop().callback(None)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 811e280c2d..b77d9d1644 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -16,7 +16,6 @@
from twisted.internet import defer
from synapse.api.constants import Membership
-from synapse.api.events.room import RoomTopicEvent
from synapse.api.errors import RoomError
from synapse.streams.config import PaginationConfig
from ._base import BaseHandler
@@ -26,7 +25,6 @@ import logging
logger = logging.getLogger(__name__)
-
class MessageHandler(BaseHandler):
def __init__(self, hs):
@@ -59,7 +57,8 @@ class MessageHandler(BaseHandler):
# user_id=sender_id
# )
- # TODO (erikj): Once we work out the correct c-s api we need to think on how to do this.
+ # TODO (erikj): Once we work out the correct c-s api we need to think
+ # on how to do this.
defer.returnValue(None)
@@ -81,12 +80,11 @@ class MessageHandler(BaseHandler):
user = self.hs.parse_userid(event.user_id)
assert user.is_mine, "User must be our own: %s" % (user,)
- snapshot = yield self.store.snapshot_room(event.room_id, event.user_id)
-
- if not suppress_auth:
- yield self.auth.check(event, snapshot, raises=True)
+ snapshot = yield self.store.snapshot_room(event)
- yield self._on_new_room_event(event, snapshot)
+ yield self._on_new_room_event(
+ event, snapshot, suppress_auth=suppress_auth
+ )
self.hs.get_handlers().presence_handler.bump_presence_active_time(
user
@@ -111,7 +109,9 @@ class MessageHandler(BaseHandler):
data_source = self.hs.get_event_sources().sources["room"]
if not pagin_config.from_token:
- pagin_config.from_token = yield self.hs.get_event_sources().get_current_token()
+ pagin_config.from_token = (
+ yield self.hs.get_event_sources().get_current_token()
+ )
user = self.hs.parse_userid(user_id)
@@ -142,66 +142,27 @@ class MessageHandler(BaseHandler):
SynapseError if something went wrong.
"""
- snapshot = yield self.store.snapshot_room(
- event.room_id,
- event.user_id,
- state_type=event.type,
- state_key=event.state_key,
- )
-
- yield self.auth.check(event, snapshot, raises=True)
-
- yield self.state_handler.handle_new_event(event, snapshot)
+ snapshot = yield self.store.snapshot_room(event)
yield self._on_new_room_event(event, snapshot)
@defer.inlineCallbacks
def get_room_data(self, user_id=None, room_id=None,
- event_type=None, state_key="",
- public_room_rules=[],
- private_room_rules=["join"]):
+ event_type=None, state_key=""):
""" Get data from a room.
Args:
event : The room path event
- public_room_rules : A list of membership states the user can be in,
- in order to read this data IN A PUBLIC ROOM. An empty list means
- 'any state'.
- private_room_rules : A list of membership states the user can be
- in, in order to read this data IN A PRIVATE ROOM. An empty list
- means 'any state'.
Returns:
The path data content.
Raises:
SynapseError if something went wrong.
"""
- if event_type == RoomTopicEvent.TYPE:
- # anyone invited/joined can read the topic
- private_room_rules = ["invite", "join"]
-
- # does this room exist
- room = yield self.store.get_room(room_id)
- if not room:
- raise RoomError(403, "Room does not exist.")
-
- # does this user exist in this room
- member = yield self.store.get_room_member(
- room_id=room_id,
- user_id="" if not user_id else user_id)
-
- member_state = member.membership if member else None
-
- if room.is_public and public_room_rules:
- # make sure the user meets public room rules
- if member_state not in public_room_rules:
- raise RoomError(403, "Member does not meet public room rules.")
- elif not room.is_public and private_room_rules:
- # make sure the user meets private room rules
- if member_state not in private_room_rules:
- raise RoomError(
- 403, "Member does not meet private room rules.")
-
- data = yield self.store.get_current_state(
+ have_joined = yield self.auth.check_joined_room(room_id, user_id)
+ if not have_joined:
+ raise RoomError(403, "User not in room.")
+
+ data = yield self.state_handler.get_current_state(
room_id, event_type, state_key
)
defer.returnValue(data)
@@ -219,9 +180,7 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks
def send_feedback(self, event):
- snapshot = yield self.store.snapshot_room(event.room_id, event.user_id)
-
- yield self.auth.check(event, snapshot, raises=True)
+ snapshot = yield self.store.snapshot_room(event)
# store message in db
yield self._on_new_room_event(event, snapshot)
@@ -239,7 +198,7 @@ class MessageHandler(BaseHandler):
yield self.auth.check_joined_room(room_id, user_id)
# TODO: This is duplicating logic from snapshot_all_rooms
- current_state = yield self.store.get_current_state(room_id)
+ current_state = yield self.state_handler.get_current_state(room_id)
defer.returnValue([self.hs.serialize_event(c) for c in current_state])
@defer.inlineCallbacks
@@ -289,8 +248,10 @@ class MessageHandler(BaseHandler):
d = {
"room_id": event.room_id,
"membership": event.membership,
- "visibility": ("public" if event.room_id in
- public_room_ids else "private"),
+ "visibility": (
+ "public" if event.room_id in public_room_ids
+ else "private"
+ ),
}
if event.membership == Membership.INVITE:
@@ -316,10 +277,12 @@ class MessageHandler(BaseHandler):
"end": end_token.to_string(),
}
- current_state = yield self.store.get_current_state(
+ current_state = yield self.state_handler.get_current_state(
event.room_id
)
- d["state"] = [self.hs.serialize_event(c) for c in current_state]
+ d["state"] = [
+ self.hs.serialize_event(c) for c in current_state
+ ]
except:
logger.exception("Failed to get snapshot")
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index dab9b03f04..834b37f5f3 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -17,7 +17,6 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError, CodeMessageException
from synapse.api.constants import Membership
-from synapse.api.events.room import RoomMemberEvent
from ._base import BaseHandler
@@ -153,10 +152,13 @@ class ProfileHandler(BaseHandler):
if not user.is_mine:
defer.returnValue(None)
- (displayname, avatar_url) = yield defer.gatherResults([
- self.store.get_profile_displayname(user.localpart),
- self.store.get_profile_avatar_url(user.localpart),
- ])
+ (displayname, avatar_url) = yield defer.gatherResults(
+ [
+ self.store.get_profile_displayname(user.localpart),
+ self.store.get_profile_avatar_url(user.localpart),
+ ],
+ consumeErrors=True
+ )
state["displayname"] = displayname
state["avatar_url"] = avatar_url
@@ -196,10 +198,7 @@ class ProfileHandler(BaseHandler):
)
for j in joins:
- snapshot = yield self.store.snapshot_room(
- j.room_id, j.state_key, RoomMemberEvent.TYPE,
- j.state_key
- )
+ snapshot = yield self.store.snapshot_room(j)
content = {
"membership": j.content["membership"],
@@ -218,5 +217,6 @@ class ProfileHandler(BaseHandler):
user_id=j.state_key,
)
- yield self.state_handler.handle_new_event(new_event, snapshot)
- yield self._on_new_room_event(new_event, snapshot)
+ yield self._on_new_room_event(
+ new_event, snapshot, suppress_auth=True
+ )
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 81ce1a5907..cfe1061ed3 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -21,10 +21,10 @@ from synapse.api.constants import Membership, JoinRules
from synapse.api.errors import StoreError, SynapseError
from synapse.api.events.room import (
RoomMemberEvent, RoomCreateEvent, RoomPowerLevelsEvent,
- RoomJoinRulesEvent, RoomAddStateLevelEvent, RoomTopicEvent,
- RoomSendEventLevelEvent, RoomOpsPowerLevelsEvent, RoomNameEvent,
+ RoomTopicEvent, RoomNameEvent, RoomJoinRulesEvent,
)
from synapse.util import stringutils
+from synapse.util.async import run_on_reactor
from ._base import BaseHandler
import logging
@@ -111,26 +111,15 @@ class RoomCreationHandler(BaseHandler):
user, room_id, is_public=is_public
)
- if room_alias:
- directory_handler = self.hs.get_handlers().directory_handler
- yield directory_handler.create_association(
- user_id=user_id,
- room_id=room_id,
- room_alias=room_alias,
- servers=[self.hs.hostname],
- )
-
@defer.inlineCallbacks
def handle_event(event):
- snapshot = yield self.store.snapshot_room(
- room_id=room_id,
- user_id=user_id,
- )
+ snapshot = yield self.store.snapshot_room(event)
logger.debug("Event: %s", event)
- yield self.state_handler.handle_new_event(event, snapshot)
- yield self._on_new_room_event(event, snapshot, extra_users=[user])
+ yield self._on_new_room_event(
+ event, snapshot, extra_users=[user], suppress_auth=True
+ )
for event in creation_events:
yield handle_event(event)
@@ -141,7 +130,6 @@ class RoomCreationHandler(BaseHandler):
etype=RoomNameEvent.TYPE,
room_id=room_id,
user_id=user_id,
- required_power_level=50,
content={"name": name},
)
@@ -153,7 +141,6 @@ class RoomCreationHandler(BaseHandler):
etype=RoomTopicEvent.TYPE,
room_id=room_id,
user_id=user_id,
- required_power_level=50,
content={"topic": topic},
)
@@ -188,9 +175,18 @@ class RoomCreationHandler(BaseHandler):
join_event,
do_auth=False
)
+
result = {"room_id": room_id}
+
if room_alias:
result["room_alias"] = room_alias.to_string()
+ directory_handler = self.hs.get_handlers().directory_handler
+ yield directory_handler.create_association(
+ user_id=user_id,
+ room_id=room_id,
+ room_alias=room_alias,
+ servers=[self.hs.hostname],
+ )
defer.returnValue(result)
@@ -198,7 +194,6 @@ class RoomCreationHandler(BaseHandler):
event_keys = {
"room_id": room_id,
"user_id": creator.to_string(),
- "required_power_level": 100,
}
def create(etype, **content):
@@ -215,7 +210,21 @@ class RoomCreationHandler(BaseHandler):
power_levels_event = self.event_factory.create_event(
etype=RoomPowerLevelsEvent.TYPE,
- content={creator.to_string(): 100, "default": 0},
+ content={
+ "users": {
+ creator.to_string(): 100,
+ },
+ "users_default": 0,
+ "events": {
+ RoomNameEvent.TYPE: 100,
+ RoomPowerLevelsEvent.TYPE: 100,
+ },
+ "events_default": 0,
+ "state_default": 50,
+ "ban": 50,
+ "kick": 50,
+ "redact": 50
+ },
**event_keys
)
@@ -225,30 +234,10 @@ class RoomCreationHandler(BaseHandler):
join_rule=join_rule,
)
- add_state_event = create(
- etype=RoomAddStateLevelEvent.TYPE,
- level=100,
- )
-
- send_event = create(
- etype=RoomSendEventLevelEvent.TYPE,
- level=0,
- )
-
- ops = create(
- etype=RoomOpsPowerLevelsEvent.TYPE,
- ban_level=50,
- kick_level=50,
- redact_level=50,
- )
-
return [
creation_event,
power_levels_event,
join_rules_event,
- add_state_event,
- send_event,
- ops,
]
@@ -363,10 +352,8 @@ class RoomMemberHandler(BaseHandler):
"""
target_user_id = event.state_key
- snapshot = yield self.store.snapshot_room(
- event.room_id, event.user_id,
- RoomMemberEvent.TYPE, target_user_id
- )
+ snapshot = yield self.store.snapshot_room(event)
+
## TODO(markjh): get prev state from snapshot.
prev_state = yield self.store.get_room_member(
target_user_id, event.room_id
@@ -375,13 +362,6 @@ class RoomMemberHandler(BaseHandler):
if prev_state:
event.content["prev"] = prev_state.membership
-# if prev_state and prev_state.membership == event.membership:
-# # treat this event as a NOOP.
-# if do_auth: # This is mainly to fix a unit test.
-# yield self.auth.check(event, raises=True)
-# defer.returnValue({})
-# return
-
room_id = event.room_id
# If we're trying to join a room then we have to do this differently
@@ -391,29 +371,17 @@ class RoomMemberHandler(BaseHandler):
yield self._do_join(event, snapshot, do_auth=do_auth)
else:
# This is not a JOIN, so we can handle it normally.
- if do_auth:
- yield self.auth.check(event, snapshot, raises=True)
-
- # If we're banning someone, set a req power level
- if event.membership == Membership.BAN:
- if not hasattr(event, "required_power_level") or event.required_power_level is None:
- # Add some default required_power_level
- user_level = yield self.store.get_power_level(
- event.room_id,
- event.user_id,
- )
- event.required_power_level = user_level
if prev_state and prev_state.membership == event.membership:
# double same action, treat this event as a NOOP.
defer.returnValue({})
return
- yield self.state_handler.handle_new_event(event, snapshot)
yield self._do_local_membership_update(
event,
membership=event.content["membership"],
snapshot=snapshot,
+ do_auth=do_auth,
)
defer.returnValue({"room_id": room_id})
@@ -443,10 +411,7 @@ class RoomMemberHandler(BaseHandler):
content=content,
)
- snapshot = yield self.store.snapshot_room(
- room_id, joinee.to_string(), RoomMemberEvent.TYPE,
- joinee.to_string()
- )
+ snapshot = yield self.store.snapshot_room(new_event)
yield self._do_join(new_event, snapshot, room_host=host, do_auth=True)
@@ -468,9 +433,12 @@ class RoomMemberHandler(BaseHandler):
# that we are allowed to join when we decide whether or not we
# need to do the invite/join dance.
- hosts = yield self.store.get_joined_hosts_for_room(room_id)
+ is_host_in_room = yield self.auth.check_host_in_room(
+ event.room_id,
+ self.hs.hostname
+ )
- if self.hs.hostname in hosts:
+ if is_host_in_room:
should_do_dance = False
elif room_host:
should_do_dance = True
@@ -502,14 +470,11 @@ class RoomMemberHandler(BaseHandler):
if not have_joined:
logger.debug("Doing normal join")
- if do_auth:
- yield self.auth.check(event, snapshot, raises=True)
-
- yield self.state_handler.handle_new_event(event, snapshot)
yield self._do_local_membership_update(
event,
membership=event.content["membership"],
snapshot=snapshot,
+ do_auth=do_auth,
)
user = self.hs.parse_userid(event.user_id)
@@ -553,26 +518,29 @@ class RoomMemberHandler(BaseHandler):
defer.returnValue([r.room_id for r in rooms])
- def _do_local_membership_update(self, event, membership, snapshot):
- destinations = []
+ @defer.inlineCallbacks
+ def _do_local_membership_update(self, event, membership, snapshot,
+ do_auth):
+ yield run_on_reactor()
# If we're inviting someone, then we should also send it to that
# HS.
target_user_id = event.state_key
target_user = self.hs.parse_userid(target_user_id)
- if membership == Membership.INVITE:
- host = target_user.domain
- destinations.append(host)
-
- # Always include target domain
- host = target_user.domain
- destinations.append(host)
-
- return self._on_new_room_event(
- event, snapshot, extra_destinations=destinations,
- extra_users=[target_user]
+ if membership == Membership.INVITE and not target_user.is_mine:
+ do_invite_host = target_user.domain
+ else:
+ do_invite_host = None
+
+ yield self._on_new_room_event(
+ event,
+ snapshot,
+ extra_users=[target_user],
+ suppress_auth=(not do_auth),
+ do_invite_host=do_invite_host,
)
+
class RoomListHandler(BaseHandler):
@defer.inlineCallbacks
|