diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index d1669c778a..f7886dd1bb 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -18,6 +18,8 @@ from twisted.internet import defer
from canonicaljson import encode_canonical_json
+from synapse.util.caches.descriptors import cachedInlineCallbacks
+
import logging
import simplejson as json
import types
@@ -107,31 +109,46 @@ class PusherStore(SQLBaseStore):
"get_all_updated_pushers", get_all_updated_pushers_txn
)
+ @cachedInlineCallbacks(num_args=1)
+ def get_users_with_pushers_in_room(self, room_id):
+ users = yield self.get_users_in_room(room_id)
+
+ result = yield self._simple_select_many_batch(
+ 'pushers', 'user_name', users, ['user_name']
+ )
+
+ defer.returnValue([r['user_name'] for r in result])
+
@defer.inlineCallbacks
def add_pusher(self, user_id, access_token, kind, app_id,
app_display_name, device_display_name,
- pushkey, pushkey_ts, lang, data, profile_tag=""):
- with self._pushers_id_gen.get_next() as stream_id:
- yield self._simple_upsert(
- "pushers",
- dict(
- app_id=app_id,
- pushkey=pushkey,
- user_name=user_id,
- ),
- dict(
- access_token=access_token,
- kind=kind,
- app_display_name=app_display_name,
- device_display_name=device_display_name,
- ts=pushkey_ts,
- lang=lang,
- data=encode_canonical_json(data),
- profile_tag=profile_tag,
- id=stream_id,
- ),
- desc="add_pusher",
- )
+ pushkey, pushkey_ts, lang, data, last_stream_ordering,
+ profile_tag=""):
+ def f(txn):
+ txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
+ with self._pushers_id_gen.get_next() as stream_id:
+ return self._simple_upsert_txn(
+ txn,
+ "pushers",
+ dict(
+ app_id=app_id,
+ pushkey=pushkey,
+ user_name=user_id,
+ ),
+ dict(
+ access_token=access_token,
+ kind=kind,
+ app_display_name=app_display_name,
+ device_display_name=device_display_name,
+ ts=pushkey_ts,
+ lang=lang,
+ data=encode_canonical_json(data),
+ last_stream_ordering=last_stream_ordering,
+ profile_tag=profile_tag,
+ id=stream_id,
+ ),
+ )
+ defer.returnValue((yield self.runInteraction("add_pusher", f)))
@defer.inlineCallbacks
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
@@ -153,22 +170,28 @@ class PusherStore(SQLBaseStore):
)
@defer.inlineCallbacks
- def update_pusher_last_token(self, app_id, pushkey, user_id, last_token):
+ def update_pusher_last_stream_ordering(self, app_id, pushkey, user_id,
+ last_stream_ordering):
yield self._simple_update_one(
"pushers",
{'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
- {'last_token': last_token},
- desc="update_pusher_last_token",
+ {'last_stream_ordering': last_stream_ordering},
+ desc="update_pusher_last_stream_ordering",
)
@defer.inlineCallbacks
- def update_pusher_last_token_and_success(self, app_id, pushkey, user_id,
- last_token, last_success):
+ def update_pusher_last_stream_ordering_and_success(self, app_id, pushkey,
+ user_id,
+ last_stream_ordering,
+ last_success):
yield self._simple_update_one(
"pushers",
{'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
- {'last_token': last_token, 'last_success': last_success},
- desc="update_pusher_last_token_and_success",
+ {
+ 'last_stream_ordering': last_stream_ordering,
+ 'last_success': last_success
+ },
+ desc="update_pusher_last_stream_ordering_and_success",
)
@defer.inlineCallbacks
|