diff options
author | Daniel Wagner-Hall <daniel@matrix.org> | 2016-03-08 17:35:09 +0000 |
---|---|---|
committer | Daniel Wagner-Hall <daniel@matrix.org> | 2016-03-08 17:35:09 +0000 |
commit | 3b97797c8d60bf9b6e5e09396620144ee9a8bc83 (patch) | |
tree | 3f4d437943e53a3f4be2d6626f0585d471c0cb7f /synapse | |
parent | Idempotent-ise schema update script (diff) | |
parent | Merge pull request #630 from matrix-org/dbkr/post_urlencoded_encode_params (diff) | |
download | synapse-3b97797c8d60bf9b6e5e09396620144ee9a8bc83.tar.xz |
Merge branch 'develop' into daniel/ick
Diffstat (limited to 'synapse')
37 files changed, 802 insertions, 299 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 183245443c..3038df4ab8 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -534,7 +534,7 @@ class Auth(object): ) access_token = request.args["access_token"][0] - user_info = yield self._get_user_by_access_token(access_token) + user_info = yield self.get_user_by_access_token(access_token) user = user_info["user"] token_id = user_info["token_id"] is_guest = user_info["is_guest"] @@ -595,7 +595,7 @@ class Auth(object): defer.returnValue(user_id) @defer.inlineCallbacks - def _get_user_by_access_token(self, token): + def get_user_by_access_token(self, token): """ Get a registered user's ID. Args: diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 021dc1d610..fcdc8e6e10 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -722,7 +722,7 @@ def run(hs): if hs.config.daemonize: if hs.config.print_pidfile: - print hs.config.pid_file + print (hs.config.pid_file) daemon = Daemonize( app="synapse-homeserver", diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index 9249e36d82..ab3a31d7b7 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -29,13 +29,13 @@ NORMAL = "\x1b[m" def start(configfile): - print "Starting ...", + print ("Starting ...") args = SYNAPSE args.extend(["--daemonize", "-c", configfile]) try: subprocess.check_call(args) - print GREEN + "started" + NORMAL + print (GREEN + "started" + NORMAL) except subprocess.CalledProcessError as e: print ( RED + @@ -48,7 +48,7 @@ def stop(pidfile): if os.path.exists(pidfile): pid = int(open(pidfile).read()) os.kill(pid, signal.SIGTERM) - print GREEN + "stopped" + NORMAL + print (GREEN + "stopped" + NORMAL) def main(): diff --git a/synapse/config/__main__.py b/synapse/config/__main__.py index 0a3b70e11f..58c97a70af 100644 --- a/synapse/config/__main__.py +++ b/synapse/config/__main__.py @@ -28,7 +28,7 @@ if __name__ == "__main__": sys.stderr.write("\n" + e.message + "\n") sys.exit(1) - print getattr(config, key) + print (getattr(config, key)) sys.exit(0) else: sys.stderr.write("Unknown command %r\n" % (action,)) diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 15d78ff33a..7449f36491 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -104,7 +104,7 @@ class Config(object): dir_path = cls.abspath(dir_path) try: os.makedirs(dir_path) - except OSError, e: + except OSError as e: if e.errno != errno.EEXIST: raise if not os.path.isdir(dir_path): diff --git a/synapse/config/api.py b/synapse/config/api.py new file mode 100644 index 0000000000..20ba33226a --- /dev/null +++ b/synapse/config/api.py @@ -0,0 +1,40 @@ +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import Config + +from synapse.api.constants import EventTypes + + +class ApiConfig(Config): + + def read_config(self, config): + self.room_invite_state_types = config.get("room_invite_state_types", [ + EventTypes.JoinRules, + EventTypes.CanonicalAlias, + EventTypes.RoomAvatar, + EventTypes.Name, + ]) + + def default_config(cls, **kwargs): + return """\ + ## API Configuration ## + + # A list of event types that will be included in the room_invite_state + room_invite_state_types: + - "{JoinRules}" + - "{CanonicalAlias}" + - "{RoomAvatar}" + - "{Name}" + """.format(**vars(EventTypes)) diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 3c333b4172..a08c170f1d 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -23,6 +23,7 @@ from .captcha import CaptchaConfig from .voip import VoipConfig from .registration import RegistrationConfig from .metrics import MetricsConfig +from .api import ApiConfig from .appservice import AppServiceConfig from .key import KeyConfig from .saml2 import SAML2Config @@ -32,7 +33,7 @@ from .password import PasswordConfig class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, RatelimitConfig, ContentRepositoryConfig, CaptchaConfig, - VoipConfig, RegistrationConfig, MetricsConfig, + VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig, AppServiceConfig, KeyConfig, SAML2Config, CasConfig, PasswordConfig,): pass diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index e30e2da58d..83c1f46586 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -114,7 +114,7 @@ class FederationClient(FederationBase): @log_function def make_query(self, destination, query_type, args, - retry_on_dns_fail=True): + retry_on_dns_fail=False): """Sends a federation Query to a remote homeserver of the given type and arguments. diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 2b5d40ea7f..2237e3413c 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -160,6 +160,7 @@ class TransportLayerClient(object): path=path, args=args, retry_on_dns_fail=retry_on_dns_fail, + timeout=10000, ) defer.returnValue(content) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index bdade98bf7..90eabb6eb7 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -29,6 +29,14 @@ import logging logger = logging.getLogger(__name__) +VISIBILITY_PRIORITY = ( + "world_readable", + "shared", + "invited", + "joined", +) + + class BaseHandler(object): """ Common base class for the event handlers. @@ -85,10 +93,28 @@ class BaseHandler(object): else: visibility = "shared" + if visibility not in VISIBILITY_PRIORITY: + visibility = "shared" + # if it was world_readable, it's easy: everyone can read it if visibility == "world_readable": return True + # Always allow history visibility events on boundaries. This is done + # by setting the effective visibility to the least restrictive + # of the old vs new. + if event.type == EventTypes.RoomHistoryVisibility: + prev_content = event.unsigned.get("prev_content", {}) + prev_visibility = prev_content.get("history_visibility", None) + + if prev_visibility not in VISIBILITY_PRIORITY: + prev_visibility = "shared" + + new_priority = VISIBILITY_PRIORITY.index(visibility) + old_priority = VISIBILITY_PRIORITY.index(prev_visibility) + if old_priority < new_priority: + visibility = prev_visibility + # get the user's membership at the time of the event. (or rather, # just *after* the event. Which means that people can see their # own join events, but not (currently) their own leave events.) @@ -160,10 +186,10 @@ class BaseHandler(object): ) defer.returnValue(res.get(user_id, [])) - def ratelimit(self, user_id): + def ratelimit(self, requester): time_now = self.clock.time() allowed, time_allowed = self.ratelimiter.send_message( - user_id, time_now, + requester.user.to_string(), time_now, msg_rate_hz=self.hs.config.rc_messages_per_second, burst_count=self.hs.config.rc_message_burst_count, ) @@ -199,8 +225,7 @@ class BaseHandler(object): # events in the room, because we don't know enough about the graph # fragment we received to treat it like a graph, so the above returned # no relevant events. It may have returned some events (if we have - # joined and left the room), but not useful ones, like the invite. So we - # forcibly set our context to the invite we received over federation. + # joined and left the room), but not useful ones, like the invite. if ( not self.is_host_in_room(context.current_state) and builder.type == EventTypes.Member @@ -208,7 +233,27 @@ class BaseHandler(object): prev_member_event = yield self.store.get_room_member( builder.sender, builder.room_id ) - if prev_member_event: + + # The prev_member_event may already be in context.current_state, + # despite us not being present in the room; in particular, if + # inviting user, and all other local users, have already left. + # + # In that case, we have all the information we need, and we don't + # want to drop "context" - not least because we may need to handle + # the invite locally, which will require us to have the whole + # context (not just prev_member_event) to auth it. + # + context_event_ids = ( + e.event_id for e in context.current_state.values() + ) + + if ( + prev_member_event and + prev_member_event.event_id not in context_event_ids + ): + # The prev_member_event is missing from context, so it must + # have arrived over federation and is an outlier. We forcibly + # set our context to the invite we received over federation builder.prev_events = ( prev_member_event.event_id, prev_member_event.prev_events @@ -263,11 +308,18 @@ class BaseHandler(object): return False @defer.inlineCallbacks - def handle_new_client_event(self, event, context, ratelimit=True, extra_users=[]): + def handle_new_client_event( + self, + requester, + event, + context, + ratelimit=True, + extra_users=[] + ): # We now need to go and hit out to wherever we need to hit out to. if ratelimit: - self.ratelimit(event.sender) + self.ratelimit(requester) self.auth.check(event, auth_events=context.current_state) @@ -307,12 +359,8 @@ class BaseHandler(object): "sender": e.sender, } for k, e in context.current_state.items() - if e.type in ( - EventTypes.JoinRules, - EventTypes.CanonicalAlias, - EventTypes.RoomAvatar, - EventTypes.Name, - ) or is_inviter_member_event(e) + if e.type in self.hs.config.room_invite_state_types + or is_inviter_member_event(e) ] invitee = UserID.from_string(event.state_key) @@ -348,6 +396,12 @@ class BaseHandler(object): "You don't have permission to redact events" ) + if event.type == EventTypes.Create and context.current_state: + raise AuthError( + 403, + "Changing the room create event is forbidden", + ) + action_generator = ActionGenerator(self.hs) yield action_generator.handle_push_actions_for_event( event, context, self diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 62e82a2570..7a4afe446d 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -477,4 +477,4 @@ class AuthHandler(BaseHandler): Returns: Whether self.hash(password) == stored_hash (bool). """ - return bcrypt.checkpw(password, stored_hash) + return bcrypt.hashpw(password, stored_hash) == stored_hash diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index e0a778e7ff..c4aaa11918 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -17,9 +17,9 @@ from twisted.internet import defer from ._base import BaseHandler -from synapse.api.errors import SynapseError, Codes, CodeMessageException +from synapse.api.errors import SynapseError, Codes, CodeMessageException, AuthError from synapse.api.constants import EventTypes -from synapse.types import RoomAlias +from synapse.types import RoomAlias, UserID import logging import string @@ -38,7 +38,7 @@ class DirectoryHandler(BaseHandler): ) @defer.inlineCallbacks - def _create_association(self, room_alias, room_id, servers=None): + def _create_association(self, room_alias, room_id, servers=None, creator=None): # general association creation for both human users and app services for wchar in string.whitespace: @@ -60,7 +60,8 @@ class DirectoryHandler(BaseHandler): yield self.store.create_room_alias_association( room_alias, room_id, - servers + servers, + creator=creator, ) @defer.inlineCallbacks @@ -77,7 +78,7 @@ class DirectoryHandler(BaseHandler): 400, "This alias is reserved by an application service.", errcode=Codes.EXCLUSIVE ) - yield self._create_association(room_alias, room_id, servers) + yield self._create_association(room_alias, room_id, servers, creator=user_id) @defer.inlineCallbacks def create_appservice_association(self, service, room_alias, room_id, @@ -95,7 +96,11 @@ class DirectoryHandler(BaseHandler): def delete_association(self, user_id, room_alias): # association deletion for human users - # TODO Check if server admin + can_delete = yield self._user_can_delete_alias(room_alias, user_id) + if not can_delete: + raise AuthError( + 403, "You don't have permission to delete the alias.", + ) can_delete = yield self.can_modify_alias( room_alias, @@ -212,17 +217,21 @@ class DirectoryHandler(BaseHandler): ) @defer.inlineCallbacks - def send_room_alias_update_event(self, user_id, room_id): + def send_room_alias_update_event(self, requester, user_id, room_id): aliases = yield self.store.get_aliases_for_room(room_id) msg_handler = self.hs.get_handlers().message_handler - yield msg_handler.create_and_send_nonmember_event({ - "type": EventTypes.Aliases, - "state_key": self.hs.hostname, - "room_id": room_id, - "sender": user_id, - "content": {"aliases": aliases}, - }, ratelimit=False) + yield msg_handler.create_and_send_nonmember_event( + requester, + { + "type": EventTypes.Aliases, + "state_key": self.hs.hostname, + "room_id": room_id, + "sender": user_id, + "content": {"aliases": aliases}, + }, + ratelimit=False + ) @defer.inlineCallbacks def get_association_from_room_alias(self, room_alias): @@ -257,3 +266,13 @@ class DirectoryHandler(BaseHandler): return # either no interested services, or no service with an exclusive lock defer.returnValue(True) + + @defer.inlineCallbacks + def _user_can_delete_alias(self, alias, user_id): + creator = yield self.store.get_room_alias_creator(alias.to_string()) + + if creator and creator == user_id: + defer.returnValue(True) + + is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id)) + defer.returnValue(is_admin) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3655b9e5e2..27f2b40bfe 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -472,7 +472,7 @@ class FederationHandler(BaseHandler): limit=100, extremities=[e for e in extremities.keys()] ) - except SynapseError: + except SynapseError as e: logger.info( "Failed to backfill from %s because %s", dom, e, @@ -1657,7 +1657,7 @@ class FederationHandler(BaseHandler): self.auth.check(event, context.current_state) yield self._check_signature(event, auth_events=context.current_state) member_handler = self.hs.get_handlers().room_member_handler - yield member_handler.send_membership_event(event, context, from_client=False) + yield member_handler.send_membership_event(None, event, context) else: destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id)) yield self.replication_layer.forward_third_party_invite( @@ -1686,7 +1686,7 @@ class FederationHandler(BaseHandler): # TODO: Make sure the signatures actually are correct. event.signatures.update(returned_invite.signatures) member_handler = self.hs.get_handlers().room_member_handler - yield member_handler.send_membership_event(event, context, from_client=False) + yield member_handler.send_membership_event(None, event, context) @defer.inlineCallbacks def add_display_name_to_third_party_invite(self, event_dict, event, context): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index afa7c9c36c..5c50c611ba 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -196,12 +196,25 @@ class MessageHandler(BaseHandler): if builder.type == EventTypes.Member: membership = builder.content.get("membership", None) + target = UserID.from_string(builder.state_key) + if membership == Membership.JOIN: - joinee = UserID.from_string(builder.state_key) # If event doesn't include a display name, add one. yield collect_presencelike_data( - self.distributor, joinee, builder.content + self.distributor, target, builder.content ) + elif membership == Membership.INVITE: + profile = self.hs.get_handlers().profile_handler + content = builder.content + + try: + content["displayname"] = yield profile.get_displayname(target) + content["avatar_url"] = yield profile.get_avatar_url(target) + except Exception as e: + logger.info( + "Failed to get profile information for %r: %s", + target, e + ) if token_id is not None: builder.internal_metadata.token_id = token_id @@ -215,7 +228,7 @@ class MessageHandler(BaseHandler): defer.returnValue((event, context)) @defer.inlineCallbacks - def send_nonmember_event(self, event, context, ratelimit=True): + def send_nonmember_event(self, requester, event, context, ratelimit=True): """ Persists and notifies local clients and federation of an event. @@ -241,6 +254,7 @@ class MessageHandler(BaseHandler): defer.returnValue(prev_state) yield self.handle_new_client_event( + requester=requester, event=event, context=context, ratelimit=ratelimit, @@ -268,9 +282,9 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def create_and_send_nonmember_event( self, + requester, event_dict, ratelimit=True, - token_id=None, txn_id=None ): """ @@ -280,10 +294,11 @@ class MessageHandler(BaseHandler): """ event, context = yield self.create_event( event_dict, - token_id=token_id, + token_id=requester.access_token_id, txn_id=txn_id ) yield self.send_nonmember_event( + requester, event, context, ratelimit=ratelimit, @@ -647,8 +662,8 @@ class MessageHandler(BaseHandler): user_id, messages, is_peeking=is_peeking ) - start_token = StreamToken(token[0], 0, 0, 0, 0) - end_token = StreamToken(token[1], 0, 0, 0, 0) + start_token = StreamToken.START.copy_and_replace("room_key", token[0]) + end_token = StreamToken.START.copy_and_replace("room_key", token[1]) time_now = self.clock.time_msec() diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index c9ad5944e6..b45eafbb49 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -89,13 +89,13 @@ class ProfileHandler(BaseHandler): defer.returnValue(result["displayname"]) @defer.inlineCallbacks - def set_displayname(self, target_user, auth_user, new_displayname): + def set_displayname(self, target_user, requester, new_displayname): """target_user is the user whose displayname is to be changed; auth_user is the user attempting to make this change.""" if not self.hs.is_mine(target_user): raise SynapseError(400, "User is not hosted on this Home Server") - if target_user != auth_user: + if target_user != requester.user: raise AuthError(400, "Cannot set another user's displayname") if new_displayname == '': @@ -109,7 +109,7 @@ class ProfileHandler(BaseHandler): "displayname": new_displayname, }) - yield self._update_join_states(target_user) + yield self._update_join_states(requester) @defer.inlineCallbacks def get_avatar_url(self, target_user): @@ -139,13 +139,13 @@ class ProfileHandler(BaseHandler): defer.returnValue(result["avatar_url"]) @defer.inlineCallbacks - def set_avatar_url(self, target_user, auth_user, new_avatar_url): + def set_avatar_url(self, target_user, requester, new_avatar_url): """target_user is the user whose avatar_url is to be changed; auth_user is the user attempting to make this change.""" if not self.hs.is_mine(target_user): raise SynapseError(400, "User is not hosted on this Home Server") - if target_user != auth_user: + if target_user != requester.user: raise AuthError(400, "Cannot set another user's avatar_url") yield self.store.set_profile_avatar_url( @@ -156,7 +156,7 @@ class ProfileHandler(BaseHandler): "avatar_url": new_avatar_url, }) - yield self._update_join_states(target_user) + yield self._update_join_states(requester) @defer.inlineCallbacks def collect_presencelike_data(self, user, state): @@ -199,11 +199,12 @@ class ProfileHandler(BaseHandler): defer.returnValue(response) @defer.inlineCallbacks - def _update_join_states(self, user): + def _update_join_states(self, requester): + user = requester.user if not self.hs.is_mine(user): return - self.ratelimit(user.to_string()) + self.ratelimit(requester) joins = yield self.store.get_rooms_for_user( user.to_string(), diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 6d155d57e7..e2ace6a4e5 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -157,6 +157,7 @@ class RegistrationHandler(BaseHandler): ) except SynapseError: # if user id is taken, just generate another + user = None user_id = None token = None attempts += 1 @@ -240,7 +241,7 @@ class RegistrationHandler(BaseHandler): password_hash=None ) yield registered_user(self.distributor, user) - except Exception, e: + except Exception as e: yield self.store.add_access_token_to_user(user_id, token) # Ignore Registration errors logger.exception(e) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index d2de23a6cc..57113ae4a5 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -18,7 +18,7 @@ from twisted.internet import defer from ._base import BaseHandler -from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken +from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken, Requester from synapse.api.constants import ( EventTypes, Membership, JoinRules, RoomCreationPreset, ) @@ -90,7 +90,7 @@ class RoomCreationHandler(BaseHandler): """ user_id = requester.user.to_string() - self.ratelimit(user_id) + self.ratelimit(requester) if "room_alias_name" in config: for wchar in string.whitespace: @@ -185,26 +185,32 @@ class RoomCreationHandler(BaseHandler): if "name" in config: name = config["name"] - yield msg_handler.create_and_send_nonmember_event({ - "type": EventTypes.Name, - "room_id": room_id, - "sender": user_id, - "state_key": "", - "content": {"name": name}, - }, ratelimit=False) + yield msg_handler.create_and_send_nonmember_event( + requester, + { + "type": EventTypes.Name, + "room_id": room_id, + "sender": user_id, + "state_key": "", + "content": {"name": name}, + }, + ratelimit=False) if "topic" in config: topic = config["topic"] - yield msg_handler.create_and_send_nonmember_event({ - "type": EventTypes.Topic, - "room_id": room_id, - "sender": user_id, - "state_key": "", - "content": {"topic": topic}, - }, ratelimit=False) + yield msg_handler.create_and_send_nonmember_event( + requester, + { + "type": EventTypes.Topic, + "room_id": room_id, + "sender": user_id, + "state_key": "", + "content": {"topic": topic}, + }, + ratelimit=False) for invitee in invite_list: - room_member_handler.update_membership( + yield room_member_handler.update_membership( requester, UserID.from_string(invitee), room_id, @@ -231,7 +237,7 @@ class RoomCreationHandler(BaseHandler): if room_alias: result["room_alias"] = room_alias.to_string() yield directory_handler.send_room_alias_update_event( - user_id, room_id + requester, user_id, room_id ) defer.returnValue(result) @@ -263,7 +269,11 @@ class RoomCreationHandler(BaseHandler): @defer.inlineCallbacks def send(etype, content, **kwargs): event = create(etype, content, **kwargs) - yield msg_handler.create_and_send_nonmember_event(event, ratelimit=False) + yield msg_handler.create_and_send_nonmember_event( + creator, + event, + ratelimit=False + ) config = RoomCreationHandler.PRESETS_DICT[preset_config] @@ -454,12 +464,11 @@ class RoomMemberHandler(BaseHandler): member_handler = self.hs.get_handlers().room_member_handler yield member_handler.send_membership_event( + requester, event, context, - is_guest=requester.is_guest, ratelimit=ratelimit, remote_room_hosts=remote_room_hosts, - from_client=True, ) if action == "forget": @@ -468,17 +477,19 @@ class RoomMemberHandler(BaseHandler): @defer.inlineCallbacks def send_membership_event( self, + requester, event, context, - is_guest=False, remote_room_hosts=None, ratelimit=True, - from_client=True, ): """ Change the membership status of a user in a room. Args: + requester (Requester): The local user who requested the membership + event. If None, certain checks, like whether this homeserver can + act as the sender, will be skipped. event (SynapseEvent): The membership event. context: The context of the event. is_guest (bool): Whether the sender is a guest. @@ -486,19 +497,23 @@ class RoomMemberHandler(BaseHandler): the room, and could be danced with in order to join this homeserver for the first time. ratelimit (bool): Whether to rate limit this request. - from_client (bool): Whether this request is the result of a local - client request (rather than over federation). If so, we will - perform extra checks, like that this homeserver can act as this - client. Raises: SynapseError if there was a problem changing the membership. """ + remote_room_hosts = remote_room_hosts or [] + target_user = UserID.from_string(event.state_key) room_id = event.room_id - if from_client: + if requester is not None: sender = UserID.from_string(event.sender) + assert sender == requester.user, ( + "Sender (%s) must be same as requester (%s)" % + (sender, requester.user) + ) assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,) + else: + requester = Requester(target_user, None, False) message_handler = self.hs.get_handlers().message_handler prev_event = message_handler.deduplicate_state_event(event, context) @@ -508,7 +523,7 @@ class RoomMemberHandler(BaseHandler): action = "send" if event.membership == Membership.JOIN: - if is_guest and not self._can_guest_join(context.current_state): + if requester.is_guest and not self._can_guest_join(context.current_state): # This should be an auth check, but guests are a local concept, # so don't really fit into the general auth process. raise AuthError(403, "Guest access not allowed") @@ -521,8 +536,24 @@ class RoomMemberHandler(BaseHandler): action = "remote_join" elif event.membership == Membership.LEAVE: is_host_in_room = self.is_host_in_room(context.current_state) + if not is_host_in_room: - action = "remote_reject" + # perhaps we've been invited + inviter = self.get_inviter(target_user.to_string(), context.current_state) + if not inviter: + raise SynapseError(404, "Not a known room") + + if self.hs.is_mine(inviter): + # the inviter was on our server, but has now left. Carry on + # with the normal rejection codepath. + # + # This is a bit of a hack, because the room might still be + # active on other servers. + pass + else: + # send the rejection to the inviter's HS. + remote_room_hosts = remote_room_hosts + [inviter.domain] + action = "remote_reject" federation_handler = self.hs.get_handlers().federation_handler @@ -541,16 +572,14 @@ class RoomMemberHandler(BaseHandler): event.content, ) elif action == "remote_reject": - inviter = self.get_inviter(target_user.to_string(), context.current_state) - if not inviter: - raise SynapseError(404, "No known servers") yield federation_handler.do_remotely_reject_invite( - [inviter.domain], + remote_room_hosts, room_id, event.user_id ) else: yield self.handle_new_client_event( + requester, event, context, extra_users=[target_user], @@ -669,12 +698,12 @@ class RoomMemberHandler(BaseHandler): ) else: yield self._make_and_store_3pid_invite( + requester, id_server, medium, address, room_id, inviter, - requester.access_token_id, txn_id=txn_id ) @@ -732,12 +761,12 @@ class RoomMemberHandler(BaseHandler): @defer.inlineCallbacks def _make_and_store_3pid_invite( self, + requester, id_server, medium, address, room_id, user, - token_id, txn_id ): room_state = yield self.hs.get_state_handler().get_current_state(room_id) @@ -787,6 +816,7 @@ class RoomMemberHandler(BaseHandler): msg_handler = self.hs.get_handlers().message_handler yield msg_handler.create_and_send_nonmember_event( + requester, { "type": EventTypes.ThirdPartyInvite, "content": { @@ -801,7 +831,6 @@ class RoomMemberHandler(BaseHandler): "sender": user.to_string(), "state_key": token, }, - token_id=token_id, txn_id=txn_id, ) @@ -855,6 +884,10 @@ class RoomMemberHandler(BaseHandler): inviter_user_id=inviter_user_id, ) + guest_user_info = yield self.hs.get_auth().get_user_by_access_token( + guest_access_token + ) + is_url = "%s%s/_matrix/identity/api/v1/store-invite" % ( id_server_scheme, id_server, ) @@ -871,6 +904,7 @@ class RoomMemberHandler(BaseHandler): "sender": inviter_user_id, "sender_display_name": inviter_display_name, "sender_avatar_url": inviter_avatar_url, + "guest_user_id": guest_user_info["user"].to_string(), "guest_access_token": guest_access_token, } ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index fded6e4009..1f6fde8e8a 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -20,6 +20,7 @@ from synapse.api.constants import Membership, EventTypes from synapse.util import unwrapFirstError from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.metrics import Measure +from synapse.push.clientformat import format_push_rules_for_user from twisted.internet import defer @@ -209,9 +210,9 @@ class SyncHandler(BaseHandler): key=None ) - membership_list = (Membership.INVITE, Membership.JOIN) - if sync_config.filter_collection.include_leave: - membership_list += (Membership.LEAVE, Membership.BAN) + membership_list = ( + Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN + ) room_list = yield self.store.get_rooms_for_user_where_membership_is( user_id=sync_config.user.to_string(), @@ -224,6 +225,10 @@ class SyncHandler(BaseHandler): ) ) + account_data['m.push_rules'] = yield self.push_rules_for_user( + sync_config.user + ) + tags_by_room = yield self.store.get_tags_for_user( sync_config.user.to_string() ) @@ -257,6 +262,12 @@ class SyncHandler(BaseHandler): invite=invite, )) elif event.membership in (Membership.LEAVE, Membership.BAN): + # Always send down rooms we were banned or kicked from. + if not sync_config.filter_collection.include_leave: + if event.membership == Membership.LEAVE: + if sync_config.user.to_string() == event.sender: + continue + leave_token = now_token.copy_and_replace( "room_key", "s%d" % (event.stream_ordering,) ) @@ -322,6 +333,14 @@ class SyncHandler(BaseHandler): defer.returnValue(room_sync) + @defer.inlineCallbacks + def push_rules_for_user(self, user): + user_id = user.to_string() + rawrules = yield self.store.get_push_rules_for_user(user_id) + enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id) + rules = format_push_rules_for_user(user, rawrules, enabled_map) + defer.returnValue(rules) + def account_data_for_user(self, account_data): account_data_events = [] @@ -481,6 +500,15 @@ class SyncHandler(BaseHandler): ) ) + push_rules_changed = yield self.store.have_push_rules_changed_for_user( + user_id, int(since_token.push_rules_key) + ) + + if push_rules_changed: + account_data["m.push_rules"] = yield self.push_rules_for_user( + sync_config.user + ) + # Get a list of membership change events that have happened. rooms_changed = yield self.store.get_membership_changes_for_user( user_id, since_token.room_key, now_token.room_key diff --git a/synapse/http/client.py b/synapse/http/client.py index fdd90b1c3c..cbd45b2bbe 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -103,7 +103,7 @@ class SimpleHttpClient(object): # TODO: Do we ever want to log message contents? logger.debug("post_urlencoded_get_json args: %s", args) - query_bytes = urllib.urlencode(args, True) + query_bytes = urllib.urlencode(encode_urlencode_args(args), True) response = yield self.request( "POST", @@ -249,7 +249,7 @@ class CaptchaServerHttpClient(SimpleHttpClient): @defer.inlineCallbacks def post_urlencoded_get_raw(self, url, args={}): - query_bytes = urllib.urlencode(args, True) + query_bytes = urllib.urlencode(encode_urlencode_args(args), True) response = yield self.request( "POST", @@ -269,6 +269,19 @@ class CaptchaServerHttpClient(SimpleHttpClient): defer.returnValue(e.response) +def encode_urlencode_args(args): + return {k: encode_urlencode_arg(v) for k, v in args.items()} + + +def encode_urlencode_arg(arg): + if isinstance(arg, unicode): + return arg.encode('utf-8') + elif isinstance(arg, list): + return [encode_urlencode_arg(i) for i in arg] + else: + return arg + + def _print_ex(e): if hasattr(e, "reasons") and e.reasons: for ex in e.reasons: diff --git a/synapse/notifier.py b/synapse/notifier.py index 3c36a20868..9b69b0333a 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -284,7 +284,7 @@ class Notifier(object): @defer.inlineCallbacks def wait_for_events(self, user_id, timeout, callback, room_ids=None, - from_token=StreamToken("s0", "0", "0", "0", "0")): + from_token=StreamToken.START): """Wait until the callback returns a non empty response or the timeout fires. """ diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py new file mode 100644 index 0000000000..ae9db9ec2f --- /dev/null +++ b/synapse/push/clientformat.py @@ -0,0 +1,112 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.push.baserules import list_with_base_rules + +from synapse.push.rulekinds import ( + PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP +) + +import copy +import simplejson as json + + +def format_push_rules_for_user(user, rawrules, enabled_map): + """Converts a list of rawrules and a enabled map into nested dictionaries + to match the Matrix client-server format for push rules""" + + ruleslist = [] + for rawrule in rawrules: + rule = dict(rawrule) + rule["conditions"] = json.loads(rawrule["conditions"]) + rule["actions"] = json.loads(rawrule["actions"]) + ruleslist.append(rule) + + # We're going to be mutating this a lot, so do a deep copy + ruleslist = copy.deepcopy(list_with_base_rules(ruleslist)) + + rules = {'global': {}, 'device': {}} + + rules['global'] = _add_empty_priority_class_arrays(rules['global']) + + for r in ruleslist: + rulearray = None + + template_name = _priority_class_to_template_name(r['priority_class']) + + # Remove internal stuff. + for c in r["conditions"]: + c.pop("_id", None) + + pattern_type = c.pop("pattern_type", None) + if pattern_type == "user_id": + c["pattern"] = user.to_string() + elif pattern_type == "user_localpart": + c["pattern"] = user.localpart + + rulearray = rules['global'][template_name] + + template_rule = _rule_to_template(r) + if template_rule: + if r['rule_id'] in enabled_map: + template_rule['enabled'] = enabled_map[r['rule_id']] + elif 'enabled' in r: + template_rule['enabled'] = r['enabled'] + else: + template_rule['enabled'] = True + rulearray.append(template_rule) + + return rules + + +def _add_empty_priority_class_arrays(d): + for pc in PRIORITY_CLASS_MAP.keys(): + d[pc] = [] + return d + + +def _rule_to_template(rule): + unscoped_rule_id = None + if 'rule_id' in rule: + unscoped_rule_id = _rule_id_from_namespaced(rule['rule_id']) + + template_name = _priority_class_to_template_name(rule['priority_class']) + if template_name in ['override', 'underride']: + templaterule = {k: rule[k] for k in ["conditions", "actions"]} + elif template_name in ["sender", "room"]: + templaterule = {'actions': rule['actions']} + unscoped_rule_id = rule['conditions'][0]['pattern'] + elif template_name == 'content': + if len(rule["conditions"]) != 1: + return None + thecond = rule["conditions"][0] + if "pattern" not in thecond: + return None + templaterule = {'actions': rule['actions']} + templaterule["pattern"] = thecond["pattern"] + + if unscoped_rule_id: + templaterule['rule_id'] = unscoped_rule_id + if 'default' in rule: + templaterule['default'] = rule['default'] + return templaterule + + +def _rule_id_from_namespaced(in_rule_id): + return in_rule_id.split('/')[-1] + + +def _priority_class_to_template_name(pc): + return PRIORITY_CLASS_INVERSE_MAP[pc] diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index e0d039518d..adc1eb1d0b 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -36,6 +36,7 @@ STREAM_NAMES = ( ("receipts",), ("user_account_data", "room_account_data", "tag_account_data",), ("backfill",), + ("push_rules",), ) @@ -63,6 +64,7 @@ class ReplicationResource(Resource): * "room_account_data: Per room per user account data. * "tag_account_data": Per room per user tags. * "backfill": Old events that have been backfilled from other servers. + * "push_rules": Per user changes to push rules. The API takes two additional query parameters: @@ -117,14 +119,16 @@ class ReplicationResource(Resource): def current_replication_token(self): stream_token = yield self.sources.get_current_token() backfill_token = yield self.store.get_current_backfill_token() + push_rules_token, room_stream_token = self.store.get_push_rules_stream_token() defer.returnValue(_ReplicationToken( - stream_token.room_stream_id, + room_stream_token, int(stream_token.presence_key), int(stream_token.typing_key), int(stream_token.receipt_key), int(stream_token.account_data_key), backfill_token, + push_rules_token, )) @request_handler @@ -146,6 +150,7 @@ class ReplicationResource(Resource): yield self.presence(writer, current_token) # TODO: implement limit yield self.typing(writer, current_token) # TODO: implement limit yield self.receipts(writer, current_token, limit) + yield self.push_rules(writer, current_token, limit) self.streams(writer, current_token) logger.info("Replicated %d rows", writer.total) @@ -277,6 +282,21 @@ class ReplicationResource(Resource): "position", "user_id", "room_id", "tags" )) + @defer.inlineCallbacks + def push_rules(self, writer, current_token, limit): + current_position = current_token.push_rules + + push_rules = parse_integer(writer.request, "push_rules") + + if push_rules is not None: + rows = yield self.store.get_all_push_rule_updates( + push_rules, current_position, limit + ) + writer.write_header_and_rows("push_rules", rows, ( + "position", "event_stream_ordering", "user_id", "rule_id", "op", + "priority_class", "priority", "conditions", "actions" + )) + class _Writer(object): """Writes the streams as a JSON object as the response to the request""" @@ -307,12 +327,16 @@ class _Writer(object): class _ReplicationToken(collections.namedtuple("_ReplicationToken", ( "events", "presence", "typing", "receipts", "account_data", "backfill", + "push_rules" ))): __slots__ = [] def __new__(cls, *args): if len(args) == 1: - return cls(*(int(value) for value in args[0].split("_"))) + streams = [int(value) for value in args[0].split("_")] + if len(streams) < len(cls._fields): + streams.extend([0] * (len(cls._fields) - len(streams))) + return cls(*streams) else: return super(_ReplicationToken, cls).__new__(cls, *args) diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py index 74ec1e50e0..8bfe9fdea8 100644 --- a/synapse/rest/client/v1/directory.py +++ b/synapse/rest/client/v1/directory.py @@ -75,7 +75,11 @@ class ClientDirectoryServer(ClientV1RestServlet): yield dir_handler.create_association( user_id, room_alias, room_id, servers ) - yield dir_handler.send_room_alias_update_event(user_id, room_id) + yield dir_handler.send_room_alias_update_event( + requester, + user_id, + room_id + ) except SynapseError as e: raise e except: @@ -118,9 +122,6 @@ class ClientDirectoryServer(ClientV1RestServlet): requester = yield self.auth.get_user_by_req(request) user = requester.user - is_admin = yield self.auth.is_server_admin(user) - if not is_admin: - raise AuthError(403, "You need to be a server admin") room_alias = RoomAlias.from_string(room_alias) diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index f13272da8e..c14e8af00e 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -252,7 +252,7 @@ class SAML2RestServlet(ClientV1RestServlet): SP = Saml2Client(conf) saml2_auth = SP.parse_authn_request_response( request.args['SAMLResponse'][0], BINDING_HTTP_POST) - except Exception, e: # Not authenticated + except Exception as e: # Not authenticated logger.exception(e) if saml2_auth and saml2_auth.status_ok() and not saml2_auth.not_signed: username = saml2_auth.name_id.text diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py index 3c5a212920..953764bd8e 100644 --- a/synapse/rest/client/v1/profile.py +++ b/synapse/rest/client/v1/profile.py @@ -51,7 +51,7 @@ class ProfileDisplaynameRestServlet(ClientV1RestServlet): defer.returnValue((400, "Unable to parse name")) yield self.handlers.profile_handler.set_displayname( - user, requester.user, new_name) + user, requester, new_name) defer.returnValue((200, {})) @@ -88,7 +88,7 @@ class ProfileAvatarURLRestServlet(ClientV1RestServlet): defer.returnValue((400, "Unable to parse name")) yield self.handlers.profile_handler.set_avatar_url( - user, requester.user, new_name) + user, requester, new_name) defer.returnValue((200, {})) diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index 970a019223..981d7708db 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -22,12 +22,10 @@ from .base import ClientV1RestServlet, client_path_patterns from synapse.storage.push_rule import ( InconsistentRuleException, RuleNotFoundException ) -from synapse.push.baserules import list_with_base_rules, BASE_RULE_IDS -from synapse.push.rulekinds import ( - PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP -) +from synapse.push.clientformat import format_push_rules_for_user +from synapse.push.baserules import BASE_RULE_IDS +from synapse.push.rulekinds import PRIORITY_CLASS_MAP -import copy import simplejson as json @@ -36,6 +34,11 @@ class PushRuleRestServlet(ClientV1RestServlet): SLIGHTLY_PEDANTIC_TRAILING_SLASH_ERROR = ( "Unrecognised request: You probably wanted a trailing slash") + def __init__(self, hs): + super(PushRuleRestServlet, self).__init__(hs) + self.store = hs.get_datastore() + self.notifier = hs.get_notifier() + @defer.inlineCallbacks def on_PUT(self, request): spec = _rule_spec_from_path(request.postpath) @@ -51,8 +54,11 @@ class PushRuleRestServlet(ClientV1RestServlet): content = _parse_json(request) + user_id = requester.user.to_string() + if 'attr' in spec: - yield self.set_rule_attr(requester.user.to_string(), spec, content) + yield self.set_rule_attr(user_id, spec, content) + self.notify_user(user_id) defer.returnValue((200, {})) if spec['rule_id'].startswith('.'): @@ -77,8 +83,8 @@ class PushRuleRestServlet(ClientV1RestServlet): after = _namespaced_rule_id(spec, after[0]) try: - yield self.hs.get_datastore().add_push_rule( - user_id=requester.user.to_string(), + yield self.store.add_push_rule( + user_id=user_id, rule_id=_namespaced_rule_id_from_spec(spec), priority_class=priority_class, conditions=conditions, @@ -86,6 +92,7 @@ class PushRuleRestServlet(ClientV1RestServlet): before=before, after=after ) + self.notify_user(user_id) except InconsistentRuleException as e: raise SynapseError(400, e.message) except RuleNotFoundException as e: @@ -98,13 +105,15 @@ class PushRuleRestServlet(ClientV1RestServlet): spec = _rule_spec_from_path(request.postpath) requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() namespaced_rule_id = _namespaced_rule_id_from_spec(spec) try: - yield self.hs.get_datastore().delete_push_rule( - requester.user.to_string(), namespaced_rule_id + yield self.store.delete_push_rule( + user_id, namespaced_rule_id ) + self.notify_user(user_id) defer.returnValue((200, {})) except StoreError as e: if e.code == 404: @@ -115,58 +124,16 @@ class PushRuleRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request): requester = yield self.auth.get_user_by_req(request) - user = requester.user + user_id = requester.user.to_string() # we build up the full structure and then decide which bits of it # to send which means doing unnecessary work sometimes but is # is probably not going to make a whole lot of difference - rawrules = yield self.hs.get_datastore().get_push_rules_for_user( - user.to_string() - ) - - ruleslist = [] - for rawrule in rawrules: - rule = dict(rawrule) - rule["conditions"] = json.loads(rawrule["conditions"]) - rule["actions"] = json.loads(rawrule["actions"]) - ruleslist.append(rule) - - # We're going to be mutating this a lot, so do a deep copy - ruleslist = copy.deepcopy(list_with_base_rules(ruleslist)) - - rules = {'global': {}, 'device': {}} - - rules['global'] = _add_empty_priority_class_arrays(rules['global']) - - enabled_map = yield self.hs.get_datastore().\ - get_push_rules_enabled_for_user(user.to_string()) - - for r in ruleslist: - rulearray = None - - template_name = _priority_class_to_template_name(r['priority_class']) - - # Remove internal stuff. - for c in r["conditions"]: - c.pop("_id", None) - - pattern_type = c.pop("pattern_type", None) - if pattern_type == "user_id": - c["pattern"] = user.to_string() - elif pattern_type == "user_localpart": - c["pattern"] = user.localpart + rawrules = yield self.store.get_push_rules_for_user(user_id) - rulearray = rules['global'][template_name] + enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id) - template_rule = _rule_to_template(r) - if template_rule: - if r['rule_id'] in enabled_map: - template_rule['enabled'] = enabled_map[r['rule_id']] - elif 'enabled' in r: - template_rule['enabled'] = r['enabled'] - else: - template_rule['enabled'] = True - rulearray.append(template_rule) + rules = format_push_rules_for_user(requester.user, rawrules, enabled_map) path = request.postpath[1:] @@ -188,6 +155,12 @@ class PushRuleRestServlet(ClientV1RestServlet): def on_OPTIONS(self, _): return 200, {} + def notify_user(self, user_id): + stream_id, _ = self.store.get_push_rules_stream_token() + self.notifier.on_new_event( + "push_rules_key", stream_id, users=[user_id] + ) + def set_rule_attr(self, user_id, spec, val): if spec['attr'] == 'enabled': if isinstance(val, dict) and "enabled" in val: @@ -198,7 +171,7 @@ class PushRuleRestServlet(ClientV1RestServlet): # bools directly, so let's not break them. raise SynapseError(400, "Value for 'enabled' must be boolean") namespaced_rule_id = _namespaced_rule_id_from_spec(spec) - return self.hs.get_datastore().set_push_rule_enabled( + return self.store.set_push_rule_enabled( user_id, namespaced_rule_id, val ) elif spec['attr'] == 'actions': @@ -210,7 +183,7 @@ class PushRuleRestServlet(ClientV1RestServlet): if is_default_rule: if namespaced_rule_id not in BASE_RULE_IDS: raise SynapseError(404, "Unknown rule %r" % (namespaced_rule_id,)) - return self.hs.get_datastore().set_push_rule_actions( + return self.store.set_push_rule_actions( user_id, namespaced_rule_id, actions, is_default_rule ) else: @@ -308,12 +281,6 @@ def _check_actions(actions): raise InvalidRuleException("Unrecognised action") -def _add_empty_priority_class_arrays(d): - for pc in PRIORITY_CLASS_MAP.keys(): - d[pc] = [] - return d - - def _filter_ruleset_with_path(ruleset, path): if path == []: raise UnrecognizedRequestError( @@ -362,37 +329,6 @@ def _priority_class_from_spec(spec): return pc -def _priority_class_to_template_name(pc): - return PRIORITY_CLASS_INVERSE_MAP[pc] - - -def _rule_to_template(rule): - unscoped_rule_id = None - if 'rule_id' in rule: - unscoped_rule_id = _rule_id_from_namespaced(rule['rule_id']) - - template_name = _priority_class_to_template_name(rule['priority_class']) - if template_name in ['override', 'underride']: - templaterule = {k: rule[k] for k in ["conditions", "actions"]} - elif template_name in ["sender", "room"]: - templaterule = {'actions': rule['actions']} - unscoped_rule_id = rule['conditions'][0]['pattern'] - elif template_name == 'content': - if len(rule["conditions"]) != 1: - return None - thecond = rule["conditions"][0] - if "pattern" not in thecond: - return None - templaterule = {'actions': rule['actions']} - templaterule["pattern"] = thecond["pattern"] - - if unscoped_rule_id: - templaterule['rule_id'] = unscoped_rule_id - if 'default' in rule: - templaterule['default'] = rule['default'] - return templaterule - - def _namespaced_rule_id_from_spec(spec): return _namespaced_rule_id(spec, spec['rule_id']) @@ -401,10 +337,6 @@ def _namespaced_rule_id(spec, rule_id): return "global/%s/%s" % (spec['template'], rule_id) -def _rule_id_from_namespaced(in_rule_id): - return in_rule_id.split('/')[-1] - - class InvalidRuleException(Exception): pass diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index f5ed4f7302..cbf3673eff 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -158,12 +158,12 @@ class RoomStateEventRestServlet(ClientV1RestServlet): if event_type == EventTypes.Member: yield self.handlers.room_member_handler.send_membership_event( + requester, event, context, - is_guest=requester.is_guest, ) else: - yield msg_handler.send_nonmember_event(event, context) + yield msg_handler.send_nonmember_event(requester, event, context) defer.returnValue((200, {"event_id": event.event_id})) @@ -183,13 +183,13 @@ class RoomSendEventRestServlet(ClientV1RestServlet): msg_handler = self.handlers.message_handler event = yield msg_handler.create_and_send_nonmember_event( + requester, { "type": event_type, "content": content, "room_id": room_id, "sender": requester.user.to_string(), }, - token_id=requester.access_token_id, txn_id=txn_id, ) @@ -504,6 +504,7 @@ class RoomRedactEventRestServlet(ClientV1RestServlet): msg_handler = self.handlers.message_handler event = yield msg_handler.create_and_send_nonmember_event( + requester, { "type": EventTypes.Redaction, "content": content, @@ -511,7 +512,6 @@ class RoomRedactEventRestServlet(ClientV1RestServlet): "sender": requester.user.to_string(), "redacts": event_id, }, - token_id=requester.access_token_id, txn_id=txn_id, ) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index f257721ea3..6f37a85d09 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -45,7 +45,7 @@ from .search import SearchStore from .tags import TagsStore from .account_data import AccountDataStore -from util.id_generators import IdGenerator, StreamIdGenerator +from util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator from synapse.api.constants import PresenceState from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -122,6 +122,9 @@ class DataStore(RoomMemberStore, RoomStore, self._pushers_id_gen = IdGenerator(db_conn, "pushers", "id") self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id") self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id") + self._push_rules_stream_id_gen = ChainedIdGenerator( + self._stream_id_gen, db_conn, "push_rules_stream", "stream_id" + ) events_max = self._stream_id_gen.get_max_token() event_cache_prefill, min_event_val = self._get_cache_dict( @@ -157,6 +160,18 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=presence_cache_prefill ) + push_rules_prefill, push_rules_id = self._get_cache_dict( + db_conn, "push_rules_stream", + entity_column="user_id", + stream_column="stream_id", + max_value=self._push_rules_stream_id_gen.get_max_token()[0], + ) + + self.push_rules_stream_cache = StreamChangeCache( + "PushRulesStreamChangeCache", push_rules_id, + prefilled_cache=push_rules_prefill, + ) + super(DataStore, self).__init__(hs) def take_presence_startup_info(self): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2e97ac84a8..7dc67ecd57 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -770,18 +770,29 @@ class SQLBaseStore(object): table : string giving the table name keyvalues : dict of column names and values to select the row with """ + return self.runInteraction( + desc, self._simple_delete_one_txn, table, keyvalues + ) + + @staticmethod + def _simple_delete_one_txn(txn, table, keyvalues): + """Executes a DELETE query on the named table, expecting to delete a + single row. + + Args: + table : string giving the table name + keyvalues : dict of column names and values to select the row with + """ sql = "DELETE FROM %s WHERE %s" % ( table, " AND ".join("%s = ?" % (k, ) for k in keyvalues) ) - def func(txn): - txn.execute(sql, keyvalues.values()) - if txn.rowcount == 0: - raise StoreError(404, "No row found") - if txn.rowcount > 1: - raise StoreError(500, "more than one row matched") - return self.runInteraction(desc, func) + txn.execute(sql, keyvalues.values()) + if txn.rowcount == 0: + raise StoreError(404, "No row found") + if txn.rowcount > 1: + raise StoreError(500, "more than one row matched") @staticmethod def _simple_delete_txn(txn, table, keyvalues): diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 1556619d5e..012a0b414a 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -70,13 +70,14 @@ class DirectoryStore(SQLBaseStore): ) @defer.inlineCallbacks - def create_room_alias_association(self, room_alias, room_id, servers): + def create_room_alias_association(self, room_alias, room_id, servers, creator=None): """ Creates an associatin between a room alias and room_id/servers Args: room_alias (RoomAlias) room_id (str) servers (list) + creator (str): Optional user_id of creator. Returns: Deferred @@ -87,6 +88,7 @@ class DirectoryStore(SQLBaseStore): { "room_alias": room_alias.to_string(), "room_id": room_id, + "creator": creator, }, desc="create_room_alias_association", ) @@ -107,6 +109,17 @@ class DirectoryStore(SQLBaseStore): ) self.get_aliases_for_room.invalidate((room_id,)) + def get_room_alias_creator(self, room_alias): + return self._simple_select_one_onecol( + table="room_aliases", + keyvalues={ + "room_alias": room_alias, + }, + retcol="creator", + desc="get_room_alias_creator", + allow_none=True + ) + @defer.inlineCallbacks def delete_room_alias(self, room_alias): room_id = yield self.runInteraction( diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 56e69495b1..9dbad2fd5f 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -99,30 +99,32 @@ class PushRuleStore(SQLBaseStore): results.setdefault(row['user_name'], {})[row['rule_id']] = row['enabled'] defer.returnValue(results) + @defer.inlineCallbacks def add_push_rule( self, user_id, rule_id, priority_class, conditions, actions, before=None, after=None ): conditions_json = json.dumps(conditions) actions_json = json.dumps(actions) - - if before or after: - return self.runInteraction( - "_add_push_rule_relative_txn", - self._add_push_rule_relative_txn, - user_id, rule_id, priority_class, - conditions_json, actions_json, before, after, - ) - else: - return self.runInteraction( - "_add_push_rule_highest_priority_txn", - self._add_push_rule_highest_priority_txn, - user_id, rule_id, priority_class, - conditions_json, actions_json, - ) + with self._push_rules_stream_id_gen.get_next() as ids: + stream_id, event_stream_ordering = ids + if before or after: + yield self.runInteraction( + "_add_push_rule_relative_txn", + self._add_push_rule_relative_txn, + stream_id, event_stream_ordering, user_id, rule_id, priority_class, + conditions_json, actions_json, before, after, + ) + else: + yield self.runInteraction( + "_add_push_rule_highest_priority_txn", + self._add_push_rule_highest_priority_txn, + stream_id, event_stream_ordering, user_id, rule_id, priority_class, + conditions_json, actions_json, + ) def _add_push_rule_relative_txn( - self, txn, user_id, rule_id, priority_class, + self, txn, stream_id, event_stream_ordering, user_id, rule_id, priority_class, conditions_json, actions_json, before, after ): # Lock the table since otherwise we'll have annoying races between the @@ -174,12 +176,12 @@ class PushRuleStore(SQLBaseStore): txn.execute(sql, (user_id, priority_class, new_rule_priority)) self._upsert_push_rule_txn( - txn, user_id, rule_id, priority_class, new_rule_priority, - conditions_json, actions_json, + txn, stream_id, event_stream_ordering, user_id, rule_id, priority_class, + new_rule_priority, conditions_json, actions_json, ) def _add_push_rule_highest_priority_txn( - self, txn, user_id, rule_id, priority_class, + self, txn, stream_id, event_stream_ordering, user_id, rule_id, priority_class, conditions_json, actions_json ): # Lock the table since otherwise we'll have annoying races between the @@ -201,13 +203,13 @@ class PushRuleStore(SQLBaseStore): self._upsert_push_rule_txn( txn, - user_id, rule_id, priority_class, new_prio, + stream_id, event_stream_ordering, user_id, rule_id, priority_class, new_prio, conditions_json, actions_json, ) def _upsert_push_rule_txn( - self, txn, user_id, rule_id, priority_class, - priority, conditions_json, actions_json + self, txn, stream_id, event_stream_ordering, user_id, rule_id, priority_class, + priority, conditions_json, actions_json, update_stream=True ): """Specialised version of _simple_upsert_txn that picks a push_rule_id using the _push_rule_id_gen if it needs to insert the rule. It assumes @@ -242,12 +244,17 @@ class PushRuleStore(SQLBaseStore): }, ) - txn.call_after( - self.get_push_rules_for_user.invalidate, (user_id,) - ) - txn.call_after( - self.get_push_rules_enabled_for_user.invalidate, (user_id,) - ) + if update_stream: + self._insert_push_rules_update_txn( + txn, stream_id, event_stream_ordering, user_id, rule_id, + op="ADD", + data={ + "priority_class": priority_class, + "priority": priority, + "conditions": conditions_json, + "actions": actions_json, + } + ) @defer.inlineCallbacks def delete_push_rule(self, user_id, rule_id): @@ -260,25 +267,37 @@ class PushRuleStore(SQLBaseStore): user_id (str): The matrix ID of the push rule owner rule_id (str): The rule_id of the rule to be deleted """ - yield self._simple_delete_one( - "push_rules", - {'user_name': user_id, 'rule_id': rule_id}, - desc="delete_push_rule", - ) + def delete_push_rule_txn(txn, stream_id, event_stream_ordering): + self._simple_delete_one_txn( + txn, + "push_rules", + {'user_name': user_id, 'rule_id': rule_id}, + ) - self.get_push_rules_for_user.invalidate((user_id,)) - self.get_push_rules_enabled_for_user.invalidate((user_id,)) + self._insert_push_rules_update_txn( + txn, stream_id, event_stream_ordering, user_id, rule_id, + op="DELETE" + ) + + with self._push_rules_stream_id_gen.get_next() as ids: + stream_id, event_stream_ordering = ids + yield self.runInteraction( + "delete_push_rule", delete_push_rule_txn, stream_id, event_stream_ordering + ) @defer.inlineCallbacks def set_push_rule_enabled(self, user_id, rule_id, enabled): - ret = yield self.runInteraction( - "_set_push_rule_enabled_txn", - self._set_push_rule_enabled_txn, - user_id, rule_id, enabled - ) - defer.returnValue(ret) + with self._push_rules_stream_id_gen.get_next() as ids: + stream_id, event_stream_ordering = ids + yield self.runInteraction( + "_set_push_rule_enabled_txn", + self._set_push_rule_enabled_txn, + stream_id, event_stream_ordering, user_id, rule_id, enabled + ) - def _set_push_rule_enabled_txn(self, txn, user_id, rule_id, enabled): + def _set_push_rule_enabled_txn( + self, txn, stream_id, event_stream_ordering, user_id, rule_id, enabled + ): new_id = self._push_rules_enable_id_gen.get_next() self._simple_upsert_txn( txn, @@ -287,25 +306,26 @@ class PushRuleStore(SQLBaseStore): {'enabled': 1 if enabled else 0}, {'id': new_id}, ) - txn.call_after( - self.get_push_rules_for_user.invalidate, (user_id,) - ) - txn.call_after( - self.get_push_rules_enabled_for_user.invalidate, (user_id,) + + self._insert_push_rules_update_txn( + txn, stream_id, event_stream_ordering, user_id, rule_id, + op="ENABLE" if enabled else "DISABLE" ) + @defer.inlineCallbacks def set_push_rule_actions(self, user_id, rule_id, actions, is_default_rule): actions_json = json.dumps(actions) - def set_push_rule_actions_txn(txn): + def set_push_rule_actions_txn(txn, stream_id, event_stream_ordering): if is_default_rule: # Add a dummy rule to the rules table with the user specified # actions. priority_class = -1 priority = 1 self._upsert_push_rule_txn( - txn, user_id, rule_id, priority_class, priority, - "[]", actions_json + txn, stream_id, event_stream_ordering, user_id, rule_id, + priority_class, priority, "[]", actions_json, + update_stream=False ) else: self._simple_update_one_txn( @@ -315,10 +335,81 @@ class PushRuleStore(SQLBaseStore): {'actions': actions_json}, ) + self._insert_push_rules_update_txn( + txn, stream_id, event_stream_ordering, user_id, rule_id, + op="ACTIONS", data={"actions": actions_json} + ) + + with self._push_rules_stream_id_gen.get_next() as ids: + stream_id, event_stream_ordering = ids + yield self.runInteraction( + "set_push_rule_actions", set_push_rule_actions_txn, + stream_id, event_stream_ordering + ) + + def _insert_push_rules_update_txn( + self, txn, stream_id, event_stream_ordering, user_id, rule_id, op, data=None + ): + values = { + "stream_id": stream_id, + "event_stream_ordering": event_stream_ordering, + "user_id": user_id, + "rule_id": rule_id, + "op": op, + } + if data is not None: + values.update(data) + + self._simple_insert_txn(txn, "push_rules_stream", values=values) + + txn.call_after( + self.get_push_rules_for_user.invalidate, (user_id,) + ) + txn.call_after( + self.get_push_rules_enabled_for_user.invalidate, (user_id,) + ) + txn.call_after( + self.push_rules_stream_cache.entity_has_changed, user_id, stream_id + ) + + def get_all_push_rule_updates(self, last_id, current_id, limit): + """Get all the push rules changes that have happend on the server""" + def get_all_push_rule_updates_txn(txn): + sql = ( + "SELECT stream_id, event_stream_ordering, user_id, rule_id," + " op, priority_class, priority, conditions, actions" + " FROM push_rules_stream" + " WHERE ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + return txn.fetchall() return self.runInteraction( - "set_push_rule_actions", set_push_rule_actions_txn, + "get_all_push_rule_updates", get_all_push_rule_updates_txn ) + def get_push_rules_stream_token(self): + """Get the position of the push rules stream. + Returns a pair of a stream id for the push_rules stream and the + room stream ordering it corresponds to.""" + return self._push_rules_stream_id_gen.get_max_token() + + def have_push_rules_changed_for_user(self, user_id, last_id): + if not self.push_rules_stream_cache.has_entity_changed(user_id, last_id): + return defer.succeed(False) + else: + def have_push_rules_changed_txn(txn): + sql = ( + "SELECT COUNT(stream_id) FROM push_rules_stream" + " WHERE user_id = ? AND ? < stream_id" + ) + txn.execute(sql, (user_id, last_id)) + count, = txn.fetchone() + return bool(count) + return self.runInteraction( + "have_push_rules_changed", have_push_rules_changed_txn + ) + class RuleNotFoundException(Exception): pass diff --git a/synapse/storage/schema/delta/30/alias_creator.sql b/synapse/storage/schema/delta/30/alias_creator.sql new file mode 100644 index 0000000000..c9d0dde638 --- /dev/null +++ b/synapse/storage/schema/delta/30/alias_creator.sql @@ -0,0 +1,16 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE room_aliases ADD COLUMN creator TEXT; diff --git a/synapse/storage/schema/delta/30/push_rule_stream.sql b/synapse/storage/schema/delta/30/push_rule_stream.sql new file mode 100644 index 0000000000..735aa8d5f6 --- /dev/null +++ b/synapse/storage/schema/delta/30/push_rule_stream.sql @@ -0,0 +1,38 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +CREATE TABLE push_rules_stream( + stream_id BIGINT NOT NULL, + event_stream_ordering BIGINT NOT NULL, + user_id TEXT NOT NULL, + rule_id TEXT NOT NULL, + op TEXT NOT NULL, -- One of "ENABLE", "DISABLE", "ACTIONS", "ADD", "DELETE" + priority_class SMALLINT, + priority INTEGER, + conditions TEXT, + actions TEXT +); + +-- The extra data for each operation is: +-- * ENABLE, DISABLE, DELETE: [] +-- * ACTIONS: ["actions"] +-- * ADD: ["priority_class", "priority", "actions", "conditions"] + +-- Index for replication queries. +CREATE INDEX push_rules_stream_id ON push_rules_stream(stream_id); +-- Index for /sync queries. +CREATE INDEX push_rules_stream_user_stream_id on push_rules_stream(user_id, stream_id); diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index efe3f68e6e..af425ba9a4 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -20,23 +20,21 @@ import threading class IdGenerator(object): def __init__(self, db_conn, table, column): - self.table = table - self.column = column self._lock = threading.Lock() - cur = db_conn.cursor() - self._next_id = self._load_next_id(cur) - cur.close() - - def _load_next_id(self, txn): - txn.execute("SELECT MAX(%s) FROM %s" % (self.column, self.table,)) - val, = txn.fetchone() - return val + 1 if val else 1 + self._next_id = _load_max_id(db_conn, table, column) def get_next(self): with self._lock: - i = self._next_id self._next_id += 1 - return i + return self._next_id + + +def _load_max_id(db_conn, table, column): + cur = db_conn.cursor() + cur.execute("SELECT MAX(%s) FROM %s" % (column, table,)) + val, = cur.fetchone() + cur.close() + return val if val else 1 class StreamIdGenerator(object): @@ -52,23 +50,10 @@ class StreamIdGenerator(object): # ... persist event ... """ def __init__(self, db_conn, table, column): - self.table = table - self.column = column - self._lock = threading.Lock() - - cur = db_conn.cursor() - self._current_max = self._load_current_max(cur) - cur.close() - + self._current_max = _load_max_id(db_conn, table, column) self._unfinished_ids = deque() - def _load_current_max(self, txn): - txn.execute("SELECT MAX(%s) FROM %s" % (self.column, self.table)) - rows = txn.fetchall() - val, = rows[0] - return int(val) if val else 1 - def get_next(self): """ Usage: @@ -124,3 +109,50 @@ class StreamIdGenerator(object): return self._unfinished_ids[0] - 1 return self._current_max + + +class ChainedIdGenerator(object): + """Used to generate new stream ids where the stream must be kept in sync + with another stream. It generates pairs of IDs, the first element is an + integer ID for this stream, the second element is the ID for the stream + that this stream needs to be kept in sync with.""" + + def __init__(self, chained_generator, db_conn, table, column): + self.chained_generator = chained_generator + self._lock = threading.Lock() + self._current_max = _load_max_id(db_conn, table, column) + self._unfinished_ids = deque() + + def get_next(self): + """ + Usage: + with stream_id_gen.get_next() as (stream_id, chained_id): + # ... persist event ... + """ + with self._lock: + self._current_max += 1 + next_id = self._current_max + chained_id = self.chained_generator.get_max_token() + + self._unfinished_ids.append((next_id, chained_id)) + + @contextlib.contextmanager + def manager(): + try: + yield (next_id, chained_id) + finally: + with self._lock: + self._unfinished_ids.remove((next_id, chained_id)) + + return manager() + + def get_max_token(self): + """Returns the maximum stream id such that all stream ids less than or + equal to it have been successfully persisted. + """ + with self._lock: + if self._unfinished_ids: + stream_id, chained_id = self._unfinished_ids[0] + return (stream_id - 1, chained_id) + + return (self._current_max, self.chained_generator.get_max_token()) diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 5ddf4e988b..d4c0bb6732 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -38,9 +38,12 @@ class EventSources(object): name: cls(hs) for name, cls in EventSources.SOURCE_TYPES.items() } + self.store = hs.get_datastore() @defer.inlineCallbacks def get_current_token(self, direction='f'): + push_rules_key, _ = self.store.get_push_rules_stream_token() + token = StreamToken( room_key=( yield self.sources["room"].get_current_key(direction) @@ -57,5 +60,6 @@ class EventSources(object): account_data_key=( yield self.sources["account_data"].get_current_key() ), + push_rules_key=push_rules_key, ) defer.returnValue(token) diff --git a/synapse/types.py b/synapse/types.py index d5bd95cbd3..5b166835bd 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -115,6 +115,7 @@ class StreamToken( "typing_key", "receipt_key", "account_data_key", + "push_rules_key", )) ): _SEPARATOR = "_" @@ -150,6 +151,7 @@ class StreamToken( or (int(other.typing_key) < int(self.typing_key)) or (int(other.receipt_key) < int(self.receipt_key)) or (int(other.account_data_key) < int(self.account_data_key)) + or (int(other.push_rules_key) < int(self.push_rules_key)) ) def copy_and_advance(self, key, new_value): @@ -174,6 +176,11 @@ class StreamToken( return StreamToken(**d) +StreamToken.START = StreamToken( + *(["s0"] + ["0"] * (len(StreamToken._fields) - 1)) +) + + class RoomStreamToken(namedtuple("_StreamToken", "topological stream")): """Tokens are positions between events. The token "s1" comes after event 1. diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index e863a8f8a9..2b68c1ac93 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -69,7 +69,7 @@ class ExpiringCache(object): if self._max_len and len(self._cache.keys()) > self._max_len: sorted_entries = sorted( self._cache.items(), - key=lambda (k, v): v.time, + key=lambda item: item[1].time, ) for k, _ in sorted_entries[self._max_len:]: |