summary refs log tree commit diff
path: root/synapse/storage/pusher.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-02-27 12:01:36 +0000
committerErik Johnston <erik@matrix.org>2018-02-27 13:58:16 +0000
commit493e25d5545389264f696be0e07544bf82a0818a (patch)
tree15b19197bf78f9f0382bb901cfea771712fc176f /synapse/storage/pusher.py
parentMerge pull request #2904 from matrix-org/erikj/receipt_cache_invalidation (diff)
downloadsynapse-493e25d5545389264f696be0e07544bf82a0818a.tar.xz
Move storage functions for push calculations
This will allow push actions for an event to be calculated on workers.
Diffstat (limited to '')
-rw-r--r--synapse/storage/pusher.py22
1 files changed, 13 insertions, 9 deletions
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index f4af3e4caa..307660b99a 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -175,11 +175,6 @@ class PusherWorkerStore(SQLBaseStore):
             "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn
         )
 
-
-class PusherStore(PusherWorkerStore):
-    def get_pushers_stream_token(self):
-        return self._pushers_id_gen.get_current_token()
-
     @cachedInlineCallbacks(num_args=1, max_entries=15000)
     def get_if_user_has_pusher(self, user_id):
         # This only exists for the cachedList decorator
@@ -201,6 +196,11 @@ class PusherStore(PusherWorkerStore):
 
         defer.returnValue(result)
 
+
+class PusherStore(PusherWorkerStore):
+    def get_pushers_stream_token(self):
+        return self._pushers_id_gen.get_current_token()
+
     @defer.inlineCallbacks
     def add_pusher(self, user_id, access_token, kind, app_id,
                    app_display_name, device_display_name,
@@ -233,14 +233,18 @@ class PusherStore(PusherWorkerStore):
             )
 
             if newly_inserted:
-                # get_if_user_has_pusher only cares if the user has
-                # at least *one* pusher.
-                self.get_if_user_has_pusher.invalidate(user_id,)
+                self.runInteraction(
+                    "add_pusher",
+                    self._invalidate_cache_and_stream,
+                    self.get_if_user_has_pusher, (user_id,)
+                )
 
     @defer.inlineCallbacks
     def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
         def delete_pusher_txn(txn, stream_id):
-            txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
+            self._invalidate_cache_and_stream(
+                txn, self.get_if_user_has_pusher, (user_id,)
+            )
 
             self._simple_delete_one_txn(
                 txn,