summary refs log tree commit diff
path: root/synapse/storage/push_rule.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/push_rule.py')
-rw-r--r--synapse/storage/push_rule.py193
1 files changed, 142 insertions, 51 deletions
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 56e69495b1..9dbad2fd5f 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -99,30 +99,32 @@ class PushRuleStore(SQLBaseStore):
             results.setdefault(row['user_name'], {})[row['rule_id']] = row['enabled']
         defer.returnValue(results)
 
+    @defer.inlineCallbacks
     def add_push_rule(
         self, user_id, rule_id, priority_class, conditions, actions,
         before=None, after=None
     ):
         conditions_json = json.dumps(conditions)
         actions_json = json.dumps(actions)
-
-        if before or after:
-            return self.runInteraction(
-                "_add_push_rule_relative_txn",
-                self._add_push_rule_relative_txn,
-                user_id, rule_id, priority_class,
-                conditions_json, actions_json, before, after,
-            )
-        else:
-            return self.runInteraction(
-                "_add_push_rule_highest_priority_txn",
-                self._add_push_rule_highest_priority_txn,
-                user_id, rule_id, priority_class,
-                conditions_json, actions_json,
-            )
+        with self._push_rules_stream_id_gen.get_next() as ids:
+            stream_id, event_stream_ordering = ids
+            if before or after:
+                yield self.runInteraction(
+                    "_add_push_rule_relative_txn",
+                    self._add_push_rule_relative_txn,
+                    stream_id, event_stream_ordering, user_id, rule_id, priority_class,
+                    conditions_json, actions_json, before, after,
+                )
+            else:
+                yield self.runInteraction(
+                    "_add_push_rule_highest_priority_txn",
+                    self._add_push_rule_highest_priority_txn,
+                    stream_id, event_stream_ordering, user_id, rule_id, priority_class,
+                    conditions_json, actions_json,
+                )
 
     def _add_push_rule_relative_txn(
-        self, txn, user_id, rule_id, priority_class,
+        self, txn, stream_id, event_stream_ordering, user_id, rule_id, priority_class,
         conditions_json, actions_json, before, after
     ):
         # Lock the table since otherwise we'll have annoying races between the
@@ -174,12 +176,12 @@ class PushRuleStore(SQLBaseStore):
         txn.execute(sql, (user_id, priority_class, new_rule_priority))
 
         self._upsert_push_rule_txn(
-            txn, user_id, rule_id, priority_class, new_rule_priority,
-            conditions_json, actions_json,
+            txn, stream_id, event_stream_ordering, user_id, rule_id, priority_class,
+            new_rule_priority, conditions_json, actions_json,
         )
 
     def _add_push_rule_highest_priority_txn(
-        self, txn, user_id, rule_id, priority_class,
+        self, txn, stream_id, event_stream_ordering, user_id, rule_id, priority_class,
         conditions_json, actions_json
     ):
         # Lock the table since otherwise we'll have annoying races between the
@@ -201,13 +203,13 @@ class PushRuleStore(SQLBaseStore):
 
         self._upsert_push_rule_txn(
             txn,
-            user_id, rule_id, priority_class, new_prio,
+            stream_id, event_stream_ordering, user_id, rule_id, priority_class, new_prio,
             conditions_json, actions_json,
         )
 
     def _upsert_push_rule_txn(
-        self, txn, user_id, rule_id, priority_class,
-        priority, conditions_json, actions_json
+        self, txn, stream_id, event_stream_ordering, user_id, rule_id, priority_class,
+        priority, conditions_json, actions_json, update_stream=True
     ):
         """Specialised version of _simple_upsert_txn that picks a push_rule_id
         using the _push_rule_id_gen if it needs to insert the rule. It assumes
@@ -242,12 +244,17 @@ class PushRuleStore(SQLBaseStore):
                 },
             )
 
-        txn.call_after(
-            self.get_push_rules_for_user.invalidate, (user_id,)
-        )
-        txn.call_after(
-            self.get_push_rules_enabled_for_user.invalidate, (user_id,)
-        )
+        if update_stream:
+            self._insert_push_rules_update_txn(
+                txn, stream_id, event_stream_ordering, user_id, rule_id,
+                op="ADD",
+                data={
+                    "priority_class": priority_class,
+                    "priority": priority,
+                    "conditions": conditions_json,
+                    "actions": actions_json,
+                }
+            )
 
     @defer.inlineCallbacks
     def delete_push_rule(self, user_id, rule_id):
@@ -260,25 +267,37 @@ class PushRuleStore(SQLBaseStore):
             user_id (str): The matrix ID of the push rule owner
             rule_id (str): The rule_id of the rule to be deleted
         """
-        yield self._simple_delete_one(
-            "push_rules",
-            {'user_name': user_id, 'rule_id': rule_id},
-            desc="delete_push_rule",
-        )
+        def delete_push_rule_txn(txn, stream_id, event_stream_ordering):
+            self._simple_delete_one_txn(
+                txn,
+                "push_rules",
+                {'user_name': user_id, 'rule_id': rule_id},
+            )
 
-        self.get_push_rules_for_user.invalidate((user_id,))
-        self.get_push_rules_enabled_for_user.invalidate((user_id,))
+            self._insert_push_rules_update_txn(
+                txn, stream_id, event_stream_ordering, user_id, rule_id,
+                op="DELETE"
+            )
+
+        with self._push_rules_stream_id_gen.get_next() as ids:
+            stream_id, event_stream_ordering = ids
+            yield self.runInteraction(
+                "delete_push_rule", delete_push_rule_txn, stream_id, event_stream_ordering
+            )
 
     @defer.inlineCallbacks
     def set_push_rule_enabled(self, user_id, rule_id, enabled):
-        ret = yield self.runInteraction(
-            "_set_push_rule_enabled_txn",
-            self._set_push_rule_enabled_txn,
-            user_id, rule_id, enabled
-        )
-        defer.returnValue(ret)
+        with self._push_rules_stream_id_gen.get_next() as ids:
+            stream_id, event_stream_ordering = ids
+            yield self.runInteraction(
+                "_set_push_rule_enabled_txn",
+                self._set_push_rule_enabled_txn,
+                stream_id, event_stream_ordering, user_id, rule_id, enabled
+            )
 
-    def _set_push_rule_enabled_txn(self, txn, user_id, rule_id, enabled):
+    def _set_push_rule_enabled_txn(
+        self, txn, stream_id, event_stream_ordering, user_id, rule_id, enabled
+    ):
         new_id = self._push_rules_enable_id_gen.get_next()
         self._simple_upsert_txn(
             txn,
@@ -287,25 +306,26 @@ class PushRuleStore(SQLBaseStore):
             {'enabled': 1 if enabled else 0},
             {'id': new_id},
         )
-        txn.call_after(
-            self.get_push_rules_for_user.invalidate, (user_id,)
-        )
-        txn.call_after(
-            self.get_push_rules_enabled_for_user.invalidate, (user_id,)
+
+        self._insert_push_rules_update_txn(
+            txn, stream_id, event_stream_ordering, user_id, rule_id,
+            op="ENABLE" if enabled else "DISABLE"
         )
 
+    @defer.inlineCallbacks
     def set_push_rule_actions(self, user_id, rule_id, actions, is_default_rule):
         actions_json = json.dumps(actions)
 
-        def set_push_rule_actions_txn(txn):
+        def set_push_rule_actions_txn(txn, stream_id, event_stream_ordering):
             if is_default_rule:
                 # Add a dummy rule to the rules table with the user specified
                 # actions.
                 priority_class = -1
                 priority = 1
                 self._upsert_push_rule_txn(
-                    txn, user_id, rule_id, priority_class, priority,
-                    "[]", actions_json
+                    txn, stream_id, event_stream_ordering, user_id, rule_id,
+                    priority_class, priority, "[]", actions_json,
+                    update_stream=False
                 )
             else:
                 self._simple_update_one_txn(
@@ -315,10 +335,81 @@ class PushRuleStore(SQLBaseStore):
                     {'actions': actions_json},
                 )
 
+            self._insert_push_rules_update_txn(
+                txn, stream_id, event_stream_ordering, user_id, rule_id,
+                op="ACTIONS", data={"actions": actions_json}
+            )
+
+        with self._push_rules_stream_id_gen.get_next() as ids:
+            stream_id, event_stream_ordering = ids
+            yield self.runInteraction(
+                "set_push_rule_actions", set_push_rule_actions_txn,
+                stream_id, event_stream_ordering
+            )
+
+    def _insert_push_rules_update_txn(
+        self, txn, stream_id, event_stream_ordering, user_id, rule_id, op, data=None
+    ):
+        values = {
+            "stream_id": stream_id,
+            "event_stream_ordering": event_stream_ordering,
+            "user_id": user_id,
+            "rule_id": rule_id,
+            "op": op,
+        }
+        if data is not None:
+            values.update(data)
+
+        self._simple_insert_txn(txn, "push_rules_stream", values=values)
+
+        txn.call_after(
+            self.get_push_rules_for_user.invalidate, (user_id,)
+        )
+        txn.call_after(
+            self.get_push_rules_enabled_for_user.invalidate, (user_id,)
+        )
+        txn.call_after(
+            self.push_rules_stream_cache.entity_has_changed, user_id, stream_id
+        )
+
+    def get_all_push_rule_updates(self, last_id, current_id, limit):
+        """Get all the push rules changes that have happend on the server"""
+        def get_all_push_rule_updates_txn(txn):
+            sql = (
+                "SELECT stream_id, event_stream_ordering, user_id, rule_id,"
+                " op, priority_class, priority, conditions, actions"
+                " FROM push_rules_stream"
+                " WHERE ? < stream_id AND stream_id <= ?"
+                " ORDER BY stream_id ASC LIMIT ?"
+            )
+            txn.execute(sql, (last_id, current_id, limit))
+            return txn.fetchall()
         return self.runInteraction(
-            "set_push_rule_actions", set_push_rule_actions_txn,
+            "get_all_push_rule_updates", get_all_push_rule_updates_txn
         )
 
+    def get_push_rules_stream_token(self):
+        """Get the position of the push rules stream.
+        Returns a pair of a stream id for the push_rules stream and the
+        room stream ordering it corresponds to."""
+        return self._push_rules_stream_id_gen.get_max_token()
+
+    def have_push_rules_changed_for_user(self, user_id, last_id):
+        if not self.push_rules_stream_cache.has_entity_changed(user_id, last_id):
+            return defer.succeed(False)
+        else:
+            def have_push_rules_changed_txn(txn):
+                sql = (
+                    "SELECT COUNT(stream_id) FROM push_rules_stream"
+                    " WHERE user_id = ? AND ? < stream_id"
+                )
+                txn.execute(sql, (user_id, last_id))
+                count, = txn.fetchone()
+                return bool(count)
+            return self.runInteraction(
+                "have_push_rules_changed", have_push_rules_changed_txn
+            )
+
 
 class RuleNotFoundException(Exception):
     pass