From cf81375b94c4763766440471e632fc4b103450ab Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Fri, 12 Feb 2016 15:11:49 +0000 Subject: Merge two of the room join codepaths There's at least one more to merge in. Side-effects: * Stop reporting None as displayname and avatar_url in some cases * Joining a room by alias populates guest-ness in join event * Remove unspec'd PUT version of /join/ which has not been called on matrix.org according to logs * Stop recording access_token_id on /join/room_id - currently we don't record it on /join/room_alias; I can try to thread it through at some point. --- synapse/types.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) (limited to 'synapse/types.py') diff --git a/synapse/types.py b/synapse/types.py index 2095837ba6..0be8384e18 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.api.errors import SynapseError +from synapse.api.errors import SynapseError, BadIdentifierError from collections import namedtuple @@ -51,13 +51,13 @@ class DomainSpecificString( def from_string(cls, s): """Parse the string given by 's' into a structure object.""" if len(s) < 1 or s[0] != cls.SIGIL: - raise SynapseError(400, "Expected %s string to start with '%s'" % ( + raise BadIdentifierError(400, "Expected %s string to start with '%s'" % ( cls.__name__, cls.SIGIL, )) parts = s[1:].split(':', 1) if len(parts) != 2: - raise SynapseError( + raise BadIdentifierError( 400, "Expected %s of the form '%slocalname:domain'" % ( cls.__name__, cls.SIGIL, ) @@ -69,6 +69,14 @@ class DomainSpecificString( # names on one HS return cls(localpart=parts[0], domain=domain) + @classmethod + def is_valid(cls, s): + try: + cls.from_string(s) + return True + except: + return False + def to_string(self): """Return a string encoding the fields of the structure object.""" return "%s%s:%s" % (self.SIGIL, self.localpart, self.domain) -- cgit 1.5.1 From 4de08a4672c62eebda2ad3ee89643c2c32242cbf Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Fri, 12 Feb 2016 16:17:24 +0000 Subject: Revert "Merge two of the room join codepaths" This reverts commit cf81375b94c4763766440471e632fc4b103450ab. It subtly violates a guest joining auth check --- synapse/api/errors.py | 5 ---- synapse/handlers/profile.py | 11 ++----- synapse/handlers/room.py | 44 ++++++--------------------- synapse/rest/client/v1/room.py | 68 ++++++++++++++++++++++++++++++++++-------- synapse/types.py | 14 ++------- 5 files changed, 69 insertions(+), 73 deletions(-) (limited to 'synapse/types.py') diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 0c7858f78d..b106fbed6d 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -84,11 +84,6 @@ class RegistrationError(SynapseError): pass -class BadIdentifierError(SynapseError): - """An error indicating an identifier couldn't be parsed.""" - pass - - class UnrecognizedRequestError(SynapseError): """An error indicating we don't understand the request you're trying to make""" def __init__(self, *args, **kwargs): diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 32af622733..629e6e3594 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -169,15 +169,8 @@ class ProfileHandler(BaseHandler): consumeErrors=True ).addErrback(unwrapFirstError) - if displayname is None: - del state["displayname"] - else: - state["displayname"] = displayname - - if avatar_url is None: - del state["avatar_url"] - else: - state["avatar_url"] = avatar_url + state["displayname"] = displayname + state["avatar_url"] = avatar_url defer.returnValue(None) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 2950ed14e4..b2de2cd0c0 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -527,17 +527,7 @@ class RoomMemberHandler(BaseHandler): defer.returnValue({"room_id": room_id}) @defer.inlineCallbacks - def lookup_room_alias(self, room_alias): - """ - Gets the room ID for an alias. - - Args: - room_alias (str): The room alias to look up. - Returns: - A tuple of the room ID (str) and the hosts hosting the room ([str]) - Raises: - SynapseError if the room couldn't be looked up. - """ + def join_room_alias(self, joinee, room_alias, content={}): directory_handler = self.hs.get_handlers().directory_handler mapping = yield directory_handler.get_association(room_alias) @@ -549,40 +539,24 @@ class RoomMemberHandler(BaseHandler): if not hosts: raise SynapseError(404, "No known servers") - defer.returnValue((room_id, hosts)) - - @defer.inlineCallbacks - def do_join(self, requester, room_id, hosts=None): - """ - Joins requester to room_id. - - Args: - requester (Requester): The user joining the room. - room_id (str): The room ID (not alias) being joined. - hosts ([str]): A list of hosts which are hopefully in the room. - Raises: - SynapseError if the room couldn't be joined. - """ - hosts = hosts or [] - - content = {"membership": Membership.JOIN} - if requester.is_guest: - content["kind"] = "guest" - - yield collect_presencelike_data(self.distributor, requester.user, content) + # If event doesn't include a display name, add one. + yield collect_presencelike_data(self.distributor, joinee, content) + content.update({"membership": Membership.JOIN}) builder = self.event_builder_factory.new({ "type": EventTypes.Member, - "state_key": requester.user.to_string(), + "state_key": joinee.to_string(), "room_id": room_id, - "sender": requester.user.to_string(), - "membership": Membership.JOIN, # For backwards compatibility + "sender": joinee.to_string(), + "membership": Membership.JOIN, "content": content, }) event, context = yield self._create_new_client_event(builder) yield self._do_join(event, context, room_hosts=hosts) + defer.returnValue({"room_id": room_id}) + @defer.inlineCallbacks def _do_join(self, event, context, room_hosts=None): room_id = event.room_id diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 1dd33b0a56..81bfe377bd 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -216,7 +216,11 @@ class RoomSendEventRestServlet(ClientV1RestServlet): # TODO: Needs unit testing for room ID + alias joins class JoinRoomAliasServlet(ClientV1RestServlet): - PATTERNS = client_path_patterns("/join/(?P[^/]*)$") + + def register(self, http_server): + # /join/$room_identifier[/$txn_id] + PATTERNS = ("/join/(?P[^/]*)") + register_txn_path(self, PATTERNS, http_server) @defer.inlineCallbacks def on_POST(self, request, room_identifier, txn_id=None): @@ -225,22 +229,60 @@ class JoinRoomAliasServlet(ClientV1RestServlet): allow_guest=True, ) - handler = self.handlers.room_member_handler + # the identifier could be a room alias or a room id. Try one then the + # other if it fails to parse, without swallowing other valid + # SynapseErrors. - room_id = None - hosts = [] - if RoomAlias.is_valid(room_identifier): - room_alias = RoomAlias.from_string(room_identifier) - room_id, hosts = yield handler.lookup_room_alias(room_alias) - else: - room_id = RoomID.from_string(room_identifier).to_string() + identifier = None + is_room_alias = False + try: + identifier = RoomAlias.from_string(room_identifier) + is_room_alias = True + except SynapseError: + identifier = RoomID.from_string(room_identifier) # TODO: Support for specifying the home server to join with? - yield handler.do_join( - requester, room_id, hosts=hosts - ) - defer.returnValue((200, {"room_id": room_id})) + if is_room_alias: + handler = self.handlers.room_member_handler + ret_dict = yield handler.join_room_alias( + requester.user, + identifier, + ) + defer.returnValue((200, ret_dict)) + else: # room id + msg_handler = self.handlers.message_handler + content = {"membership": Membership.JOIN} + if requester.is_guest: + content["kind"] = "guest" + yield msg_handler.create_and_send_event( + { + "type": EventTypes.Member, + "content": content, + "room_id": identifier.to_string(), + "sender": requester.user.to_string(), + "state_key": requester.user.to_string(), + }, + token_id=requester.access_token_id, + txn_id=txn_id, + is_guest=requester.is_guest, + ) + + defer.returnValue((200, {"room_id": identifier.to_string()})) + + @defer.inlineCallbacks + def on_PUT(self, request, room_identifier, txn_id): + try: + defer.returnValue( + self.txns.get_client_transaction(request, txn_id) + ) + except KeyError: + pass + + response = yield self.on_POST(request, room_identifier, txn_id) + + self.txns.store_client_transaction(request, txn_id, response) + defer.returnValue(response) # TODO: Needs unit testing diff --git a/synapse/types.py b/synapse/types.py index 0be8384e18..2095837ba6 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.api.errors import SynapseError, BadIdentifierError +from synapse.api.errors import SynapseError from collections import namedtuple @@ -51,13 +51,13 @@ class DomainSpecificString( def from_string(cls, s): """Parse the string given by 's' into a structure object.""" if len(s) < 1 or s[0] != cls.SIGIL: - raise BadIdentifierError(400, "Expected %s string to start with '%s'" % ( + raise SynapseError(400, "Expected %s string to start with '%s'" % ( cls.__name__, cls.SIGIL, )) parts = s[1:].split(':', 1) if len(parts) != 2: - raise BadIdentifierError( + raise SynapseError( 400, "Expected %s of the form '%slocalname:domain'" % ( cls.__name__, cls.SIGIL, ) @@ -69,14 +69,6 @@ class DomainSpecificString( # names on one HS return cls(localpart=parts[0], domain=domain) - @classmethod - def is_valid(cls, s): - try: - cls.from_string(s) - return True - except: - return False - def to_string(self): """Return a string encoding the fields of the structure object.""" return "%s%s:%s" % (self.SIGIL, self.localpart, self.domain) -- cgit 1.5.1 From e71095801fc376aac30ff9408ae7f0203684024d Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Mon, 15 Feb 2016 15:39:16 +0000 Subject: Merge implementation of /join by alias or ID This code is kind of rough (passing the remote servers down a long chain), but is a step towards improvement. --- synapse/handlers/_base.py | 5 +++- synapse/handlers/message.py | 20 ++++++++----- synapse/handlers/room.py | 40 ++++++++++--------------- synapse/rest/client/v1/room.py | 68 +++++++++++++++++++----------------------- synapse/types.py | 8 +++++ 5 files changed, 71 insertions(+), 70 deletions(-) (limited to 'synapse/types.py') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 064e8723c8..8508ecdd49 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -188,9 +188,12 @@ class BaseHandler(object): ) @defer.inlineCallbacks - def handle_new_client_event(self, event, context, extra_users=[]): + def handle_new_client_event(self, event, context, ratelimit=True, extra_users=[]): # We now need to go and hit out to wherever we need to hit out to. + if ratelimit: + self.ratelimit(event.sender) + self.auth.check(event, auth_events=context.current_state) yield self.maybe_kick_guest_users(event, context.current_state.values()) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 82c8cb5f0c..a94fad1735 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -216,7 +216,7 @@ class MessageHandler(BaseHandler): defer.returnValue((event, context)) @defer.inlineCallbacks - def send_event(self, event, context, ratelimit=True, is_guest=False): + def send_event(self, event, context, ratelimit=True, is_guest=False, room_hosts=None): """ Persists and notifies local clients and federation of an event. @@ -230,9 +230,6 @@ class MessageHandler(BaseHandler): assert self.hs.is_mine(user), "User must be our own: %s" % (user,) - if ratelimit: - self.ratelimit(event.sender) - if event.is_state(): prev_state = context.current_state.get((event.type, event.state_key)) if prev_state and event.user_id == prev_state.user_id: @@ -245,11 +242,18 @@ class MessageHandler(BaseHandler): if event.type == EventTypes.Member: member_handler = self.hs.get_handlers().room_member_handler - yield member_handler.send_membership_event(event, context, is_guest=is_guest) + yield member_handler.send_membership_event( + event, + context, + is_guest=is_guest, + ratelimit=ratelimit, + room_hosts=room_hosts + ) else: yield self.handle_new_client_event( event=event, context=context, + ratelimit=ratelimit, ) if event.type == EventTypes.Message: @@ -259,7 +263,8 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def create_and_send_event(self, event_dict, ratelimit=True, - token_id=None, txn_id=None, is_guest=False): + token_id=None, txn_id=None, is_guest=False, + room_hosts=None): """ Creates an event, then sends it. @@ -274,7 +279,8 @@ class MessageHandler(BaseHandler): event, context, ratelimit=ratelimit, - is_guest=is_guest + is_guest=is_guest, + room_hosts=room_hosts, ) defer.returnValue(event) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 89695cc0cf..b748e81d20 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -455,7 +455,9 @@ class RoomMemberHandler(BaseHandler): yield self.forget(requester.user, room_id) @defer.inlineCallbacks - def send_membership_event(self, event, context, is_guest=False, room_hosts=None): + def send_membership_event( + self, event, context, is_guest=False, room_hosts=None, ratelimit=True + ): """ Change the membership status of a user in a room. Args: @@ -527,8 +529,17 @@ class RoomMemberHandler(BaseHandler): defer.returnValue({"room_id": room_id}) @defer.inlineCallbacks - def join_room_alias(self, requester, room_alias, content={}): - joinee = requester.user + def lookup_room_alias(self, room_alias): + """ + Get the room ID associated with a room alias. + + Args: + room_alias (RoomAlias): The alias to look up. + Returns: + The room ID as a RoomID object. + Raises: + SynapseError if room alias could not be found. + """ directory_handler = self.hs.get_handlers().directory_handler mapping = yield directory_handler.get_association(room_alias) @@ -540,28 +551,7 @@ class RoomMemberHandler(BaseHandler): if not hosts: raise SynapseError(404, "No known servers") - # If event doesn't include a display name, add one. - yield collect_presencelike_data(self.distributor, joinee, content) - - content.update({"membership": Membership.JOIN}) - builder = self.event_builder_factory.new({ - "type": EventTypes.Member, - "state_key": joinee.to_string(), - "room_id": room_id, - "sender": joinee.to_string(), - "membership": Membership.JOIN, - "content": content, - }) - event, context = yield self._create_new_client_event(builder) - - yield self.send_membership_event( - event, - context, - is_guest=requester.is_guest, - room_hosts=hosts - ) - - defer.returnValue({"room_id": room_id}) + defer.returnValue((RoomID.from_string(room_id), hosts)) @defer.inlineCallbacks def _do_join(self, event, context, room_hosts=None): diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 76025213dc..340c24635d 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -229,46 +229,40 @@ class JoinRoomAliasServlet(ClientV1RestServlet): allow_guest=True, ) - # the identifier could be a room alias or a room id. Try one then the - # other if it fails to parse, without swallowing other valid - # SynapseErrors. - - identifier = None - is_room_alias = False - try: - identifier = RoomAlias.from_string(room_identifier) - is_room_alias = True - except SynapseError: - identifier = RoomID.from_string(room_identifier) + if RoomID.is_valid(room_identifier): + room_id = room_identifier + room_hosts = None + elif RoomAlias.is_valid(room_identifier): + handler = self.handlers.room_member_handler + room_alias = RoomAlias.from_string(room_identifier) + room_id, room_hosts = yield handler.lookup_room_alias(room_alias) + room_id = room_id.to_string() + else: + raise SynapseError(400, "%s was not legal room ID or room alias" % ( + room_identifier, + )) - # TODO: Support for specifying the home server to join with? + msg_handler = self.handlers.message_handler + content = {"membership": Membership.JOIN} + if requester.is_guest: + content["kind"] = "guest" + yield msg_handler.create_and_send_event( + { + "type": EventTypes.Member, + "content": content, + "room_id": room_id, + "sender": requester.user.to_string(), + "state_key": requester.user.to_string(), - if is_room_alias: - handler = self.handlers.room_member_handler - ret_dict = yield handler.join_room_alias( - requester, - identifier, - ) - defer.returnValue((200, ret_dict)) - else: # room id - msg_handler = self.handlers.message_handler - content = {"membership": Membership.JOIN} - if requester.is_guest: - content["kind"] = "guest" - yield msg_handler.create_and_send_event( - { - "type": EventTypes.Member, - "content": content, - "room_id": identifier.to_string(), - "sender": requester.user.to_string(), - "state_key": requester.user.to_string(), - }, - token_id=requester.access_token_id, - txn_id=txn_id, - is_guest=requester.is_guest, - ) + "membership": Membership.JOIN, # For backwards compatibility + }, + token_id=requester.access_token_id, + txn_id=txn_id, + is_guest=requester.is_guest, + room_hosts=room_hosts, + ) - defer.returnValue((200, {"room_id": identifier.to_string()})) + defer.returnValue((200, {"room_id": room_id})) @defer.inlineCallbacks def on_PUT(self, request, room_identifier, txn_id): diff --git a/synapse/types.py b/synapse/types.py index 2095837ba6..d5bd95cbd3 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -73,6 +73,14 @@ class DomainSpecificString( """Return a string encoding the fields of the structure object.""" return "%s%s:%s" % (self.SIGIL, self.localpart, self.domain) + @classmethod + def is_valid(cls, s): + try: + cls.from_string(s) + return True + except: + return False + __str__ = to_string @classmethod -- cgit 1.5.1 From ddf9e7b3027eee61086ebfb447c5fa33e9b640fe Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 3 Mar 2016 14:57:45 +0000 Subject: Hook up the push rules to the notifier --- synapse/handlers/message.py | 4 ++-- synapse/notifier.py | 2 +- synapse/rest/client/v1/push_rule.py | 44 ++++++++++++++++++++++++------------- synapse/streams/events.py | 4 ++++ synapse/types.py | 7 ++++++ 5 files changed, 43 insertions(+), 18 deletions(-) (limited to 'synapse/types.py') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index afa7c9c36c..2fa12c8f2b 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -647,8 +647,8 @@ class MessageHandler(BaseHandler): user_id, messages, is_peeking=is_peeking ) - start_token = StreamToken(token[0], 0, 0, 0, 0) - end_token = StreamToken(token[1], 0, 0, 0, 0) + start_token = StreamToken.START.copy_and_replace("room_key", token[0]) + end_token = StreamToken.START.copy_and_replace("room_key", token[1]) time_now = self.clock.time_msec() diff --git a/synapse/notifier.py b/synapse/notifier.py index 3c36a20868..9b69b0333a 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -284,7 +284,7 @@ class Notifier(object): @defer.inlineCallbacks def wait_for_events(self, user_id, timeout, callback, room_ids=None, - from_token=StreamToken("s0", "0", "0", "0", "0")): + from_token=StreamToken.START): """Wait until the callback returns a non empty response or the timeout fires. """ diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index 970a019223..cf68725ca1 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -36,6 +36,11 @@ class PushRuleRestServlet(ClientV1RestServlet): SLIGHTLY_PEDANTIC_TRAILING_SLASH_ERROR = ( "Unrecognised request: You probably wanted a trailing slash") + def __init__(self, hs): + super(PushRuleRestServlet, self).__init__(hs) + self.store = hs.get_datastore() + self.notifier = hs.get_notifier() + @defer.inlineCallbacks def on_PUT(self, request): spec = _rule_spec_from_path(request.postpath) @@ -51,8 +56,11 @@ class PushRuleRestServlet(ClientV1RestServlet): content = _parse_json(request) + user_id = requester.user.to_string() + if 'attr' in spec: - yield self.set_rule_attr(requester.user.to_string(), spec, content) + yield self.set_rule_attr(user_id, spec, content) + self.notify_user(user_id) defer.returnValue((200, {})) if spec['rule_id'].startswith('.'): @@ -77,8 +85,8 @@ class PushRuleRestServlet(ClientV1RestServlet): after = _namespaced_rule_id(spec, after[0]) try: - yield self.hs.get_datastore().add_push_rule( - user_id=requester.user.to_string(), + yield self.store.add_push_rule( + user_id=user_id, rule_id=_namespaced_rule_id_from_spec(spec), priority_class=priority_class, conditions=conditions, @@ -86,6 +94,7 @@ class PushRuleRestServlet(ClientV1RestServlet): before=before, after=after ) + self.notify_user(user_id) except InconsistentRuleException as e: raise SynapseError(400, e.message) except RuleNotFoundException as e: @@ -98,13 +107,15 @@ class PushRuleRestServlet(ClientV1RestServlet): spec = _rule_spec_from_path(request.postpath) requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() namespaced_rule_id = _namespaced_rule_id_from_spec(spec) try: - yield self.hs.get_datastore().delete_push_rule( - requester.user.to_string(), namespaced_rule_id + yield self.store.delete_push_rule( + user_id, namespaced_rule_id ) + self.notify_user(user_id) defer.returnValue((200, {})) except StoreError as e: if e.code == 404: @@ -115,14 +126,12 @@ class PushRuleRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request): requester = yield self.auth.get_user_by_req(request) - user = requester.user + user_id = requester.user.to_string() # we build up the full structure and then decide which bits of it # to send which means doing unnecessary work sometimes but is # is probably not going to make a whole lot of difference - rawrules = yield self.hs.get_datastore().get_push_rules_for_user( - user.to_string() - ) + rawrules = yield self.store.get_push_rules_for_user(user_id) ruleslist = [] for rawrule in rawrules: @@ -138,8 +147,7 @@ class PushRuleRestServlet(ClientV1RestServlet): rules['global'] = _add_empty_priority_class_arrays(rules['global']) - enabled_map = yield self.hs.get_datastore().\ - get_push_rules_enabled_for_user(user.to_string()) + enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id) for r in ruleslist: rulearray = None @@ -152,9 +160,9 @@ class PushRuleRestServlet(ClientV1RestServlet): pattern_type = c.pop("pattern_type", None) if pattern_type == "user_id": - c["pattern"] = user.to_string() + c["pattern"] = user_id elif pattern_type == "user_localpart": - c["pattern"] = user.localpart + c["pattern"] = requester.user.localpart rulearray = rules['global'][template_name] @@ -188,6 +196,12 @@ class PushRuleRestServlet(ClientV1RestServlet): def on_OPTIONS(self, _): return 200, {} + def notify_user(self, user_id): + stream_id = self.store.get_push_rules_stream_token() + self.notifier.on_new_event( + "push_rules_key", stream_id, users=[user_id] + ) + def set_rule_attr(self, user_id, spec, val): if spec['attr'] == 'enabled': if isinstance(val, dict) and "enabled" in val: @@ -198,7 +212,7 @@ class PushRuleRestServlet(ClientV1RestServlet): # bools directly, so let's not break them. raise SynapseError(400, "Value for 'enabled' must be boolean") namespaced_rule_id = _namespaced_rule_id_from_spec(spec) - return self.hs.get_datastore().set_push_rule_enabled( + return self.store.set_push_rule_enabled( user_id, namespaced_rule_id, val ) elif spec['attr'] == 'actions': @@ -210,7 +224,7 @@ class PushRuleRestServlet(ClientV1RestServlet): if is_default_rule: if namespaced_rule_id not in BASE_RULE_IDS: raise SynapseError(404, "Unknown rule %r" % (namespaced_rule_id,)) - return self.hs.get_datastore().set_push_rule_actions( + return self.store.set_push_rule_actions( user_id, namespaced_rule_id, actions, is_default_rule ) else: diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 5ddf4e988b..d4c0bb6732 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -38,9 +38,12 @@ class EventSources(object): name: cls(hs) for name, cls in EventSources.SOURCE_TYPES.items() } + self.store = hs.get_datastore() @defer.inlineCallbacks def get_current_token(self, direction='f'): + push_rules_key, _ = self.store.get_push_rules_stream_token() + token = StreamToken( room_key=( yield self.sources["room"].get_current_key(direction) @@ -57,5 +60,6 @@ class EventSources(object): account_data_key=( yield self.sources["account_data"].get_current_key() ), + push_rules_key=push_rules_key, ) defer.returnValue(token) diff --git a/synapse/types.py b/synapse/types.py index d5bd95cbd3..5b166835bd 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -115,6 +115,7 @@ class StreamToken( "typing_key", "receipt_key", "account_data_key", + "push_rules_key", )) ): _SEPARATOR = "_" @@ -150,6 +151,7 @@ class StreamToken( or (int(other.typing_key) < int(self.typing_key)) or (int(other.receipt_key) < int(self.receipt_key)) or (int(other.account_data_key) < int(self.account_data_key)) + or (int(other.push_rules_key) < int(self.push_rules_key)) ) def copy_and_advance(self, key, new_value): @@ -174,6 +176,11 @@ class StreamToken( return StreamToken(**d) +StreamToken.START = StreamToken( + *(["s0"] + ["0"] * (len(StreamToken._fields) - 1)) +) + + class RoomStreamToken(namedtuple("_StreamToken", "topological stream")): """Tokens are positions between events. The token "s1" comes after event 1. -- cgit 1.5.1