1 files changed, 10 insertions, 13 deletions
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
index 21ceb0213a..83e880fdd2 100644
--- a/synapse/replication/slave/storage/push_rule.py
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -50,18 +50,15 @@ class SlavedPushRuleStore(SlavedEventStore):
result["push_rules"] = self._push_rules_stream_id_gen.get_current_token()
return result
- def process_replication(self, result):
- stream = result.get("push_rules")
- if stream:
- for row in stream["rows"]:
- position = row[0]
- user_id = row[2]
- self.get_push_rules_for_user.invalidate((user_id,))
- self.get_push_rules_enabled_for_user.invalidate((user_id,))
+ def process_replication_rows(self, stream_name, token, rows):
+ if stream_name == "push_rules":
+ self._push_rules_stream_id_gen.advance(token)
+ for row in rows:
+ self.get_push_rules_for_user.invalidate((row.user_id,))
+ self.get_push_rules_enabled_for_user.invalidate((row.user_id,))
self.push_rules_stream_cache.entity_has_changed(
- user_id, position
+ row.user_id, token
)
-
- self._push_rules_stream_id_gen.advance(int(stream["position"]))
-
- return super(SlavedPushRuleStore, self).process_replication(result)
+ return super(SlavedPushRuleStore, self).process_replication_rows(
+ stream_name, token, rows
+ )
|