diff options
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r-- | synapse/handlers/message.py | 354 |
1 files changed, 241 insertions, 113 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 4f97c8db79..81cff0870e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -13,11 +13,23 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import simplejson +import sys + +from canonicaljson import encode_canonical_json +import six +from six import string_types, itervalues, iteritems from twisted.internet import defer, reactor +from twisted.internet.defer import succeed from twisted.python.failure import Failure -from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import AuthError, Codes, SynapseError +from synapse.api.constants import EventTypes, Membership, MAX_DEPTH +from synapse.api.errors import ( + AuthError, Codes, SynapseError, + ConsentNotGivenError, +) +from synapse.api.urls import ConsentURIBuilder from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator @@ -25,21 +37,15 @@ from synapse.types import ( UserID, RoomAlias, RoomStreamToken, ) from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter -from synapse.util.logcontext import preserve_fn, run_in_background +from synapse.util.logcontext import run_in_background from synapse.util.metrics import measure_func -from synapse.util.frozenutils import unfreeze +from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client from synapse.replication.http.send_event import send_event_to_master from ._base import BaseHandler -from canonicaljson import encode_canonical_json - -import logging -import random -import ujson - logger = logging.getLogger(__name__) @@ -86,14 +92,14 @@ class MessageHandler(BaseHandler): # map from purge id to PurgeStatus self._purges_by_id = {} - def start_purge_history(self, room_id, topological_ordering, + def start_purge_history(self, room_id, token, delete_local_events=False): """Start off a history purge on a room. Args: room_id (str): The room to purge from - topological_ordering (int): minimum topo ordering to preserve + token (str): topological token to delete events before delete_local_events (bool): True to delete local events as well as remote ones @@ -115,19 +121,19 @@ class MessageHandler(BaseHandler): self._purges_by_id[purge_id] = PurgeStatus() run_in_background( self._purge_history, - purge_id, room_id, topological_ordering, delete_local_events, + purge_id, room_id, token, delete_local_events, ) return purge_id @defer.inlineCallbacks - def _purge_history(self, purge_id, room_id, topological_ordering, + def _purge_history(self, purge_id, room_id, token, delete_local_events): """Carry out a history purge on a room. Args: purge_id (str): The id for this purge room_id (str): The room to purge from - topological_ordering (int): minimum topo ordering to preserve + token (str): topological token to delete events before delete_local_events (bool): True to delete local events as well as remote ones @@ -138,7 +144,7 @@ class MessageHandler(BaseHandler): try: with (yield self.pagination_lock.write(room_id)): yield self.store.purge_history( - room_id, topological_ordering, delete_local_events, + room_id, token, delete_local_events, ) logger.info("[purge] complete") self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE @@ -397,7 +403,7 @@ class MessageHandler(BaseHandler): "avatar_url": profile.avatar_url, "display_name": profile.display_name, } - for user_id, profile in users_with_profile.iteritems() + for user_id, profile in iteritems(users_with_profile) }) @@ -431,9 +437,12 @@ class EventCreationHandler(object): self.spam_checker = hs.get_spam_checker() + if self.config.block_events_without_consent_error is not None: + self._consent_uri_builder = ConsentURIBuilder(self.config) + @defer.inlineCallbacks def create_event(self, requester, event_dict, token_id=None, txn_id=None, - prev_event_ids=None): + prev_events_and_hashes=None): """ Given a dict from a client, create a new event. @@ -447,50 +456,136 @@ class EventCreationHandler(object): event_dict (dict): An entire event token_id (str) txn_id (str) - prev_event_ids (list): The prev event ids to use when creating the event + + prev_events_and_hashes (list[(str, dict[str, str], int)]|None): + the forward extremities to use as the prev_events for the + new event. For each event, a tuple of (event_id, hashes, depth) + where *hashes* is a map from algorithm to hash. + + If None, they will be requested from the database. Returns: Tuple of created event (FrozenEvent), Context """ builder = self.event_builder_factory.new(event_dict) - with (yield self.limiter.queue(builder.room_id)): - self.validator.validate_new(builder) - - if builder.type == EventTypes.Member: - membership = builder.content.get("membership", None) - target = UserID.from_string(builder.state_key) - - if membership in {Membership.JOIN, Membership.INVITE}: - # If event doesn't include a display name, add one. - profile = self.profile_handler - content = builder.content - - try: - if "displayname" not in content: - content["displayname"] = yield profile.get_displayname(target) - if "avatar_url" not in content: - content["avatar_url"] = yield profile.get_avatar_url(target) - except Exception as e: - logger.info( - "Failed to get profile information for %r: %s", - target, e - ) + self.validator.validate_new(builder) + + if builder.type == EventTypes.Member: + membership = builder.content.get("membership", None) + target = UserID.from_string(builder.state_key) + + if membership in {Membership.JOIN, Membership.INVITE}: + # If event doesn't include a display name, add one. + profile = self.profile_handler + content = builder.content + + try: + if "displayname" not in content: + content["displayname"] = yield profile.get_displayname(target) + if "avatar_url" not in content: + content["avatar_url"] = yield profile.get_avatar_url(target) + except Exception as e: + logger.info( + "Failed to get profile information for %r: %s", + target, e + ) - if token_id is not None: - builder.internal_metadata.token_id = token_id + is_exempt = yield self._is_exempt_from_privacy_policy(builder) + if not is_exempt: + yield self.assert_accepted_privacy_policy(requester) - if txn_id is not None: - builder.internal_metadata.txn_id = txn_id + if token_id is not None: + builder.internal_metadata.token_id = token_id - event, context = yield self.create_new_client_event( - builder=builder, - requester=requester, - prev_event_ids=prev_event_ids, - ) + if txn_id is not None: + builder.internal_metadata.txn_id = txn_id + + event, context = yield self.create_new_client_event( + builder=builder, + requester=requester, + prev_events_and_hashes=prev_events_and_hashes, + ) defer.returnValue((event, context)) + def _is_exempt_from_privacy_policy(self, builder): + """"Determine if an event to be sent is exempt from having to consent + to the privacy policy + + Args: + builder (synapse.events.builder.EventBuilder): event being created + + Returns: + Deferred[bool]: true if the event can be sent without the user + consenting + """ + # the only thing the user can do is join the server notices room. + if builder.type == EventTypes.Member: + membership = builder.content.get("membership", None) + if membership == Membership.JOIN: + return self._is_server_notices_room(builder.room_id) + return succeed(False) + + @defer.inlineCallbacks + def _is_server_notices_room(self, room_id): + if self.config.server_notices_mxid is None: + defer.returnValue(False) + user_ids = yield self.store.get_users_in_room(room_id) + defer.returnValue(self.config.server_notices_mxid in user_ids) + + @defer.inlineCallbacks + def assert_accepted_privacy_policy(self, requester): + """Check if a user has accepted the privacy policy + + Called when the given user is about to do something that requires + privacy consent. We see if the user is exempt and otherwise check that + they have given consent. If they have not, a ConsentNotGiven error is + raised. + + Args: + requester (synapse.types.Requester): + The user making the request + + Returns: + Deferred[None]: returns normally if the user has consented or is + exempt + + Raises: + ConsentNotGivenError: if the user has not given consent yet + """ + if self.config.block_events_without_consent_error is None: + return + + # exempt AS users from needing consent + if requester.app_service is not None: + return + + user_id = requester.user.to_string() + + # exempt the system notices user + if ( + self.config.server_notices_mxid is not None and + user_id == self.config.server_notices_mxid + ): + return + + u = yield self.store.get_user_by_id(user_id) + assert u is not None + if u["consent_version"] == self.config.user_consent_version: + return + + consent_uri = self._consent_uri_builder.build_user_consent_uri( + requester.user.localpart, + ) + msg = self.config.block_events_without_consent_error % { + 'consent_uri': consent_uri, + } + raise ConsentNotGivenError( + msg=msg, + consent_uri=consent_uri, + ) + @defer.inlineCallbacks def send_nonmember_event(self, requester, event, context, ratelimit=True): """ @@ -557,64 +652,80 @@ class EventCreationHandler(object): See self.create_event and self.send_nonmember_event. """ - event, context = yield self.create_event( - requester, - event_dict, - token_id=requester.access_token_id, - txn_id=txn_id - ) - spam_error = self.spam_checker.check_event_for_spam(event) - if spam_error: - if not isinstance(spam_error, basestring): - spam_error = "Spam is not permitted here" - raise SynapseError( - 403, spam_error, Codes.FORBIDDEN + # We limit the number of concurrent event sends in a room so that we + # don't fork the DAG too much. If we don't limit then we can end up in + # a situation where event persistence can't keep up, causing + # extremities to pile up, which in turn leads to state resolution + # taking longer. + with (yield self.limiter.queue(event_dict["room_id"])): + event, context = yield self.create_event( + requester, + event_dict, + token_id=requester.access_token_id, + txn_id=txn_id ) - yield self.send_nonmember_event( - requester, - event, - context, - ratelimit=ratelimit, - ) + spam_error = self.spam_checker.check_event_for_spam(event) + if spam_error: + if not isinstance(spam_error, string_types): + spam_error = "Spam is not permitted here" + raise SynapseError( + 403, spam_error, Codes.FORBIDDEN + ) + + yield self.send_nonmember_event( + requester, + event, + context, + ratelimit=ratelimit, + ) defer.returnValue(event) @measure_func("create_new_client_event") @defer.inlineCallbacks - def create_new_client_event(self, builder, requester=None, prev_event_ids=None): - if prev_event_ids: - prev_events = yield self.store.add_event_hashes(prev_event_ids) - prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) - depth = prev_max_depth + 1 - else: - latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room( - builder.room_id, - ) + def create_new_client_event(self, builder, requester=None, + prev_events_and_hashes=None): + """Create a new event for a local client - # We want to limit the max number of prev events we point to in our - # new event - if len(latest_ret) > 10: - # Sort by reverse depth, so we point to the most recent. - latest_ret.sort(key=lambda a: -a[2]) - new_latest_ret = latest_ret[:5] - - # We also randomly point to some of the older events, to make - # sure that we don't completely ignore the older events. - if latest_ret[5:]: - sample_size = min(5, len(latest_ret[5:])) - new_latest_ret.extend(random.sample(latest_ret[5:], sample_size)) - latest_ret = new_latest_ret - - if latest_ret: - depth = max([d for _, _, d in latest_ret]) + 1 - else: - depth = 1 + Args: + builder (EventBuilder): + + requester (synapse.types.Requester|None): + + prev_events_and_hashes (list[(str, dict[str, str], int)]|None): + the forward extremities to use as the prev_events for the + new event. For each event, a tuple of (event_id, hashes, depth) + where *hashes* is a map from algorithm to hash. - prev_events = [ - (event_id, prev_hashes) - for event_id, prev_hashes, _ in latest_ret - ] + If None, they will be requested from the database. + + Returns: + Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)] + """ + + if prev_events_and_hashes is not None: + assert len(prev_events_and_hashes) <= 10, \ + "Attempting to create an event with %i prev_events" % ( + len(prev_events_and_hashes), + ) + else: + prev_events_and_hashes = \ + yield self.store.get_prev_events_for_room(builder.room_id) + + if prev_events_and_hashes: + depth = max([d for _, _, d in prev_events_and_hashes]) + 1 + # we cap depth of generated events, to ensure that they are not + # rejected by other servers (and so that they can be persisted in + # the db) + depth = min(depth, MAX_DEPTH) + else: + depth = 1 + + prev_events = [ + (event_id, prev_hashes) + for event_id, prev_hashes, _ in prev_events_and_hashes + ] builder.prev_events = prev_events builder.depth = depth @@ -678,8 +789,8 @@ class EventCreationHandler(object): # Ensure that we can round trip before trying to persist in db try: - dump = ujson.dumps(unfreeze(event.content)) - ujson.loads(dump) + dump = frozendict_json_encoder.encode(event.content) + simplejson.loads(dump) except Exception: logger.exception("Failed to encode content: %r", event.content) raise @@ -713,8 +824,14 @@ class EventCreationHandler(object): except: # noqa: E722, as we reraise the exception this is fine. # Ensure that we actually remove the entries in the push actions # staging area, if we calculated them. - preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id) - raise + tp, value, tb = sys.exc_info() + + run_in_background( + self.store.remove_push_actions_from_staging, + event.event_id, + ) + + six.reraise(tp, value, tb) @defer.inlineCallbacks def persist_and_notify_client_event( @@ -765,7 +882,7 @@ class EventCreationHandler(object): state_to_include_ids = [ e_id - for k, e_id in context.current_state_ids.iteritems() + for k, e_id in iteritems(context.current_state_ids) if k[0] in self.hs.config.room_invite_state_types or k == (EventTypes.Member, event.sender) ] @@ -779,7 +896,7 @@ class EventCreationHandler(object): "content": e.content, "sender": e.sender, } - for e in state_to_include.itervalues() + for e in itervalues(state_to_include) ] invitee = UserID.from_string(event.state_key) @@ -834,22 +951,33 @@ class EventCreationHandler(object): # this intentionally does not yield: we don't care about the result # and don't need to wait for it. - preserve_fn(self.pusher_pool.on_new_notifications)( + run_in_background( + self.pusher_pool.on_new_notifications, event_stream_id, max_stream_id ) @defer.inlineCallbacks def _notify(): yield run_on_reactor() - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=extra_users - ) + try: + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) + except Exception: + logger.exception("Error notifying about new room event") - preserve_fn(_notify)() + run_in_background(_notify) if event.type == EventTypes.Message: - presence = self.hs.get_presence_handler() # We don't want to block sending messages on any presence code. This # matters as sometimes presence code can take a while. - preserve_fn(presence.bump_presence_active_time)(requester.user) + run_in_background(self._bump_active_time, requester.user) + + @defer.inlineCallbacks + def _bump_active_time(self, user): + try: + presence = self.hs.get_presence_handler() + yield presence.bump_presence_active_time(user) + except Exception: + logger.exception("Error bumping presence active time") |