diff options
author | Mark Haines <mark.haines@matrix.org> | 2015-05-18 13:50:01 +0100 |
---|---|---|
committer | Mark Haines <mark.haines@matrix.org> | 2015-05-18 13:50:01 +0100 |
commit | 0b0033c40b29fd78384863bce932b31ebe281d95 (patch) | |
tree | 1e56a17d193b01f12c3bb5b897274f8753ed3531 /synapse | |
parent | Make sure the notifier stream token goes forward when it is updated. Sort the... (diff) | |
parent | Merge pull request #153 from matrix-org/markjh/presence_docstring (diff) | |
download | synapse-0b0033c40b29fd78384863bce932b31ebe281d95.tar.xz |
Merge branch 'develop' into notifier_performance
Diffstat (limited to 'synapse')
-rwxr-xr-x | synapse/app/homeserver.py | 21 | ||||
-rw-r--r-- | synapse/handlers/directory.py | 5 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 269 | ||||
-rw-r--r-- | synapse/handlers/profile.py | 3 | ||||
-rw-r--r-- | synapse/handlers/room.py | 5 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/register.py | 5 | ||||
-rw-r--r-- | synapse/storage/_base.py | 1 | ||||
-rw-r--r-- | synapse/storage/push_rule.py | 14 |
8 files changed, 266 insertions, 57 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index c227265190..dfb5314ff7 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -32,9 +32,9 @@ from synapse.server import HomeServer from twisted.internet import reactor from twisted.application import service from twisted.enterprise import adbapi -from twisted.web.resource import Resource +from twisted.web.resource import Resource, EncodingResourceWrapper from twisted.web.static import File -from twisted.web.server import Site +from twisted.web.server import Site, GzipEncoderFactory from twisted.web.http import proxiedLogFormatter, combinedLogFormatter from synapse.http.server import JsonResource, RootRedirect from synapse.rest.media.v0.content_repository import ContentRepoResource @@ -69,16 +69,26 @@ import subprocess logger = logging.getLogger("synapse.app.homeserver") +class GzipFile(File): + def getChild(self, path, request): + child = File.getChild(self, path, request) + return EncodingResourceWrapper(child, [GzipEncoderFactory()]) + + +def gz_wrap(r): + return EncodingResourceWrapper(r, [GzipEncoderFactory()]) + + class SynapseHomeServer(HomeServer): def build_http_client(self): return MatrixFederationHttpClient(self) def build_resource_for_client(self): - return ClientV1RestResource(self) + return gz_wrap(ClientV1RestResource(self)) def build_resource_for_client_v2_alpha(self): - return ClientV2AlphaRestResource(self) + return gz_wrap(ClientV2AlphaRestResource(self)) def build_resource_for_federation(self): return JsonResource(self) @@ -87,9 +97,10 @@ class SynapseHomeServer(HomeServer): import syweb syweb_path = os.path.dirname(syweb.__file__) webclient_path = os.path.join(syweb_path, "webclient") - return File(webclient_path) # TODO configurable? + return GzipFile(webclient_path) # TODO configurable? def build_resource_for_static_content(self): + # This is old and should go away: not going to bother adding gzip return File("static") def build_resource_for_content_repo(self): diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index f76febee8f..e41a688836 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -22,6 +22,7 @@ from synapse.api.constants import EventTypes from synapse.types import RoomAlias import logging +import string logger = logging.getLogger(__name__) @@ -40,6 +41,10 @@ class DirectoryHandler(BaseHandler): def _create_association(self, room_alias, room_id, servers=None): # general association creation for both human users and app services + for wchar in string.whitespace: + if wchar in room_alias.localpart: + raise SynapseError(400, "Invalid characters in room alias") + if not self.hs.is_mine(room_alias): raise SynapseError(400, "Room alias must be local") # TODO(erikj): Change this. diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index afde49c004..40794187b1 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -317,6 +317,14 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def user_joined_room(self, user, room_id): + """Called via the distributor whenever a user joins a room. + Notifies the new member of the presence of the current members. + Notifies the current members of the room of the new member's presence. + + Args: + user(UserID): The user who joined the room. + room_id(str): The room id the user joined. + """ if self.hs.is_mine(user): statuscache = self._get_or_make_usercache(user) @@ -346,6 +354,7 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def send_invite(self, observer_user, observed_user): + """Request the presence of a local or remote user for a local user""" if not self.hs.is_mine(observer_user): raise SynapseError(400, "User is not hosted on this Home Server") @@ -380,6 +389,15 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def invite_presence(self, observed_user, observer_user): + """Handles a m.presence_invite EDU. A remote or local user has + requested presence updates for a local user. If the invite is accepted + then allow the local or remote user to see the presence of the local + user. + + Args: + observed_user(UserID): The local user whose presence is requested. + observer_user(UserID): The remote or local user requesting presence. + """ accept = yield self._should_accept_invite(observed_user, observer_user) if accept: @@ -406,6 +424,14 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def accept_presence(self, observed_user, observer_user): + """Handles a m.presence_accept EDU. Mark a presence invite from a + local or remote user as accepted in a local user's presence list. + Starts polling for presence updates from the local or remote user. + + Args: + observed_user(UserID): The user to update in the presence list. + observer_user(UserID): The owner of the presence list to update. + """ yield self.store.set_presence_list_accepted( observer_user.localpart, observed_user.to_string() ) @@ -416,6 +442,16 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def deny_presence(self, observed_user, observer_user): + """Handle a m.presence_deny EDU. Removes a local or remote user from a + local user's presence list. + + Args: + observed_user(UserID): The local or remote user to remove from the + list. + observer_user(UserID): The local owner of the presence list. + Returns: + A Deferred. + """ yield self.store.del_presence_list( observer_user.localpart, observed_user.to_string() ) @@ -424,6 +460,16 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def drop(self, observed_user, observer_user): + """Remove a local or remote user from a local user's presence list and + unsubscribe the local user from updates that user. + + Args: + observed_user(UserId): The local or remote user to remove from the + list. + observer_user(UserId): The local owner of the presence list. + Returns: + A Deferred. + """ if not self.hs.is_mine(observer_user): raise SynapseError(400, "User is not hosted on this Home Server") @@ -437,6 +483,16 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def get_presence_list(self, observer_user, accepted=None): + """Get the presence list for a local user. The retured list includes + the current presence state for each user listed. + + Args: + observer_user(UserID): The local user whose presence list to fetch. + accepted(bool or None): If not none then only include users who + have or have not accepted the presence invite request. + Returns: + A Deferred list of presence state events. + """ if not self.hs.is_mine(observer_user): raise SynapseError(400, "User is not hosted on this Home Server") @@ -458,6 +514,23 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks @log_function def start_polling_presence(self, user, target_user=None, state=None): + """Subscribe a local user to presence updates from a local or remote + user. If no target_user is supplied then subscribe to all users stored + in the presence list for the local user. + + Additonally this pushes the current presence state of this user to all + target_users. That state can be provided directly or will be read from + the stored state for the local user. + + Also this attempts to notify the local user of the current state of + any local target users. + + Args: + user(UserID): The local user that whishes for presence updates. + target_user(UserID): The local or remote user whose updates are + wanted. + state(dict): Optional presence state for the local user. + """ logger.debug("Start polling for presence from %s", user) if target_user: @@ -498,9 +571,7 @@ class PresenceHandler(BaseHandler): # We want to tell the person that just came online # presence state of people they are interested in? self.push_update_to_clients( - observed_user=target_user, users_to_push=[user], - statuscache=self._get_or_offline_usercache(target_user), ) deferreds = [] @@ -517,6 +588,12 @@ class PresenceHandler(BaseHandler): yield defer.DeferredList(deferreds, consumeErrors=True) def _start_polling_local(self, user, target_user): + """Subscribe a local user to presence updates for a local user + + Args: + user(UserId): The local user that wishes for updates. + target_user(UserId): The local users whose updates are wanted. + """ target_localpart = target_user.localpart if target_localpart not in self._local_pushmap: @@ -525,6 +602,17 @@ class PresenceHandler(BaseHandler): self._local_pushmap[target_localpart].add(user) def _start_polling_remote(self, user, domain, remoteusers): + """Subscribe a local user to presence updates for remote users on a + given remote domain. + + Args: + user(UserID): The local user that wishes for updates. + domain(str): The remote server the local user wants updates from. + remoteusers(UserID): The remote users that local user wants to be + told about. + Returns: + A Deferred. + """ to_poll = set() for u in remoteusers: @@ -545,6 +633,17 @@ class PresenceHandler(BaseHandler): @log_function def stop_polling_presence(self, user, target_user=None): + """Unsubscribe a local user from presence updates from a local or + remote user. If no target user is supplied then unsubscribe the user + from all presence updates that the user had subscribed to. + + Args: + user(UserID): The local user that no longer wishes for updates. + target_user(UserID or None): The user whose updates are no longer + wanted. + Returns: + A Deferred. + """ logger.debug("Stop polling for presence from %s", user) if not target_user or self.hs.is_mine(target_user): @@ -573,6 +672,13 @@ class PresenceHandler(BaseHandler): return defer.DeferredList(deferreds, consumeErrors=True) def _stop_polling_local(self, user, target_user): + """Unsubscribe a local user from presence updates from a local user on + this server. + + Args: + user(UserID): The local user that no longer wishes for updates. + target_user(UserID): The user whose updates are no longer wanted. + """ for localpart in self._local_pushmap.keys(): if target_user and localpart != target_user.localpart: continue @@ -585,6 +691,17 @@ class PresenceHandler(BaseHandler): @log_function def _stop_polling_remote(self, user, domain, remoteusers): + """Unsubscribe a local user from presence updates from remote users on + a given domain. + + Args: + user(UserID): The local user that no longer wishes for updates. + domain(str): The remote server to unsubscribe from. + remoteusers([UserID]): The users on that remote server that the + local user no longer wishes to be updated about. + Returns: + A Deferred. + """ to_unpoll = set() for u in remoteusers: @@ -606,6 +723,19 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks @log_function def push_presence(self, user, statuscache): + """ + Notify local and remote users of a change in presence of a local user. + Pushes the update to local clients and remote domains that are directly + subscribed to the presence of the local user. + Also pushes that update to any local user or remote domain that shares + a room with the local user. + + Args: + user(UserID): The local user whose presence was updated. + statuscache(UserPresenceCache): Cache of the user's presence state + Returns: + A Deferred. + """ assert(self.hs.is_mine(user)) logger.debug("Pushing presence update from %s", user) @@ -633,44 +763,23 @@ class PresenceHandler(BaseHandler): yield self.distributor.fire("user_presence_changed", user, statuscache) @defer.inlineCallbacks - def _push_presence_remote(self, user, destination, state=None): - if state is None: - state = yield self.store.get_presence_state(user.localpart) - del state["mtime"] - state["presence"] = state.pop("state") - - if user in self._user_cachemap: - state["last_active"] = ( - self._user_cachemap[user].get_state()["last_active"] - ) - - yield self.distributor.fire( - "collect_presencelike_data", user, state - ) - - if "last_active" in state: - state = dict(state) - state["last_active_ago"] = int( - self.clock.time_msec() - state.pop("last_active") - ) - - user_state = { - "user_id": user.to_string(), - } - user_state.update(**state) - - yield self.federation.send_edu( - destination=destination, - edu_type="m.presence", - content={ - "push": [ - user_state, - ], - } - ) - - @defer.inlineCallbacks def incoming_presence(self, origin, content): + """Handle an incoming m.presence EDU. + For each presence update in the "push" list update our local cache and + notify the appropriate local clients. Only clients that share a room + or are directly subscribed to the presence for a user should be + notified of the update. + For each subscription request in the "poll" list start pushing presence + updates to the remote server. + For unsubscribe request in the "unpoll" list stop pushing presence + updates to the remote server. + + Args: + orgin(str): The source of this m.presence EDU. + content(dict): The content of this m.presence EDU. + Returns: + A Deferred. + """ deferreds = [] for push in content.get("push", []): @@ -714,10 +823,7 @@ class PresenceHandler(BaseHandler): continue self.push_update_to_clients( - observed_user=user, - users_to_push=observers, - room_ids=room_ids, - statuscache=statuscache, + users_to_push=observers, room_ids=room_ids ) user_id = user.to_string() @@ -772,6 +878,23 @@ class PresenceHandler(BaseHandler): def push_update_to_local_and_remote(self, observed_user, statuscache, users_to_push=[], room_ids=[], remote_domains=[]): + """Notify local clients and remote servers of a change in the presence + of a user. + + Args: + observed_user(UserID): The user to push the presence state for. + statuscache(UserPresenceCache): The cache for the presence state to + push. + users_to_push([UserID]): A list of local and remote users to + notify. + room_ids([str]): Notify the local and remote occupants of these + rooms. + remote_domains([str]): A list of remote servers to notify in + addition to those implied by the users_to_push and the + room_ids. + Returns: + A Deferred. + """ localusers, remoteusers = partitionbool( users_to_push, @@ -781,10 +904,7 @@ class PresenceHandler(BaseHandler): localusers = set(localusers) self.push_update_to_clients( - observed_user=observed_user, - users_to_push=localusers, - room_ids=room_ids, - statuscache=statuscache, + users_to_push=localusers, room_ids=room_ids ) remote_domains = set(remote_domains) @@ -809,8 +929,13 @@ class PresenceHandler(BaseHandler): defer.returnValue((localusers, remote_domains)) - def push_update_to_clients(self, observed_user, users_to_push=[], - room_ids=[], statuscache=None): + def push_update_to_clients(self, users_to_push=[], room_ids=[]): + """Notify clients of a new presence event. + + Args: + users_to_push([UserID]): List of users to notify. + room_ids([str]): List of room_ids to notify. + """ with PreserveLoggingContext(): self.notifier.on_new_user_event( "presence_key", @@ -819,6 +944,52 @@ class PresenceHandler(BaseHandler): room_ids, ) + @defer.inlineCallbacks + def _push_presence_remote(self, user, destination, state=None): + """Push a user's presence to a remote server. If a presence state event + that event is sent. Otherwise a new state event is constructed from the + stored presence state. + The last_active is replaced with last_active_ago in case the wallclock + time on the remote server is different to the time on this server. + Sends an EDU to the remote server with the current presence state. + + Args: + user(UserID): The user to push the presence state for. + destination(str): The remote server to send state to. + state(dict): The state to push, or None to use the current stored + state. + Returns: + A Deferred. + """ + if state is None: + state = yield self.store.get_presence_state(user.localpart) + del state["mtime"] + state["presence"] = state.pop("state") + + if user in self._user_cachemap: + state["last_active"] = ( + self._user_cachemap[user].get_state()["last_active"] + ) + + yield self.distributor.fire( + "collect_presencelike_data", user, state + ) + + if "last_active" in state: + state = dict(state) + state["last_active_ago"] = int( + self.clock.time_msec() - state.pop("last_active") + ) + + user_state = {"user_id": user.to_string(), } + user_state.update(state) + + yield self.federation.send_edu( + destination=destination, + edu_type="m.presence", + content={"push": [user_state, ], } + ) + class PresenceEventSource(object): def __init__(self, hs): diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 71ff78ab23..799faffe53 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -88,6 +88,9 @@ class ProfileHandler(BaseHandler): if target_user != auth_user: raise AuthError(400, "Cannot set another user's displayname") + if new_displayname == '': + new_displayname = None + yield self.store.set_profile_displayname( target_user.localpart, new_displayname ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index dac683616a..401cc677d1 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -26,6 +26,7 @@ from synapse.util.async import run_on_reactor from synapse.events.utils import serialize_event import logging +import string logger = logging.getLogger(__name__) @@ -50,6 +51,10 @@ class RoomCreationHandler(BaseHandler): self.ratelimit(user_id) if "room_alias_name" in config: + for wchar in string.whitespace: + if wchar in config["room_alias_name"]: + raise SynapseError(400, "Invalid characters in room alias") + room_alias = RoomAlias.create( config["room_alias_name"], self.hs.hostname, diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 3640fb4a29..72dfb876c5 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -82,8 +82,10 @@ class RegisterRestServlet(RestServlet): [LoginType.EMAIL_IDENTITY] ] + result = None if service: is_application_server = True + params = body elif 'mac' in body: # Check registration-specific shared secret auth if 'username' not in body: @@ -92,6 +94,7 @@ class RegisterRestServlet(RestServlet): body['username'], body['mac'] ) is_using_shared_secret = True + params = body else: authed, result, params = yield self.auth_handler.check_auth( flows, body, self.hs.get_ip_from_request(request) @@ -118,7 +121,7 @@ class RegisterRestServlet(RestServlet): password=new_password ) - if LoginType.EMAIL_IDENTITY in result: + if result and LoginType.EMAIL_IDENTITY in result: threepid = result[LoginType.EMAIL_IDENTITY] for reqd in ['medium', 'address', 'validated_at']: diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c9fe5a3555..81052409b7 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -309,6 +309,7 @@ class SQLBaseStore(object): self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self) self._pushers_id_gen = IdGenerator("pushers", "id", self) self._push_rule_id_gen = IdGenerator("push_rules", "id", self) + self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self) def start_profiling(self): self._previous_loop_ts = self._clock.time_msec() diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 34805e276e..e7988676ce 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -204,11 +204,21 @@ class PushRuleStore(SQLBaseStore): @defer.inlineCallbacks def set_push_rule_enabled(self, user_name, rule_id, enabled): - yield self._simple_upsert( + ret = yield self.runInteraction( + "_set_push_rule_enabled_txn", + self._set_push_rule_enabled_txn, + user_name, rule_id, enabled + ) + defer.returnValue(ret) + + def _set_push_rule_enabled_txn(self, txn, user_name, rule_id, enabled): + new_id = self._push_rules_enable_id_gen.get_next_txn(txn) + self._simple_upsert_txn( + txn, PushRuleEnableTable.table_name, {'user_name': user_name, 'rule_id': rule_id}, {'enabled': 1 if enabled else 0}, - desc="set_push_rule_enabled", + {'id': new_id}, ) |