summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/event_federation.py8
-rw-r--r--synapse/storage/event_push_actions.py7
-rw-r--r--synapse/storage/push_rule.py168
-rw-r--r--synapse/storage/pusher.py6
4 files changed, 96 insertions, 93 deletions
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index ce2c794025..3489315e0d 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -114,10 +114,10 @@ class EventFederationStore(SQLBaseStore):
             retcol="event_id",
         )
 
-    def get_latest_events_in_room(self, room_id):
+    def get_latest_event_ids_and_hashes_in_room(self, room_id):
         return self.runInteraction(
-            "get_latest_events_in_room",
-            self._get_latest_events_in_room,
+            "get_latest_event_ids_and_hashes_in_room",
+            self._get_latest_event_ids_and_hashes_in_room,
             room_id,
         )
 
@@ -132,7 +132,7 @@ class EventFederationStore(SQLBaseStore):
             desc="get_latest_event_ids_in_room",
         )
 
-    def _get_latest_events_in_room(self, txn, room_id):
+    def _get_latest_event_ids_and_hashes_in_room(self, txn, room_id):
         sql = (
             "SELECT e.event_id, e.depth FROM events as e "
             "INNER JOIN event_forward_extremities as f "
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index d77a817682..5820539a92 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -27,15 +27,14 @@ class EventPushActionsStore(SQLBaseStore):
     def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
         """
         :param event: the event set actions for
-        :param tuples: list of tuples of (user_id, profile_tag, actions)
+        :param tuples: list of tuples of (user_id, actions)
         """
         values = []
-        for uid, profile_tag, actions in tuples:
+        for uid, actions in tuples:
             values.append({
                 'room_id': event.room_id,
                 'event_id': event.event_id,
                 'user_id': uid,
-                'profile_tag': profile_tag,
                 'actions': json.dumps(actions),
                 'stream_ordering': event.internal_metadata.stream_ordering,
                 'topological_ordering': event.depth,
@@ -43,7 +42,7 @@ class EventPushActionsStore(SQLBaseStore):
                 'highlight': 1 if _action_has_highlight(actions) else 0,
             })
 
-        for uid, _, __ in tuples:
+        for uid, __ in tuples:
             txn.call_after(
                 self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
                 (event.room_id, uid)
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index f9a48171ba..e19a81e41f 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -99,38 +99,36 @@ class PushRuleStore(SQLBaseStore):
             results.setdefault(row['user_name'], {})[row['rule_id']] = row['enabled']
         defer.returnValue(results)
 
-    @defer.inlineCallbacks
-    def add_push_rule(self, before, after, **kwargs):
-        vals = kwargs
-        if 'conditions' in vals:
-            vals['conditions'] = json.dumps(vals['conditions'])
-        if 'actions' in vals:
-            vals['actions'] = json.dumps(vals['actions'])
-
-        # we could check the rest of the keys are valid column names
-        # but sqlite will do that anyway so I think it's just pointless.
-        vals.pop("id", None)
+    def add_push_rule(
+        self, user_id, rule_id, priority_class, conditions, actions,
+        before=None, after=None
+    ):
+        conditions_json = json.dumps(conditions)
+        actions_json = json.dumps(actions)
 
         if before or after:
-            ret = yield self.runInteraction(
+            return self.runInteraction(
                 "_add_push_rule_relative_txn",
                 self._add_push_rule_relative_txn,
-                before=before,
-                after=after,
-                **vals
+                user_id, rule_id, priority_class,
+                conditions_json, actions_json, before, after,
             )
-            defer.returnValue(ret)
         else:
-            ret = yield self.runInteraction(
+            return self.runInteraction(
                 "_add_push_rule_highest_priority_txn",
                 self._add_push_rule_highest_priority_txn,
-                **vals
+                user_id, rule_id, priority_class,
+                conditions_json, actions_json,
             )
-            defer.returnValue(ret)
 
-    def _add_push_rule_relative_txn(self, txn, user_id, **kwargs):
-        after = kwargs.pop("after", None)
-        before = kwargs.pop("before", None)
+    def _add_push_rule_relative_txn(
+        self, txn, user_id, rule_id, priority_class,
+        conditions_json, actions_json, before, after
+    ):
+        # Lock the table since otherwise we'll have annoying races between the
+        # SELECT here and the UPSERT below.
+        self.database_engine.lock_table(txn, "push_rules")
+
         relative_to_rule = before or after
 
         res = self._simple_select_one_txn(
@@ -149,69 +147,45 @@ class PushRuleStore(SQLBaseStore):
                 "before/after rule not found: %s" % (relative_to_rule,)
             )
 
-        priority_class = res["priority_class"]
+        base_priority_class = res["priority_class"]
         base_rule_priority = res["priority"]
 
-        if 'priority_class' in kwargs and kwargs['priority_class'] != priority_class:
+        if base_priority_class != priority_class:
             raise InconsistentRuleException(
                 "Given priority class does not match class of relative rule"
             )
 
-        new_rule = kwargs
-        new_rule.pop("before", None)
-        new_rule.pop("after", None)
-        new_rule['priority_class'] = priority_class
-        new_rule['user_name'] = user_id
-        new_rule['id'] = self._push_rule_id_gen.get_next_txn(txn)
-
-        # check if the priority before/after is free
-        new_rule_priority = base_rule_priority
-        if after:
-            new_rule_priority -= 1
+        if before:
+            # Higher priority rules are executed first, So adding a rule before
+            # a rule means giving it a higher priority than that rule.
+            new_rule_priority = base_rule_priority + 1
         else:
-            new_rule_priority += 1
-
-        new_rule['priority'] = new_rule_priority
+            # We increment the priority of the existing rules to make space for
+            # the new rule. Therefore if we want this rule to appear after
+            # an existing rule we give it the priority of the existing rule,
+            # and then increment the priority of the existing rule.
+            new_rule_priority = base_rule_priority
 
         sql = (
-            "SELECT COUNT(*) FROM push_rules"
-            " WHERE user_name = ? AND priority_class = ? AND priority = ?"
+            "UPDATE push_rules SET priority = priority + 1"
+            " WHERE user_name = ? AND priority_class = ? AND priority >= ?"
         )
+
         txn.execute(sql, (user_id, priority_class, new_rule_priority))
-        res = txn.fetchall()
-        num_conflicting = res[0][0]
-
-        # if there are conflicting rules, bump everything
-        if num_conflicting:
-            sql = "UPDATE push_rules SET priority = priority "
-            if after:
-                sql += "-1"
-            else:
-                sql += "+1"
-            sql += " WHERE user_name = ? AND priority_class = ? AND priority "
-            if after:
-                sql += "<= ?"
-            else:
-                sql += ">= ?"
-
-            txn.execute(sql, (user_id, priority_class, new_rule_priority))
 
-        txn.call_after(
-            self.get_push_rules_for_user.invalidate, (user_id,)
+        self._upsert_push_rule_txn(
+            txn, user_id, rule_id, priority_class, new_rule_priority,
+            conditions_json, actions_json,
         )
 
-        txn.call_after(
-            self.get_push_rules_enabled_for_user.invalidate, (user_id,)
-        )
+    def _add_push_rule_highest_priority_txn(
+        self, txn, user_id, rule_id, priority_class,
+        conditions_json, actions_json
+    ):
+        # Lock the table since otherwise we'll have annoying races between the
+        # SELECT here and the UPSERT below.
+        self.database_engine.lock_table(txn, "push_rules")
 
-        self._simple_insert_txn(
-            txn,
-            table="push_rules",
-            values=new_rule,
-        )
-
-    def _add_push_rule_highest_priority_txn(self, txn, user_id,
-                                            priority_class, **kwargs):
         # find the highest priority rule in that class
         sql = (
             "SELECT COUNT(*), MAX(priority) FROM push_rules"
@@ -225,12 +199,48 @@ class PushRuleStore(SQLBaseStore):
         if how_many > 0:
             new_prio = highest_prio + 1
 
-        # and insert the new rule
-        new_rule = kwargs
-        new_rule['id'] = self._push_rule_id_gen.get_next_txn(txn)
-        new_rule['user_name'] = user_id
-        new_rule['priority_class'] = priority_class
-        new_rule['priority'] = new_prio
+        self._upsert_push_rule_txn(
+            txn,
+            user_id, rule_id, priority_class, new_prio,
+            conditions_json, actions_json,
+        )
+
+    def _upsert_push_rule_txn(
+        self, txn, user_id, rule_id, priority_class,
+        priority, conditions_json, actions_json
+    ):
+        """Specialised version of _simple_upsert_txn that picks a push_rule_id
+        using the _push_rule_id_gen if it needs to insert the rule. It assumes
+        that the "push_rules" table is locked"""
+
+        sql = (
+            "UPDATE push_rules"
+            " SET priority_class = ?, priority = ?, conditions = ?, actions = ?"
+            " WHERE user_name = ? AND rule_id = ?"
+        )
+
+        txn.execute(sql, (
+            priority_class, priority, conditions_json, actions_json,
+            user_id, rule_id,
+        ))
+
+        if txn.rowcount == 0:
+            # We didn't update a row with the given rule_id so insert one
+            push_rule_id = self._push_rule_id_gen.get_next_txn(txn)
+
+            self._simple_insert_txn(
+                txn,
+                table="push_rules",
+                values={
+                    "id": push_rule_id,
+                    "user_name": user_id,
+                    "rule_id": rule_id,
+                    "priority_class": priority_class,
+                    "priority": priority,
+                    "conditions": conditions_json,
+                    "actions": actions_json,
+                },
+            )
 
         txn.call_after(
             self.get_push_rules_for_user.invalidate, (user_id,)
@@ -239,12 +249,6 @@ class PushRuleStore(SQLBaseStore):
             self.get_push_rules_enabled_for_user.invalidate, (user_id,)
         )
 
-        self._simple_insert_txn(
-            txn,
-            table="push_rules",
-            values=new_rule,
-        )
-
     @defer.inlineCallbacks
     def delete_push_rule(self, user_id, rule_id):
         """
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 8ec706178a..c23648cdbc 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -80,9 +80,9 @@ class PusherStore(SQLBaseStore):
         defer.returnValue(rows)
 
     @defer.inlineCallbacks
-    def add_pusher(self, user_id, access_token, profile_tag, kind, app_id,
+    def add_pusher(self, user_id, access_token, kind, app_id,
                    app_display_name, device_display_name,
-                   pushkey, pushkey_ts, lang, data):
+                   pushkey, pushkey_ts, lang, data, profile_tag=""):
         try:
             next_id = yield self._pushers_id_gen.get_next()
             yield self._simple_upsert(
@@ -95,12 +95,12 @@ class PusherStore(SQLBaseStore):
                 dict(
                     access_token=access_token,
                     kind=kind,
-                    profile_tag=profile_tag,
                     app_display_name=app_display_name,
                     device_display_name=device_display_name,
                     ts=pushkey_ts,
                     lang=lang,
                     data=encode_canonical_json(data),
+                    profile_tag=profile_tag,
                 ),
                 insertion_values=dict(
                     id=next_id,