summary refs log tree commit diff
path: root/synapse/storage/data_stores
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-04-07 11:01:04 +0100
committerGitHub <noreply@github.com>2020-04-07 11:01:04 +0100
commitce72355d7f67a986d60a7d86489b1b40f93fb152 (patch)
tree9d562fd0a9d98d6ce1eb96c1e837008d43c61ec2 /synapse/storage/data_stores
parentMove server command handling out of TCP protocol (#7187) (diff)
downloadsynapse-ce72355d7f67a986d60a7d86489b1b40f93fb152.tar.xz
Fix race in replication (#7226)
Fixes a race between handling `POSITION` and `RDATA` commands. We do this by simply linearizing handling of them.
Diffstat (limited to 'synapse/storage/data_stores')
-rw-r--r--synapse/storage/data_stores/main/push_rule.py40
1 files changed, 20 insertions, 20 deletions
diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py
index 46f9bda773..b3faafa0a4 100644
--- a/synapse/storage/data_stores/main/push_rule.py
+++ b/synapse/storage/data_stores/main/push_rule.py
@@ -334,6 +334,26 @@ class PushRulesWorkerStore(
             results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled
         return results
 
+    def get_all_push_rule_updates(self, last_id, current_id, limit):
+        """Get all the push rules changes that have happend on the server"""
+        if last_id == current_id:
+            return defer.succeed([])
+
+        def get_all_push_rule_updates_txn(txn):
+            sql = (
+                "SELECT stream_id, event_stream_ordering, user_id, rule_id,"
+                " op, priority_class, priority, conditions, actions"
+                " FROM push_rules_stream"
+                " WHERE ? < stream_id AND stream_id <= ?"
+                " ORDER BY stream_id ASC LIMIT ?"
+            )
+            txn.execute(sql, (last_id, current_id, limit))
+            return txn.fetchall()
+
+        return self.db.runInteraction(
+            "get_all_push_rule_updates", get_all_push_rule_updates_txn
+        )
+
 
 class PushRuleStore(PushRulesWorkerStore):
     @defer.inlineCallbacks
@@ -685,26 +705,6 @@ class PushRuleStore(PushRulesWorkerStore):
             self.push_rules_stream_cache.entity_has_changed, user_id, stream_id
         )
 
-    def get_all_push_rule_updates(self, last_id, current_id, limit):
-        """Get all the push rules changes that have happend on the server"""
-        if last_id == current_id:
-            return defer.succeed([])
-
-        def get_all_push_rule_updates_txn(txn):
-            sql = (
-                "SELECT stream_id, event_stream_ordering, user_id, rule_id,"
-                " op, priority_class, priority, conditions, actions"
-                " FROM push_rules_stream"
-                " WHERE ? < stream_id AND stream_id <= ?"
-                " ORDER BY stream_id ASC LIMIT ?"
-            )
-            txn.execute(sql, (last_id, current_id, limit))
-            return txn.fetchall()
-
-        return self.db.runInteraction(
-            "get_all_push_rule_updates", get_all_push_rule_updates_txn
-        )
-
     def get_push_rules_stream_token(self):
         """Get the position of the push rules stream.
         Returns a pair of a stream id for the push_rules stream and the