summary refs log tree commit diff
path: root/synapse/storage/pusher.py
diff options
context:
space:
mode:
authorDavid Baker <dave@matrix.org>2016-04-06 15:42:15 +0100
committerDavid Baker <dave@matrix.org>2016-04-06 15:42:15 +0100
commit7e2c89a37f3a5261f43b4d472b36219ac41dfb16 (patch)
treee52d8d4683cc8229004f17b401a4c5b6e633391b /synapse/storage/pusher.py
parentMerge pull request #691 from matrix-org/erikj/member (diff)
downloadsynapse-7e2c89a37f3a5261f43b4d472b36219ac41dfb16.tar.xz
Make pushers use the event_push_actions table instead of listening on an event stream & running the rules again. Sytest passes, but remaining to do:
 * Make badges work again
 * Remove old, unused code
Diffstat (limited to 'synapse/storage/pusher.py')
-rw-r--r--synapse/storage/pusher.py81
1 files changed, 52 insertions, 29 deletions
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index d1669c778a..f7886dd1bb 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -18,6 +18,8 @@ from twisted.internet import defer
 
 from canonicaljson import encode_canonical_json
 
+from synapse.util.caches.descriptors import cachedInlineCallbacks
+
 import logging
 import simplejson as json
 import types
@@ -107,31 +109,46 @@ class PusherStore(SQLBaseStore):
             "get_all_updated_pushers", get_all_updated_pushers_txn
         )
 
+    @cachedInlineCallbacks(num_args=1)
+    def get_users_with_pushers_in_room(self, room_id):
+        users = yield self.get_users_in_room(room_id)
+
+        result = yield self._simple_select_many_batch(
+            'pushers', 'user_name', users, ['user_name']
+        )
+
+        defer.returnValue([r['user_name'] for r in result])
+
     @defer.inlineCallbacks
     def add_pusher(self, user_id, access_token, kind, app_id,
                    app_display_name, device_display_name,
-                   pushkey, pushkey_ts, lang, data, profile_tag=""):
-        with self._pushers_id_gen.get_next() as stream_id:
-            yield self._simple_upsert(
-                "pushers",
-                dict(
-                    app_id=app_id,
-                    pushkey=pushkey,
-                    user_name=user_id,
-                ),
-                dict(
-                    access_token=access_token,
-                    kind=kind,
-                    app_display_name=app_display_name,
-                    device_display_name=device_display_name,
-                    ts=pushkey_ts,
-                    lang=lang,
-                    data=encode_canonical_json(data),
-                    profile_tag=profile_tag,
-                    id=stream_id,
-                ),
-                desc="add_pusher",
-            )
+                   pushkey, pushkey_ts, lang, data, last_stream_ordering,
+                   profile_tag=""):
+        def f(txn):
+            txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
+            with self._pushers_id_gen.get_next() as stream_id:
+                return self._simple_upsert_txn(
+                    txn,
+                    "pushers",
+                    dict(
+                        app_id=app_id,
+                        pushkey=pushkey,
+                        user_name=user_id,
+                    ),
+                    dict(
+                        access_token=access_token,
+                        kind=kind,
+                        app_display_name=app_display_name,
+                        device_display_name=device_display_name,
+                        ts=pushkey_ts,
+                        lang=lang,
+                        data=encode_canonical_json(data),
+                        last_stream_ordering=last_stream_ordering,
+                        profile_tag=profile_tag,
+                        id=stream_id,
+                    ),
+                )
+        defer.returnValue((yield self.runInteraction("add_pusher", f)))
 
     @defer.inlineCallbacks
     def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
@@ -153,22 +170,28 @@ class PusherStore(SQLBaseStore):
             )
 
     @defer.inlineCallbacks
-    def update_pusher_last_token(self, app_id, pushkey, user_id, last_token):
+    def update_pusher_last_stream_ordering(self, app_id, pushkey, user_id,
+                                           last_stream_ordering):
         yield self._simple_update_one(
             "pushers",
             {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
-            {'last_token': last_token},
-            desc="update_pusher_last_token",
+            {'last_stream_ordering': last_stream_ordering},
+            desc="update_pusher_last_stream_ordering",
         )
 
     @defer.inlineCallbacks
-    def update_pusher_last_token_and_success(self, app_id, pushkey, user_id,
-                                             last_token, last_success):
+    def update_pusher_last_stream_ordering_and_success(self, app_id, pushkey,
+                                                       user_id,
+                                                       last_stream_ordering,
+                                                       last_success):
         yield self._simple_update_one(
             "pushers",
             {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
-            {'last_token': last_token, 'last_success': last_success},
-            desc="update_pusher_last_token_and_success",
+            {
+                'last_stream_ordering': last_stream_ordering,
+                'last_success': last_success
+            },
+            desc="update_pusher_last_stream_ordering_and_success",
         )
 
     @defer.inlineCallbacks