From 7e2c89a37f3a5261f43b4d472b36219ac41dfb16 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 6 Apr 2016 15:42:15 +0100 Subject: Make pushers use the event_push_actions table instead of listening on an event stream & running the rules again. Sytest passes, but remaining to do: * Make badges work again * Remove old, unused code --- synapse/storage/event_push_actions.py | 48 ++++++++++++++++++ synapse/storage/events.py | 12 +++++ synapse/storage/pusher.py | 81 +++++++++++++++++++----------- synapse/storage/registration.py | 20 -------- synapse/storage/roommember.py | 1 + synapse/storage/schema/delta/31/pushers.py | 75 +++++++++++++++++++++++++++ 6 files changed, 188 insertions(+), 49 deletions(-) create mode 100644 synapse/storage/schema/delta/31/pushers.py (limited to 'synapse/storage') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 3933b6e2c5..5f61743e34 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -100,6 +100,54 @@ class EventPushActionsStore(SQLBaseStore): ) defer.returnValue(ret) + @defer.inlineCallbacks + def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering): + def f(txn): + sql = ( + "SELECT DISTINCT(user_id) FROM event_push_actions WHERE" + " stream_ordering >= ? AND stream_ordering >= ?" + ) + txn.execute(sql, (min_stream_ordering, max_stream_ordering)) + return [r[0] for r in txn.fetchall()] + ret = yield self.runInteraction("get_push_action_users_in_range", f) + defer.returnValue(ret) + + @defer.inlineCallbacks + def get_unread_push_actions_for_user_in_range(self, user_id, + min_stream_ordering, + max_stream_ordering=None): + def f(txn): + sql = ( + "SELECT event_id, stream_ordering, actions" + " FROM event_push_actions" + " WHERE user_id = ? AND stream_ordering > ?" + ) + args = [user_id, min_stream_ordering] + if max_stream_ordering is not None: + sql += " AND stream_ordering <= ?" + args.append(max_stream_ordering) + sql += " ORDER BY stream_ordering ASC" + txn.execute(sql, args) + return txn.fetchall() + ret = yield self.runInteraction("get_unread_push_actions_for_user_in_range", f) + defer.returnValue([ + { + "event_id": row[0], + "stream_ordering": row[1], + "actions": json.loads(row[2]), + } for row in ret + ]) + + @defer.inlineCallbacks + def get_latest_push_action_stream_ordering(self): + def f(txn): + txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions") + return txn.fetchone() + result = yield self.runInteraction( + "get_latest_push_action_stream_ordering", f + ) + defer.returnValue(result[0] or 0) + def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id): # Sad that we have to blow away the cache for the whole room here txn.call_after( diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5d299a1132..ceae8715ce 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -61,6 +61,17 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def persist_events(self, events_and_contexts, backfilled=False): + """ + Write events to the database + Args: + events_and_contexts: list of tuples of (event, context) + backfilled: ? + + Returns: Tuple of stream_orderings where the first is the minimum and + last is the maximum stream ordering assigned to the events when + persisting. + + """ if not events_and_contexts: return @@ -191,6 +202,7 @@ class EventsStore(SQLBaseStore): txn.call_after(self._get_current_state_for_key.invalidate_all) txn.call_after(self.get_rooms_for_user.invalidate_all) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) + txn.call_after(self.get_users_with_pushers_in_room.invalidate, (event.room_id,)) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,)) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index d1669c778a..f7886dd1bb 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -18,6 +18,8 @@ from twisted.internet import defer from canonicaljson import encode_canonical_json +from synapse.util.caches.descriptors import cachedInlineCallbacks + import logging import simplejson as json import types @@ -107,31 +109,46 @@ class PusherStore(SQLBaseStore): "get_all_updated_pushers", get_all_updated_pushers_txn ) + @cachedInlineCallbacks(num_args=1) + def get_users_with_pushers_in_room(self, room_id): + users = yield self.get_users_in_room(room_id) + + result = yield self._simple_select_many_batch( + 'pushers', 'user_name', users, ['user_name'] + ) + + defer.returnValue([r['user_name'] for r in result]) + @defer.inlineCallbacks def add_pusher(self, user_id, access_token, kind, app_id, app_display_name, device_display_name, - pushkey, pushkey_ts, lang, data, profile_tag=""): - with self._pushers_id_gen.get_next() as stream_id: - yield self._simple_upsert( - "pushers", - dict( - app_id=app_id, - pushkey=pushkey, - user_name=user_id, - ), - dict( - access_token=access_token, - kind=kind, - app_display_name=app_display_name, - device_display_name=device_display_name, - ts=pushkey_ts, - lang=lang, - data=encode_canonical_json(data), - profile_tag=profile_tag, - id=stream_id, - ), - desc="add_pusher", - ) + pushkey, pushkey_ts, lang, data, last_stream_ordering, + profile_tag=""): + def f(txn): + txn.call_after(self.get_users_with_pushers_in_room.invalidate_all) + with self._pushers_id_gen.get_next() as stream_id: + return self._simple_upsert_txn( + txn, + "pushers", + dict( + app_id=app_id, + pushkey=pushkey, + user_name=user_id, + ), + dict( + access_token=access_token, + kind=kind, + app_display_name=app_display_name, + device_display_name=device_display_name, + ts=pushkey_ts, + lang=lang, + data=encode_canonical_json(data), + last_stream_ordering=last_stream_ordering, + profile_tag=profile_tag, + id=stream_id, + ), + ) + defer.returnValue((yield self.runInteraction("add_pusher", f))) @defer.inlineCallbacks def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): @@ -153,22 +170,28 @@ class PusherStore(SQLBaseStore): ) @defer.inlineCallbacks - def update_pusher_last_token(self, app_id, pushkey, user_id, last_token): + def update_pusher_last_stream_ordering(self, app_id, pushkey, user_id, + last_stream_ordering): yield self._simple_update_one( "pushers", {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, - {'last_token': last_token}, - desc="update_pusher_last_token", + {'last_stream_ordering': last_stream_ordering}, + desc="update_pusher_last_stream_ordering", ) @defer.inlineCallbacks - def update_pusher_last_token_and_success(self, app_id, pushkey, user_id, - last_token, last_success): + def update_pusher_last_stream_ordering_and_success(self, app_id, pushkey, + user_id, + last_stream_ordering, + last_success): yield self._simple_update_one( "pushers", {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, - {'last_token': last_token, 'last_success': last_success}, - desc="update_pusher_last_token_and_success", + { + 'last_stream_ordering': last_stream_ordering, + 'last_success': last_success + }, + desc="update_pusher_last_stream_ordering_and_success", ) @defer.inlineCallbacks diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index d46a963bb8..701dd2f656 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -319,26 +319,6 @@ class RegistrationStore(SQLBaseStore): defer.returnValue(res if res else False) - @cachedList(cache=is_guest.cache, list_name="user_ids", num_args=1, - inlineCallbacks=True) - def are_guests(self, user_ids): - sql = "SELECT name, is_guest FROM users WHERE name IN (%s)" % ( - ",".join("?" for _ in user_ids), - ) - - rows = yield self._execute( - "are_guests", self.cursor_to_dict, sql, *user_ids - ) - - result = {user_id: False for user_id in user_ids} - - result.update({ - row["name"]: bool(row["is_guest"]) - for row in rows - }) - - defer.returnValue(result) - def _query_for_auth(self, txn, token): sql = ( "SELECT users.name, users.is_guest, access_tokens.id as token_id" diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 66e7a40e3c..22a690aa8d 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -58,6 +58,7 @@ class RoomMemberStore(SQLBaseStore): txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,)) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) + txn.call_after(self.get_users_with_pushers_in_room.invalidate, (event.room_id,)) txn.call_after( self._membership_stream_cache.entity_has_changed, event.state_key, event.internal_metadata.stream_ordering diff --git a/synapse/storage/schema/delta/31/pushers.py b/synapse/storage/schema/delta/31/pushers.py new file mode 100644 index 0000000000..7e0e385fb5 --- /dev/null +++ b/synapse/storage/schema/delta/31/pushers.py @@ -0,0 +1,75 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Change the last_token to last_stream_ordering now that pushers no longer +# listen on an event stream but instead select out of the event_push_actions +# table. + + +import logging + +logger = logging.getLogger(__name__) + + +def token_to_stream_ordering(token): + return int(token[1:].split('_')[0]) + + +def run_upgrade(cur, database_engine, *args, **kwargs): + logger.info("Porting pushers table, delta 31...") + cur.execute(""" + CREATE TABLE IF NOT EXISTS pushers2 ( + id BIGINT PRIMARY KEY, + user_name TEXT NOT NULL, + access_token BIGINT DEFAULT NULL, + profile_tag VARCHAR(32) NOT NULL, + kind VARCHAR(8) NOT NULL, + app_id VARCHAR(64) NOT NULL, + app_display_name VARCHAR(64) NOT NULL, + device_display_name VARCHAR(128) NOT NULL, + pushkey TEXT NOT NULL, + ts BIGINT NOT NULL, + lang VARCHAR(8), + data TEXT, + last_stream_ordering INTEGER, + last_success BIGINT, + failing_since BIGINT, + UNIQUE (app_id, pushkey, user_name) + ) + """) + cur.execute("""SELECT + id, user_name, access_token, profile_tag, kind, + app_id, app_display_name, device_display_name, + pushkey, ts, lang, data, last_token, last_success, + failing_since + FROM pushers + """) + count = 0 + for row in cur.fetchall(): + row = list(row) + row[12] = token_to_stream_ordering(row[12]) + cur.execute(database_engine.convert_param_style(""" + INSERT into pushers2 ( + id, user_name, access_token, profile_tag, kind, + app_id, app_display_name, device_display_name, + pushkey, ts, lang, data, last_stream_ordering, last_success, + failing_since + ) values (%s)""" % (','.join(['?' for _ in range(len(row))]))), + row + ) + count += 1 + cur.execute("DROP TABLE pushers") + cur.execute("ALTER TABLE pushers2 RENAME TO pushers") + logger.info("Moved %d pushers to new table", count) -- cgit 1.5.1 From 0fd1cd24003b54e475985cf90db4223c3098375d Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 6 Apr 2016 16:50:47 +0100 Subject: pep8 --- synapse/storage/event_push_actions.py | 2 +- synapse/storage/events.py | 4 +++- synapse/storage/registration.py | 2 +- synapse/storage/roommember.py | 4 +++- 4 files changed, 8 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 5f61743e34..4d72e4a85e 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -144,7 +144,7 @@ class EventPushActionsStore(SQLBaseStore): txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions") return txn.fetchone() result = yield self.runInteraction( - "get_latest_push_action_stream_ordering", f + "get_latest_push_action_stream_ordering", f ) defer.returnValue(result[0] or 0) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ceae8715ce..5be5bc01b1 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -202,7 +202,9 @@ class EventsStore(SQLBaseStore): txn.call_after(self._get_current_state_for_key.invalidate_all) txn.call_after(self.get_rooms_for_user.invalidate_all) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) - txn.call_after(self.get_users_with_pushers_in_room.invalidate, (event.room_id,)) + txn.call_after( + self.get_users_with_pushers_in_room.invalidate, (event.room_id,) + ) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,)) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 701dd2f656..7af0cae6a5 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -20,7 +20,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError, Codes from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks class RegistrationStore(SQLBaseStore): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 22a690aa8d..088ad0f914 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -58,7 +58,9 @@ class RoomMemberStore(SQLBaseStore): txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,)) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) - txn.call_after(self.get_users_with_pushers_in_room.invalidate, (event.room_id,)) + txn.call_after( + self.get_users_with_pushers_in_room.invalidate, (event.room_id,) + ) txn.call_after( self._membership_stream_cache.entity_has_changed, event.state_key, event.internal_metadata.stream_ordering -- cgit 1.5.1 From 92e3071623c34350bf072bb77e089d5d6d5f41c2 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 7 Apr 2016 15:39:53 +0100 Subject: Send badge count pushes. Also fix bugs with retrying. --- synapse/handlers/receipts.py | 21 +++++++++++++++++---- synapse/push/httppusher.py | 45 ++++++++++++++++++++++++++++---------------- synapse/push/pusherpool.py | 20 +++++++++++++++++++- synapse/storage/receipts.py | 9 ++++++--- 4 files changed, 71 insertions(+), 24 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 935c339707..26b0368080 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -80,6 +80,9 @@ class ReceiptsHandler(BaseHandler): def _handle_new_receipts(self, receipts): """Takes a list of receipts, stores them and informs the notifier. """ + min_batch_id = None + max_batch_id = None + for receipt in receipts: room_id = receipt["room_id"] receipt_type = receipt["receipt_type"] @@ -97,10 +100,20 @@ class ReceiptsHandler(BaseHandler): stream_id, max_persisted_id = res - with PreserveLoggingContext(): - self.notifier.on_new_event( - "receipt_key", max_persisted_id, rooms=[room_id] - ) + if min_batch_id is None or stream_id < min_batch_id: + min_batch_id = stream_id + if max_batch_id is None or max_persisted_id > max_batch_id: + max_batch_id = max_persisted_id + + affected_room_ids = list(set([r["room_id"] for r in receipts])) + + with PreserveLoggingContext(): + self.notifier.on_new_event( + "receipt_key", max_batch_id, rooms=affected_room_ids + ) + self.hs.get_pusherpool().on_new_receipts( + min_batch_id, max_batch_id, affected_room_ids + ) defer.returnValue(True) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index d695885649..0d5450bc01 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -76,15 +76,25 @@ class HttpPusher(object): self.data_minus_url.update(self.data) del self.data_minus_url['url'] + @defer.inlineCallbacks def on_started(self): - self._process() + yield self._process() + @defer.inlineCallbacks def on_new_notifications(self, min_stream_ordering, max_stream_ordering): self.max_stream_ordering = max_stream_ordering - self._process() + yield self._process() + + @defer.inlineCallbacks + def on_new_receipts(self, min_stream_id, max_stream_id): + # We could check the receipts are actually m.read receipts here, + # but currently that's the only type of receipt anyway... + badge = yield push_tools.get_badge_count(self.hs, self.user_id) + yield self.send_badge(badge) + @defer.inlineCallbacks def on_timer(self): - self._process() + yield self._process() def on_stop(self): if self.timed_call: @@ -106,22 +116,24 @@ class HttpPusher(object): self.last_stream_ordering, self.clock.time_msec() ) - self.failing_since = None - yield self.store.update_pusher_failing_since( - self.app_id, self.pushkey, self.user_id, - self.failing_since - ) + if self.failing_since: + self.failing_since = None + yield self.store.update_pusher_failing_since( + self.app_id, self.pushkey, self.user_id, + self.failing_since + ) else: - self.failing_since = self.clock.time_msec() - yield self.store.update_pusher_failing_since( - self.app_id, self.pushkey, self.user_id, - self.failing_since - ) + if not self.failing_since: + self.failing_since = self.clock.time_msec() + yield self.store.update_pusher_failing_since( + self.app_id, self.pushkey, self.user_id, + self.failing_since + ) if ( self.failing_since and self.failing_since < - self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER + self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER_MS ): # we really only give up so that if the URL gets # fixed, we don't suddenly deliver a load @@ -148,7 +160,7 @@ class HttpPusher(object): else: logger.info("Push failed: delaying for %ds", self.backoff_delay) self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer) - self.backoff_delay = min(self.backoff_delay, self.MAX_BACKOFF_SEC) + self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC) break @defer.inlineCallbacks @@ -191,7 +203,8 @@ class HttpPusher(object): d = { 'notification': { - 'id': event.event_id, + 'id': event.event_id, # deprecated: remove soon + 'event_id': event.event_id, 'room_id': event.room_id, 'type': event.type, 'sender': event.user_id, diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index b67ad455ea..7b1ce81e9a 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -126,10 +126,28 @@ class PusherPool: for u in users_affected: if u in self.pushers: for p in self.pushers[u].values(): - p.on_new_notifications(min_stream_id, max_stream_id) + yield p.on_new_notifications(min_stream_id, max_stream_id) except: logger.exception("Exception in pusher on_new_notifications") + @defer.inlineCallbacks + def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids): + yield run_on_reactor() + try: + # Need to subtract 1 from the minimum because the lower bound here + # is not inclusive + updated_receipts = yield self.store.get_all_updated_receipts( + min_stream_id - 1, max_stream_id + ) + # This returns a tuple, user_id is at index 3 + users_affected = set([r[3] for r in updated_receipts]) + for u in users_affected: + if u in self.pushers: + for p in self.pushers[u].values(): + yield p.on_new_receipts(min_stream_id, max_stream_id) + except: + logger.exception("Exception in pusher on_new_receipts") + @defer.inlineCallbacks def _refresh_pusher(self, app_id, pushkey, user_id): resultlist = yield self.store.get_pushers_by_app_id_and_pushkey( diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 4befebc8e2..59d1ac0314 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -390,16 +390,19 @@ class ReceiptsStore(SQLBaseStore): } ) - def get_all_updated_receipts(self, last_id, current_id, limit): + def get_all_updated_receipts(self, last_id, current_id, limit=None): def get_all_updated_receipts_txn(txn): sql = ( "SELECT stream_id, room_id, receipt_type, user_id, event_id, data" " FROM receipts_linearized" " WHERE ? < stream_id AND stream_id <= ?" " ORDER BY stream_id ASC" - " LIMIT ?" ) - txn.execute(sql, (last_id, current_id, limit)) + args = [last_id, current_id] + if limit is not None: + sql += " LIMIT ?" + args.append(limit) + txn.execute(sql, args) return txn.fetchall() return self.runInteraction( -- cgit 1.5.1 From 2d5c693fd3c72800980f906b8255e3619ac524e2 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 7 Apr 2016 16:43:54 +0100 Subject: Fix port script for changes merged from develop --- synapse/storage/schema/delta/31/pushers.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/31/pushers.py b/synapse/storage/schema/delta/31/pushers.py index 7e0e385fb5..d07bab012f 100644 --- a/synapse/storage/schema/delta/31/pushers.py +++ b/synapse/storage/schema/delta/31/pushers.py @@ -27,7 +27,7 @@ def token_to_stream_ordering(token): return int(token[1:].split('_')[0]) -def run_upgrade(cur, database_engine, *args, **kwargs): +def run_create(cur, database_engine, *args, **kwargs): logger.info("Porting pushers table, delta 31...") cur.execute(""" CREATE TABLE IF NOT EXISTS pushers2 ( @@ -73,3 +73,6 @@ def run_upgrade(cur, database_engine, *args, **kwargs): cur.execute("DROP TABLE pushers") cur.execute("ALTER TABLE pushers2 RENAME TO pushers") logger.info("Moved %d pushers to new table", count) + +def run_upgrade(cur, database_engine, *args, **kwargs): + pass -- cgit 1.5.1 From 05d044aac396de9dff64ffb47e8b9a3f43ad0919 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 7 Apr 2016 16:45:38 +0100 Subject: pep8 --- synapse/storage/schema/delta/31/pushers.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/31/pushers.py b/synapse/storage/schema/delta/31/pushers.py index d07bab012f..93367fa09e 100644 --- a/synapse/storage/schema/delta/31/pushers.py +++ b/synapse/storage/schema/delta/31/pushers.py @@ -74,5 +74,6 @@ def run_create(cur, database_engine, *args, **kwargs): cur.execute("ALTER TABLE pushers2 RENAME TO pushers") logger.info("Moved %d pushers to new table", count) + def run_upgrade(cur, database_engine, *args, **kwargs): pass -- cgit 1.5.1 From 3fb35cbd6fc52905c88344fd3ea55a4ee1d1c478 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 7 Apr 2016 17:33:37 +0100 Subject: Oops, inequality fail --- synapse/storage/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 4d72e4a85e..355478957d 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -105,7 +105,7 @@ class EventPushActionsStore(SQLBaseStore): def f(txn): sql = ( "SELECT DISTINCT(user_id) FROM event_push_actions WHERE" - " stream_ordering >= ? AND stream_ordering >= ?" + " stream_ordering >= ? AND stream_ordering <= ?" ) txn.execute(sql, (min_stream_ordering, max_stream_ordering)) return [r[0] for r in txn.fetchall()] -- cgit 1.5.1 From 4836864f5681fbcca34a4c40384a7c4c8309b4e2 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 7 Apr 2016 17:38:48 +0100 Subject: generate id in the main thread --- synapse/storage/pusher.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index f7886dd1bb..b314e3ab4f 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -124,9 +124,9 @@ class PusherStore(SQLBaseStore): app_display_name, device_display_name, pushkey, pushkey_ts, lang, data, last_stream_ordering, profile_tag=""): - def f(txn): - txn.call_after(self.get_users_with_pushers_in_room.invalidate_all) - with self._pushers_id_gen.get_next() as stream_id: + with self._pushers_id_gen.get_next() as stream_id: + def f(txn): + txn.call_after(self.get_users_with_pushers_in_room.invalidate_all) return self._simple_upsert_txn( txn, "pushers", -- cgit 1.5.1 From d9f38561c8855fa6893868069f0ec00d802618df Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 7 Apr 2016 17:45:01 +0100 Subject: Literally a dictionary --- synapse/storage/pusher.py | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index b314e3ab4f..b34a30a8fb 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -130,23 +130,23 @@ class PusherStore(SQLBaseStore): return self._simple_upsert_txn( txn, "pushers", - dict( - app_id=app_id, - pushkey=pushkey, - user_name=user_id, - ), - dict( - access_token=access_token, - kind=kind, - app_display_name=app_display_name, - device_display_name=device_display_name, - ts=pushkey_ts, - lang=lang, - data=encode_canonical_json(data), - last_stream_ordering=last_stream_ordering, - profile_tag=profile_tag, - id=stream_id, - ), + { + "app_id": app_id, + "pushkey": pushkey, + "user_name": user_id, + }, + { + "access_token": access_token, + "kind": kind, + "app_display_name": app_display_name, + "device_display_name": device_display_name, + "ts": pushkey_ts, + "lang": lang, + "data": encode_canonical_json(data), + "last_stream_ordering": last_stream_ordering, + "profile_tag": profile_tag, + "id": stream_id, + }, ) defer.returnValue((yield self.runInteraction("add_pusher", f))) -- cgit 1.5.1 From ed3979df5faac6d63990f4230662ff8cdcf59584 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 8 Apr 2016 15:29:59 +0100 Subject: Fix invite pushes * If the event is an invite event, add the invitee to list of user we run push rules for (if they have a pusher etc) * Move invite_for_me to be higher prio than member events otherwise member events matches them * Spell override right --- synapse/push/action_generator.py | 6 +-- synapse/push/baserules.py | 72 ++++++++++++++++---------------- synapse/push/bulk_push_rule_evaluator.py | 12 +++++- synapse/storage/pusher.py | 7 ++++ 4 files changed, 58 insertions(+), 39 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 84efcdd184..59e512f507 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -15,7 +15,7 @@ from twisted.internet import defer -from .bulk_push_rule_evaluator import evaluator_for_room_id +from .bulk_push_rule_evaluator import evaluator_for_event import logging @@ -35,8 +35,8 @@ class ActionGenerator: @defer.inlineCallbacks def handle_push_actions_for_event(self, event, context, handler): - bulk_evaluator = yield evaluator_for_room_id( - event.room_id, self.hs, self.store + bulk_evaluator = yield evaluator_for_event( + event, self.hs, self.store ) actions_by_user = yield bulk_evaluator.action_for_event_by_user( diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py index 6add94beeb..8a174feeaf 100644 --- a/synapse/push/baserules.py +++ b/synapse/push/baserules.py @@ -79,7 +79,7 @@ def make_base_append_rules(kind, modified_base_rules): rules = [] if kind == 'override': - rules = BASE_APPEND_OVRRIDE_RULES + rules = BASE_APPEND_OVERRIDE_RULES elif kind == 'underride': rules = BASE_APPEND_UNDERRIDE_RULES elif kind == 'content': @@ -148,7 +148,7 @@ BASE_PREPEND_OVERRIDE_RULES = [ ] -BASE_APPEND_OVRRIDE_RULES = [ +BASE_APPEND_OVERRIDE_RULES = [ { 'rule_id': 'global/override/.m.rule.suppress_notices', 'conditions': [ @@ -163,6 +163,40 @@ BASE_APPEND_OVRRIDE_RULES = [ 'dont_notify', ] }, + # NB. .m.rule.invite_for_me must be higher prio than .m.rule.member_event + # otherwise invites will be matched by .m.rule.member_event + { + 'rule_id': 'global/underride/.m.rule.invite_for_me', + 'conditions': [ + { + 'kind': 'event_match', + 'key': 'type', + 'pattern': 'm.room.member', + '_id': '_member', + }, + { + 'kind': 'event_match', + 'key': 'content.membership', + 'pattern': 'invite', + '_id': '_invite_member', + }, + { + 'kind': 'event_match', + 'key': 'state_key', + 'pattern_type': 'user_id' + }, + ], + 'actions': [ + 'notify', + { + 'set_tweak': 'sound', + 'value': 'default' + }, { + 'set_tweak': 'highlight', + 'value': False + } + ] + }, # Will we sometimes want to know about people joining and leaving? # Perhaps: if so, this could be expanded upon. Seems the most usual case # is that we don't though. We add this override rule so that even if @@ -251,38 +285,6 @@ BASE_APPEND_UNDERRIDE_RULES = [ } ] }, - { - 'rule_id': 'global/underride/.m.rule.invite_for_me', - 'conditions': [ - { - 'kind': 'event_match', - 'key': 'type', - 'pattern': 'm.room.member', - '_id': '_member', - }, - { - 'kind': 'event_match', - 'key': 'content.membership', - 'pattern': 'invite', - '_id': '_invite_member', - }, - { - 'kind': 'event_match', - 'key': 'state_key', - 'pattern_type': 'user_id' - }, - ], - 'actions': [ - 'notify', - { - 'set_tweak': 'sound', - 'value': 'default' - }, { - 'set_tweak': 'highlight', - 'value': False - } - ] - }, { 'rule_id': 'global/underride/.m.rule.message', 'conditions': [ @@ -315,7 +317,7 @@ for r in BASE_PREPEND_OVERRIDE_RULES: r['default'] = True BASE_RULE_IDS.add(r['rule_id']) -for r in BASE_APPEND_OVRRIDE_RULES: +for r in BASE_APPEND_OVERRIDE_RULES: r['priority_class'] = PRIORITY_CLASS_MAP['override'] r['default'] = True BASE_RULE_IDS.add(r['rule_id']) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 7f94591dcb..49216f0c15 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -69,7 +69,8 @@ def _get_rules(room_id, user_ids, store): @defer.inlineCallbacks -def evaluator_for_room_id(room_id, hs, store): +def evaluator_for_event(event, hs, store): + room_id = event.room_id users_with_pushers = yield store.get_users_with_pushers_in_room(room_id) receipts = yield store.get_receipts_for_room(room_id, "m.read") @@ -79,6 +80,15 @@ def evaluator_for_room_id(room_id, hs, store): if hs.is_mine_id(r['user_id']): user_ids.add(r['user_id']) + # if this event is an invite event, we may need to run rules for the user + # who's been invited, otherwise they won't get told they've been invited + if event.type == 'm.room.member' and event.content['membership'] == 'invite': + invited_user = event.state_key + if invited_user and hs.is_mine_id(invited_user): + has_pusher = yield store.user_has_pusher(invited_user) + if has_pusher: + user_ids.add(invited_user) + user_ids = list(user_ids) rules_by_user = yield _get_rules(room_id, user_ids, store) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index b34a30a8fb..19888a8e76 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -49,6 +49,13 @@ class PusherStore(SQLBaseStore): return rows + @defer.inlineCallbacks + def user_has_pusher(self, user_id): + ret = yield self._simple_select_one_onecol( + "pushers", {"user_name": user_id}, "id", allow_none=True + ) + defer.returnValue(ret is not None) + @defer.inlineCallbacks def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey): def r(txn): -- cgit 1.5.1