diff options
author | Erik Johnston <erik@matrix.org> | 2020-04-07 11:01:04 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-07 11:01:04 +0100 |
commit | ce72355d7f67a986d60a7d86489b1b40f93fb152 (patch) | |
tree | 9d562fd0a9d98d6ce1eb96c1e837008d43c61ec2 /synapse/storage/data_stores | |
parent | Move server command handling out of TCP protocol (#7187) (diff) | |
download | synapse-ce72355d7f67a986d60a7d86489b1b40f93fb152.tar.xz |
Fix race in replication (#7226)
Fixes a race between handling `POSITION` and `RDATA` commands. We do this by simply linearizing handling of them.
Diffstat (limited to 'synapse/storage/data_stores')
-rw-r--r-- | synapse/storage/data_stores/main/push_rule.py | 40 |
1 files changed, 20 insertions, 20 deletions
diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py index 46f9bda773..b3faafa0a4 100644 --- a/synapse/storage/data_stores/main/push_rule.py +++ b/synapse/storage/data_stores/main/push_rule.py @@ -334,6 +334,26 @@ class PushRulesWorkerStore( results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled return results + def get_all_push_rule_updates(self, last_id, current_id, limit): + """Get all the push rules changes that have happend on the server""" + if last_id == current_id: + return defer.succeed([]) + + def get_all_push_rule_updates_txn(txn): + sql = ( + "SELECT stream_id, event_stream_ordering, user_id, rule_id," + " op, priority_class, priority, conditions, actions" + " FROM push_rules_stream" + " WHERE ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + return txn.fetchall() + + return self.db.runInteraction( + "get_all_push_rule_updates", get_all_push_rule_updates_txn + ) + class PushRuleStore(PushRulesWorkerStore): @defer.inlineCallbacks @@ -685,26 +705,6 @@ class PushRuleStore(PushRulesWorkerStore): self.push_rules_stream_cache.entity_has_changed, user_id, stream_id ) - def get_all_push_rule_updates(self, last_id, current_id, limit): - """Get all the push rules changes that have happend on the server""" - if last_id == current_id: - return defer.succeed([]) - - def get_all_push_rule_updates_txn(txn): - sql = ( - "SELECT stream_id, event_stream_ordering, user_id, rule_id," - " op, priority_class, priority, conditions, actions" - " FROM push_rules_stream" - " WHERE ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC LIMIT ?" - ) - txn.execute(sql, (last_id, current_id, limit)) - return txn.fetchall() - - return self.db.runInteraction( - "get_all_push_rule_updates", get_all_push_rule_updates_txn - ) - def get_push_rules_stream_token(self): """Get the position of the push rules stream. Returns a pair of a stream id for the push_rules stream and the |