diff options
author | Matthew Hodgson <matthew@matrix.org> | 2016-03-27 22:54:42 +0100 |
---|---|---|
committer | Matthew Hodgson <matthew@matrix.org> | 2016-03-27 22:54:42 +0100 |
commit | d9d48aad2d58deb5db422a5373a4dac9334a0618 (patch) | |
tree | 63e51372ca9ace4971403928bd46440ff9e455e2 /synapse/storage/pusher.py | |
parent | initial WIP of a tentative preview_url endpoint - incomplete, untested, exper... (diff) | |
parent | typo (diff) | |
download | synapse-d9d48aad2d58deb5db422a5373a4dac9334a0618.tar.xz |
Merge branch 'develop' into matthew/preview_urls
Diffstat (limited to 'synapse/storage/pusher.py')
-rw-r--r-- | synapse/storage/pusher.py | 69 |
1 files changed, 51 insertions, 18 deletions
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 8ec706178a..87b2ac5773 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -16,8 +16,6 @@ from ._base import SQLBaseStore from twisted.internet import defer -from synapse.api.errors import StoreError - from canonicaljson import encode_canonical_json import logging @@ -79,12 +77,41 @@ class PusherStore(SQLBaseStore): rows = yield self.runInteraction("get_all_pushers", get_pushers) defer.returnValue(rows) + def get_pushers_stream_token(self): + return self._pushers_id_gen.get_max_token() + + def get_all_updated_pushers(self, last_id, current_id, limit): + def get_all_updated_pushers_txn(txn): + sql = ( + "SELECT id, user_name, access_token, profile_tag, kind," + " app_id, app_display_name, device_display_name, pushkey, ts," + " lang, data" + " FROM pushers" + " WHERE ? < id AND id <= ?" + " ORDER BY id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + updated = txn.fetchall() + + sql = ( + "SELECT stream_id, user_id, app_id, pushkey" + " FROM deleted_pushers" + " WHERE ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + deleted = txn.fetchall() + + return (updated, deleted) + return self.runInteraction( + "get_all_updated_pushers", get_all_updated_pushers_txn + ) + @defer.inlineCallbacks - def add_pusher(self, user_id, access_token, profile_tag, kind, app_id, + def add_pusher(self, user_id, access_token, kind, app_id, app_display_name, device_display_name, - pushkey, pushkey_ts, lang, data): - try: - next_id = yield self._pushers_id_gen.get_next() + pushkey, pushkey_ts, lang, data, profile_tag=""): + with self._pushers_id_gen.get_next() as stream_id: yield self._simple_upsert( "pushers", dict( @@ -95,29 +122,35 @@ class PusherStore(SQLBaseStore): dict( access_token=access_token, kind=kind, - profile_tag=profile_tag, app_display_name=app_display_name, device_display_name=device_display_name, ts=pushkey_ts, lang=lang, data=encode_canonical_json(data), - ), - insertion_values=dict( - id=next_id, + profile_tag=profile_tag, + id=stream_id, ), desc="add_pusher", ) - except Exception as e: - logger.error("create_pusher with failed: %s", e) - raise StoreError(500, "Problem creating pusher.") @defer.inlineCallbacks def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): - yield self._simple_delete_one( - "pushers", - {"app_id": app_id, "pushkey": pushkey, 'user_name': user_id}, - desc="delete_pusher_by_app_id_pushkey_user_id", - ) + def delete_pusher_txn(txn, stream_id): + self._simple_delete_one_txn( + txn, + "pushers", + {"app_id": app_id, "pushkey": pushkey, "user_name": user_id} + ) + self._simple_upsert_txn( + txn, + "deleted_pushers", + {"app_id": app_id, "pushkey": pushkey, "user_id": user_id}, + {"stream_id": stream_id}, + ) + with self._pushers_id_gen.get_next() as stream_id: + yield self.runInteraction( + "delete_pusher", delete_pusher_txn, stream_id + ) @defer.inlineCallbacks def update_pusher_last_token(self, app_id, pushkey, user_id, last_token): |