diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/_base.py | 16 | ||||
-rw-r--r-- | synapse/handlers/directory.py | 5 | ||||
-rw-r--r-- | synapse/handlers/events.py | 8 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 192 | ||||
-rw-r--r-- | synapse/handlers/message.py | 19 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 315 | ||||
-rw-r--r-- | synapse/handlers/profile.py | 20 | ||||
-rw-r--r-- | synapse/handlers/room.py | 13 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 2 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 6 |
10 files changed, 474 insertions, 122 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 4b3f4eadab..833ff41377 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -20,6 +20,8 @@ from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.api.constants import Membership, EventTypes from synapse.types import UserID +from synapse.util.logcontext import PreserveLoggingContext + import logging @@ -103,7 +105,9 @@ class BaseHandler(object): if not suppress_auth: self.auth.check(event, auth_events=context.current_state) - yield self.store.persist_event(event, context=context) + (event_stream_id, max_stream_id) = yield self.store.persist_event( + event, context=context + ) federation_handler = self.hs.get_handlers().federation_handler @@ -137,10 +141,12 @@ class BaseHandler(object): "Failed to get destination from event %s", s.event_id ) - # Don't block waiting on waking up all the listeners. - notify_d = self.notifier.on_new_room_event( - event, extra_users=extra_users - ) + with PreserveLoggingContext(): + # Don't block waiting on waking up all the listeners. + notify_d = self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) def log_failure(f): logger.warn( diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index f76febee8f..e41a688836 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -22,6 +22,7 @@ from synapse.api.constants import EventTypes from synapse.types import RoomAlias import logging +import string logger = logging.getLogger(__name__) @@ -40,6 +41,10 @@ class DirectoryHandler(BaseHandler): def _create_association(self, room_alias, room_id, servers=None): # general association creation for both human users and app services + for wchar in string.whitespace: + if wchar in room_alias.localpart: + raise SynapseError(400, "Invalid characters in room alias") + if not self.hs.is_mine(room_alias): raise SynapseError(400, "Room alias must be local") # TODO(erikj): Change this. diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index f9f855213b..993d33ba47 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -15,7 +15,6 @@ from twisted.internet import defer -from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function from synapse.types import UserID from synapse.events.utils import serialize_event @@ -81,10 +80,9 @@ class EventStreamHandler(BaseHandler): # thundering herds on restart. timeout = random.randint(int(timeout*0.9), int(timeout*1.1)) - with PreserveLoggingContext(): - events, tokens = yield self.notifier.get_events_for( - auth_user, room_ids, pagin_config, timeout - ) + events, tokens = yield self.notifier.get_events_for( + auth_user, room_ids, pagin_config, timeout + ) time_now = self.clock.time_msec() diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 85e2757227..d35d9f603c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -18,9 +18,11 @@ from ._base import BaseHandler from synapse.api.errors import ( - AuthError, FederationError, StoreError, + AuthError, FederationError, StoreError, CodeMessageException, SynapseError, ) from synapse.api.constants import EventTypes, Membership, RejectedReason +from synapse.util import unwrapFirstError +from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor from synapse.util.frozenutils import unfreeze @@ -29,6 +31,8 @@ from synapse.crypto.event_signing import ( ) from synapse.types import UserID +from synapse.util.retryutils import NotRetryingDestination + from twisted.internet import defer import itertools @@ -156,7 +160,7 @@ class FederationHandler(BaseHandler): ) try: - yield self._handle_new_event( + _, event_stream_id, max_stream_id = yield self._handle_new_event( origin, event, state=state, @@ -197,9 +201,11 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(target_user_id) extra_users.append(target_user) - d = self.notifier.on_new_room_event( - event, extra_users=extra_users - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) def log_failure(f): logger.warn( @@ -218,10 +224,11 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def backfill(self, dest, room_id, limit): + def backfill(self, dest, room_id, limit, extremities=[]): """ Trigger a backfill request to `dest` for the given `room_id` """ - extremities = yield self.store.get_oldest_events_in_room(room_id) + if not extremities: + extremities = yield self.store.get_oldest_events_in_room(room_id) pdus = yield self.replication_layer.backfill( dest, @@ -249,6 +256,140 @@ class FederationHandler(BaseHandler): defer.returnValue(events) @defer.inlineCallbacks + def maybe_backfill(self, room_id, current_depth): + """Checks the database to see if we should backfill before paginating, + and if so do. + """ + extremities = yield self.store.get_oldest_events_with_depth_in_room( + room_id + ) + + if not extremities: + logger.debug("Not backfilling as no extremeties found.") + return + + # Check if we reached a point where we should start backfilling. + sorted_extremeties_tuple = sorted( + extremities.items(), + key=lambda e: -int(e[1]) + ) + max_depth = sorted_extremeties_tuple[0][1] + + if current_depth > max_depth: + logger.debug( + "Not backfilling as we don't need to. %d < %d", + max_depth, current_depth, + ) + return + + # Now we need to decide which hosts to hit first. + + # First we try hosts that are already in the room + # TODO: HEURISTIC ALERT. + + curr_state = yield self.state_handler.get_current_state(room_id) + + def get_domains_from_state(state): + joined_users = [ + (state_key, int(event.depth)) + for (e_type, state_key), event in state.items() + if e_type == EventTypes.Member + and event.membership == Membership.JOIN + ] + + joined_domains = {} + for u, d in joined_users: + try: + dom = UserID.from_string(u).domain + old_d = joined_domains.get(dom) + if old_d: + joined_domains[dom] = min(d, old_d) + else: + joined_domains[dom] = d + except: + pass + + return sorted(joined_domains.items(), key=lambda d: d[1]) + + curr_domains = get_domains_from_state(curr_state) + + likely_domains = [ + domain for domain, depth in curr_domains + if domain is not self.server_name + ] + + @defer.inlineCallbacks + def try_backfill(domains): + # TODO: Should we try multiple of these at a time? + for dom in domains: + try: + events = yield self.backfill( + dom, room_id, + limit=100, + extremities=[e for e in extremities.keys()] + ) + except SynapseError: + logger.info( + "Failed to backfill from %s because %s", + dom, e, + ) + continue + except CodeMessageException as e: + if 400 <= e.code < 500: + raise + + logger.info( + "Failed to backfill from %s because %s", + dom, e, + ) + continue + except NotRetryingDestination as e: + logger.info(e.message) + continue + except Exception as e: + logger.warn( + "Failed to backfill from %s because %s", + dom, e, + ) + continue + + if events: + defer.returnValue(True) + defer.returnValue(False) + + success = yield try_backfill(likely_domains) + if success: + defer.returnValue(True) + + # Huh, well *those* domains didn't work out. Lets try some domains + # from the time. + + tried_domains = set(likely_domains) + tried_domains.add(self.server_name) + + event_ids = list(extremities.keys()) + + states = yield defer.gatherResults([ + self.state_handler.resolve_state_groups([e]) + for e in event_ids + ]) + states = dict(zip(event_ids, [s[1] for s in states])) + + for e_id, _ in sorted_extremeties_tuple: + likely_domains = get_domains_from_state(states[e_id]) + + success = yield try_backfill([ + dom for dom in likely_domains + if dom not in tried_domains + ]) + if success: + defer.returnValue(True) + + tried_domains.update(likely_domains) + + defer.returnValue(False) + + @defer.inlineCallbacks def send_invite(self, target_host, event): """ Sends the invite to the remote server for signing. @@ -423,7 +564,7 @@ class FederationHandler(BaseHandler): if e.event_id in auth_ids } - yield self._handle_new_event( + _, event_stream_id, max_stream_id = yield self._handle_new_event( origin, new_event, state=state, @@ -431,9 +572,11 @@ class FederationHandler(BaseHandler): auth_events=auth_events, ) - d = self.notifier.on_new_room_event( - new_event, extra_users=[joinee] - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + new_event, event_stream_id, max_stream_id, + extra_users=[joinee] + ) def log_failure(f): logger.warn( @@ -498,7 +641,9 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = False - context = yield self._handle_new_event(origin, event) + context, event_stream_id, max_stream_id = yield self._handle_new_event( + origin, event + ) logger.debug( "on_send_join_request: After _handle_new_event: %s, sigs: %s", @@ -512,9 +657,10 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(target_user_id) extra_users.append(target_user) - d = self.notifier.on_new_room_event( - event, extra_users=extra_users - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, extra_users=extra_users + ) def log_failure(f): logger.warn( @@ -587,16 +733,18 @@ class FederationHandler(BaseHandler): context = yield self.state_handler.compute_event_context(event) - yield self.store.persist_event( + event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, backfilled=False, ) target_user = UserID.from_string(event.state_key) - d = self.notifier.on_new_room_event( - event, extra_users=[target_user], - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=[target_user], + ) def log_failure(f): logger.warn( @@ -773,7 +921,7 @@ class FederationHandler(BaseHandler): ) raise - yield self.store.persist_event( + event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, backfilled=backfilled, @@ -781,7 +929,7 @@ class FederationHandler(BaseHandler): current_state=current_state, ) - defer.returnValue(context) + defer.returnValue((context, event_stream_id, max_stream_id)) @defer.inlineCallbacks def on_query_auth(self, origin, event_id, remote_auth_chain, rejects, @@ -921,7 +1069,7 @@ class FederationHandler(BaseHandler): if d in have_events and not have_events[d] ], consumeErrors=True - ) + ).addErrback(unwrapFirstError) if different_events: local_view = dict(auth_events) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 22e19af17f..867fdbefb0 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -20,8 +20,9 @@ from synapse.api.errors import RoomError, SynapseError from synapse.streams.config import PaginationConfig from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator +from synapse.util import unwrapFirstError from synapse.util.logcontext import PreserveLoggingContext -from synapse.types import UserID +from synapse.types import UserID, RoomStreamToken from ._base import BaseHandler @@ -89,9 +90,19 @@ class MessageHandler(BaseHandler): if not pagin_config.from_token: pagin_config.from_token = ( - yield self.hs.get_event_sources().get_current_token() + yield self.hs.get_event_sources().get_current_token( + direction='b' + ) ) + room_token = RoomStreamToken.parse(pagin_config.from_token.room_key) + if room_token.topological is None: + raise SynapseError(400, "Invalid token") + + yield self.hs.get_handlers().federation_handler.maybe_backfill( + room_id, room_token.topological + ) + user = UserID.from_string(user_id) events, next_key = yield data_source.get_pagination_rows( @@ -303,7 +314,7 @@ class MessageHandler(BaseHandler): event.room_id ), ] - ) + ).addErrback(unwrapFirstError) start_token = now_token.copy_and_replace("room_key", token[0]) end_token = now_token.copy_and_replace("room_key", token[1]) @@ -328,7 +339,7 @@ class MessageHandler(BaseHandler): yield defer.gatherResults( [handle_room(e) for e in room_list], consumeErrors=True - ) + ).addErrback(unwrapFirstError) ret = { "rooms": rooms_ret, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9e15610401..40794187b1 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -18,8 +18,8 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError from synapse.api.constants import PresenceState -from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logutils import log_function from synapse.types import UserID import synapse.metrics @@ -278,15 +278,14 @@ class PresenceHandler(BaseHandler): now_online = state["presence"] != PresenceState.OFFLINE was_polling = target_user in self._user_cachemap - with PreserveLoggingContext(): - if now_online and not was_polling: - self.start_polling_presence(target_user, state=state) - elif not now_online and was_polling: - self.stop_polling_presence(target_user) + if now_online and not was_polling: + self.start_polling_presence(target_user, state=state) + elif not now_online and was_polling: + self.stop_polling_presence(target_user) - # TODO(paul): perform a presence push as part of start/stop poll so - # we don't have to do this all the time - self.changed_presencelike_data(target_user, state) + # TODO(paul): perform a presence push as part of start/stop poll so + # we don't have to do this all the time + self.changed_presencelike_data(target_user, state) def bump_presence_active_time(self, user, now=None): if now is None: @@ -318,6 +317,14 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def user_joined_room(self, user, room_id): + """Called via the distributor whenever a user joins a room. + Notifies the new member of the presence of the current members. + Notifies the current members of the room of the new member's presence. + + Args: + user(UserID): The user who joined the room. + room_id(str): The room id the user joined. + """ if self.hs.is_mine(user): statuscache = self._get_or_make_usercache(user) @@ -337,6 +344,8 @@ class PresenceHandler(BaseHandler): curr_users = yield rm_handler.get_room_members(room_id) for local_user in [c for c in curr_users if self.hs.is_mine(c)]: + statuscache = self._get_or_offline_usercache(local_user) + statuscache.update({}, serial=self._user_cachemap_latest_serial) self.push_update_to_local_and_remote( observed_user=local_user, users_to_push=[user], @@ -345,6 +354,7 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def send_invite(self, observer_user, observed_user): + """Request the presence of a local or remote user for a local user""" if not self.hs.is_mine(observer_user): raise SynapseError(400, "User is not hosted on this Home Server") @@ -379,6 +389,15 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def invite_presence(self, observed_user, observer_user): + """Handles a m.presence_invite EDU. A remote or local user has + requested presence updates for a local user. If the invite is accepted + then allow the local or remote user to see the presence of the local + user. + + Args: + observed_user(UserID): The local user whose presence is requested. + observer_user(UserID): The remote or local user requesting presence. + """ accept = yield self._should_accept_invite(observed_user, observer_user) if accept: @@ -405,16 +424,34 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def accept_presence(self, observed_user, observer_user): + """Handles a m.presence_accept EDU. Mark a presence invite from a + local or remote user as accepted in a local user's presence list. + Starts polling for presence updates from the local or remote user. + + Args: + observed_user(UserID): The user to update in the presence list. + observer_user(UserID): The owner of the presence list to update. + """ yield self.store.set_presence_list_accepted( observer_user.localpart, observed_user.to_string() ) - with PreserveLoggingContext(): - self.start_polling_presence( - observer_user, target_user=observed_user - ) + + self.start_polling_presence( + observer_user, target_user=observed_user + ) @defer.inlineCallbacks def deny_presence(self, observed_user, observer_user): + """Handle a m.presence_deny EDU. Removes a local or remote user from a + local user's presence list. + + Args: + observed_user(UserID): The local or remote user to remove from the + list. + observer_user(UserID): The local owner of the presence list. + Returns: + A Deferred. + """ yield self.store.del_presence_list( observer_user.localpart, observed_user.to_string() ) @@ -423,6 +460,16 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def drop(self, observed_user, observer_user): + """Remove a local or remote user from a local user's presence list and + unsubscribe the local user from updates that user. + + Args: + observed_user(UserId): The local or remote user to remove from the + list. + observer_user(UserId): The local owner of the presence list. + Returns: + A Deferred. + """ if not self.hs.is_mine(observer_user): raise SynapseError(400, "User is not hosted on this Home Server") @@ -430,13 +477,22 @@ class PresenceHandler(BaseHandler): observer_user.localpart, observed_user.to_string() ) - with PreserveLoggingContext(): - self.stop_polling_presence( - observer_user, target_user=observed_user - ) + self.stop_polling_presence( + observer_user, target_user=observed_user + ) @defer.inlineCallbacks def get_presence_list(self, observer_user, accepted=None): + """Get the presence list for a local user. The retured list includes + the current presence state for each user listed. + + Args: + observer_user(UserID): The local user whose presence list to fetch. + accepted(bool or None): If not none then only include users who + have or have not accepted the presence invite request. + Returns: + A Deferred list of presence state events. + """ if not self.hs.is_mine(observer_user): raise SynapseError(400, "User is not hosted on this Home Server") @@ -458,6 +514,23 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks @log_function def start_polling_presence(self, user, target_user=None, state=None): + """Subscribe a local user to presence updates from a local or remote + user. If no target_user is supplied then subscribe to all users stored + in the presence list for the local user. + + Additonally this pushes the current presence state of this user to all + target_users. That state can be provided directly or will be read from + the stored state for the local user. + + Also this attempts to notify the local user of the current state of + any local target users. + + Args: + user(UserID): The local user that whishes for presence updates. + target_user(UserID): The local or remote user whose updates are + wanted. + state(dict): Optional presence state for the local user. + """ logger.debug("Start polling for presence from %s", user) if target_user: @@ -498,9 +571,7 @@ class PresenceHandler(BaseHandler): # We want to tell the person that just came online # presence state of people they are interested in? self.push_update_to_clients( - observed_user=target_user, users_to_push=[user], - statuscache=self._get_or_offline_usercache(target_user), ) deferreds = [] @@ -517,6 +588,12 @@ class PresenceHandler(BaseHandler): yield defer.DeferredList(deferreds, consumeErrors=True) def _start_polling_local(self, user, target_user): + """Subscribe a local user to presence updates for a local user + + Args: + user(UserId): The local user that wishes for updates. + target_user(UserId): The local users whose updates are wanted. + """ target_localpart = target_user.localpart if target_localpart not in self._local_pushmap: @@ -525,6 +602,17 @@ class PresenceHandler(BaseHandler): self._local_pushmap[target_localpart].add(user) def _start_polling_remote(self, user, domain, remoteusers): + """Subscribe a local user to presence updates for remote users on a + given remote domain. + + Args: + user(UserID): The local user that wishes for updates. + domain(str): The remote server the local user wants updates from. + remoteusers(UserID): The remote users that local user wants to be + told about. + Returns: + A Deferred. + """ to_poll = set() for u in remoteusers: @@ -545,6 +633,17 @@ class PresenceHandler(BaseHandler): @log_function def stop_polling_presence(self, user, target_user=None): + """Unsubscribe a local user from presence updates from a local or + remote user. If no target user is supplied then unsubscribe the user + from all presence updates that the user had subscribed to. + + Args: + user(UserID): The local user that no longer wishes for updates. + target_user(UserID or None): The user whose updates are no longer + wanted. + Returns: + A Deferred. + """ logger.debug("Stop polling for presence from %s", user) if not target_user or self.hs.is_mine(target_user): @@ -573,6 +672,13 @@ class PresenceHandler(BaseHandler): return defer.DeferredList(deferreds, consumeErrors=True) def _stop_polling_local(self, user, target_user): + """Unsubscribe a local user from presence updates from a local user on + this server. + + Args: + user(UserID): The local user that no longer wishes for updates. + target_user(UserID): The user whose updates are no longer wanted. + """ for localpart in self._local_pushmap.keys(): if target_user and localpart != target_user.localpart: continue @@ -585,6 +691,17 @@ class PresenceHandler(BaseHandler): @log_function def _stop_polling_remote(self, user, domain, remoteusers): + """Unsubscribe a local user from presence updates from remote users on + a given domain. + + Args: + user(UserID): The local user that no longer wishes for updates. + domain(str): The remote server to unsubscribe from. + remoteusers([UserID]): The users on that remote server that the + local user no longer wishes to be updated about. + Returns: + A Deferred. + """ to_unpoll = set() for u in remoteusers: @@ -606,6 +723,19 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks @log_function def push_presence(self, user, statuscache): + """ + Notify local and remote users of a change in presence of a local user. + Pushes the update to local clients and remote domains that are directly + subscribed to the presence of the local user. + Also pushes that update to any local user or remote domain that shares + a room with the local user. + + Args: + user(UserID): The local user whose presence was updated. + statuscache(UserPresenceCache): Cache of the user's presence state + Returns: + A Deferred. + """ assert(self.hs.is_mine(user)) logger.debug("Pushing presence update from %s", user) @@ -633,44 +763,23 @@ class PresenceHandler(BaseHandler): yield self.distributor.fire("user_presence_changed", user, statuscache) @defer.inlineCallbacks - def _push_presence_remote(self, user, destination, state=None): - if state is None: - state = yield self.store.get_presence_state(user.localpart) - del state["mtime"] - state["presence"] = state.pop("state") - - if user in self._user_cachemap: - state["last_active"] = ( - self._user_cachemap[user].get_state()["last_active"] - ) - - yield self.distributor.fire( - "collect_presencelike_data", user, state - ) - - if "last_active" in state: - state = dict(state) - state["last_active_ago"] = int( - self.clock.time_msec() - state.pop("last_active") - ) - - user_state = { - "user_id": user.to_string(), - } - user_state.update(**state) - - yield self.federation.send_edu( - destination=destination, - edu_type="m.presence", - content={ - "push": [ - user_state, - ], - } - ) - - @defer.inlineCallbacks def incoming_presence(self, origin, content): + """Handle an incoming m.presence EDU. + For each presence update in the "push" list update our local cache and + notify the appropriate local clients. Only clients that share a room + or are directly subscribed to the presence for a user should be + notified of the update. + For each subscription request in the "poll" list start pushing presence + updates to the remote server. + For unsubscribe request in the "unpoll" list stop pushing presence + updates to the remote server. + + Args: + orgin(str): The source of this m.presence EDU. + content(dict): The content of this m.presence EDU. + Returns: + A Deferred. + """ deferreds = [] for push in content.get("push", []): @@ -714,10 +823,7 @@ class PresenceHandler(BaseHandler): continue self.push_update_to_clients( - observed_user=user, - users_to_push=observers, - room_ids=room_ids, - statuscache=statuscache, + users_to_push=observers, room_ids=room_ids ) user_id = user.to_string() @@ -766,13 +872,29 @@ class PresenceHandler(BaseHandler): if not self._remote_sendmap[user]: del self._remote_sendmap[user] - with PreserveLoggingContext(): - yield defer.DeferredList(deferreds, consumeErrors=True) + yield defer.DeferredList(deferreds, consumeErrors=True) @defer.inlineCallbacks def push_update_to_local_and_remote(self, observed_user, statuscache, users_to_push=[], room_ids=[], remote_domains=[]): + """Notify local clients and remote servers of a change in the presence + of a user. + + Args: + observed_user(UserID): The user to push the presence state for. + statuscache(UserPresenceCache): The cache for the presence state to + push. + users_to_push([UserID]): A list of local and remote users to + notify. + room_ids([str]): Notify the local and remote occupants of these + rooms. + remote_domains([str]): A list of remote servers to notify in + addition to those implied by the users_to_push and the + room_ids. + Returns: + A Deferred. + """ localusers, remoteusers = partitionbool( users_to_push, @@ -782,10 +904,7 @@ class PresenceHandler(BaseHandler): localusers = set(localusers) self.push_update_to_clients( - observed_user=observed_user, - users_to_push=localusers, - room_ids=room_ids, - statuscache=statuscache, + users_to_push=localusers, room_ids=room_ids ) remote_domains = set(remote_domains) @@ -810,11 +929,65 @@ class PresenceHandler(BaseHandler): defer.returnValue((localusers, remote_domains)) - def push_update_to_clients(self, observed_user, users_to_push=[], - room_ids=[], statuscache=None): - self.notifier.on_new_user_event( - users_to_push, - room_ids, + def push_update_to_clients(self, users_to_push=[], room_ids=[]): + """Notify clients of a new presence event. + + Args: + users_to_push([UserID]): List of users to notify. + room_ids([str]): List of room_ids to notify. + """ + with PreserveLoggingContext(): + self.notifier.on_new_user_event( + "presence_key", + self._user_cachemap_latest_serial, + users_to_push, + room_ids, + ) + + @defer.inlineCallbacks + def _push_presence_remote(self, user, destination, state=None): + """Push a user's presence to a remote server. If a presence state event + that event is sent. Otherwise a new state event is constructed from the + stored presence state. + The last_active is replaced with last_active_ago in case the wallclock + time on the remote server is different to the time on this server. + Sends an EDU to the remote server with the current presence state. + + Args: + user(UserID): The user to push the presence state for. + destination(str): The remote server to send state to. + state(dict): The state to push, or None to use the current stored + state. + Returns: + A Deferred. + """ + if state is None: + state = yield self.store.get_presence_state(user.localpart) + del state["mtime"] + state["presence"] = state.pop("state") + + if user in self._user_cachemap: + state["last_active"] = ( + self._user_cachemap[user].get_state()["last_active"] + ) + + yield self.distributor.fire( + "collect_presencelike_data", user, state + ) + + if "last_active" in state: + state = dict(state) + state["last_active_ago"] = int( + self.clock.time_msec() - state.pop("last_active") + ) + + user_state = {"user_id": user.to_string(), } + user_state.update(state) + + yield self.federation.send_edu( + destination=destination, + edu_type="m.presence", + content={"push": [user_state, ], } ) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index ee2732b848..799faffe53 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -17,8 +17,8 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError, CodeMessageException from synapse.api.constants import EventTypes, Membership -from synapse.util.logcontext import PreserveLoggingContext from synapse.types import UserID +from synapse.util import unwrapFirstError from ._base import BaseHandler @@ -88,6 +88,9 @@ class ProfileHandler(BaseHandler): if target_user != auth_user: raise AuthError(400, "Cannot set another user's displayname") + if new_displayname == '': + new_displayname = None + yield self.store.set_profile_displayname( target_user.localpart, new_displayname ) @@ -154,14 +157,13 @@ class ProfileHandler(BaseHandler): if not self.hs.is_mine(user): defer.returnValue(None) - with PreserveLoggingContext(): - (displayname, avatar_url) = yield defer.gatherResults( - [ - self.store.get_profile_displayname(user.localpart), - self.store.get_profile_avatar_url(user.localpart), - ], - consumeErrors=True - ) + (displayname, avatar_url) = yield defer.gatherResults( + [ + self.store.get_profile_displayname(user.localpart), + self.store.get_profile_avatar_url(user.localpart), + ], + consumeErrors=True + ).addErrback(unwrapFirstError) state["displayname"] = displayname state["avatar_url"] = avatar_url diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3da08c147e..4bd027d9bb 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -21,11 +21,12 @@ from ._base import BaseHandler from synapse.types import UserID, RoomAlias, RoomID from synapse.api.constants import EventTypes, Membership, JoinRules from synapse.api.errors import StoreError, SynapseError -from synapse.util import stringutils +from synapse.util import stringutils, unwrapFirstError from synapse.util.async import run_on_reactor from synapse.events.utils import serialize_event import logging +import string logger = logging.getLogger(__name__) @@ -50,6 +51,10 @@ class RoomCreationHandler(BaseHandler): self.ratelimit(user_id) if "room_alias_name" in config: + for wchar in string.whitespace: + if wchar in config["room_alias_name"]: + raise SynapseError(400, "Invalid characters in room alias") + room_alias = RoomAlias.create( config["room_alias_name"], self.hs.hostname, @@ -535,7 +540,7 @@ class RoomListHandler(BaseHandler): for room in chunk ], consumeErrors=True, - ) + ).addErrback(unwrapFirstError) for i, room in enumerate(chunk): room["num_joined_members"] = len(results[i]) @@ -575,8 +580,8 @@ class RoomEventSource(object): defer.returnValue((events, end_key)) - def get_current_key(self): - return self.store.get_room_events_max_id() + def get_current_key(self, direction='f'): + return self.store.get_room_events_max_id(direction) @defer.inlineCallbacks def get_pagination_rows(self, user, config, key): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 35a62fda47..bd8c603681 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -92,7 +92,7 @@ class SyncHandler(BaseHandler): result = yield self.current_sync_for_user(sync_config, since_token) defer.returnValue(result) else: - def current_sync_callback(): + def current_sync_callback(before_token, after_token): return self.current_sync_for_user(sync_config, since_token) rm_handler = self.hs.get_handlers().room_member_handler diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index c0b2bd7db0..a9895292c2 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -18,6 +18,7 @@ from twisted.internet import defer from ._base import BaseHandler from synapse.api.errors import SynapseError, AuthError +from synapse.util.logcontext import PreserveLoggingContext from synapse.types import UserID import logging @@ -216,7 +217,10 @@ class TypingNotificationHandler(BaseHandler): self._latest_room_serial += 1 self._room_serials[room_id] = self._latest_room_serial - self.notifier.on_new_user_event(rooms=[room_id]) + with PreserveLoggingContext(): + self.notifier.on_new_user_event( + "typing_key", self._latest_room_serial, rooms=[room_id] + ) class TypingNotificationEventSource(object): |