diff options
Diffstat (limited to 'synapse/storage/pusher.py')
-rw-r--r-- | synapse/storage/pusher.py | 81 |
1 files changed, 52 insertions, 29 deletions
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index d1669c778a..f7886dd1bb 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -18,6 +18,8 @@ from twisted.internet import defer from canonicaljson import encode_canonical_json +from synapse.util.caches.descriptors import cachedInlineCallbacks + import logging import simplejson as json import types @@ -107,31 +109,46 @@ class PusherStore(SQLBaseStore): "get_all_updated_pushers", get_all_updated_pushers_txn ) + @cachedInlineCallbacks(num_args=1) + def get_users_with_pushers_in_room(self, room_id): + users = yield self.get_users_in_room(room_id) + + result = yield self._simple_select_many_batch( + 'pushers', 'user_name', users, ['user_name'] + ) + + defer.returnValue([r['user_name'] for r in result]) + @defer.inlineCallbacks def add_pusher(self, user_id, access_token, kind, app_id, app_display_name, device_display_name, - pushkey, pushkey_ts, lang, data, profile_tag=""): - with self._pushers_id_gen.get_next() as stream_id: - yield self._simple_upsert( - "pushers", - dict( - app_id=app_id, - pushkey=pushkey, - user_name=user_id, - ), - dict( - access_token=access_token, - kind=kind, - app_display_name=app_display_name, - device_display_name=device_display_name, - ts=pushkey_ts, - lang=lang, - data=encode_canonical_json(data), - profile_tag=profile_tag, - id=stream_id, - ), - desc="add_pusher", - ) + pushkey, pushkey_ts, lang, data, last_stream_ordering, + profile_tag=""): + def f(txn): + txn.call_after(self.get_users_with_pushers_in_room.invalidate_all) + with self._pushers_id_gen.get_next() as stream_id: + return self._simple_upsert_txn( + txn, + "pushers", + dict( + app_id=app_id, + pushkey=pushkey, + user_name=user_id, + ), + dict( + access_token=access_token, + kind=kind, + app_display_name=app_display_name, + device_display_name=device_display_name, + ts=pushkey_ts, + lang=lang, + data=encode_canonical_json(data), + last_stream_ordering=last_stream_ordering, + profile_tag=profile_tag, + id=stream_id, + ), + ) + defer.returnValue((yield self.runInteraction("add_pusher", f))) @defer.inlineCallbacks def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): @@ -153,22 +170,28 @@ class PusherStore(SQLBaseStore): ) @defer.inlineCallbacks - def update_pusher_last_token(self, app_id, pushkey, user_id, last_token): + def update_pusher_last_stream_ordering(self, app_id, pushkey, user_id, + last_stream_ordering): yield self._simple_update_one( "pushers", {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, - {'last_token': last_token}, - desc="update_pusher_last_token", + {'last_stream_ordering': last_stream_ordering}, + desc="update_pusher_last_stream_ordering", ) @defer.inlineCallbacks - def update_pusher_last_token_and_success(self, app_id, pushkey, user_id, - last_token, last_success): + def update_pusher_last_stream_ordering_and_success(self, app_id, pushkey, + user_id, + last_stream_ordering, + last_success): yield self._simple_update_one( "pushers", {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, - {'last_token': last_token, 'last_success': last_success}, - desc="update_pusher_last_token_and_success", + { + 'last_stream_ordering': last_stream_ordering, + 'last_success': last_success + }, + desc="update_pusher_last_stream_ordering_and_success", ) @defer.inlineCallbacks |