diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7d9e3cf364..45d3d47fc1 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -17,13 +17,18 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
-from synapse.streams.config import PaginationConfig
+from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
+from synapse.push.action_generator import ActionGenerator
+from synapse.streams.config import PaginationConfig
+from synapse.types import (
+ UserID, RoomAlias, RoomStreamToken, StreamToken, get_domian_from_id
+)
from synapse.util import unwrapFirstError
from synapse.util.async import concurrently_execute
from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.types import UserID, RoomStreamToken, StreamToken
+from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
from ._base import BaseHandler
@@ -43,6 +48,7 @@ class MessageHandler(BaseHandler):
self.clock = hs.get_clock()
self.validator = EventValidator()
self.snapshot_cache = SnapshotCache()
+ self.signing_key = hs.config.signing_key[0]
@defer.inlineCallbacks
def get_messages(self, requester, room_id=None, pagin_config=None,
@@ -724,3 +730,192 @@ class MessageHandler(BaseHandler):
ret["membership"] = membership
defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def _create_new_client_event(self, builder, prev_event_ids=None):
+ if prev_event_ids:
+ prev_events = yield self.store.add_event_hashes(prev_event_ids)
+ prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
+ depth = prev_max_depth + 1
+ else:
+ latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
+ builder.room_id,
+ )
+
+ if latest_ret:
+ depth = max([d for _, _, d in latest_ret]) + 1
+ else:
+ depth = 1
+
+ prev_events = [
+ (event_id, prev_hashes)
+ for event_id, prev_hashes, _ in latest_ret
+ ]
+
+ builder.prev_events = prev_events
+ builder.depth = depth
+
+ state_handler = self.state_handler
+
+ context = yield state_handler.compute_event_context(builder)
+
+ if builder.is_state():
+ builder.prev_state = yield self.store.add_event_hashes(
+ context.prev_state_events
+ )
+
+ yield self.auth.add_auth_events(builder, context)
+
+ add_hashes_and_signatures(
+ builder, self.server_name, self.signing_key
+ )
+
+ event = builder.build()
+
+ logger.debug(
+ "Created event %s with current state: %s",
+ event.event_id, context.current_state,
+ )
+
+ defer.returnValue(
+ (event, context,)
+ )
+
+ @defer.inlineCallbacks
+ def handle_new_client_event(
+ self,
+ requester,
+ event,
+ context,
+ ratelimit=True,
+ extra_users=[]
+ ):
+ # We now need to go and hit out to wherever we need to hit out to.
+
+ if ratelimit:
+ self.ratelimit(requester)
+
+ try:
+ self.auth.check(event, auth_events=context.current_state)
+ except AuthError as err:
+ logger.warn("Denying new event %r because %s", event, err)
+ raise err
+
+ yield self.maybe_kick_guest_users(event, context.current_state.values())
+
+ if event.type == EventTypes.CanonicalAlias:
+ # Check the alias is acually valid (at this time at least)
+ room_alias_str = event.content.get("alias", None)
+ if room_alias_str:
+ room_alias = RoomAlias.from_string(room_alias_str)
+ directory_handler = self.hs.get_handlers().directory_handler
+ mapping = yield directory_handler.get_association(room_alias)
+
+ if mapping["room_id"] != event.room_id:
+ raise SynapseError(
+ 400,
+ "Room alias %s does not point to the room" % (
+ room_alias_str,
+ )
+ )
+
+ federation_handler = self.hs.get_handlers().federation_handler
+
+ if event.type == EventTypes.Member:
+ if event.content["membership"] == Membership.INVITE:
+ def is_inviter_member_event(e):
+ return (
+ e.type == EventTypes.Member and
+ e.sender == event.sender
+ )
+
+ event.unsigned["invite_room_state"] = [
+ {
+ "type": e.type,
+ "state_key": e.state_key,
+ "content": e.content,
+ "sender": e.sender,
+ }
+ for k, e in context.current_state.items()
+ if e.type in self.hs.config.room_invite_state_types
+ or is_inviter_member_event(e)
+ ]
+
+ invitee = UserID.from_string(event.state_key)
+ if not self.hs.is_mine(invitee):
+ # TODO: Can we add signature from remote server in a nicer
+ # way? If we have been invited by a remote server, we need
+ # to get them to sign the event.
+
+ returned_invite = yield federation_handler.send_invite(
+ invitee.domain,
+ event,
+ )
+
+ event.unsigned.pop("room_state", None)
+
+ # TODO: Make sure the signatures actually are correct.
+ event.signatures.update(
+ returned_invite.signatures
+ )
+
+ if event.type == EventTypes.Redaction:
+ if self.auth.check_redaction(event, auth_events=context.current_state):
+ original_event = yield self.store.get_event(
+ event.redacts,
+ check_redacted=False,
+ get_prev_content=False,
+ allow_rejected=False,
+ allow_none=False
+ )
+ if event.user_id != original_event.user_id:
+ raise AuthError(
+ 403,
+ "You don't have permission to redact events"
+ )
+
+ if event.type == EventTypes.Create and context.current_state:
+ raise AuthError(
+ 403,
+ "Changing the room create event is forbidden",
+ )
+
+ action_generator = ActionGenerator(self.hs)
+ yield action_generator.handle_push_actions_for_event(
+ event, context, self
+ )
+
+ (event_stream_id, max_stream_id) = yield self.store.persist_event(
+ event, context=context
+ )
+
+ # this intentionally does not yield: we don't care about the result
+ # and don't need to wait for it.
+ preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
+ event_stream_id, max_stream_id
+ )
+
+ destinations = set()
+ for k, s in context.current_state.items():
+ try:
+ if k[0] == EventTypes.Member:
+ if s.content["membership"] == Membership.JOIN:
+ destinations.add(get_domian_from_id(s.state_key))
+ except SynapseError:
+ logger.warn(
+ "Failed to get destination from event %s", s.event_id
+ )
+
+ with PreserveLoggingContext():
+ # Don't block waiting on waking up all the listeners.
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id,
+ extra_users=extra_users
+ )
+
+ # If invite, remove room_state from unsigned before sending.
+ event.unsigned.pop("invite_room_state", None)
+
+ federation_handler.handle_new_event(
+ event, destinations=destinations,
+ )
|