diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index fded6e4009..92eab20c7c 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -20,6 +20,7 @@ from synapse.api.constants import Membership, EventTypes
from synapse.util import unwrapFirstError
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.metrics import Measure
+from synapse.push.clientformat import format_push_rules_for_user
from twisted.internet import defer
@@ -224,6 +225,10 @@ class SyncHandler(BaseHandler):
)
)
+ account_data['m.push_rules'] = yield self.push_rules_for_user(
+ sync_config.user
+ )
+
tags_by_room = yield self.store.get_tags_for_user(
sync_config.user.to_string()
)
@@ -322,6 +327,14 @@ class SyncHandler(BaseHandler):
defer.returnValue(room_sync)
+ @defer.inlineCallbacks
+ def push_rules_for_user(self, user):
+ user_id = user.to_string()
+ rawrules = yield self.store.get_push_rules_for_user(user_id)
+ enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id)
+ rules = format_push_rules_for_user(user, rawrules, enabled_map)
+ defer.returnValue(rules)
+
def account_data_for_user(self, account_data):
account_data_events = []
@@ -481,6 +494,15 @@ class SyncHandler(BaseHandler):
)
)
+ push_rules_changed = yield self.store.have_push_rules_changed_for_user(
+ user_id, int(since_token.push_rules_key)
+ )
+
+ if push_rules_changed:
+ account_data["m.push_rules"] = yield self.push_rules_for_user(
+ sync_config.user
+ )
+
# Get a list of membership change events that have happened.
rooms_changed = yield self.store.get_membership_changes_for_user(
user_id, since_token.room_key, now_token.room_key
diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py
index edfe28c79b..981d7708db 100644
--- a/synapse/rest/client/v1/push_rule.py
+++ b/synapse/rest/client/v1/push_rule.py
@@ -156,7 +156,7 @@ class PushRuleRestServlet(ClientV1RestServlet):
return 200, {}
def notify_user(self, user_id):
- stream_id = self.store.get_push_rules_stream_token()
+ stream_id, _ = self.store.get_push_rules_stream_token()
self.notifier.on_new_event(
"push_rules_key", stream_id, users=[user_id]
)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index e2d7b52569..7b7b03d052 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -160,6 +160,11 @@ class DataStore(RoomMemberStore, RoomStore,
prefilled_cache=presence_cache_prefill
)
+ self.push_rules_stream_cache = StreamChangeCache(
+ "PushRulesStreamChangeCache",
+ self._push_rules_stream_id_gen.get_max_token()[0],
+ )
+
super(DataStore, self).__init__(hs)
def take_presence_startup_info(self):
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index e034024108..792fcbdf5b 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -244,15 +244,10 @@ class PushRuleStore(SQLBaseStore):
)
if update_stream:
- self._simple_insert_txn(
- txn,
- table="push_rules_stream",
- values={
- "stream_id": stream_id,
- "stream_ordering": stream_ordering,
- "user_id": user_id,
- "rule_id": rule_id,
- "op": "ADD",
+ self._insert_push_rules_update_txn(
+ txn, stream_id, stream_ordering, user_id, rule_id,
+ op="ADD",
+ data={
"priority_class": priority_class,
"priority": priority,
"conditions": conditions_json,
@@ -260,13 +255,6 @@ class PushRuleStore(SQLBaseStore):
}
)
- txn.call_after(
- self.get_push_rules_for_user.invalidate, (user_id,)
- )
- txn.call_after(
- self.get_push_rules_enabled_for_user.invalidate, (user_id,)
- )
-
@defer.inlineCallbacks
def delete_push_rule(self, user_id, rule_id):
"""
@@ -284,22 +272,10 @@ class PushRuleStore(SQLBaseStore):
"push_rules",
{'user_name': user_id, 'rule_id': rule_id},
)
- self._simple_insert_txn(
- txn,
- table="push_rules_stream",
- values={
- "stream_id": stream_id,
- "stream_ordering": stream_ordering,
- "user_id": user_id,
- "rule_id": rule_id,
- "op": "DELETE",
- }
- )
- txn.call_after(
- self.get_push_rules_for_user.invalidate, (user_id,)
- )
- txn.call_after(
- self.get_push_rules_enabled_for_user.invalidate, (user_id,)
+
+ self._insert_push_rules_update_txn(
+ txn, stream_id, stream_ordering, user_id, rule_id,
+ op="DELETE"
)
with self._push_rules_stream_id_gen.get_next() as (stream_id, stream_ordering):
@@ -328,23 +304,9 @@ class PushRuleStore(SQLBaseStore):
{'id': new_id},
)
- self._simple_insert_txn(
- txn,
- "push_rules_stream",
- values={
- "stream_id": stream_id,
- "stream_ordering": stream_ordering,
- "user_id": user_id,
- "rule_id": rule_id,
- "op": "ENABLE" if enabled else "DISABLE",
- }
- )
-
- txn.call_after(
- self.get_push_rules_for_user.invalidate, (user_id,)
- )
- txn.call_after(
- self.get_push_rules_enabled_for_user.invalidate, (user_id,)
+ self._insert_push_rules_update_txn(
+ txn, stream_id, stream_ordering, user_id, rule_id,
+ op="ENABLE" if enabled else "DISABLE"
)
@defer.inlineCallbacks
@@ -370,24 +332,9 @@ class PushRuleStore(SQLBaseStore):
{'actions': actions_json},
)
- self._simple_insert_txn(
- txn,
- "push_rules_stream",
- values={
- "stream_id": stream_id,
- "stream_ordering": stream_ordering,
- "user_id": user_id,
- "rule_id": rule_id,
- "op": "ACTIONS",
- "actions": actions_json,
- }
- )
-
- txn.call_after(
- self.get_push_rules_for_user.invalidate, (user_id,)
- )
- txn.call_after(
- self.get_push_rules_enabled_for_user.invalidate, (user_id,)
+ self._insert_push_rules_update_txn(
+ txn, stream_id, stream_ordering, user_id, rule_id,
+ op="ACTIONS", data={"actions": actions_json}
)
with self._push_rules_stream_id_gen.get_next() as (stream_id, stream_ordering):
@@ -396,6 +343,31 @@ class PushRuleStore(SQLBaseStore):
stream_id, stream_ordering
)
+ def _insert_push_rules_update_txn(
+ self, txn, stream_id, stream_ordering, user_id, rule_id, op, data=None
+ ):
+ values = {
+ "stream_id": stream_id,
+ "stream_ordering": stream_ordering,
+ "user_id": user_id,
+ "rule_id": rule_id,
+ "op": op,
+ }
+ if data is not None:
+ values.update(data)
+
+ self._simple_insert_txn(txn, "push_rules_stream", values=values)
+
+ txn.call_after(
+ self.get_push_rules_for_user.invalidate, (user_id,)
+ )
+ txn.call_after(
+ self.get_push_rules_enabled_for_user.invalidate, (user_id,)
+ )
+ txn.call_after(
+ self.push_rules_stream_cache.entity_has_changed, user_id, stream_id
+ )
+
def get_all_push_rule_updates(self, last_id, current_id, limit):
"""Get all the push rules changes that have happend on the server"""
def get_all_push_rule_updates_txn(txn):
@@ -403,7 +375,7 @@ class PushRuleStore(SQLBaseStore):
"SELECT stream_id, stream_ordering, user_id, rule_id,"
" op, priority_class, priority, conditions, actions"
" FROM push_rules_stream"
- " WHERE ? < stream_id and stream_id <= ?"
+ " WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC LIMIT ?"
)
txn.execute(sql, (last_id, current_id, limit))
@@ -418,6 +390,23 @@ class PushRuleStore(SQLBaseStore):
room stream ordering it corresponds to."""
return self._push_rules_stream_id_gen.get_max_token()
+ def have_push_rules_changed_for_user(self, user_id, last_id):
+ if not self.push_rules_stream_cache.has_entity_changed(user_id, last_id):
+ logger.error("FNARG")
+ return defer.succeed(False)
+ else:
+ def have_push_rules_changed_txn(txn):
+ sql = (
+ "SELECT COUNT(stream_id) FROM push_rules_stream"
+ " WHERE user_id = ? AND ? < stream_id"
+ )
+ txn.execute(sql, (user_id, last_id))
+ count, = txn.fetchone()
+ return bool(count)
+ return self.runInteraction(
+ "have_push_rules_changed", have_push_rules_changed_txn
+ )
+
class RuleNotFoundException(Exception):
pass
|