diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/appservice.py | 37 | ||||
-rw-r--r-- | synapse/handlers/message.py | 104 | ||||
-rw-r--r-- | synapse/handlers/register.py | 36 |
3 files changed, 123 insertions, 54 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 3dd3fa2a27..0245197c02 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -18,7 +18,9 @@ from twisted.internet import defer import synapse from synapse.api.constants import EventTypes from synapse.util.metrics import Measure -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import ( + make_deferred_yieldable, preserve_fn, run_in_background, +) import logging @@ -84,11 +86,16 @@ class ApplicationServicesHandler(object): if not events: break + events_by_room = {} for event in events: + events_by_room.setdefault(event.room_id, []).append(event) + + @defer.inlineCallbacks + def handle_event(event): # Gather interested services services = yield self._get_services_for_event(event) if len(services) == 0: - continue # no services need notifying + return # no services need notifying # Do we know this user exists? If not, poke the user # query API for all services which match that user regex. @@ -108,9 +115,33 @@ class ApplicationServicesHandler(object): service, event ) - events_processed_counter.inc_by(len(events)) + @defer.inlineCallbacks + def handle_room_events(events): + for event in events: + yield handle_event(event) + + yield make_deferred_yieldable(defer.gatherResults([ + run_in_background(handle_room_events, evs) + for evs in events_by_room.itervalues() + ], consumeErrors=True)) yield self.store.set_appservice_last_pos(upper_bound) + + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + + synapse.metrics.event_processing_positions.set( + upper_bound, "appservice_sender", + ) + + events_processed_counter.inc_by(len(events)) + + synapse.metrics.event_processing_lag.set( + now - ts, "appservice_sender", + ) + synapse.metrics.event_processing_last_ts.set( + ts, "appservice_sender", + ) finally: self.is_processing = False diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 6de6e13b7b..54cd691f91 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -454,40 +454,39 @@ class EventCreationHandler(object): """ builder = self.event_builder_factory.new(event_dict) - with (yield self.limiter.queue(builder.room_id)): - self.validator.validate_new(builder) - - if builder.type == EventTypes.Member: - membership = builder.content.get("membership", None) - target = UserID.from_string(builder.state_key) - - if membership in {Membership.JOIN, Membership.INVITE}: - # If event doesn't include a display name, add one. - profile = self.profile_handler - content = builder.content - - try: - if "displayname" not in content: - content["displayname"] = yield profile.get_displayname(target) - if "avatar_url" not in content: - content["avatar_url"] = yield profile.get_avatar_url(target) - except Exception as e: - logger.info( - "Failed to get profile information for %r: %s", - target, e - ) + self.validator.validate_new(builder) + + if builder.type == EventTypes.Member: + membership = builder.content.get("membership", None) + target = UserID.from_string(builder.state_key) + + if membership in {Membership.JOIN, Membership.INVITE}: + # If event doesn't include a display name, add one. + profile = self.profile_handler + content = builder.content + + try: + if "displayname" not in content: + content["displayname"] = yield profile.get_displayname(target) + if "avatar_url" not in content: + content["avatar_url"] = yield profile.get_avatar_url(target) + except Exception as e: + logger.info( + "Failed to get profile information for %r: %s", + target, e + ) - if token_id is not None: - builder.internal_metadata.token_id = token_id + if token_id is not None: + builder.internal_metadata.token_id = token_id - if txn_id is not None: - builder.internal_metadata.txn_id = txn_id + if txn_id is not None: + builder.internal_metadata.txn_id = txn_id - event, context = yield self.create_new_client_event( - builder=builder, - requester=requester, - prev_event_ids=prev_event_ids, - ) + event, context = yield self.create_new_client_event( + builder=builder, + requester=requester, + prev_event_ids=prev_event_ids, + ) defer.returnValue((event, context)) @@ -557,27 +556,34 @@ class EventCreationHandler(object): See self.create_event and self.send_nonmember_event. """ - event, context = yield self.create_event( - requester, - event_dict, - token_id=requester.access_token_id, - txn_id=txn_id - ) - spam_error = self.spam_checker.check_event_for_spam(event) - if spam_error: - if not isinstance(spam_error, basestring): - spam_error = "Spam is not permitted here" - raise SynapseError( - 403, spam_error, Codes.FORBIDDEN + # We limit the number of concurrent event sends in a room so that we + # don't fork the DAG too much. If we don't limit then we can end up in + # a situation where event persistence can't keep up, causing + # extremities to pile up, which in turn leads to state resolution + # taking longer. + with (yield self.limiter.queue(event_dict["room_id"])): + event, context = yield self.create_event( + requester, + event_dict, + token_id=requester.access_token_id, + txn_id=txn_id ) - yield self.send_nonmember_event( - requester, - event, - context, - ratelimit=ratelimit, - ) + spam_error = self.spam_checker.check_event_for_spam(event) + if spam_error: + if not isinstance(spam_error, basestring): + spam_error = "Spam is not permitted here" + raise SynapseError( + 403, spam_error, Codes.FORBIDDEN + ) + + yield self.send_nonmember_event( + requester, + event, + context, + ratelimit=ratelimit, + ) defer.returnValue(event) @measure_func("create_new_client_event") diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index dd03705279..f83c6b3cf8 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -23,7 +23,7 @@ from synapse.api.errors import ( ) from synapse.http.client import CaptchaServerHttpClient from synapse import types -from synapse.types import UserID +from synapse.types import UserID, create_requester, RoomID, RoomAlias from synapse.util.async import run_on_reactor, Linearizer from synapse.util.threepids import check_3pid_allowed from ._base import BaseHandler @@ -205,10 +205,17 @@ class RegistrationHandler(BaseHandler): token = None attempts += 1 + # auto-join the user to any rooms we're supposed to dump them into + fake_requester = create_requester(user_id) + for r in self.hs.config.auto_join_rooms: + try: + yield self._join_user_to_room(fake_requester, r) + except Exception as e: + logger.error("Failed to join new user to %r: %r", r, e) + # We used to generate default identicons here, but nowadays # we want clients to generate their own as part of their branding # rather than there being consistent matrix-wide ones, so we don't. - defer.returnValue((user_id, token)) @defer.inlineCallbacks @@ -483,3 +490,28 @@ class RegistrationHandler(BaseHandler): ) defer.returnValue((user_id, access_token)) + + @defer.inlineCallbacks + def _join_user_to_room(self, requester, room_identifier): + room_id = None + room_member_handler = self.hs.get_room_member_handler() + if RoomID.is_valid(room_identifier): + room_id = room_identifier + elif RoomAlias.is_valid(room_identifier): + room_alias = RoomAlias.from_string(room_identifier) + room_id, remote_room_hosts = ( + yield room_member_handler.lookup_room_alias(room_alias) + ) + room_id = room_id.to_string() + else: + raise SynapseError(400, "%s was not legal room ID or room alias" % ( + room_identifier, + )) + + yield room_member_handler.update_membership( + requester=requester, + target=requester.user, + room_id=room_id, + remote_room_hosts=remote_room_hosts, + action="join", + ) |