diff options
author | David Baker <dave@matrix.org> | 2016-04-06 15:42:15 +0100 |
---|---|---|
committer | David Baker <dave@matrix.org> | 2016-04-06 15:42:15 +0100 |
commit | 7e2c89a37f3a5261f43b4d472b36219ac41dfb16 (patch) | |
tree | e52d8d4683cc8229004f17b401a4c5b6e633391b /synapse/storage | |
parent | Merge pull request #691 from matrix-org/erikj/member (diff) | |
download | synapse-7e2c89a37f3a5261f43b4d472b36219ac41dfb16.tar.xz |
Make pushers use the event_push_actions table instead of listening on an event stream & running the rules again. Sytest passes, but remaining to do:
* Make badges work again * Remove old, unused code
Diffstat (limited to '')
-rw-r--r-- | synapse/storage/event_push_actions.py | 48 | ||||
-rw-r--r-- | synapse/storage/events.py | 12 | ||||
-rw-r--r-- | synapse/storage/pusher.py | 81 | ||||
-rw-r--r-- | synapse/storage/registration.py | 20 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 1 | ||||
-rw-r--r-- | synapse/storage/schema/delta/31/pushers.py | 75 |
6 files changed, 188 insertions, 49 deletions
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 3933b6e2c5..5f61743e34 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -100,6 +100,54 @@ class EventPushActionsStore(SQLBaseStore): ) defer.returnValue(ret) + @defer.inlineCallbacks + def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering): + def f(txn): + sql = ( + "SELECT DISTINCT(user_id) FROM event_push_actions WHERE" + " stream_ordering >= ? AND stream_ordering >= ?" + ) + txn.execute(sql, (min_stream_ordering, max_stream_ordering)) + return [r[0] for r in txn.fetchall()] + ret = yield self.runInteraction("get_push_action_users_in_range", f) + defer.returnValue(ret) + + @defer.inlineCallbacks + def get_unread_push_actions_for_user_in_range(self, user_id, + min_stream_ordering, + max_stream_ordering=None): + def f(txn): + sql = ( + "SELECT event_id, stream_ordering, actions" + " FROM event_push_actions" + " WHERE user_id = ? AND stream_ordering > ?" + ) + args = [user_id, min_stream_ordering] + if max_stream_ordering is not None: + sql += " AND stream_ordering <= ?" + args.append(max_stream_ordering) + sql += " ORDER BY stream_ordering ASC" + txn.execute(sql, args) + return txn.fetchall() + ret = yield self.runInteraction("get_unread_push_actions_for_user_in_range", f) + defer.returnValue([ + { + "event_id": row[0], + "stream_ordering": row[1], + "actions": json.loads(row[2]), + } for row in ret + ]) + + @defer.inlineCallbacks + def get_latest_push_action_stream_ordering(self): + def f(txn): + txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions") + return txn.fetchone() + result = yield self.runInteraction( + "get_latest_push_action_stream_ordering", f + ) + defer.returnValue(result[0] or 0) + def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id): # Sad that we have to blow away the cache for the whole room here txn.call_after( diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5d299a1132..ceae8715ce 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -61,6 +61,17 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def persist_events(self, events_and_contexts, backfilled=False): + """ + Write events to the database + Args: + events_and_contexts: list of tuples of (event, context) + backfilled: ? + + Returns: Tuple of stream_orderings where the first is the minimum and + last is the maximum stream ordering assigned to the events when + persisting. + + """ if not events_and_contexts: return @@ -191,6 +202,7 @@ class EventsStore(SQLBaseStore): txn.call_after(self._get_current_state_for_key.invalidate_all) txn.call_after(self.get_rooms_for_user.invalidate_all) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) + txn.call_after(self.get_users_with_pushers_in_room.invalidate, (event.room_id,)) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,)) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index d1669c778a..f7886dd1bb 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -18,6 +18,8 @@ from twisted.internet import defer from canonicaljson import encode_canonical_json +from synapse.util.caches.descriptors import cachedInlineCallbacks + import logging import simplejson as json import types @@ -107,31 +109,46 @@ class PusherStore(SQLBaseStore): "get_all_updated_pushers", get_all_updated_pushers_txn ) + @cachedInlineCallbacks(num_args=1) + def get_users_with_pushers_in_room(self, room_id): + users = yield self.get_users_in_room(room_id) + + result = yield self._simple_select_many_batch( + 'pushers', 'user_name', users, ['user_name'] + ) + + defer.returnValue([r['user_name'] for r in result]) + @defer.inlineCallbacks def add_pusher(self, user_id, access_token, kind, app_id, app_display_name, device_display_name, - pushkey, pushkey_ts, lang, data, profile_tag=""): - with self._pushers_id_gen.get_next() as stream_id: - yield self._simple_upsert( - "pushers", - dict( - app_id=app_id, - pushkey=pushkey, - user_name=user_id, - ), - dict( - access_token=access_token, - kind=kind, - app_display_name=app_display_name, - device_display_name=device_display_name, - ts=pushkey_ts, - lang=lang, - data=encode_canonical_json(data), - profile_tag=profile_tag, - id=stream_id, - ), - desc="add_pusher", - ) + pushkey, pushkey_ts, lang, data, last_stream_ordering, + profile_tag=""): + def f(txn): + txn.call_after(self.get_users_with_pushers_in_room.invalidate_all) + with self._pushers_id_gen.get_next() as stream_id: + return self._simple_upsert_txn( + txn, + "pushers", + dict( + app_id=app_id, + pushkey=pushkey, + user_name=user_id, + ), + dict( + access_token=access_token, + kind=kind, + app_display_name=app_display_name, + device_display_name=device_display_name, + ts=pushkey_ts, + lang=lang, + data=encode_canonical_json(data), + last_stream_ordering=last_stream_ordering, + profile_tag=profile_tag, + id=stream_id, + ), + ) + defer.returnValue((yield self.runInteraction("add_pusher", f))) @defer.inlineCallbacks def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): @@ -153,22 +170,28 @@ class PusherStore(SQLBaseStore): ) @defer.inlineCallbacks - def update_pusher_last_token(self, app_id, pushkey, user_id, last_token): + def update_pusher_last_stream_ordering(self, app_id, pushkey, user_id, + last_stream_ordering): yield self._simple_update_one( "pushers", {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, - {'last_token': last_token}, - desc="update_pusher_last_token", + {'last_stream_ordering': last_stream_ordering}, + desc="update_pusher_last_stream_ordering", ) @defer.inlineCallbacks - def update_pusher_last_token_and_success(self, app_id, pushkey, user_id, - last_token, last_success): + def update_pusher_last_stream_ordering_and_success(self, app_id, pushkey, + user_id, + last_stream_ordering, + last_success): yield self._simple_update_one( "pushers", {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, - {'last_token': last_token, 'last_success': last_success}, - desc="update_pusher_last_token_and_success", + { + 'last_stream_ordering': last_stream_ordering, + 'last_success': last_success + }, + desc="update_pusher_last_stream_ordering_and_success", ) @defer.inlineCallbacks diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index d46a963bb8..701dd2f656 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -319,26 +319,6 @@ class RegistrationStore(SQLBaseStore): defer.returnValue(res if res else False) - @cachedList(cache=is_guest.cache, list_name="user_ids", num_args=1, - inlineCallbacks=True) - def are_guests(self, user_ids): - sql = "SELECT name, is_guest FROM users WHERE name IN (%s)" % ( - ",".join("?" for _ in user_ids), - ) - - rows = yield self._execute( - "are_guests", self.cursor_to_dict, sql, *user_ids - ) - - result = {user_id: False for user_id in user_ids} - - result.update({ - row["name"]: bool(row["is_guest"]) - for row in rows - }) - - defer.returnValue(result) - def _query_for_auth(self, txn, token): sql = ( "SELECT users.name, users.is_guest, access_tokens.id as token_id" diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 66e7a40e3c..22a690aa8d 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -58,6 +58,7 @@ class RoomMemberStore(SQLBaseStore): txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,)) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) + txn.call_after(self.get_users_with_pushers_in_room.invalidate, (event.room_id,)) txn.call_after( self._membership_stream_cache.entity_has_changed, event.state_key, event.internal_metadata.stream_ordering diff --git a/synapse/storage/schema/delta/31/pushers.py b/synapse/storage/schema/delta/31/pushers.py new file mode 100644 index 0000000000..7e0e385fb5 --- /dev/null +++ b/synapse/storage/schema/delta/31/pushers.py @@ -0,0 +1,75 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Change the last_token to last_stream_ordering now that pushers no longer +# listen on an event stream but instead select out of the event_push_actions +# table. + + +import logging + +logger = logging.getLogger(__name__) + + +def token_to_stream_ordering(token): + return int(token[1:].split('_')[0]) + + +def run_upgrade(cur, database_engine, *args, **kwargs): + logger.info("Porting pushers table, delta 31...") + cur.execute(""" + CREATE TABLE IF NOT EXISTS pushers2 ( + id BIGINT PRIMARY KEY, + user_name TEXT NOT NULL, + access_token BIGINT DEFAULT NULL, + profile_tag VARCHAR(32) NOT NULL, + kind VARCHAR(8) NOT NULL, + app_id VARCHAR(64) NOT NULL, + app_display_name VARCHAR(64) NOT NULL, + device_display_name VARCHAR(128) NOT NULL, + pushkey TEXT NOT NULL, + ts BIGINT NOT NULL, + lang VARCHAR(8), + data TEXT, + last_stream_ordering INTEGER, + last_success BIGINT, + failing_since BIGINT, + UNIQUE (app_id, pushkey, user_name) + ) + """) + cur.execute("""SELECT + id, user_name, access_token, profile_tag, kind, + app_id, app_display_name, device_display_name, + pushkey, ts, lang, data, last_token, last_success, + failing_since + FROM pushers + """) + count = 0 + for row in cur.fetchall(): + row = list(row) + row[12] = token_to_stream_ordering(row[12]) + cur.execute(database_engine.convert_param_style(""" + INSERT into pushers2 ( + id, user_name, access_token, profile_tag, kind, + app_id, app_display_name, device_display_name, + pushkey, ts, lang, data, last_stream_ordering, last_success, + failing_since + ) values (%s)""" % (','.join(['?' for _ in range(len(row))]))), + row + ) + count += 1 + cur.execute("DROP TABLE pushers") + cur.execute("ALTER TABLE pushers2 RENAME TO pushers") + logger.info("Moved %d pushers to new table", count) |