diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 24c9ffdb20..39d7724778 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2017 - 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,173 +13,185 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import sys
+
+import six
+from six import iteritems, itervalues, string_types
+
+from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
+from twisted.internet.defer import succeed
-from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import AuthError, Codes, SynapseError
+from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
+from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
+from synapse.api.urls import ConsentURIBuilder
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
-from synapse.types import (
- UserID, RoomAlias, RoomStreamToken,
-)
-from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
-from synapse.util.logcontext import preserve_fn
+from synapse.replication.http.send_event import send_event_to_master
+from synapse.types import RoomAlias, UserID
+from synapse.util.async import Linearizer
+from synapse.util.frozenutils import frozendict_json_encoder
+from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
-from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
-from canonicaljson import encode_canonical_json
-
-import logging
-import random
-import ujson
-
logger = logging.getLogger(__name__)
-class MessageHandler(BaseHandler):
+class MessageHandler(object):
+ """Contains some read only APIs to get state about a room
+ """
def __init__(self, hs):
- super(MessageHandler, self).__init__(hs)
- self.hs = hs
- self.state = hs.get_state_handler()
+ self.auth = hs.get_auth()
self.clock = hs.get_clock()
- self.validator = EventValidator()
-
- self.pagination_lock = ReadWriteLock()
-
- # We arbitrarily limit concurrent event creation for a room to 5.
- # This is to stop us from diverging history *too* much.
- self.limiter = Limiter(max_count=5)
-
- self.action_generator = hs.get_action_generator()
+ self.state = hs.get_state_handler()
+ self.store = hs.get_datastore()
@defer.inlineCallbacks
- def purge_history(self, room_id, event_id):
- event = yield self.store.get_event(event_id)
+ def get_room_data(self, user_id=None, room_id=None,
+ event_type=None, state_key="", is_guest=False):
+ """ Get data from a room.
- if event.room_id != room_id:
- raise SynapseError(400, "Event is for wrong room.")
+ Args:
+ event : The room path event
+ Returns:
+ The path data content.
+ Raises:
+ SynapseError if something went wrong.
+ """
+ membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
+ room_id, user_id
+ )
- depth = event.depth
+ if membership == Membership.JOIN:
+ data = yield self.state.get_current_state(
+ room_id, event_type, state_key
+ )
+ elif membership == Membership.LEAVE:
+ key = (event_type, state_key)
+ room_state = yield self.store.get_state_for_events(
+ [membership_event_id], [key]
+ )
+ data = room_state[membership_event_id].get(key)
- with (yield self.pagination_lock.write(room_id)):
- yield self.store.delete_old_state(room_id, depth)
+ defer.returnValue(data)
@defer.inlineCallbacks
- def get_messages(self, requester, room_id=None, pagin_config=None,
- as_client_event=True, event_filter=None):
- """Get messages in a room.
+ def get_state_events(self, user_id, room_id, is_guest=False):
+ """Retrieve all state events for a given room. If the user is
+ joined to the room then return the current state. If the user has
+ left the room return the state events from when they left.
Args:
- requester (Requester): The user requesting messages.
- room_id (str): The room they want messages from.
- pagin_config (synapse.api.streams.PaginationConfig): The pagination
- config rules to apply, if any.
- as_client_event (bool): True to get events in client-server format.
- event_filter (Filter): Filter to apply to results or None
+ user_id(str): The user requesting state events.
+ room_id(str): The room ID to get all state events from.
Returns:
- dict: Pagination API results
+ A list of dicts representing state events. [{}, {}, {}]
"""
- user_id = requester.user.to_string()
+ membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
+ room_id, user_id
+ )
- if pagin_config.from_token:
- room_token = pagin_config.from_token.room_key
- else:
- pagin_config.from_token = (
- yield self.hs.get_event_sources().get_current_token_for_room(
- room_id=room_id
- )
+ if membership == Membership.JOIN:
+ room_state = yield self.state.get_current_state(room_id)
+ elif membership == Membership.LEAVE:
+ room_state = yield self.store.get_state_for_events(
+ [membership_event_id], None
)
- room_token = pagin_config.from_token.room_key
-
- room_token = RoomStreamToken.parse(room_token)
+ room_state = room_state[membership_event_id]
- pagin_config.from_token = pagin_config.from_token.copy_and_replace(
- "room_key", str(room_token)
+ now = self.clock.time_msec()
+ defer.returnValue(
+ [serialize_event(c, now) for c in room_state.values()]
)
- source_config = pagin_config.get_source_config("room")
+ @defer.inlineCallbacks
+ def get_joined_members(self, requester, room_id):
+ """Get all the joined members in the room and their profile information.
+
+ If the user has left the room return the state events from when they left.
- with (yield self.pagination_lock.read(room_id)):
- membership, member_event_id = yield self._check_in_room_or_world_readable(
+ Args:
+ requester(Requester): The user requesting state events.
+ room_id(str): The room ID to get all state events from.
+ Returns:
+ A dict of user_id to profile info
+ """
+ user_id = requester.user.to_string()
+ if not requester.app_service:
+ # We check AS auth after fetching the room membership, as it
+ # requires us to pull out all joined members anyway.
+ membership, _ = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
+ if membership != Membership.JOIN:
+ raise NotImplementedError(
+ "Getting joined members after leaving is not implemented"
+ )
- if source_config.direction == 'b':
- # if we're going backwards, we might need to backfill. This
- # requires that we have a topo token.
- if room_token.topological:
- max_topo = room_token.topological
- else:
- max_topo = yield self.store.get_max_topological_token(
- room_id, room_token.stream
- )
+ users_with_profile = yield self.state.get_current_user_in_room(room_id)
- if membership == Membership.LEAVE:
- # If they have left the room then clamp the token to be before
- # they left the room, to save the effort of loading from the
- # database.
- leave_token = yield self.store.get_topological_token_for_event(
- member_event_id
- )
- leave_token = RoomStreamToken.parse(leave_token)
- if leave_token.topological < max_topo:
- source_config.from_key = str(leave_token)
+ # If this is an AS, double check that they are allowed to see the members.
+ # This can either be because the AS user is in the room or because there
+ # is a user in the room that the AS is "interested in"
+ if requester.app_service and user_id not in users_with_profile:
+ for uid in users_with_profile:
+ if requester.app_service.is_interested_in_user(uid):
+ break
+ else:
+ # Loop fell through, AS has no interested users in room
+ raise AuthError(403, "Appservice not in room")
- yield self.hs.get_handlers().federation_handler.maybe_backfill(
- room_id, max_topo
- )
+ defer.returnValue({
+ user_id: {
+ "avatar_url": profile.avatar_url,
+ "display_name": profile.display_name,
+ }
+ for user_id, profile in iteritems(users_with_profile)
+ })
- events, next_key = yield self.store.paginate_room_events(
- room_id=room_id,
- from_key=source_config.from_key,
- to_key=source_config.to_key,
- direction=source_config.direction,
- limit=source_config.limit,
- event_filter=event_filter,
- )
- next_token = pagin_config.from_token.copy_and_replace(
- "room_key", next_key
- )
+class EventCreationHandler(object):
+ def __init__(self, hs):
+ self.hs = hs
+ self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
+ self.state = hs.get_state_handler()
+ self.clock = hs.get_clock()
+ self.validator = EventValidator()
+ self.profile_handler = hs.get_profile_handler()
+ self.event_builder_factory = hs.get_event_builder_factory()
+ self.server_name = hs.hostname
+ self.ratelimiter = hs.get_ratelimiter()
+ self.notifier = hs.get_notifier()
+ self.config = hs.config
- if not events:
- defer.returnValue({
- "chunk": [],
- "start": pagin_config.from_token.to_string(),
- "end": next_token.to_string(),
- })
-
- if event_filter:
- events = event_filter.filter(events)
-
- events = yield filter_events_for_client(
- self.store,
- user_id,
- events,
- is_peeking=(member_event_id is None),
- )
+ self.http_client = hs.get_simple_http_client()
- time_now = self.clock.time_msec()
+ # This is only used to get at ratelimit function, and maybe_kick_guest_users
+ self.base_handler = BaseHandler(hs)
- chunk = {
- "chunk": [
- serialize_event(e, time_now, as_client_event)
- for e in events
- ],
- "start": pagin_config.from_token.to_string(),
- "end": next_token.to_string(),
- }
+ self.pusher_pool = hs.get_pusherpool()
- defer.returnValue(chunk)
+ # We arbitrarily limit concurrent event creation for a room to 5.
+ # This is to stop us from diverging history *too* much.
+ self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
+
+ self.action_generator = hs.get_action_generator()
+
+ self.spam_checker = hs.get_spam_checker()
+
+ if self.config.block_events_without_consent_error is not None:
+ self._consent_uri_builder = ConsentURIBuilder(self.config)
@defer.inlineCallbacks
def create_event(self, requester, event_dict, token_id=None, txn_id=None,
- prev_event_ids=None):
+ prev_events_and_hashes=None):
"""
Given a dict from a client, create a new event.
@@ -192,50 +205,143 @@ class MessageHandler(BaseHandler):
event_dict (dict): An entire event
token_id (str)
txn_id (str)
- prev_event_ids (list): The prev event ids to use when creating the event
+
+ prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
+ the forward extremities to use as the prev_events for the
+ new event. For each event, a tuple of (event_id, hashes, depth)
+ where *hashes* is a map from algorithm to hash.
+
+ If None, they will be requested from the database.
Returns:
Tuple of created event (FrozenEvent), Context
"""
builder = self.event_builder_factory.new(event_dict)
- with (yield self.limiter.queue(builder.room_id)):
- self.validator.validate_new(builder)
-
- if builder.type == EventTypes.Member:
- membership = builder.content.get("membership", None)
- target = UserID.from_string(builder.state_key)
-
- if membership in {Membership.JOIN, Membership.INVITE}:
- # If event doesn't include a display name, add one.
- profile = self.hs.get_handlers().profile_handler
- content = builder.content
-
- try:
- if "displayname" not in content:
- content["displayname"] = yield profile.get_displayname(target)
- if "avatar_url" not in content:
- content["avatar_url"] = yield profile.get_avatar_url(target)
- except Exception as e:
- logger.info(
- "Failed to get profile information for %r: %s",
- target, e
- )
+ self.validator.validate_new(builder)
+
+ if builder.type == EventTypes.Member:
+ membership = builder.content.get("membership", None)
+ target = UserID.from_string(builder.state_key)
+
+ if membership in {Membership.JOIN, Membership.INVITE}:
+ # If event doesn't include a display name, add one.
+ profile = self.profile_handler
+ content = builder.content
+
+ try:
+ if "displayname" not in content:
+ content["displayname"] = yield profile.get_displayname(target)
+ if "avatar_url" not in content:
+ content["avatar_url"] = yield profile.get_avatar_url(target)
+ except Exception as e:
+ logger.info(
+ "Failed to get profile information for %r: %s",
+ target, e
+ )
- if token_id is not None:
- builder.internal_metadata.token_id = token_id
+ is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
+ if not is_exempt:
+ yield self.assert_accepted_privacy_policy(requester)
- if txn_id is not None:
- builder.internal_metadata.txn_id = txn_id
+ if token_id is not None:
+ builder.internal_metadata.token_id = token_id
- event, context = yield self._create_new_client_event(
- builder=builder,
- requester=requester,
- prev_event_ids=prev_event_ids,
- )
+ if txn_id is not None:
+ builder.internal_metadata.txn_id = txn_id
+
+ event, context = yield self.create_new_client_event(
+ builder=builder,
+ requester=requester,
+ prev_events_and_hashes=prev_events_and_hashes,
+ )
defer.returnValue((event, context))
+ def _is_exempt_from_privacy_policy(self, builder, requester):
+ """"Determine if an event to be sent is exempt from having to consent
+ to the privacy policy
+
+ Args:
+ builder (synapse.events.builder.EventBuilder): event being created
+ requester (Requster): user requesting this event
+
+ Returns:
+ Deferred[bool]: true if the event can be sent without the user
+ consenting
+ """
+ # the only thing the user can do is join the server notices room.
+ if builder.type == EventTypes.Member:
+ membership = builder.content.get("membership", None)
+ if membership == Membership.JOIN:
+ return self._is_server_notices_room(builder.room_id)
+ elif membership == Membership.LEAVE:
+ # the user is always allowed to leave (but not kick people)
+ return builder.state_key == requester.user.to_string()
+ return succeed(False)
+
+ @defer.inlineCallbacks
+ def _is_server_notices_room(self, room_id):
+ if self.config.server_notices_mxid is None:
+ defer.returnValue(False)
+ user_ids = yield self.store.get_users_in_room(room_id)
+ defer.returnValue(self.config.server_notices_mxid in user_ids)
+
+ @defer.inlineCallbacks
+ def assert_accepted_privacy_policy(self, requester):
+ """Check if a user has accepted the privacy policy
+
+ Called when the given user is about to do something that requires
+ privacy consent. We see if the user is exempt and otherwise check that
+ they have given consent. If they have not, a ConsentNotGiven error is
+ raised.
+
+ Args:
+ requester (synapse.types.Requester):
+ The user making the request
+
+ Returns:
+ Deferred[None]: returns normally if the user has consented or is
+ exempt
+
+ Raises:
+ ConsentNotGivenError: if the user has not given consent yet
+ """
+ if self.config.block_events_without_consent_error is None:
+ return
+
+ # exempt AS users from needing consent
+ if requester.app_service is not None:
+ return
+
+ user_id = requester.user.to_string()
+
+ # exempt the system notices user
+ if (
+ self.config.server_notices_mxid is not None and
+ user_id == self.config.server_notices_mxid
+ ):
+ return
+
+ u = yield self.store.get_user_by_id(user_id)
+ assert u is not None
+ if u["appservice_id"] is not None:
+ # users registered by an appservice are exempt
+ return
+ if u["consent_version"] == self.config.user_consent_version:
+ return
+
+ consent_uri = self._consent_uri_builder.build_user_consent_uri(
+ requester.user.localpart,
+ )
+ msg = self.config.block_events_without_consent_error % {
+ 'consent_uri': consent_uri,
+ }
+ raise ConsentNotGivenError(
+ msg=msg,
+ consent_uri=consent_uri,
+ )
+
@defer.inlineCallbacks
def send_nonmember_event(self, requester, event, context, ratelimit=True):
"""
@@ -253,11 +359,6 @@ class MessageHandler(BaseHandler):
"Tried to send member event through non-member codepath"
)
- # We check here if we are currently being rate limited, so that we
- # don't do unnecessary work. We check again just before we actually
- # send the event.
- yield self.ratelimit(requester, update=False)
-
user = UserID.from_string(event.sender)
assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
@@ -274,12 +375,6 @@ class MessageHandler(BaseHandler):
ratelimit=ratelimit,
)
- if event.type == EventTypes.Message:
- presence = self.hs.get_presence_handler()
- # We don't want to block sending messages on any presence code. This
- # matters as sometimes presence code can take a while.
- preserve_fn(presence.bump_presence_active_time)(user)
-
@defer.inlineCallbacks
def deduplicate_state_event(self, event, context):
"""
@@ -288,7 +383,8 @@ class MessageHandler(BaseHandler):
If so, returns the version of the event in context.
Otherwise, returns None.
"""
- prev_event_id = context.prev_state_ids.get((event.type, event.state_key))
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ prev_event_id = prev_state_ids.get((event.type, event.state_key))
prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
if not prev_event:
return
@@ -313,145 +409,85 @@ class MessageHandler(BaseHandler):
See self.create_event and self.send_nonmember_event.
"""
- event, context = yield self.create_event(
- requester,
- event_dict,
- token_id=requester.access_token_id,
- txn_id=txn_id
- )
- yield self.send_nonmember_event(
- requester,
- event,
- context,
- ratelimit=ratelimit,
- )
- defer.returnValue(event)
- @defer.inlineCallbacks
- def get_room_data(self, user_id=None, room_id=None,
- event_type=None, state_key="", is_guest=False):
- """ Get data from a room.
-
- Args:
- event : The room path event
- Returns:
- The path data content.
- Raises:
- SynapseError if something went wrong.
- """
- membership, membership_event_id = yield self._check_in_room_or_world_readable(
- room_id, user_id
- )
-
- if membership == Membership.JOIN:
- data = yield self.state_handler.get_current_state(
- room_id, event_type, state_key
- )
- elif membership == Membership.LEAVE:
- key = (event_type, state_key)
- room_state = yield self.store.get_state_for_events(
- [membership_event_id], [key]
+ # We limit the number of concurrent event sends in a room so that we
+ # don't fork the DAG too much. If we don't limit then we can end up in
+ # a situation where event persistence can't keep up, causing
+ # extremities to pile up, which in turn leads to state resolution
+ # taking longer.
+ with (yield self.limiter.queue(event_dict["room_id"])):
+ event, context = yield self.create_event(
+ requester,
+ event_dict,
+ token_id=requester.access_token_id,
+ txn_id=txn_id
)
- data = room_state[membership_event_id].get(key)
- defer.returnValue(data)
+ spam_error = self.spam_checker.check_event_for_spam(event)
+ if spam_error:
+ if not isinstance(spam_error, string_types):
+ spam_error = "Spam is not permitted here"
+ raise SynapseError(
+ 403, spam_error, Codes.FORBIDDEN
+ )
- @defer.inlineCallbacks
- def _check_in_room_or_world_readable(self, room_id, user_id):
- try:
- # check_user_was_in_room will return the most recent membership
- # event for the user if:
- # * The user is a non-guest user, and was ever in the room
- # * The user is a guest user, and has joined the room
- # else it will throw.
- member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
- defer.returnValue((member_event.membership, member_event.event_id))
- return
- except AuthError:
- visibility = yield self.state_handler.get_current_state(
- room_id, EventTypes.RoomHistoryVisibility, ""
- )
- if (
- visibility and
- visibility.content["history_visibility"] == "world_readable"
- ):
- defer.returnValue((Membership.JOIN, None))
- return
- raise AuthError(
- 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
+ yield self.send_nonmember_event(
+ requester,
+ event,
+ context,
+ ratelimit=ratelimit,
)
+ defer.returnValue(event)
+ @measure_func("create_new_client_event")
@defer.inlineCallbacks
- def get_state_events(self, user_id, room_id, is_guest=False):
- """Retrieve all state events for a given room. If the user is
- joined to the room then return the current state. If the user has
- left the room return the state events from when they left.
+ def create_new_client_event(self, builder, requester=None,
+ prev_events_and_hashes=None):
+ """Create a new event for a local client
Args:
- user_id(str): The user requesting state events.
- room_id(str): The room ID to get all state events from.
+ builder (EventBuilder):
+
+ requester (synapse.types.Requester|None):
+
+ prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
+ the forward extremities to use as the prev_events for the
+ new event. For each event, a tuple of (event_id, hashes, depth)
+ where *hashes* is a map from algorithm to hash.
+
+ If None, they will be requested from the database.
+
Returns:
- A list of dicts representing state events. [{}, {}, {}]
+ Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
"""
- membership, membership_event_id = yield self._check_in_room_or_world_readable(
- room_id, user_id
- )
- if membership == Membership.JOIN:
- room_state = yield self.state_handler.get_current_state(room_id)
- elif membership == Membership.LEAVE:
- room_state = yield self.store.get_state_for_events(
- [membership_event_id], None
+ if prev_events_and_hashes is not None:
+ assert len(prev_events_and_hashes) <= 10, \
+ "Attempting to create an event with %i prev_events" % (
+ len(prev_events_and_hashes),
)
- room_state = room_state[membership_event_id]
-
- now = self.clock.time_msec()
- defer.returnValue(
- [serialize_event(c, now) for c in room_state.values()]
- )
-
- @measure_func("_create_new_client_event")
- @defer.inlineCallbacks
- def _create_new_client_event(self, builder, requester=None, prev_event_ids=None):
- if prev_event_ids:
- prev_events = yield self.store.add_event_hashes(prev_event_ids)
- prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
- depth = prev_max_depth + 1
else:
- latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
- builder.room_id,
- )
-
- # We want to limit the max number of prev events we point to in our
- # new event
- if len(latest_ret) > 10:
- # Sort by reverse depth, so we point to the most recent.
- latest_ret.sort(key=lambda a: -a[2])
- new_latest_ret = latest_ret[:5]
-
- # We also randomly point to some of the older events, to make
- # sure that we don't completely ignore the older events.
- if latest_ret[5:]:
- sample_size = min(5, len(latest_ret[5:]))
- new_latest_ret.extend(random.sample(latest_ret[5:], sample_size))
- latest_ret = new_latest_ret
-
- if latest_ret:
- depth = max([d for _, _, d in latest_ret]) + 1
- else:
- depth = 1
+ prev_events_and_hashes = \
+ yield self.store.get_prev_events_for_room(builder.room_id)
+
+ if prev_events_and_hashes:
+ depth = max([d for _, _, d in prev_events_and_hashes]) + 1
+ # we cap depth of generated events, to ensure that they are not
+ # rejected by other servers (and so that they can be persisted in
+ # the db)
+ depth = min(depth, MAX_DEPTH)
+ else:
+ depth = 1
- prev_events = [
- (event_id, prev_hashes)
- for event_id, prev_hashes, _ in latest_ret
- ]
+ prev_events = [
+ (event_id, prev_hashes)
+ for event_id, prev_hashes, _ in prev_events_and_hashes
+ ]
builder.prev_events = prev_events
builder.depth = depth
- state_handler = self.state_handler
-
- context = yield state_handler.compute_event_context(builder)
+ context = yield self.state.compute_event_context(builder)
if requester:
context.app_service = requester.app_service
@@ -470,8 +506,8 @@ class MessageHandler(BaseHandler):
event = builder.build()
logger.debug(
- "Created event %s with state: %s",
- event.event_id, context.prev_state_ids,
+ "Created event %s",
+ event.event_id,
)
defer.returnValue(
@@ -486,12 +522,21 @@ class MessageHandler(BaseHandler):
event,
context,
ratelimit=True,
- extra_users=[]
+ extra_users=[],
):
- # We now need to go and hit out to wherever we need to hit out to.
+ """Processes a new event. This includes checking auth, persisting it,
+ notifying users, sending to remote servers, etc.
- if ratelimit:
- yield self.ratelimit(requester)
+ If called from a worker will hit out to the master process for final
+ processing.
+
+ Args:
+ requester (Requester)
+ event (FrozenEvent)
+ context (EventContext)
+ ratelimit (bool)
+ extra_users (list(UserID)): Any extra users to notify about event
+ """
try:
yield self.auth.check_from_context(event, context)
@@ -501,13 +546,72 @@ class MessageHandler(BaseHandler):
# Ensure that we can round trip before trying to persist in db
try:
- dump = ujson.dumps(event.content)
- ujson.loads(dump)
- except:
+ dump = frozendict_json_encoder.encode(event.content)
+ json.loads(dump)
+ except Exception:
logger.exception("Failed to encode content: %r", event.content)
raise
- yield self.maybe_kick_guest_users(event, context)
+ yield self.action_generator.handle_push_actions_for_event(
+ event, context
+ )
+
+ try:
+ # If we're a worker we need to hit out to the master.
+ if self.config.worker_app:
+ yield send_event_to_master(
+ clock=self.hs.get_clock(),
+ store=self.store,
+ client=self.http_client,
+ host=self.config.worker_replication_host,
+ port=self.config.worker_replication_http_port,
+ requester=requester,
+ event=event,
+ context=context,
+ ratelimit=ratelimit,
+ extra_users=extra_users,
+ )
+ return
+
+ yield self.persist_and_notify_client_event(
+ requester,
+ event,
+ context,
+ ratelimit=ratelimit,
+ extra_users=extra_users,
+ )
+ except: # noqa: E722, as we reraise the exception this is fine.
+ # Ensure that we actually remove the entries in the push actions
+ # staging area, if we calculated them.
+ tp, value, tb = sys.exc_info()
+
+ run_in_background(
+ self.store.remove_push_actions_from_staging,
+ event.event_id,
+ )
+
+ six.reraise(tp, value, tb)
+
+ @defer.inlineCallbacks
+ def persist_and_notify_client_event(
+ self,
+ requester,
+ event,
+ context,
+ ratelimit=True,
+ extra_users=[],
+ ):
+ """Called when we have fully built the event, have already
+ calculated the push actions for the event, and checked auth.
+
+ This should only be run on master.
+ """
+ assert not self.config.worker_app
+
+ if ratelimit:
+ yield self.base_handler.ratelimit(requester)
+
+ yield self.base_handler.maybe_kick_guest_users(event, context)
if event.type == EventTypes.CanonicalAlias:
# Check the alias is acually valid (at this time at least)
@@ -535,9 +639,11 @@ class MessageHandler(BaseHandler):
e.sender == event.sender
)
+ current_state_ids = yield context.get_current_state_ids(self.store)
+
state_to_include_ids = [
e_id
- for k, e_id in context.current_state_ids.iteritems()
+ for k, e_id in iteritems(current_state_ids)
if k[0] in self.hs.config.room_invite_state_types
or k == (EventTypes.Member, event.sender)
]
@@ -551,7 +657,7 @@ class MessageHandler(BaseHandler):
"content": e.content,
"sender": e.sender,
}
- for e in state_to_include.itervalues()
+ for e in itervalues(state_to_include)
]
invitee = UserID.from_string(event.state_key)
@@ -573,8 +679,9 @@ class MessageHandler(BaseHandler):
)
if event.type == EventTypes.Redaction:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids, for_verification=True,
+ event, prev_state_ids, for_verification=True,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@@ -594,15 +701,13 @@ class MessageHandler(BaseHandler):
"You don't have permission to redact events"
)
- if event.type == EventTypes.Create and context.prev_state_ids:
- raise AuthError(
- 403,
- "Changing the room create event is forbidden",
- )
-
- yield self.action_generator.handle_push_actions_for_event(
- event, context
- )
+ if event.type == EventTypes.Create:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ if prev_state_ids:
+ raise AuthError(
+ 403,
+ "Changing the room create event is forbidden",
+ )
(event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context=context
@@ -610,16 +715,31 @@ class MessageHandler(BaseHandler):
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
- preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
+ run_in_background(
+ self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id
)
- @defer.inlineCallbacks
def _notify():
- yield run_on_reactor()
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=extra_users
- )
+ try:
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id,
+ extra_users=extra_users
+ )
+ except Exception:
+ logger.exception("Error notifying about new room event")
+
+ run_in_background(_notify)
- preserve_fn(_notify)()
+ if event.type == EventTypes.Message:
+ # We don't want to block sending messages on any presence code. This
+ # matters as sometimes presence code can take a while.
+ run_in_background(self._bump_active_time, requester.user)
+
+ @defer.inlineCallbacks
+ def _bump_active_time(self, user):
+ try:
+ presence = self.hs.get_presence_handler()
+ yield presence.bump_presence_active_time(user)
+ except Exception:
+ logger.exception("Error bumping presence active time")
|