diff options
author | Erik Johnston <erik@matrix.org> | 2019-10-21 12:56:42 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2019-10-21 16:05:06 +0100 |
commit | c66a06ac6b69b0a03f5c6284ded980399e9df94e (patch) | |
tree | 01dfd3b9098a9ace759403744d122c18efbd97ff /synapse/storage/pusher.py | |
parent | Merge branch 'master' into develop (diff) | |
download | synapse-c66a06ac6b69b0a03f5c6284ded980399e9df94e.tar.xz |
Move storage classes into a main "data store".
This is in preparation for having multiple data stores that offer different functionality, e.g. splitting out state or event storage.
Diffstat (limited to 'synapse/storage/pusher.py')
-rw-r--r-- | synapse/storage/pusher.py | 372 |
1 files changed, 0 insertions, 372 deletions
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py deleted file mode 100644 index b12e80440a..0000000000 --- a/synapse/storage/pusher.py +++ /dev/null @@ -1,372 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014-2016 OpenMarket Ltd -# Copyright 2018 New Vector Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging - -import six - -from canonicaljson import encode_canonical_json, json - -from twisted.internet import defer - -from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList - -from ._base import SQLBaseStore - -logger = logging.getLogger(__name__) - -if six.PY2: - db_binary_type = six.moves.builtins.buffer -else: - db_binary_type = memoryview - - -class PusherWorkerStore(SQLBaseStore): - def _decode_pushers_rows(self, rows): - for r in rows: - dataJson = r["data"] - r["data"] = None - try: - if isinstance(dataJson, db_binary_type): - dataJson = str(dataJson).decode("UTF8") - - r["data"] = json.loads(dataJson) - except Exception as e: - logger.warn( - "Invalid JSON in data for pusher %d: %s, %s", - r["id"], - dataJson, - e.args[0], - ) - pass - - if isinstance(r["pushkey"], db_binary_type): - r["pushkey"] = str(r["pushkey"]).decode("UTF8") - - return rows - - @defer.inlineCallbacks - def user_has_pusher(self, user_id): - ret = yield self._simple_select_one_onecol( - "pushers", {"user_name": user_id}, "id", allow_none=True - ) - return ret is not None - - def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey): - return self.get_pushers_by({"app_id": app_id, "pushkey": pushkey}) - - def get_pushers_by_user_id(self, user_id): - return self.get_pushers_by({"user_name": user_id}) - - @defer.inlineCallbacks - def get_pushers_by(self, keyvalues): - ret = yield self._simple_select_list( - "pushers", - keyvalues, - [ - "id", - "user_name", - "access_token", - "profile_tag", - "kind", - "app_id", - "app_display_name", - "device_display_name", - "pushkey", - "ts", - "lang", - "data", - "last_stream_ordering", - "last_success", - "failing_since", - ], - desc="get_pushers_by", - ) - return self._decode_pushers_rows(ret) - - @defer.inlineCallbacks - def get_all_pushers(self): - def get_pushers(txn): - txn.execute("SELECT * FROM pushers") - rows = self.cursor_to_dict(txn) - - return self._decode_pushers_rows(rows) - - rows = yield self.runInteraction("get_all_pushers", get_pushers) - return rows - - def get_all_updated_pushers(self, last_id, current_id, limit): - if last_id == current_id: - return defer.succeed(([], [])) - - def get_all_updated_pushers_txn(txn): - sql = ( - "SELECT id, user_name, access_token, profile_tag, kind," - " app_id, app_display_name, device_display_name, pushkey, ts," - " lang, data" - " FROM pushers" - " WHERE ? < id AND id <= ?" - " ORDER BY id ASC LIMIT ?" - ) - txn.execute(sql, (last_id, current_id, limit)) - updated = txn.fetchall() - - sql = ( - "SELECT stream_id, user_id, app_id, pushkey" - " FROM deleted_pushers" - " WHERE ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC LIMIT ?" - ) - txn.execute(sql, (last_id, current_id, limit)) - deleted = txn.fetchall() - - return updated, deleted - - return self.runInteraction( - "get_all_updated_pushers", get_all_updated_pushers_txn - ) - - def get_all_updated_pushers_rows(self, last_id, current_id, limit): - """Get all the pushers that have changed between the given tokens. - - Returns: - Deferred(list(tuple)): each tuple consists of: - stream_id (str) - user_id (str) - app_id (str) - pushkey (str) - was_deleted (bool): whether the pusher was added/updated (False) - or deleted (True) - """ - - if last_id == current_id: - return defer.succeed([]) - - def get_all_updated_pushers_rows_txn(txn): - sql = ( - "SELECT id, user_name, app_id, pushkey" - " FROM pushers" - " WHERE ? < id AND id <= ?" - " ORDER BY id ASC LIMIT ?" - ) - txn.execute(sql, (last_id, current_id, limit)) - results = [list(row) + [False] for row in txn] - - sql = ( - "SELECT stream_id, user_id, app_id, pushkey" - " FROM deleted_pushers" - " WHERE ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC LIMIT ?" - ) - txn.execute(sql, (last_id, current_id, limit)) - - results.extend(list(row) + [True] for row in txn) - results.sort() # Sort so that they're ordered by stream id - - return results - - return self.runInteraction( - "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn - ) - - @cachedInlineCallbacks(num_args=1, max_entries=15000) - def get_if_user_has_pusher(self, user_id): - # This only exists for the cachedList decorator - raise NotImplementedError() - - @cachedList( - cached_method_name="get_if_user_has_pusher", - list_name="user_ids", - num_args=1, - inlineCallbacks=True, - ) - def get_if_users_have_pushers(self, user_ids): - rows = yield self._simple_select_many_batch( - table="pushers", - column="user_name", - iterable=user_ids, - retcols=["user_name"], - desc="get_if_users_have_pushers", - ) - - result = {user_id: False for user_id in user_ids} - result.update({r["user_name"]: True for r in rows}) - - return result - - -class PusherStore(PusherWorkerStore): - def get_pushers_stream_token(self): - return self._pushers_id_gen.get_current_token() - - @defer.inlineCallbacks - def add_pusher( - self, - user_id, - access_token, - kind, - app_id, - app_display_name, - device_display_name, - pushkey, - pushkey_ts, - lang, - data, - last_stream_ordering, - profile_tag="", - ): - with self._pushers_id_gen.get_next() as stream_id: - # no need to lock because `pushers` has a unique key on - # (app_id, pushkey, user_name) so _simple_upsert will retry - yield self._simple_upsert( - table="pushers", - keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, - values={ - "access_token": access_token, - "kind": kind, - "app_display_name": app_display_name, - "device_display_name": device_display_name, - "ts": pushkey_ts, - "lang": lang, - "data": bytearray(encode_canonical_json(data)), - "last_stream_ordering": last_stream_ordering, - "profile_tag": profile_tag, - "id": stream_id, - }, - desc="add_pusher", - lock=False, - ) - - user_has_pusher = self.get_if_user_has_pusher.cache.get( - (user_id,), None, update_metrics=False - ) - - if user_has_pusher is not True: - # invalidate, since we the user might not have had a pusher before - yield self.runInteraction( - "add_pusher", - self._invalidate_cache_and_stream, - self.get_if_user_has_pusher, - (user_id,), - ) - - @defer.inlineCallbacks - def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): - def delete_pusher_txn(txn, stream_id): - self._invalidate_cache_and_stream( - txn, self.get_if_user_has_pusher, (user_id,) - ) - - self._simple_delete_one_txn( - txn, - "pushers", - {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, - ) - - # it's possible for us to end up with duplicate rows for - # (app_id, pushkey, user_id) at different stream_ids, but that - # doesn't really matter. - self._simple_insert_txn( - txn, - table="deleted_pushers", - values={ - "stream_id": stream_id, - "app_id": app_id, - "pushkey": pushkey, - "user_id": user_id, - }, - ) - - with self._pushers_id_gen.get_next() as stream_id: - yield self.runInteraction("delete_pusher", delete_pusher_txn, stream_id) - - @defer.inlineCallbacks - def update_pusher_last_stream_ordering( - self, app_id, pushkey, user_id, last_stream_ordering - ): - yield self._simple_update_one( - "pushers", - {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, - {"last_stream_ordering": last_stream_ordering}, - desc="update_pusher_last_stream_ordering", - ) - - @defer.inlineCallbacks - def update_pusher_last_stream_ordering_and_success( - self, app_id, pushkey, user_id, last_stream_ordering, last_success - ): - """Update the last stream ordering position we've processed up to for - the given pusher. - - Args: - app_id (str) - pushkey (str) - last_stream_ordering (int) - last_success (int) - - Returns: - Deferred[bool]: True if the pusher still exists; False if it has been deleted. - """ - updated = yield self._simple_update( - table="pushers", - keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, - updatevalues={ - "last_stream_ordering": last_stream_ordering, - "last_success": last_success, - }, - desc="update_pusher_last_stream_ordering_and_success", - ) - - return bool(updated) - - @defer.inlineCallbacks - def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since): - yield self._simple_update( - table="pushers", - keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, - updatevalues={"failing_since": failing_since}, - desc="update_pusher_failing_since", - ) - - @defer.inlineCallbacks - def get_throttle_params_by_room(self, pusher_id): - res = yield self._simple_select_list( - "pusher_throttle", - {"pusher": pusher_id}, - ["room_id", "last_sent_ts", "throttle_ms"], - desc="get_throttle_params_by_room", - ) - - params_by_room = {} - for row in res: - params_by_room[row["room_id"]] = { - "last_sent_ts": row["last_sent_ts"], - "throttle_ms": row["throttle_ms"], - } - - return params_by_room - - @defer.inlineCallbacks - def set_throttle_params(self, pusher_id, room_id, params): - # no need to lock because `pusher_throttle` has a primary key on - # (pusher, room_id) so _simple_upsert will retry - yield self._simple_upsert( - "pusher_throttle", - {"pusher": pusher_id, "room_id": room_id}, - params, - desc="set_throttle_params", - lock=False, - ) |