diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/_base.py | 16 | ||||
-rw-r--r-- | synapse/handlers/appservice.py | 7 | ||||
-rw-r--r-- | synapse/handlers/directory.py | 5 | ||||
-rw-r--r-- | synapse/handlers/events.py | 8 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 342 | ||||
-rw-r--r-- | synapse/handlers/message.py | 19 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 498 | ||||
-rw-r--r-- | synapse/handlers/profile.py | 20 | ||||
-rw-r--r-- | synapse/handlers/room.py | 13 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 2 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 6 |
11 files changed, 704 insertions, 232 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 4b3f4eadab..833ff41377 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -20,6 +20,8 @@ from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.api.constants import Membership, EventTypes from synapse.types import UserID +from synapse.util.logcontext import PreserveLoggingContext + import logging @@ -103,7 +105,9 @@ class BaseHandler(object): if not suppress_auth: self.auth.check(event, auth_events=context.current_state) - yield self.store.persist_event(event, context=context) + (event_stream_id, max_stream_id) = yield self.store.persist_event( + event, context=context + ) federation_handler = self.hs.get_handlers().federation_handler @@ -137,10 +141,12 @@ class BaseHandler(object): "Failed to get destination from event %s", s.event_id ) - # Don't block waiting on waking up all the listeners. - notify_d = self.notifier.on_new_room_event( - event, extra_users=extra_users - ) + with PreserveLoggingContext(): + # Don't block waiting on waking up all the listeners. + notify_d = self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) def log_failure(f): logger.warn( diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 355ab317df..8269482e47 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -15,7 +15,7 @@ from twisted.internet import defer -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import EventTypes from synapse.appservice import ApplicationService from synapse.types import UserID @@ -147,10 +147,7 @@ class ApplicationServicesHandler(object): ) # We need to know the members associated with this event.room_id, # if any. - member_list = yield self.store.get_room_members( - room_id=event.room_id, - membership=Membership.JOIN - ) + member_list = yield self.store.get_users_in_room(event.room_id) services = yield self.store.get_app_services() interested_list = [ diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index f76febee8f..e41a688836 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -22,6 +22,7 @@ from synapse.api.constants import EventTypes from synapse.types import RoomAlias import logging +import string logger = logging.getLogger(__name__) @@ -40,6 +41,10 @@ class DirectoryHandler(BaseHandler): def _create_association(self, room_alias, room_id, servers=None): # general association creation for both human users and app services + for wchar in string.whitespace: + if wchar in room_alias.localpart: + raise SynapseError(400, "Invalid characters in room alias") + if not self.hs.is_mine(room_alias): raise SynapseError(400, "Room alias must be local") # TODO(erikj): Change this. diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index f9f855213b..993d33ba47 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -15,7 +15,6 @@ from twisted.internet import defer -from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function from synapse.types import UserID from synapse.events.utils import serialize_event @@ -81,10 +80,9 @@ class EventStreamHandler(BaseHandler): # thundering herds on restart. timeout = random.randint(int(timeout*0.9), int(timeout*1.1)) - with PreserveLoggingContext(): - events, tokens = yield self.notifier.get_events_for( - auth_user, room_ids, pagin_config, timeout - ) + events, tokens = yield self.notifier.get_events_for( + auth_user, room_ids, pagin_config, timeout + ) time_now = self.clock.time_msec() diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 85e2757227..46ce3699d7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -18,9 +18,11 @@ from ._base import BaseHandler from synapse.api.errors import ( - AuthError, FederationError, StoreError, + AuthError, FederationError, StoreError, CodeMessageException, SynapseError, ) from synapse.api.constants import EventTypes, Membership, RejectedReason +from synapse.util import unwrapFirstError +from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor from synapse.util.frozenutils import unfreeze @@ -29,6 +31,8 @@ from synapse.crypto.event_signing import ( ) from synapse.types import UserID +from synapse.util.retryutils import NotRetryingDestination + from twisted.internet import defer import itertools @@ -156,7 +160,7 @@ class FederationHandler(BaseHandler): ) try: - yield self._handle_new_event( + _, event_stream_id, max_stream_id = yield self._handle_new_event( origin, event, state=state, @@ -197,9 +201,11 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(target_user_id) extra_users.append(target_user) - d = self.notifier.on_new_room_event( - event, extra_users=extra_users - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) def log_failure(f): logger.warn( @@ -218,37 +224,210 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def backfill(self, dest, room_id, limit): + def backfill(self, dest, room_id, limit, extremities=[]): """ Trigger a backfill request to `dest` for the given `room_id` """ - extremities = yield self.store.get_oldest_events_in_room(room_id) + if not extremities: + extremities = yield self.store.get_oldest_events_in_room(room_id) - pdus = yield self.replication_layer.backfill( + events = yield self.replication_layer.backfill( dest, room_id, - limit, + limit=limit, extremities=extremities, ) - events = [] + event_map = {e.event_id: e for e in events} - for pdu in pdus: - event = pdu + event_ids = set(e.event_id for e in events) - # FIXME (erikj): Not sure this actually works :/ - context = yield self.state_handler.compute_event_context(event) + edges = [ + ev.event_id + for ev in events + if set(e_id for e_id, _ in ev.prev_events) - event_ids + ] - events.append((event, context)) + # For each edge get the current state. - yield self.store.persist_event( - event, - context=context, - backfilled=True + auth_events = {} + events_to_state = {} + for e_id in edges: + state, auth = yield self.replication_layer.get_state_for_room( + destination=dest, + room_id=room_id, + event_id=e_id + ) + auth_events.update({a.event_id: a for a in auth}) + events_to_state[e_id] = state + + yield defer.gatherResults( + [ + self._handle_new_event(dest, a) + for a in auth_events.values() + ], + consumeErrors=True, + ).addErrback(unwrapFirstError) + + yield defer.gatherResults( + [ + self._handle_new_event( + dest, event_map[e_id], + state=events_to_state[e_id], + backfilled=True, + ) + for e_id in events_to_state + ], + consumeErrors=True + ).addErrback(unwrapFirstError) + + events.sort(key=lambda e: e.depth) + + for event in events: + if event in events_to_state: + continue + + yield self._handle_new_event( + dest, event, + backfilled=True, ) defer.returnValue(events) @defer.inlineCallbacks + def maybe_backfill(self, room_id, current_depth): + """Checks the database to see if we should backfill before paginating, + and if so do. + """ + extremities = yield self.store.get_oldest_events_with_depth_in_room( + room_id + ) + + if not extremities: + logger.debug("Not backfilling as no extremeties found.") + return + + # Check if we reached a point where we should start backfilling. + sorted_extremeties_tuple = sorted( + extremities.items(), + key=lambda e: -int(e[1]) + ) + max_depth = sorted_extremeties_tuple[0][1] + + if current_depth > max_depth: + logger.debug( + "Not backfilling as we don't need to. %d < %d", + max_depth, current_depth, + ) + return + + # Now we need to decide which hosts to hit first. + + # First we try hosts that are already in the room + # TODO: HEURISTIC ALERT. + + curr_state = yield self.state_handler.get_current_state(room_id) + + def get_domains_from_state(state): + joined_users = [ + (state_key, int(event.depth)) + for (e_type, state_key), event in state.items() + if e_type == EventTypes.Member + and event.membership == Membership.JOIN + ] + + joined_domains = {} + for u, d in joined_users: + try: + dom = UserID.from_string(u).domain + old_d = joined_domains.get(dom) + if old_d: + joined_domains[dom] = min(d, old_d) + else: + joined_domains[dom] = d + except: + pass + + return sorted(joined_domains.items(), key=lambda d: d[1]) + + curr_domains = get_domains_from_state(curr_state) + + likely_domains = [ + domain for domain, depth in curr_domains + if domain is not self.server_name + ] + + @defer.inlineCallbacks + def try_backfill(domains): + # TODO: Should we try multiple of these at a time? + for dom in domains: + try: + events = yield self.backfill( + dom, room_id, + limit=100, + extremities=[e for e in extremities.keys()] + ) + except SynapseError: + logger.info( + "Failed to backfill from %s because %s", + dom, e, + ) + continue + except CodeMessageException as e: + if 400 <= e.code < 500: + raise + + logger.info( + "Failed to backfill from %s because %s", + dom, e, + ) + continue + except NotRetryingDestination as e: + logger.info(e.message) + continue + except Exception as e: + logger.exception( + "Failed to backfill from %s because %s", + dom, e, + ) + continue + + if events: + defer.returnValue(True) + defer.returnValue(False) + + success = yield try_backfill(likely_domains) + if success: + defer.returnValue(True) + + # Huh, well *those* domains didn't work out. Lets try some domains + # from the time. + + tried_domains = set(likely_domains) + tried_domains.add(self.server_name) + + event_ids = list(extremities.keys()) + + states = yield defer.gatherResults([ + self.state_handler.resolve_state_groups([e]) + for e in event_ids + ]) + states = dict(zip(event_ids, [s[1] for s in states])) + + for e_id, _ in sorted_extremeties_tuple: + likely_domains = get_domains_from_state(states[e_id]) + + success = yield try_backfill([ + dom for dom in likely_domains + if dom not in tried_domains + ]) + if success: + defer.returnValue(True) + + tried_domains.update(likely_domains) + + defer.returnValue(False) + + @defer.inlineCallbacks def send_invite(self, target_host, event): """ Sends the invite to the remote server for signing. @@ -376,30 +555,14 @@ class FederationHandler(BaseHandler): # FIXME pass - for e in auth_chain: - e.internal_metadata.outlier = True - - if e.event_id == event.event_id: - continue - - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - yield self._handle_new_event( - origin, e, auth_events=auth - ) - except: - logger.exception( - "Failed to handle auth event %s", - e.event_id, - ) + yield self._handle_auth_events( + origin, [e for e in auth_chain if e.event_id != event.event_id] + ) - for e in state: + @defer.inlineCallbacks + def handle_state(e): if e.event_id == event.event_id: - continue + return e.internal_metadata.outlier = True try: @@ -417,13 +580,15 @@ class FederationHandler(BaseHandler): e.event_id, ) + yield defer.DeferredList([handle_state(e) for e in state]) + auth_ids = [e_id for e_id, _ in event.auth_events] auth_events = { (e.type, e.state_key): e for e in auth_chain if e.event_id in auth_ids } - yield self._handle_new_event( + _, event_stream_id, max_stream_id = yield self._handle_new_event( origin, new_event, state=state, @@ -431,9 +596,11 @@ class FederationHandler(BaseHandler): auth_events=auth_events, ) - d = self.notifier.on_new_room_event( - new_event, extra_users=[joinee] - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + new_event, event_stream_id, max_stream_id, + extra_users=[joinee] + ) def log_failure(f): logger.warn( @@ -498,7 +665,9 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = False - context = yield self._handle_new_event(origin, event) + context, event_stream_id, max_stream_id = yield self._handle_new_event( + origin, event + ) logger.debug( "on_send_join_request: After _handle_new_event: %s, sigs: %s", @@ -512,9 +681,10 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(target_user_id) extra_users.append(target_user) - d = self.notifier.on_new_room_event( - event, extra_users=extra_users - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, extra_users=extra_users + ) def log_failure(f): logger.warn( @@ -587,16 +757,18 @@ class FederationHandler(BaseHandler): context = yield self.state_handler.compute_event_context(event) - yield self.store.persist_event( + event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, backfilled=False, ) target_user = UserID.from_string(event.state_key) - d = self.notifier.on_new_room_event( - event, extra_users=[target_user], - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=[target_user], + ) def log_failure(f): logger.warn( @@ -745,9 +917,12 @@ class FederationHandler(BaseHandler): # This is a hack to fix some old rooms where the initial join event # didn't reference the create event in its auth events. if event.type == EventTypes.Member and not event.auth_events: - if len(event.prev_events) == 1: - c = yield self.store.get_event(event.prev_events[0][0]) - if c.type == EventTypes.Create: + if len(event.prev_events) == 1 and event.depth < 5: + c = yield self.store.get_event( + event.prev_events[0][0], + allow_none=True, + ) + if c and c.type == EventTypes.Create: auth_events[(c.type, c.state_key)] = c try: @@ -773,7 +948,7 @@ class FederationHandler(BaseHandler): ) raise - yield self.store.persist_event( + event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, backfilled=backfilled, @@ -781,7 +956,7 @@ class FederationHandler(BaseHandler): current_state=current_state, ) - defer.returnValue(context) + defer.returnValue((context, event_stream_id, max_stream_id)) @defer.inlineCallbacks def on_query_auth(self, origin, event_id, remote_auth_chain, rejects, @@ -921,7 +1096,7 @@ class FederationHandler(BaseHandler): if d in have_events and not have_events[d] ], consumeErrors=True - ) + ).addErrback(unwrapFirstError) if different_events: local_view = dict(auth_events) @@ -1166,3 +1341,52 @@ class FederationHandler(BaseHandler): }, "missing": [e.event_id for e in missing_locals], }) + + @defer.inlineCallbacks + def _handle_auth_events(self, origin, auth_events): + auth_ids_to_deferred = {} + + def process_auth_ev(ev): + auth_ids = [e_id for e_id, _ in ev.auth_events] + + prev_ds = [ + auth_ids_to_deferred[i] + for i in auth_ids + if i in auth_ids_to_deferred + ] + + d = defer.Deferred() + + auth_ids_to_deferred[ev.event_id] = d + + @defer.inlineCallbacks + def f(*_): + ev.internal_metadata.outlier = True + + try: + auth = { + (e.type, e.state_key): e for e in auth_events + if e.event_id in auth_ids + } + + yield self._handle_new_event( + origin, ev, auth_events=auth + ) + except: + logger.exception( + "Failed to handle auth event %s", + ev.event_id, + ) + + d.callback(None) + + if prev_ds: + dx = defer.DeferredList(prev_ds) + dx.addBoth(f) + else: + f() + + for e in auth_events: + process_auth_ev(e) + + yield defer.DeferredList(auth_ids_to_deferred.values()) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 22e19af17f..867fdbefb0 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -20,8 +20,9 @@ from synapse.api.errors import RoomError, SynapseError from synapse.streams.config import PaginationConfig from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator +from synapse.util import unwrapFirstError from synapse.util.logcontext import PreserveLoggingContext -from synapse.types import UserID +from synapse.types import UserID, RoomStreamToken from ._base import BaseHandler @@ -89,9 +90,19 @@ class MessageHandler(BaseHandler): if not pagin_config.from_token: pagin_config.from_token = ( - yield self.hs.get_event_sources().get_current_token() + yield self.hs.get_event_sources().get_current_token( + direction='b' + ) ) + room_token = RoomStreamToken.parse(pagin_config.from_token.room_key) + if room_token.topological is None: + raise SynapseError(400, "Invalid token") + + yield self.hs.get_handlers().federation_handler.maybe_backfill( + room_id, room_token.topological + ) + user = UserID.from_string(user_id) events, next_key = yield data_source.get_pagination_rows( @@ -303,7 +314,7 @@ class MessageHandler(BaseHandler): event.room_id ), ] - ) + ).addErrback(unwrapFirstError) start_token = now_token.copy_and_replace("room_key", token[0]) end_token = now_token.copy_and_replace("room_key", token[1]) @@ -328,7 +339,7 @@ class MessageHandler(BaseHandler): yield defer.gatherResults( [handle_room(e) for e in room_list], consumeErrors=True - ) + ).addErrback(unwrapFirstError) ret = { "rooms": rooms_ret, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9e15610401..023ad33ab0 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -18,8 +18,8 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError from synapse.api.constants import PresenceState -from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logutils import log_function from synapse.types import UserID import synapse.metrics @@ -146,6 +146,10 @@ class PresenceHandler(BaseHandler): self._user_cachemap = {} self._user_cachemap_latest_serial = 0 + # map room_ids to the latest presence serial for a member of that + # room + self._room_serials = {} + metrics.register_callback( "userCachemap:size", lambda: len(self._user_cachemap), @@ -278,15 +282,14 @@ class PresenceHandler(BaseHandler): now_online = state["presence"] != PresenceState.OFFLINE was_polling = target_user in self._user_cachemap - with PreserveLoggingContext(): - if now_online and not was_polling: - self.start_polling_presence(target_user, state=state) - elif not now_online and was_polling: - self.stop_polling_presence(target_user) + if now_online and not was_polling: + self.start_polling_presence(target_user, state=state) + elif not now_online and was_polling: + self.stop_polling_presence(target_user) - # TODO(paul): perform a presence push as part of start/stop poll so - # we don't have to do this all the time - self.changed_presencelike_data(target_user, state) + # TODO(paul): perform a presence push as part of start/stop poll so + # we don't have to do this all the time + self.changed_presencelike_data(target_user, state) def bump_presence_active_time(self, user, now=None): if now is None: @@ -298,13 +301,34 @@ class PresenceHandler(BaseHandler): self.changed_presencelike_data(user, {"last_active": now}) - def changed_presencelike_data(self, user, state): - statuscache = self._get_or_make_usercache(user) + def get_joined_rooms_for_user(self, user): + """Get the list of rooms a user is joined to. - self._user_cachemap_latest_serial += 1 - statuscache.update(state, serial=self._user_cachemap_latest_serial) + Args: + user(UserID): The user. + Returns: + A Deferred of a list of room id strings. + """ + rm_handler = self.homeserver.get_handlers().room_member_handler + return rm_handler.get_joined_rooms_for_user(user) - return self.push_presence(user, statuscache=statuscache) + def get_joined_users_for_room_id(self, room_id): + rm_handler = self.homeserver.get_handlers().room_member_handler + return rm_handler.get_room_members(room_id) + + @defer.inlineCallbacks + def changed_presencelike_data(self, user, state): + """Updates the presence state of a local user. + + Args: + user(UserID): The user being updated. + state(dict): The new presence state for the user. + Returns: + A Deferred + """ + self._user_cachemap_latest_serial += 1 + statuscache = yield self.update_presence_cache(user, state) + yield self.push_presence(user, statuscache=statuscache) @log_function def started_user_eventstream(self, user): @@ -318,14 +342,21 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def user_joined_room(self, user, room_id): + """Called via the distributor whenever a user joins a room. + Notifies the new member of the presence of the current members. + Notifies the current members of the room of the new member's presence. + + Args: + user(UserID): The user who joined the room. + room_id(str): The room id the user joined. + """ if self.hs.is_mine(user): - statuscache = self._get_or_make_usercache(user) - # No actual update but we need to bump the serial anyway for the # event source self._user_cachemap_latest_serial += 1 - statuscache.update({}, serial=self._user_cachemap_latest_serial) - + statuscache = yield self.update_presence_cache( + user, room_ids=[room_id] + ) self.push_update_to_local_and_remote( observed_user=user, room_ids=[room_id], @@ -333,18 +364,22 @@ class PresenceHandler(BaseHandler): ) # We also want to tell them about current presence of people. - rm_handler = self.homeserver.get_handlers().room_member_handler - curr_users = yield rm_handler.get_room_members(room_id) + curr_users = yield self.get_joined_users_for_room_id(room_id) for local_user in [c for c in curr_users if self.hs.is_mine(c)]: + statuscache = yield self.update_presence_cache( + local_user, room_ids=[room_id], add_to_cache=False + ) + self.push_update_to_local_and_remote( observed_user=local_user, users_to_push=[user], - statuscache=self._get_or_offline_usercache(local_user), + statuscache=statuscache, ) @defer.inlineCallbacks def send_invite(self, observer_user, observed_user): + """Request the presence of a local or remote user for a local user""" if not self.hs.is_mine(observer_user): raise SynapseError(400, "User is not hosted on this Home Server") @@ -379,6 +414,15 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def invite_presence(self, observed_user, observer_user): + """Handles a m.presence_invite EDU. A remote or local user has + requested presence updates for a local user. If the invite is accepted + then allow the local or remote user to see the presence of the local + user. + + Args: + observed_user(UserID): The local user whose presence is requested. + observer_user(UserID): The remote or local user requesting presence. + """ accept = yield self._should_accept_invite(observed_user, observer_user) if accept: @@ -405,16 +449,34 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def accept_presence(self, observed_user, observer_user): + """Handles a m.presence_accept EDU. Mark a presence invite from a + local or remote user as accepted in a local user's presence list. + Starts polling for presence updates from the local or remote user. + + Args: + observed_user(UserID): The user to update in the presence list. + observer_user(UserID): The owner of the presence list to update. + """ yield self.store.set_presence_list_accepted( observer_user.localpart, observed_user.to_string() ) - with PreserveLoggingContext(): - self.start_polling_presence( - observer_user, target_user=observed_user - ) + + self.start_polling_presence( + observer_user, target_user=observed_user + ) @defer.inlineCallbacks def deny_presence(self, observed_user, observer_user): + """Handle a m.presence_deny EDU. Removes a local or remote user from a + local user's presence list. + + Args: + observed_user(UserID): The local or remote user to remove from the + list. + observer_user(UserID): The local owner of the presence list. + Returns: + A Deferred. + """ yield self.store.del_presence_list( observer_user.localpart, observed_user.to_string() ) @@ -423,6 +485,16 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def drop(self, observed_user, observer_user): + """Remove a local or remote user from a local user's presence list and + unsubscribe the local user from updates that user. + + Args: + observed_user(UserId): The local or remote user to remove from the + list. + observer_user(UserId): The local owner of the presence list. + Returns: + A Deferred. + """ if not self.hs.is_mine(observer_user): raise SynapseError(400, "User is not hosted on this Home Server") @@ -430,34 +502,66 @@ class PresenceHandler(BaseHandler): observer_user.localpart, observed_user.to_string() ) - with PreserveLoggingContext(): - self.stop_polling_presence( - observer_user, target_user=observed_user - ) + self.stop_polling_presence( + observer_user, target_user=observed_user + ) @defer.inlineCallbacks def get_presence_list(self, observer_user, accepted=None): + """Get the presence list for a local user. The retured list includes + the current presence state for each user listed. + + Args: + observer_user(UserID): The local user whose presence list to fetch. + accepted(bool or None): If not none then only include users who + have or have not accepted the presence invite request. + Returns: + A Deferred list of presence state events. + """ if not self.hs.is_mine(observer_user): raise SynapseError(400, "User is not hosted on this Home Server") - presence = yield self.store.get_presence_list( + presence_list = yield self.store.get_presence_list( observer_user.localpart, accepted=accepted ) - for p in presence: - observed_user = UserID.from_string(p.pop("observed_user_id")) - p["observed_user"] = observed_user - p.update(self._get_or_offline_usercache(observed_user).get_state()) - if "last_active" in p: - p["last_active_ago"] = int( - self.clock.time_msec() - p.pop("last_active") + results = [] + for row in presence_list: + observed_user = UserID.from_string(row["observed_user_id"]) + result = { + "observed_user": observed_user, "accepted": row["accepted"] + } + result.update( + self._get_or_offline_usercache(observed_user).get_state() + ) + if "last_active" in result: + result["last_active_ago"] = int( + self.clock.time_msec() - result.pop("last_active") ) + results.append(result) - defer.returnValue(presence) + defer.returnValue(results) @defer.inlineCallbacks @log_function def start_polling_presence(self, user, target_user=None, state=None): + """Subscribe a local user to presence updates from a local or remote + user. If no target_user is supplied then subscribe to all users stored + in the presence list for the local user. + + Additonally this pushes the current presence state of this user to all + target_users. That state can be provided directly or will be read from + the stored state for the local user. + + Also this attempts to notify the local user of the current state of + any local target users. + + Args: + user(UserID): The local user that whishes for presence updates. + target_user(UserID): The local or remote user whose updates are + wanted. + state(dict): Optional presence state for the local user. + """ logger.debug("Start polling for presence from %s", user) if target_user: @@ -473,8 +577,7 @@ class PresenceHandler(BaseHandler): # Also include people in all my rooms - rm_handler = self.homeserver.get_handlers().room_member_handler - room_ids = yield rm_handler.get_joined_rooms_for_user(user) + room_ids = yield self.get_joined_rooms_for_user(user) if state is None: state = yield self.store.get_presence_state(user.localpart) @@ -498,9 +601,7 @@ class PresenceHandler(BaseHandler): # We want to tell the person that just came online # presence state of people they are interested in? self.push_update_to_clients( - observed_user=target_user, users_to_push=[user], - statuscache=self._get_or_offline_usercache(target_user), ) deferreds = [] @@ -517,6 +618,12 @@ class PresenceHandler(BaseHandler): yield defer.DeferredList(deferreds, consumeErrors=True) def _start_polling_local(self, user, target_user): + """Subscribe a local user to presence updates for a local user + + Args: + user(UserId): The local user that wishes for updates. + target_user(UserId): The local users whose updates are wanted. + """ target_localpart = target_user.localpart if target_localpart not in self._local_pushmap: @@ -525,6 +632,17 @@ class PresenceHandler(BaseHandler): self._local_pushmap[target_localpart].add(user) def _start_polling_remote(self, user, domain, remoteusers): + """Subscribe a local user to presence updates for remote users on a + given remote domain. + + Args: + user(UserID): The local user that wishes for updates. + domain(str): The remote server the local user wants updates from. + remoteusers(UserID): The remote users that local user wants to be + told about. + Returns: + A Deferred. + """ to_poll = set() for u in remoteusers: @@ -545,6 +663,17 @@ class PresenceHandler(BaseHandler): @log_function def stop_polling_presence(self, user, target_user=None): + """Unsubscribe a local user from presence updates from a local or + remote user. If no target user is supplied then unsubscribe the user + from all presence updates that the user had subscribed to. + + Args: + user(UserID): The local user that no longer wishes for updates. + target_user(UserID or None): The user whose updates are no longer + wanted. + Returns: + A Deferred. + """ logger.debug("Stop polling for presence from %s", user) if not target_user or self.hs.is_mine(target_user): @@ -573,6 +702,13 @@ class PresenceHandler(BaseHandler): return defer.DeferredList(deferreds, consumeErrors=True) def _stop_polling_local(self, user, target_user): + """Unsubscribe a local user from presence updates from a local user on + this server. + + Args: + user(UserID): The local user that no longer wishes for updates. + target_user(UserID): The user whose updates are no longer wanted. + """ for localpart in self._local_pushmap.keys(): if target_user and localpart != target_user.localpart: continue @@ -585,6 +721,17 @@ class PresenceHandler(BaseHandler): @log_function def _stop_polling_remote(self, user, domain, remoteusers): + """Unsubscribe a local user from presence updates from remote users on + a given domain. + + Args: + user(UserID): The local user that no longer wishes for updates. + domain(str): The remote server to unsubscribe from. + remoteusers([UserID]): The users on that remote server that the + local user no longer wishes to be updated about. + Returns: + A Deferred. + """ to_unpoll = set() for u in remoteusers: @@ -606,6 +753,19 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks @log_function def push_presence(self, user, statuscache): + """ + Notify local and remote users of a change in presence of a local user. + Pushes the update to local clients and remote domains that are directly + subscribed to the presence of the local user. + Also pushes that update to any local user or remote domain that shares + a room with the local user. + + Args: + user(UserID): The local user whose presence was updated. + statuscache(UserPresenceCache): Cache of the user's presence state + Returns: + A Deferred. + """ assert(self.hs.is_mine(user)) logger.debug("Pushing presence update from %s", user) @@ -617,8 +777,7 @@ class PresenceHandler(BaseHandler): # and also user is informed of server-forced pushes localusers.add(user) - rm_handler = self.homeserver.get_handlers().room_member_handler - room_ids = yield rm_handler.get_joined_rooms_for_user(user) + room_ids = yield self.get_joined_rooms_for_user(user) if not localusers and not room_ids: defer.returnValue(None) @@ -633,44 +792,23 @@ class PresenceHandler(BaseHandler): yield self.distributor.fire("user_presence_changed", user, statuscache) @defer.inlineCallbacks - def _push_presence_remote(self, user, destination, state=None): - if state is None: - state = yield self.store.get_presence_state(user.localpart) - del state["mtime"] - state["presence"] = state.pop("state") - - if user in self._user_cachemap: - state["last_active"] = ( - self._user_cachemap[user].get_state()["last_active"] - ) - - yield self.distributor.fire( - "collect_presencelike_data", user, state - ) - - if "last_active" in state: - state = dict(state) - state["last_active_ago"] = int( - self.clock.time_msec() - state.pop("last_active") - ) - - user_state = { - "user_id": user.to_string(), - } - user_state.update(**state) - - yield self.federation.send_edu( - destination=destination, - edu_type="m.presence", - content={ - "push": [ - user_state, - ], - } - ) - - @defer.inlineCallbacks def incoming_presence(self, origin, content): + """Handle an incoming m.presence EDU. + For each presence update in the "push" list update our local cache and + notify the appropriate local clients. Only clients that share a room + or are directly subscribed to the presence for a user should be + notified of the update. + For each subscription request in the "poll" list start pushing presence + updates to the remote server. + For unsubscribe request in the "unpoll" list stop pushing presence + updates to the remote server. + + Args: + orgin(str): The source of this m.presence EDU. + content(dict): The content of this m.presence EDU. + Returns: + A Deferred. + """ deferreds = [] for push in content.get("push", []): @@ -684,8 +822,7 @@ class PresenceHandler(BaseHandler): " | %d interested local observers %r", len(observers), observers ) - rm_handler = self.homeserver.get_handlers().room_member_handler - room_ids = yield rm_handler.get_joined_rooms_for_user(user) + room_ids = yield self.get_joined_rooms_for_user(user) if room_ids: logger.debug(" | %d interested room IDs %r", len(room_ids), room_ids) @@ -704,20 +841,15 @@ class PresenceHandler(BaseHandler): self.clock.time_msec() - state.pop("last_active_ago") ) - statuscache = self._get_or_make_usercache(user) - self._user_cachemap_latest_serial += 1 - statuscache.update(state, serial=self._user_cachemap_latest_serial) + yield self.update_presence_cache(user, state, room_ids=room_ids) if not observers and not room_ids: logger.debug(" | no interested observers or room IDs") continue self.push_update_to_clients( - observed_user=user, - users_to_push=observers, - room_ids=room_ids, - statuscache=statuscache, + users_to_push=observers, room_ids=room_ids ) user_id = user.to_string() @@ -766,13 +898,58 @@ class PresenceHandler(BaseHandler): if not self._remote_sendmap[user]: del self._remote_sendmap[user] - with PreserveLoggingContext(): - yield defer.DeferredList(deferreds, consumeErrors=True) + yield defer.DeferredList(deferreds, consumeErrors=True) + + @defer.inlineCallbacks + def update_presence_cache(self, user, state={}, room_ids=None, + add_to_cache=True): + """Update the presence cache for a user with a new state and bump the + serial to the latest value. + + Args: + user(UserID): The user being updated + state(dict): The presence state being updated + room_ids(None or list of str): A list of room_ids to update. If + room_ids is None then fetch the list of room_ids the user is + joined to. + add_to_cache: Whether to add an entry to the presence cache if the + user isn't already in the cache. + Returns: + A Deferred UserPresenceCache for the user being updated. + """ + if room_ids is None: + room_ids = yield self.get_joined_rooms_for_user(user) + + for room_id in room_ids: + self._room_serials[room_id] = self._user_cachemap_latest_serial + if add_to_cache: + statuscache = self._get_or_make_usercache(user) + else: + statuscache = self._get_or_offline_usercache(user) + statuscache.update(state, serial=self._user_cachemap_latest_serial) + defer.returnValue(statuscache) @defer.inlineCallbacks def push_update_to_local_and_remote(self, observed_user, statuscache, users_to_push=[], room_ids=[], remote_domains=[]): + """Notify local clients and remote servers of a change in the presence + of a user. + + Args: + observed_user(UserID): The user to push the presence state for. + statuscache(UserPresenceCache): The cache for the presence state to + push. + users_to_push([UserID]): A list of local and remote users to + notify. + room_ids([str]): Notify the local and remote occupants of these + rooms. + remote_domains([str]): A list of remote servers to notify in + addition to those implied by the users_to_push and the + room_ids. + Returns: + A Deferred. + """ localusers, remoteusers = partitionbool( users_to_push, @@ -782,10 +959,7 @@ class PresenceHandler(BaseHandler): localusers = set(localusers) self.push_update_to_clients( - observed_user=observed_user, - users_to_push=localusers, - room_ids=room_ids, - statuscache=statuscache, + users_to_push=localusers, room_ids=room_ids ) remote_domains = set(remote_domains) @@ -810,52 +984,78 @@ class PresenceHandler(BaseHandler): defer.returnValue((localusers, remote_domains)) - def push_update_to_clients(self, observed_user, users_to_push=[], - room_ids=[], statuscache=None): - self.notifier.on_new_user_event( - users_to_push, - room_ids, - ) - + def push_update_to_clients(self, users_to_push=[], room_ids=[]): + """Notify clients of a new presence event. -class PresenceEventSource(object): - def __init__(self, hs): - self.hs = hs - self.clock = hs.get_clock() + Args: + users_to_push([UserID]): List of users to notify. + room_ids([str]): List of room_ids to notify. + """ + with PreserveLoggingContext(): + self.notifier.on_new_user_event( + "presence_key", + self._user_cachemap_latest_serial, + users_to_push, + room_ids, + ) @defer.inlineCallbacks - def is_visible(self, observer_user, observed_user): - if observer_user == observed_user: - defer.returnValue(True) - - presence = self.hs.get_handlers().presence_handler - - if (yield presence.store.user_rooms_intersect( - [u.to_string() for u in observer_user, observed_user])): - defer.returnValue(True) + def _push_presence_remote(self, user, destination, state=None): + """Push a user's presence to a remote server. If a presence state event + that event is sent. Otherwise a new state event is constructed from the + stored presence state. + The last_active is replaced with last_active_ago in case the wallclock + time on the remote server is different to the time on this server. + Sends an EDU to the remote server with the current presence state. + + Args: + user(UserID): The user to push the presence state for. + destination(str): The remote server to send state to. + state(dict): The state to push, or None to use the current stored + state. + Returns: + A Deferred. + """ + if state is None: + state = yield self.store.get_presence_state(user.localpart) + del state["mtime"] + state["presence"] = state.pop("state") - if self.hs.is_mine(observed_user): - pushmap = presence._local_pushmap + if user in self._user_cachemap: + state["last_active"] = ( + self._user_cachemap[user].get_state()["last_active"] + ) - defer.returnValue( - observed_user.localpart in pushmap and - observer_user in pushmap[observed_user.localpart] + yield self.distributor.fire( + "collect_presencelike_data", user, state ) - else: - recvmap = presence._remote_recvmap - defer.returnValue( - observed_user in recvmap and - observer_user in recvmap[observed_user] + if "last_active" in state: + state = dict(state) + state["last_active_ago"] = int( + self.clock.time_msec() - state.pop("last_active") ) + user_state = {"user_id": user.to_string(), } + user_state.update(state) + + yield self.federation.send_edu( + destination=destination, + edu_type="m.presence", + content={"push": [user_state, ], } + ) + + +class PresenceEventSource(object): + def __init__(self, hs): + self.hs = hs + self.clock = hs.get_clock() + @defer.inlineCallbacks @log_function def get_new_events_for_user(self, user, from_key, limit): from_key = int(from_key) - observer_user = user - presence = self.hs.get_handlers().presence_handler cachemap = presence._user_cachemap @@ -864,17 +1064,27 @@ class PresenceEventSource(object): clock = self.clock latest_serial = 0 + user_ids_to_check = {user} + presence_list = yield presence.store.get_presence_list( + user.localpart, accepted=True + ) + if presence_list is not None: + user_ids_to_check |= set( + UserID.from_string(p["observed_user_id"]) for p in presence_list + ) + room_ids = yield presence.get_joined_rooms_for_user(user) + for room_id in set(room_ids) & set(presence._room_serials): + if presence._room_serials[room_id] > from_key: + joined = yield presence.get_joined_users_for_room_id(room_id) + user_ids_to_check |= set(joined) + updates = [] - # TODO(paul): use a DeferredList ? How to limit concurrency. - for observed_user in cachemap.keys(): + for observed_user in user_ids_to_check & set(cachemap): cached = cachemap[observed_user] if cached.serial <= from_key or cached.serial > max_serial: continue - if not (yield self.is_visible(observer_user, observed_user)): - continue - latest_serial = max(cached.serial, latest_serial) updates.append(cached.make_event(user=observed_user, clock=clock)) @@ -911,8 +1121,6 @@ class PresenceEventSource(object): def get_pagination_rows(self, user, pagination_config, key): # TODO (erikj): Does this make sense? Ordering? - observer_user = user - from_key = int(pagination_config.from_key) if pagination_config.to_key: @@ -923,14 +1131,26 @@ class PresenceEventSource(object): presence = self.hs.get_handlers().presence_handler cachemap = presence._user_cachemap + user_ids_to_check = {user} + presence_list = yield presence.store.get_presence_list( + user.localpart, accepted=True + ) + if presence_list is not None: + user_ids_to_check |= set( + UserID.from_string(p["observed_user_id"]) for p in presence_list + ) + room_ids = yield presence.get_joined_rooms_for_user(user) + for room_id in set(room_ids) & set(presence._room_serials): + if presence._room_serials[room_id] >= from_key: + joined = yield presence.get_joined_users_for_room_id(room_id) + user_ids_to_check |= set(joined) + updates = [] - # TODO(paul): use a DeferredList ? How to limit concurrency. - for observed_user in cachemap.keys(): + for observed_user in user_ids_to_check & set(cachemap): if not (to_key < cachemap[observed_user].serial <= from_key): continue - if (yield self.is_visible(observer_user, observed_user)): - updates.append((observed_user, cachemap[observed_user])) + updates.append((observed_user, cachemap[observed_user])) # TODO(paul): limit diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index ee2732b848..799faffe53 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -17,8 +17,8 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError, CodeMessageException from synapse.api.constants import EventTypes, Membership -from synapse.util.logcontext import PreserveLoggingContext from synapse.types import UserID +from synapse.util import unwrapFirstError from ._base import BaseHandler @@ -88,6 +88,9 @@ class ProfileHandler(BaseHandler): if target_user != auth_user: raise AuthError(400, "Cannot set another user's displayname") + if new_displayname == '': + new_displayname = None + yield self.store.set_profile_displayname( target_user.localpart, new_displayname ) @@ -154,14 +157,13 @@ class ProfileHandler(BaseHandler): if not self.hs.is_mine(user): defer.returnValue(None) - with PreserveLoggingContext(): - (displayname, avatar_url) = yield defer.gatherResults( - [ - self.store.get_profile_displayname(user.localpart), - self.store.get_profile_avatar_url(user.localpart), - ], - consumeErrors=True - ) + (displayname, avatar_url) = yield defer.gatherResults( + [ + self.store.get_profile_displayname(user.localpart), + self.store.get_profile_avatar_url(user.localpart), + ], + consumeErrors=True + ).addErrback(unwrapFirstError) state["displayname"] = displayname state["avatar_url"] = avatar_url diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3da08c147e..4bd027d9bb 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -21,11 +21,12 @@ from ._base import BaseHandler from synapse.types import UserID, RoomAlias, RoomID from synapse.api.constants import EventTypes, Membership, JoinRules from synapse.api.errors import StoreError, SynapseError -from synapse.util import stringutils +from synapse.util import stringutils, unwrapFirstError from synapse.util.async import run_on_reactor from synapse.events.utils import serialize_event import logging +import string logger = logging.getLogger(__name__) @@ -50,6 +51,10 @@ class RoomCreationHandler(BaseHandler): self.ratelimit(user_id) if "room_alias_name" in config: + for wchar in string.whitespace: + if wchar in config["room_alias_name"]: + raise SynapseError(400, "Invalid characters in room alias") + room_alias = RoomAlias.create( config["room_alias_name"], self.hs.hostname, @@ -535,7 +540,7 @@ class RoomListHandler(BaseHandler): for room in chunk ], consumeErrors=True, - ) + ).addErrback(unwrapFirstError) for i, room in enumerate(chunk): room["num_joined_members"] = len(results[i]) @@ -575,8 +580,8 @@ class RoomEventSource(object): defer.returnValue((events, end_key)) - def get_current_key(self): - return self.store.get_room_events_max_id() + def get_current_key(self, direction='f'): + return self.store.get_room_events_max_id(direction) @defer.inlineCallbacks def get_pagination_rows(self, user, config, key): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 35a62fda47..bd8c603681 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -92,7 +92,7 @@ class SyncHandler(BaseHandler): result = yield self.current_sync_for_user(sync_config, since_token) defer.returnValue(result) else: - def current_sync_callback(): + def current_sync_callback(before_token, after_token): return self.current_sync_for_user(sync_config, since_token) rm_handler = self.hs.get_handlers().room_member_handler diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index c0b2bd7db0..a9895292c2 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -18,6 +18,7 @@ from twisted.internet import defer from ._base import BaseHandler from synapse.api.errors import SynapseError, AuthError +from synapse.util.logcontext import PreserveLoggingContext from synapse.types import UserID import logging @@ -216,7 +217,10 @@ class TypingNotificationHandler(BaseHandler): self._latest_room_serial += 1 self._room_serials[room_id] = self._latest_room_serial - self.notifier.on_new_user_event(rooms=[room_id]) + with PreserveLoggingContext(): + self.notifier.on_new_user_event( + "typing_key", self._latest_room_serial, rooms=[room_id] + ) class TypingNotificationEventSource(object): |