summary refs log tree commit diff
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2016-03-04 14:44:01 +0000
committerMark Haines <mark.haines@matrix.org>2016-03-04 14:44:01 +0000
commit1b4f4a936fb416d81203fcd66be690f9a04b2b62 (patch)
treee9d7be97688385f7d221822495bbf6ee19401600
parentMove the code for formatting push rules into a separate function (diff)
downloadsynapse-1b4f4a936fb416d81203fcd66be690f9a04b2b62.tar.xz
Hook up the push rules stream to account_data in /sync
-rw-r--r--synapse/handlers/sync.py22
-rw-r--r--synapse/rest/client/v1/push_rule.py2
-rw-r--r--synapse/storage/__init__.py5
-rw-r--r--synapse/storage/push_rule.py125
4 files changed, 85 insertions, 69 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index fded6e4009..92eab20c7c 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -20,6 +20,7 @@ from synapse.api.constants import Membership, EventTypes
 from synapse.util import unwrapFirstError
 from synapse.util.logcontext import LoggingContext, preserve_fn
 from synapse.util.metrics import Measure
+from synapse.push.clientformat import format_push_rules_for_user
 
 from twisted.internet import defer
 
@@ -224,6 +225,10 @@ class SyncHandler(BaseHandler):
             )
         )
 
+        account_data['m.push_rules'] = yield self.push_rules_for_user(
+            sync_config.user
+        )
+
         tags_by_room = yield self.store.get_tags_for_user(
             sync_config.user.to_string()
         )
@@ -322,6 +327,14 @@ class SyncHandler(BaseHandler):
 
         defer.returnValue(room_sync)
 
+    @defer.inlineCallbacks
+    def push_rules_for_user(self, user):
+        user_id = user.to_string()
+        rawrules = yield self.store.get_push_rules_for_user(user_id)
+        enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id)
+        rules = format_push_rules_for_user(user, rawrules, enabled_map)
+        defer.returnValue(rules)
+
     def account_data_for_user(self, account_data):
         account_data_events = []
 
@@ -481,6 +494,15 @@ class SyncHandler(BaseHandler):
             )
         )
 
+        push_rules_changed = yield self.store.have_push_rules_changed_for_user(
+            user_id, int(since_token.push_rules_key)
+        )
+
+        if push_rules_changed:
+            account_data["m.push_rules"] = yield self.push_rules_for_user(
+                sync_config.user
+            )
+
         # Get a list of membership change events that have happened.
         rooms_changed = yield self.store.get_membership_changes_for_user(
             user_id, since_token.room_key, now_token.room_key
diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py
index edfe28c79b..981d7708db 100644
--- a/synapse/rest/client/v1/push_rule.py
+++ b/synapse/rest/client/v1/push_rule.py
@@ -156,7 +156,7 @@ class PushRuleRestServlet(ClientV1RestServlet):
         return 200, {}
 
     def notify_user(self, user_id):
-        stream_id = self.store.get_push_rules_stream_token()
+        stream_id, _ = self.store.get_push_rules_stream_token()
         self.notifier.on_new_event(
             "push_rules_key", stream_id, users=[user_id]
         )
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index e2d7b52569..7b7b03d052 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -160,6 +160,11 @@ class DataStore(RoomMemberStore, RoomStore,
             prefilled_cache=presence_cache_prefill
         )
 
+        self.push_rules_stream_cache = StreamChangeCache(
+            "PushRulesStreamChangeCache",
+            self._push_rules_stream_id_gen.get_max_token()[0],
+        )
+
         super(DataStore, self).__init__(hs)
 
     def take_presence_startup_info(self):
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index e034024108..792fcbdf5b 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -244,15 +244,10 @@ class PushRuleStore(SQLBaseStore):
             )
 
         if update_stream:
-            self._simple_insert_txn(
-                txn,
-                table="push_rules_stream",
-                values={
-                    "stream_id": stream_id,
-                    "stream_ordering": stream_ordering,
-                    "user_id": user_id,
-                    "rule_id": rule_id,
-                    "op": "ADD",
+            self._insert_push_rules_update_txn(
+                txn, stream_id, stream_ordering, user_id, rule_id,
+                op="ADD",
+                data={
                     "priority_class": priority_class,
                     "priority": priority,
                     "conditions": conditions_json,
@@ -260,13 +255,6 @@ class PushRuleStore(SQLBaseStore):
                 }
             )
 
-        txn.call_after(
-            self.get_push_rules_for_user.invalidate, (user_id,)
-        )
-        txn.call_after(
-            self.get_push_rules_enabled_for_user.invalidate, (user_id,)
-        )
-
     @defer.inlineCallbacks
     def delete_push_rule(self, user_id, rule_id):
         """
@@ -284,22 +272,10 @@ class PushRuleStore(SQLBaseStore):
                 "push_rules",
                 {'user_name': user_id, 'rule_id': rule_id},
             )
-            self._simple_insert_txn(
-                txn,
-                table="push_rules_stream",
-                values={
-                    "stream_id": stream_id,
-                    "stream_ordering": stream_ordering,
-                    "user_id": user_id,
-                    "rule_id": rule_id,
-                    "op": "DELETE",
-                }
-            )
-            txn.call_after(
-                self.get_push_rules_for_user.invalidate, (user_id,)
-            )
-            txn.call_after(
-                self.get_push_rules_enabled_for_user.invalidate, (user_id,)
+
+            self._insert_push_rules_update_txn(
+                txn, stream_id, stream_ordering, user_id, rule_id,
+                op="DELETE"
             )
 
         with self._push_rules_stream_id_gen.get_next() as (stream_id, stream_ordering):
@@ -328,23 +304,9 @@ class PushRuleStore(SQLBaseStore):
             {'id': new_id},
         )
 
-        self._simple_insert_txn(
-            txn,
-            "push_rules_stream",
-            values={
-                "stream_id": stream_id,
-                "stream_ordering": stream_ordering,
-                "user_id": user_id,
-                "rule_id": rule_id,
-                "op": "ENABLE" if enabled else "DISABLE",
-            }
-        )
-
-        txn.call_after(
-            self.get_push_rules_for_user.invalidate, (user_id,)
-        )
-        txn.call_after(
-            self.get_push_rules_enabled_for_user.invalidate, (user_id,)
+        self._insert_push_rules_update_txn(
+            txn, stream_id, stream_ordering, user_id, rule_id,
+            op="ENABLE" if enabled else "DISABLE"
         )
 
     @defer.inlineCallbacks
@@ -370,24 +332,9 @@ class PushRuleStore(SQLBaseStore):
                     {'actions': actions_json},
                 )
 
-            self._simple_insert_txn(
-                txn,
-                "push_rules_stream",
-                values={
-                    "stream_id": stream_id,
-                    "stream_ordering": stream_ordering,
-                    "user_id": user_id,
-                    "rule_id": rule_id,
-                    "op": "ACTIONS",
-                    "actions": actions_json,
-                }
-            )
-
-            txn.call_after(
-                self.get_push_rules_for_user.invalidate, (user_id,)
-            )
-            txn.call_after(
-                self.get_push_rules_enabled_for_user.invalidate, (user_id,)
+            self._insert_push_rules_update_txn(
+                txn, stream_id, stream_ordering, user_id, rule_id,
+                op="ACTIONS", data={"actions": actions_json}
             )
 
         with self._push_rules_stream_id_gen.get_next() as (stream_id, stream_ordering):
@@ -396,6 +343,31 @@ class PushRuleStore(SQLBaseStore):
                 stream_id, stream_ordering
             )
 
+    def _insert_push_rules_update_txn(
+        self, txn, stream_id, stream_ordering, user_id, rule_id, op, data=None
+    ):
+        values = {
+            "stream_id": stream_id,
+            "stream_ordering": stream_ordering,
+            "user_id": user_id,
+            "rule_id": rule_id,
+            "op": op,
+        }
+        if data is not None:
+            values.update(data)
+
+        self._simple_insert_txn(txn, "push_rules_stream", values=values)
+
+        txn.call_after(
+            self.get_push_rules_for_user.invalidate, (user_id,)
+        )
+        txn.call_after(
+            self.get_push_rules_enabled_for_user.invalidate, (user_id,)
+        )
+        txn.call_after(
+            self.push_rules_stream_cache.entity_has_changed, user_id, stream_id
+        )
+
     def get_all_push_rule_updates(self, last_id, current_id, limit):
         """Get all the push rules changes that have happend on the server"""
         def get_all_push_rule_updates_txn(txn):
@@ -403,7 +375,7 @@ class PushRuleStore(SQLBaseStore):
                 "SELECT stream_id, stream_ordering, user_id, rule_id,"
                 " op, priority_class, priority, conditions, actions"
                 " FROM push_rules_stream"
-                " WHERE ? < stream_id and stream_id <= ?"
+                " WHERE ? < stream_id AND stream_id <= ?"
                 " ORDER BY stream_id ASC LIMIT ?"
             )
             txn.execute(sql, (last_id, current_id, limit))
@@ -418,6 +390,23 @@ class PushRuleStore(SQLBaseStore):
         room stream ordering it corresponds to."""
         return self._push_rules_stream_id_gen.get_max_token()
 
+    def have_push_rules_changed_for_user(self, user_id, last_id):
+        if not self.push_rules_stream_cache.has_entity_changed(user_id, last_id):
+            logger.error("FNARG")
+            return defer.succeed(False)
+        else:
+            def have_push_rules_changed_txn(txn):
+                sql = (
+                    "SELECT COUNT(stream_id) FROM push_rules_stream"
+                    " WHERE user_id = ? AND ? < stream_id"
+                )
+                txn.execute(sql, (user_id, last_id))
+                count, = txn.fetchone()
+                return bool(count)
+            return self.runInteraction(
+                "have_push_rules_changed", have_push_rules_changed_txn
+            )
+
 
 class RuleNotFoundException(Exception):
     pass