diff options
author | Mark Haines <mark.haines@matrix.org> | 2016-03-01 13:35:37 +0000 |
---|---|---|
committer | Mark Haines <mark.haines@matrix.org> | 2016-03-01 18:16:37 +0000 |
commit | a1cf9e3bf343c3e5adb8dce7923726aa9b09115e (patch) | |
tree | 808fa29d72b775837a520d1a3916f21e105b411f /synapse/storage/push_rule.py | |
parent | Merge pull request #489 from matrix-org/markjh/replication (diff) | |
download | synapse-a1cf9e3bf343c3e5adb8dce7923726aa9b09115e.tar.xz |
Add a stream for push rule updates
Diffstat (limited to 'synapse/storage/push_rule.py')
-rw-r--r-- | synapse/storage/push_rule.py | 173 |
1 files changed, 133 insertions, 40 deletions
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 56e69495b1..f3ebd49492 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -99,30 +99,31 @@ class PushRuleStore(SQLBaseStore): results.setdefault(row['user_name'], {})[row['rule_id']] = row['enabled'] defer.returnValue(results) + @defer.inlineCallbacks def add_push_rule( self, user_id, rule_id, priority_class, conditions, actions, before=None, after=None ): conditions_json = json.dumps(conditions) actions_json = json.dumps(actions) - - if before or after: - return self.runInteraction( - "_add_push_rule_relative_txn", - self._add_push_rule_relative_txn, - user_id, rule_id, priority_class, - conditions_json, actions_json, before, after, - ) - else: - return self.runInteraction( - "_add_push_rule_highest_priority_txn", - self._add_push_rule_highest_priority_txn, - user_id, rule_id, priority_class, - conditions_json, actions_json, - ) + with self._push_rules_stream_id_gen.get_next() as (stream_id, stream_ordering): + if before or after: + yield self.runInteraction( + "_add_push_rule_relative_txn", + self._add_push_rule_relative_txn, + stream_id, stream_ordering, user_id, rule_id, priority_class, + conditions_json, actions_json, before, after, + ) + else: + yield self.runInteraction( + "_add_push_rule_highest_priority_txn", + self._add_push_rule_highest_priority_txn, + stream_id, stream_ordering, user_id, rule_id, priority_class, + conditions_json, actions_json, + ) def _add_push_rule_relative_txn( - self, txn, user_id, rule_id, priority_class, + self, txn, stream_id, stream_ordering, user_id, rule_id, priority_class, conditions_json, actions_json, before, after ): # Lock the table since otherwise we'll have annoying races between the @@ -174,12 +175,12 @@ class PushRuleStore(SQLBaseStore): txn.execute(sql, (user_id, priority_class, new_rule_priority)) self._upsert_push_rule_txn( - txn, user_id, rule_id, priority_class, new_rule_priority, - conditions_json, actions_json, + txn, stream_id, stream_ordering, user_id, rule_id, priority_class, + new_rule_priority, conditions_json, actions_json, ) def _add_push_rule_highest_priority_txn( - self, txn, user_id, rule_id, priority_class, + self, txn, stream_id, stream_ordering, user_id, rule_id, priority_class, conditions_json, actions_json ): # Lock the table since otherwise we'll have annoying races between the @@ -201,13 +202,13 @@ class PushRuleStore(SQLBaseStore): self._upsert_push_rule_txn( txn, - user_id, rule_id, priority_class, new_prio, + stream_id, stream_ordering, user_id, rule_id, priority_class, new_prio, conditions_json, actions_json, ) def _upsert_push_rule_txn( - self, txn, user_id, rule_id, priority_class, - priority, conditions_json, actions_json + self, txn, stream_id, stream_ordering, user_id, rule_id, priority_class, + priority, conditions_json, actions_json, update_stream=True ): """Specialised version of _simple_upsert_txn that picks a push_rule_id using the _push_rule_id_gen if it needs to insert the rule. It assumes @@ -242,6 +243,23 @@ class PushRuleStore(SQLBaseStore): }, ) + if update_stream: + self._simple_insert_txn( + txn, + table="push_rules_stream", + values={ + "stream_id": stream_id, + "stream_ordering": stream_ordering, + "user_id": user_id, + "rule_id": rule_id, + "op": "ADD", + "priority_class": priority_class, + "priority": priority, + "conditions": conditions_json, + "actions": actions_json, + } + ) + txn.call_after( self.get_push_rules_for_user.invalidate, (user_id,) ) @@ -260,25 +278,47 @@ class PushRuleStore(SQLBaseStore): user_id (str): The matrix ID of the push rule owner rule_id (str): The rule_id of the rule to be deleted """ - yield self._simple_delete_one( - "push_rules", - {'user_name': user_id, 'rule_id': rule_id}, - desc="delete_push_rule", - ) + def delete_push_rule_txn(txn, stream_id, stream_ordering): + self._simple_delete_one_txn( + txn, + "push_rules", + {'user_name': user_id, 'rule_id': rule_id}, + ) + self._simple_insert_txn( + txn, + table="push_rules_stream", + values={ + "stream_id": stream_id, + "stream_ordering": stream_ordering, + "user_id": user_id, + "rule_id": rule_id, + "op": "DELETE", + } + ) + txn.call_after( + self.get_push_rules_for_user.invalidate, (user_id,) + ) + txn.call_after( + self.get_push_rules_enabled_for_user.invalidate, (user_id,) + ) - self.get_push_rules_for_user.invalidate((user_id,)) - self.get_push_rules_enabled_for_user.invalidate((user_id,)) + with self._push_rules_stream_id_gen.get_next() as (stream_id, stream_ordering): + yield self.runInteraction( + "delete_push_rule", delete_push_rule_txn, stream_id, stream_ordering + ) @defer.inlineCallbacks def set_push_rule_enabled(self, user_id, rule_id, enabled): - ret = yield self.runInteraction( - "_set_push_rule_enabled_txn", - self._set_push_rule_enabled_txn, - user_id, rule_id, enabled - ) - defer.returnValue(ret) + with self._push_rules_stream_id_gen.get_next() as (stream_id, stream_ordering): + yield self.runInteraction( + "_set_push_rule_enabled_txn", + self._set_push_rule_enabled_txn, + stream_id, stream_ordering, user_id, rule_id, enabled + ) - def _set_push_rule_enabled_txn(self, txn, user_id, rule_id, enabled): + def _set_push_rule_enabled_txn( + self, txn, stream_id, stream_ordering, user_id, rule_id, enabled + ): new_id = self._push_rules_enable_id_gen.get_next() self._simple_upsert_txn( txn, @@ -287,6 +327,19 @@ class PushRuleStore(SQLBaseStore): {'enabled': 1 if enabled else 0}, {'id': new_id}, ) + + self._simple_insert_txn( + txn, + "push_rules_stream", + values={ + "stream_id": stream_id, + "stream_ordering": stream_ordering, + "user_id": user_id, + "rule_id": rule_id, + "op": "ENABLE" if enabled else "DISABLE", + } + ) + txn.call_after( self.get_push_rules_for_user.invalidate, (user_id,) ) @@ -294,18 +347,20 @@ class PushRuleStore(SQLBaseStore): self.get_push_rules_enabled_for_user.invalidate, (user_id,) ) + @defer.inlineCallbacks def set_push_rule_actions(self, user_id, rule_id, actions, is_default_rule): actions_json = json.dumps(actions) - def set_push_rule_actions_txn(txn): + def set_push_rule_actions_txn(txn, stream_id, stream_ordering): if is_default_rule: # Add a dummy rule to the rules table with the user specified # actions. priority_class = -1 priority = 1 self._upsert_push_rule_txn( - txn, user_id, rule_id, priority_class, priority, - "[]", actions_json + txn, stream_id, stream_ordering, user_id, rule_id, + priority_class, priority, "[]", actions_json, + update_stream=False ) else: self._simple_update_one_txn( @@ -315,8 +370,46 @@ class PushRuleStore(SQLBaseStore): {'actions': actions_json}, ) + self._simple_insert_txn( + txn, + "push_rules_stream", + values={ + "stream_id": stream_id, + "stream_ordering": stream_ordering, + "user_id": user_id, + "rule_id": rule_id, + "op": "ACTIONS", + "actions": actions_json, + } + ) + + txn.call_after( + self.get_push_rules_for_user.invalidate, (user_id,) + ) + txn.call_after( + self.get_push_rules_enabled_for_user.invalidate, (user_id,) + ) + + with self._push_rules_stream_id_gen.get_next() as (stream_id, stream_ordering): + yield self.runInteraction( + "set_push_rule_actions", set_push_rule_actions_txn, + stream_id, stream_ordering + ) + + def get_all_push_rule_updates(self, last_id, current_id, limit): + """Get all the push rules changes that have happend on the server""" + def get_all_push_rule_updates_txn(txn): + sql = ( + "SELECT stream_id, stream_ordering, user_id, rule_id," + " op, priority_class, priority, conditions, actions" + " FROM push_rules_stream" + " WHERE ? < stream_id and stream_id <= ?" + " ORDER BY stream_id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + return txn.fetchall() return self.runInteraction( - "set_push_rule_actions", set_push_rule_actions_txn, + "get_all_push_rule_updates", get_all_push_rule_updates_txn ) |