From 21f135ba763a583ecf9ba2714b5151f6b14b61fd Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 10 Dec 2015 16:26:08 +0000 Subject: Very first cut of calculating actions for events as they come in. Doesn't store them yet. Not very efficient. --- synapse/push/action_generator.py | 47 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 synapse/push/action_generator.py (limited to 'synapse/push') diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py new file mode 100644 index 0000000000..508eeaed95 --- /dev/null +++ b/synapse/push/action_generator.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +import push_rule_evaluator + +import logging + + +logger = logging.getLogger(__name__) + + +class ActionGenerator: + def __init__(self, store): + self.store = store + # really we want to get all user ids and all profile tags too, + # since we want the actions for each profile tag for every user and + # also actions for a client with no profile tag for each user. + # Currently the event stream doesn't support profile tags on an + # event stream, so we just run the rules for a client with no profile + # tag (ie. we just need all the users). + + @defer.inlineCallbacks + def handle_event(self, event): + users = yield self.store.get_users_in_room(event['room_id']) + logger.error("users in room: %r", users) + + for uid in users: + evaluator = yield push_rule_evaluator.\ + evaluator_for_user_name_and_profile_tag( + uid, None, event['room_id'], self.store + ) + actions = yield evaluator.actions_for_event(event) + logger.info("actions for user %s: %s", uid, actions) -- cgit 1.4.1 From aa667ee396c473f497b084655d47b2a9520a538a Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 10 Dec 2015 17:51:15 +0000 Subject: Save event actions to the db --- synapse/push/action_generator.py | 6 ++-- synapse/storage/__init__.py | 2 ++ synapse/storage/event_actions.py | 41 +++++++++++++++++++++++ synapse/storage/schema/delta/27/event_actions.sql | 25 ++++++++++++++ 4 files changed, 72 insertions(+), 2 deletions(-) create mode 100644 synapse/storage/event_actions.py create mode 100644 synapse/storage/schema/delta/27/event_actions.sql (limited to 'synapse/push') diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 508eeaed95..870c68a0ca 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -19,7 +19,6 @@ import push_rule_evaluator import logging - logger = logging.getLogger(__name__) @@ -42,6 +41,9 @@ class ActionGenerator: evaluator = yield push_rule_evaluator.\ evaluator_for_user_name_and_profile_tag( uid, None, event['room_id'], self.store - ) + ) actions = yield evaluator.actions_for_event(event) logger.info("actions for user %s: %s", uid, actions) + self.store.set_actions_for_event( + event['event_id'], uid, None, actions + ) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index c46b653f11..a112dd237f 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -33,6 +33,7 @@ from .pusher import PusherStore from .push_rule import PushRuleStore from .media_repository import MediaRepositoryStore from .rejections import RejectionsStore +from .event_actions import EventActionsStore from .state import StateStore from .signatures import SignatureStore @@ -75,6 +76,7 @@ class DataStore(RoomMemberStore, RoomStore, SearchStore, TagsStore, AccountDataStore, + EventActionsStore ): def __init__(self, hs): diff --git a/synapse/storage/event_actions.py b/synapse/storage/event_actions.py new file mode 100644 index 0000000000..593b1714c7 --- /dev/null +++ b/synapse/storage/event_actions.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore +from twisted.internet import defer + +import logging +import simplejson as json + +logger = logging.getLogger(__name__) + + +class EventActionsStore(SQLBaseStore): + @defer.inlineCallbacks + def set_actions_for_event(self, event_id, user_id, profile_tag, actions): + actionsJson = json.dumps(actions) + + ret = yield self.runInteraction( + "_set_actions_for_event", + self._simple_upsert_txn, + EventActionsTable.table_name, + {'event_id': event_id, 'user_id': user_id, 'profile_tag': profile_tag}, + {'actions': actionsJson} + ) + defer.returnValue(ret) + + +class EventActionsTable(object): + table_name = "event_actions" diff --git a/synapse/storage/schema/delta/27/event_actions.sql b/synapse/storage/schema/delta/27/event_actions.sql new file mode 100644 index 0000000000..1246823a00 --- /dev/null +++ b/synapse/storage/schema/delta/27/event_actions.sql @@ -0,0 +1,25 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS event_actions( + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + profile_tag VARCHAR(32), + actions TEXT NOT NULL, + CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (event_id, user_id, profile_tag) +); + + +CREATE INDEX event_actions_event_id_user_id_profile_tag on event_actions(event_id, user_id, profile_tag); -- cgit 1.4.1 From 5e909c73d76cd56e3eac91635a99405182dcac3c Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 10 Dec 2015 18:40:28 +0000 Subject: Store nothing instead of ['dont_notify'] for events with no notification required: much as it would be nice to be able to tell between the event not having been processed and there being no notification for it, this isn't worth filling up the table with ['dont_notify'] I think. Consequently treat the empty actions array as dont_notify and filter dont_notify out of the result. --- synapse/push/__init__.py | 18 +++--------------- synapse/push/action_generator.py | 8 ++++---- synapse/push/push_rule_evaluator.py | 9 +++++++-- 3 files changed, 14 insertions(+), 21 deletions(-) (limited to 'synapse/push') diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index e7c964bcd2..d5928c1d29 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -157,21 +157,7 @@ class Pusher(object): actions = yield rule_evaluator.actions_for_event(single_event) tweaks = rule_evaluator.tweaks_for_actions(actions) - if len(actions) == 0: - logger.warn("Empty actions! Using default action.") - actions = Pusher.DEFAULT_ACTIONS - - if 'notify' not in actions and 'dont_notify' not in actions: - logger.warn("Neither notify nor dont_notify in actions: adding default") - actions.extend(Pusher.DEFAULT_ACTIONS) - - if 'dont_notify' in actions: - logger.debug( - "%s for %s: dont_notify", - single_event['event_id'], self.user_name - ) - processed = True - else: + if 'notify' in actions: rejected = yield self.dispatch_push(single_event, tweaks) self.has_unread = True if isinstance(rejected, list) or isinstance(rejected, tuple): @@ -192,6 +178,8 @@ class Pusher(object): yield self.hs.get_pusherpool().remove_pusher( self.app_id, pk, self.user_name ) + else: + processed = True if not self.alive: return diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 870c68a0ca..a72a7d703c 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -35,7 +35,6 @@ class ActionGenerator: @defer.inlineCallbacks def handle_event(self, event): users = yield self.store.get_users_in_room(event['room_id']) - logger.error("users in room: %r", users) for uid in users: evaluator = yield push_rule_evaluator.\ @@ -44,6 +43,7 @@ class ActionGenerator: ) actions = yield evaluator.actions_for_event(event) logger.info("actions for user %s: %s", uid, actions) - self.store.set_actions_for_event( - event['event_id'], uid, None, actions - ) + if len(actions): + self.store.set_actions_for_event( + event['event_id'], uid, None, actions + ) diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 92c7fd048f..420476fd0b 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -43,7 +43,7 @@ def evaluator_for_user_name_and_profile_tag(user_name, profile_tag, room_id, sto class PushRuleEvaluator: - DEFAULT_ACTIONS = ['dont_notify'] + DEFAULT_ACTIONS = [] INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$") def __init__(self, user_name, profile_tag, raw_rules, enabled_map, room_id, @@ -85,7 +85,7 @@ class PushRuleEvaluator: """ if ev['user_id'] == self.user_name: # let's assume you probably know about messages you sent yourself - defer.returnValue(['dont_notify']) + defer.returnValue([]) room_id = ev['room_id'] @@ -131,6 +131,11 @@ class PushRuleEvaluator: "%s matches for user %s, event %s", r['rule_id'], self.user_name, ev['event_id'] ) + + # filter out dont_notify as we treat an empty actions list + # as dont_notify, and this doesn't take up a row in our database + actions = [x for x in actions if x != 'dont_notify'] + defer.returnValue(actions) logger.info( -- cgit 1.4.1 From 42ad49f5b75c2c645c4060026c21c5572f5b1063 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Dec 2015 18:42:09 +0000 Subject: still very WIP, but now sends unread_notifications_count in the room object on sync (only actually corrrect in a full sync: hardcoded to 0 in incremental syncs). --- synapse/handlers/sync.py | 26 +++++++++++ synapse/push/action_generator.py | 2 +- synapse/rest/client/v2_alpha/sync.py | 1 + synapse/storage/event_actions.py | 53 ++++++++++++++++++++++- synapse/storage/schema/delta/27/event_actions.sql | 5 ++- 5 files changed, 82 insertions(+), 5 deletions(-) (limited to 'synapse/push') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 24c2b2fad6..6d193a10c4 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -52,6 +52,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ "state", # dict[(str, str), FrozenEvent] "ephemeral", "account_data", + "unread_notification_count", ])): __slots__ = [] @@ -64,6 +65,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ or self.state or self.ephemeral or self.account_data + or self.unread_notification_count > 0 ) @@ -161,6 +163,18 @@ class SyncHandler(BaseHandler): else: return self.incremental_sync_with_gap(sync_config, since_token) + def last_read_event_id_for_room_and_user(self, room_id, user_id, ephemeral_by_room): + if room_id not in ephemeral_by_room: + return None + for e in ephemeral_by_room[room_id]: + if e['type'] != 'm.receipt': + continue + for receipt_event_id,val in e['content'].items(): + if 'm.read' in val: + if user_id in val['m.read']: + return receipt_event_id + return None + @defer.inlineCallbacks def full_state_sync(self, sync_config, timeline_since_token): """Get a sync for a client which is starting without any state. @@ -265,6 +279,16 @@ class SyncHandler(BaseHandler): room_id, sync_config, now_token, since_token=timeline_since_token ) + last_unread_event_id = self.last_read_event_id_for_room_and_user( + room_id, sync_config.user.to_string(), ephemeral_by_room + ) + + notifs = [] + if last_unread_event_id: + notifs = yield self.store.get_unread_event_actions_by_room( + room_id, last_unread_event_id + ) + current_state = yield self.get_state_at(room_id, now_token) defer.returnValue(JoinedSyncResult( @@ -275,6 +299,7 @@ class SyncHandler(BaseHandler): account_data=self.account_data_for_room( room_id, tags_by_room, account_data_by_room ), + unread_notification_count=len(notifs) )) def account_data_for_user(self, account_data): @@ -509,6 +534,7 @@ class SyncHandler(BaseHandler): account_data=self.account_data_for_room( room_id, tags_by_room, account_data_by_room ), + unread_notification_count=0 ) logger.debug("Result for room %s: %r", room_id, room_sync) diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index a72a7d703c..1c7cd31666 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -45,5 +45,5 @@ class ActionGenerator: logger.info("actions for user %s: %s", uid, actions) if len(actions): self.store.set_actions_for_event( - event['event_id'], uid, None, actions + event, uid, None, actions ) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index f0a637a6da..4ca10732c1 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -304,6 +304,7 @@ class SyncRestServlet(RestServlet): }, "state": {"events": serialized_state}, "account_data": {"events": account_data}, + "unread_notification_count": room.unread_notification_count } if joined: diff --git a/synapse/storage/event_actions.py b/synapse/storage/event_actions.py index 593b1714c7..40ac8e2d27 100644 --- a/synapse/storage/event_actions.py +++ b/synapse/storage/event_actions.py @@ -24,18 +24,67 @@ logger = logging.getLogger(__name__) class EventActionsStore(SQLBaseStore): @defer.inlineCallbacks - def set_actions_for_event(self, event_id, user_id, profile_tag, actions): + def set_actions_for_event(self, event, user_id, profile_tag, actions): actionsJson = json.dumps(actions) ret = yield self.runInteraction( "_set_actions_for_event", self._simple_upsert_txn, EventActionsTable.table_name, - {'event_id': event_id, 'user_id': user_id, 'profile_tag': profile_tag}, + { + 'room_id': event['room_id'], + 'event_id': event['event_id'], + 'user_id': user_id, + 'profile_tag': profile_tag + }, {'actions': actionsJson} ) defer.returnValue(ret) + @defer.inlineCallbacks + def get_unread_event_actions_by_room(self, room_id, last_read_event_id): + #events = yield self._get_events( + # [last_read_event_id], + # check_redacted=False + #) + + def _get_unread_event_actions_by_room(txn): + sql = ( + "SELECT stream_ordering, topological_ordering" + " FROM events" + " WHERE room_id = ? AND event_id = ?" + ) + txn.execute( + sql, (room_id, last_read_event_id) + ) + results = txn.fetchall() + if len(results) == 0: + return [] + + stream_ordering = results[0][0] + topological_ordering = results[0][1] + + sql = ( + "SELECT ea.actions" + " FROM event_actions ea, events e" + " WHERE ea.room_id = e.room_id" + " AND ea.event_id = e.event_id" + " AND ea.room_id = ?" + " AND (" + " e.topological_ordering > ?" + " OR (e.topological_ordering == ? AND e.stream_ordering > ?)" + ")" + ) + txn.execute(sql, + (room_id, topological_ordering, topological_ordering, stream_ordering) + ) + return txn.fetchall() + + ret = yield self.runInteraction( + "get_unread_event_actions_by_room", + _get_unread_event_actions_by_room + ) + defer.returnValue(ret) class EventActionsTable(object): table_name = "event_actions" diff --git a/synapse/storage/schema/delta/27/event_actions.sql b/synapse/storage/schema/delta/27/event_actions.sql index 1246823a00..bbdaee990e 100644 --- a/synapse/storage/schema/delta/27/event_actions.sql +++ b/synapse/storage/schema/delta/27/event_actions.sql @@ -14,12 +14,13 @@ */ CREATE TABLE IF NOT EXISTS event_actions( + room_id TEXT NOT NULL, event_id TEXT NOT NULL, user_id TEXT NOT NULL, profile_tag VARCHAR(32), actions TEXT NOT NULL, - CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (event_id, user_id, profile_tag) + CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag) ); -CREATE INDEX event_actions_event_id_user_id_profile_tag on event_actions(event_id, user_id, profile_tag); +CREATE INDEX event_actions_room_id_event_id_user_id_profile_tag on event_actions(room_id, event_id, user_id, profile_tag); -- cgit 1.4.1 From f73f154ec2c8ffdc49270d3ccaf3053f915800f3 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 21 Dec 2015 15:28:54 +0000 Subject: Only run pushers for users on this hs! --- synapse/handlers/_base.py | 2 +- synapse/handlers/federation.py | 2 +- synapse/push/action_generator.py | 8 +++++++- 3 files changed, 9 insertions(+), 3 deletions(-) (limited to 'synapse/push') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index a8e8c4f5af..24c4c62698 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -267,7 +267,7 @@ class BaseHandler(object): event, context=context ) - action_generator = ActionGenerator(self.store) + action_generator = ActionGenerator(self.hs, self.store) yield action_generator.handle_event(serialize_event( event, self.clock.time_msec() )) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 6a4269b500..6525bde430 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -245,7 +245,7 @@ class FederationHandler(BaseHandler): yield user_joined_room(self.distributor, user, event.room_id) if not backfilled and not event.internal_metadata.is_outlier(): - action_generator = ActionGenerator(self.store) + action_generator = ActionGenerator(self.hs, self.store) yield action_generator.handle_event(serialize_event( event, self.clock.time_msec()) ) diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 1c7cd31666..6e107ca792 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -15,6 +15,8 @@ from twisted.internet import defer +from synapse.types import UserID + import push_rule_evaluator import logging @@ -23,7 +25,8 @@ logger = logging.getLogger(__name__) class ActionGenerator: - def __init__(self, store): + def __init__(self, hs, store): + self.hs = hs self.store = store # really we want to get all user ids and all profile tags too, # since we want the actions for each profile tag for every user and @@ -37,6 +40,9 @@ class ActionGenerator: users = yield self.store.get_users_in_room(event['room_id']) for uid in users: + if not self.hs.is_mine(UserID.from_string(uid)): + continue + evaluator = yield push_rule_evaluator.\ evaluator_for_user_name_and_profile_tag( uid, None, event['room_id'], self.store -- cgit 1.4.1 From 65c451cb3878fb41f28a2adecd638894e18f5343 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 22 Dec 2015 15:19:34 +0000 Subject: Add bulk push rule evaluator which actually still evaluates rules one by one, but does far fewer db queries to fetch the rules --- synapse/push/action_generator.py | 28 ++++----- synapse/push/bulk_push_rule_evaluator.py | 99 ++++++++++++++++++++++++++++++++ synapse/push/push_rule_evaluator.py | 13 +++-- synapse/storage/push_rule.py | 41 +++++++++++++ 4 files changed, 159 insertions(+), 22 deletions(-) create mode 100644 synapse/push/bulk_push_rule_evaluator.py (limited to 'synapse/push') diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 6e107ca792..2ad5f82da2 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -15,9 +15,7 @@ from twisted.internet import defer -from synapse.types import UserID - -import push_rule_evaluator +import bulk_push_rule_evaluator import logging @@ -39,17 +37,13 @@ class ActionGenerator: def handle_event(self, event): users = yield self.store.get_users_in_room(event['room_id']) - for uid in users: - if not self.hs.is_mine(UserID.from_string(uid)): - continue - - evaluator = yield push_rule_evaluator.\ - evaluator_for_user_name_and_profile_tag( - uid, None, event['room_id'], self.store - ) - actions = yield evaluator.actions_for_event(event) - logger.info("actions for user %s: %s", uid, actions) - if len(actions): - self.store.set_actions_for_event( - event, uid, None, actions - ) + bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id( + event['room_id'], self.hs, self.store + ) + + actions_by_user = bulk_evaluator.action_for_event_by_user(event) + + for uid,actions in actions_by_user.items(): + self.store.set_actions_for_event( + event, uid, None, actions + ) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py new file mode 100644 index 0000000000..f531d2edc4 --- /dev/null +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import simplejson as json + +from twisted.internet import defer + +from synapse.types import UserID + +import baserules +from push_rule_evaluator import PushRuleEvaluator + +logger = logging.getLogger(__name__) + + +def decode_rule_json(rule): + rule['conditions'] = json.loads(rule['conditions']) + rule['actions'] = json.loads(rule['actions']) + return rule + + +@defer.inlineCallbacks +def evaluator_for_room_id(room_id, hs, store): + users = yield store.get_users_in_room(room_id) + rules_by_user = yield store.bulk_get_push_rules(users) + rules_by_user = { + uid: baserules.list_with_base_rules( + [decode_rule_json(rule_list) for rule_list in rules_by_user[uid]] + if uid in rules_by_user else [], + UserID.from_string(uid) + ) + for uid in users + } + member_events = yield store.get_current_state( + room_id=room_id, + event_type='m.room.member', + ) + display_names = {} + for ev in member_events: + if ev.content.get("displayname"): + display_names[ev.state_key] = ev.content.get("displayname") + + defer.returnValue(BulkPushRuleEvaluator( + room_id, rules_by_user, display_names, users + )) + + +class BulkPushRuleEvaluator: + def __init__(self, room_id, rules_by_user, display_names, users_in_room): + self.room_id = room_id + self.rules_by_user = rules_by_user + self.display_names = display_names + self.users_in_room = users_in_room + + def action_for_event_by_user(self, event): + actions_by_user = {} + + for uid, rules in self.rules_by_user.items(): + display_name = None + if uid in self.display_names: + display_name = self.display_names[uid] + + for rule in rules: + if 'enabled' in rule and not rule['enabled']: + continue + + # XXX: profile tags + if BulkPushRuleEvaluator.event_matches_rule( + event, rule, + display_name, len(self.users_in_room), None + ): + actions = [x for x in rule['actions'] if x != 'dont_notify'] + if len(actions) > 0: + actions_by_user[uid] = actions + break + return actions_by_user + + @staticmethod + def event_matches_rule(event, rule, + display_name, room_member_count, profile_tag): + matches = True + for cond in rule['conditions']: + matches &= PushRuleEvaluator._event_fulfills_condition( + event, cond, display_name, room_member_count, profile_tag + ) + return matches \ No newline at end of file diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 420476fd0b..40c7622ec4 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -113,7 +113,8 @@ class PushRuleEvaluator: for c in conditions: matches &= self._event_fulfills_condition( ev, c, display_name=my_display_name, - room_member_count=room_member_count + room_member_count=room_member_count, + profile_tag=self.profile_tag ) logger.debug( "Rule %s %s", @@ -156,16 +157,18 @@ class PushRuleEvaluator: re.sub(r'\\\-', '-', x.group(2)))), r) return r - def _event_fulfills_condition(self, ev, condition, display_name, room_member_count): + @staticmethod + def _event_fulfills_condition(ev, condition, + display_name, room_member_count, profile_tag): if condition['kind'] == 'event_match': if 'pattern' not in condition: logger.warn("event_match condition with no pattern") return False # XXX: optimisation: cache our pattern regexps if condition['key'] == 'content.body': - r = r'\b%s\b' % self._glob_to_regexp(condition['pattern']) + r = r'\b%s\b' % PushRuleEvaluator._glob_to_regexp(condition['pattern']) else: - r = r'^%s$' % self._glob_to_regexp(condition['pattern']) + r = r'^%s$' % PushRuleEvaluator._glob_to_regexp(condition['pattern']) val = _value_for_dotted_key(condition['key'], ev) if val is None: return False @@ -174,7 +177,7 @@ class PushRuleEvaluator: elif condition['kind'] == 'device': if 'profile_tag' not in condition: return True - return condition['profile_tag'] == self.profile_tag + return condition['profile_tag'] == profile_tag elif condition['kind'] == 'contains_display_name': # This is special because display names can be different diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 5305b7e122..9dec4aa685 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -55,6 +55,47 @@ class PushRuleStore(SQLBaseStore): r['rule_id']: False if r['enabled'] == 0 else True for r in results }) + @defer.inlineCallbacks + def bulk_get_push_rules(self, user_ids): + batch_size = 100 + + def f(txn, user_ids_to_fetch): + sql = ( + "SELECT " + + ",".join(map(lambda x: "pr."+x, PushRuleTable.fields)) + + " FROM " + PushRuleTable.table_name + " pr " + + " LEFT JOIN " + PushRuleEnableTable.table_name + " pre " + + " ON pr.user_name = pre.user_name and pr.rule_id = pre.rule_id " + + " WHERE pr.user_name " + + " IN (" + ",".join(["?" for _ in user_ids_to_fetch]) + ")" + " AND (pre.enabled is null or pre.enabled = 1)" + " ORDER BY pr.user_name, pr.priority_class DESC, pr.priority DESC" + ) + txn.execute(sql, user_ids_to_fetch) + return txn.fetchall() + + results = {} + + batch_start = 0 + while batch_start < len(user_ids): + batch_end = max(len(user_ids), batch_size) + batch_user_ids = user_ids[batch_start:batch_end] + batch_start = batch_end + + rows = yield self.runInteraction( + "bulk_get_push_rules", f, batch_user_ids + ) + + for r in rows: + rawdict = { + PushRuleTable.fields[i]: r[i] for i in range(len(r)) + } + + if rawdict['user_name'] not in results: + results[rawdict['user_name']] = [] + results[rawdict['user_name']].append(rawdict) + defer.returnValue(results) + @defer.inlineCallbacks def add_push_rule(self, before, after, **kwargs): vals = kwargs -- cgit 1.4.1 From 4c8f6a7e427cc0e22ff1a19c3f1d9da0f9438f18 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 22 Dec 2015 17:04:31 +0000 Subject: Insert push actions in a single db query rather than one per user/profile_tag --- synapse/push/action_generator.py | 10 ++++++---- synapse/storage/event_actions.py | 31 ++++++++++++++++++------------- 2 files changed, 24 insertions(+), 17 deletions(-) (limited to 'synapse/push') diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 2ad5f82da2..148b1bda8e 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -43,7 +43,9 @@ class ActionGenerator: actions_by_user = bulk_evaluator.action_for_event_by_user(event) - for uid,actions in actions_by_user.items(): - self.store.set_actions_for_event( - event, uid, None, actions - ) + yield self.store.set_actions_for_event_and_users( + event, + [ + (uid, None, actions) for uid, actions in actions_by_user.items() + ] + ) diff --git a/synapse/storage/event_actions.py b/synapse/storage/event_actions.py index fbd0a42279..3efa445c18 100644 --- a/synapse/storage/event_actions.py +++ b/synapse/storage/event_actions.py @@ -24,22 +24,27 @@ logger = logging.getLogger(__name__) class EventActionsStore(SQLBaseStore): @defer.inlineCallbacks - def set_actions_for_event(self, event, user_id, profile_tag, actions): - actionsJson = json.dumps(actions) - - ret = yield self.runInteraction( - "_set_actions_for_event", - self._simple_upsert_txn, - EventActionsTable.table_name, - { + def set_actions_for_event_and_users(self, event, tuples): + """ + :param event: the event set actions for + :param tuples: list of tuples of (user_id, profile_tag, actions) + """ + values = [] + for uid, profile_tag, actions in tuples: + values.append({ 'room_id': event['room_id'], 'event_id': event['event_id'], - 'user_id': user_id, - 'profile_tag': profile_tag - }, - {'actions': actionsJson} + 'user_id': uid, + 'profile_tag': profile_tag, + 'actions': json.dumps(actions) + }) + + yield self.runInteraction( + "set_actions_for_event_and_users", + self._simple_insert_many_txn, + EventActionsTable.table_name, + values ) - defer.returnValue(ret) @defer.inlineCallbacks def get_unread_event_actions_by_room_for_user( -- cgit 1.4.1 From 5645d9747b17e9d119cc7badd7c2abe3c157a1a6 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 22 Dec 2015 17:19:22 +0000 Subject: Add some comments to areas that could be optimised. --- synapse/handlers/sync.py | 3 +++ synapse/push/__init__.py | 4 +++- synapse/push/bulk_push_rule_evaluator.py | 8 ++++++++ 3 files changed, 14 insertions(+), 1 deletion(-) (limited to 'synapse/push') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4cbb43a31b..fa5e954e01 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -447,6 +447,9 @@ class SyncHandler(BaseHandler): ) now_token = now_token.copy_and_replace("presence_key", presence_key) + # We now fetch all ephemeral events for this room in order to get + # this users current read receipt. This could almost certainly be + # optimised. _, all_ephemeral_by_room = yield self.ephemeral_by_room( sync_config, now_token ) diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index d5928c1d29..635dedd523 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -26,7 +26,9 @@ import random logger = logging.getLogger(__name__) - +# Pushers could now be moved to pull out of the event_actions table instead +# of listening on the event stream: this would avoid them having to run the +# rules again. class Pusher(object): INITIAL_BACKOFF = 1000 MAX_BACKOFF = 60 * 60 * 1000 diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index f531d2edc4..1c0fa72b25 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -59,6 +59,14 @@ def evaluator_for_room_id(room_id, hs, store): class BulkPushRuleEvaluator: + """ + Runs push rules for all users in a room. + This is faster than running PushRuleEvaluator for each user because it + fetches all the rules for all the users in one (batched) db query + rarher than doing multiple queries per-user. It currently uses + the same logic to run the actual rules, but could be optimised further + (see https://matrix.org/jira/browse/SYN-562) + """ def __init__(self, room_id, rules_by_user, display_names, users_in_room): self.room_id = room_id self.rules_by_user = rules_by_user -- cgit 1.4.1 From 9b4cd0cd0f31803657018bf0ac9178787d796912 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 22 Dec 2015 17:25:09 +0000 Subject: pep8 & unused variable --- synapse/push/__init__.py | 1 + synapse/push/action_generator.py | 2 -- synapse/push/bulk_push_rule_evaluator.py | 4 ++-- 3 files changed, 3 insertions(+), 4 deletions(-) (limited to 'synapse/push') diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 635dedd523..250f22a168 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -26,6 +26,7 @@ import random logger = logging.getLogger(__name__) + # Pushers could now be moved to pull out of the event_actions table instead # of listening on the event stream: this would avoid them having to run the # rules again. diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 148b1bda8e..00f518f609 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -35,8 +35,6 @@ class ActionGenerator: @defer.inlineCallbacks def handle_event(self, event): - users = yield self.store.get_users_in_room(event['room_id']) - bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id( event['room_id'], self.hs, self.store ) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 1c0fa72b25..c489bfc8d4 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -54,7 +54,7 @@ def evaluator_for_room_id(room_id, hs, store): display_names[ev.state_key] = ev.content.get("displayname") defer.returnValue(BulkPushRuleEvaluator( - room_id, rules_by_user, display_names, users + room_id, rules_by_user, display_names, users )) @@ -104,4 +104,4 @@ class BulkPushRuleEvaluator: matches &= PushRuleEvaluator._event_fulfills_condition( event, cond, display_name, room_member_count, profile_tag ) - return matches \ No newline at end of file + return matches -- cgit 1.4.1 From 3051c9d002a467643d1ab32bc36974d2e3f84c12 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 4 Jan 2016 13:39:29 +0000 Subject: Address minor PR issues --- synapse/handlers/_base.py | 4 ++-- synapse/handlers/federation.py | 4 ++-- synapse/push/action_generator.py | 7 +++---- synapse/push/bulk_push_rule_evaluator.py | 2 +- synapse/storage/event_actions.py | 2 +- synapse/storage/push_rule.py | 6 +++--- synapse/storage/registration.py | 12 ------------ 7 files changed, 12 insertions(+), 25 deletions(-) (limited to 'synapse/push') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 24c4c62698..938eb29de7 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -267,8 +267,8 @@ class BaseHandler(object): event, context=context ) - action_generator = ActionGenerator(self.hs, self.store) - yield action_generator.handle_event(serialize_event( + action_generator = ActionGenerator(self.store) + yield action_generator.handle_push_actions_for_event(serialize_event( event, self.clock.time_msec() )) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0b1221deb5..764709b424 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -245,8 +245,8 @@ class FederationHandler(BaseHandler): yield user_joined_room(self.distributor, user, event.room_id) if not backfilled and not event.internal_metadata.is_outlier(): - action_generator = ActionGenerator(self.hs, self.store) - yield action_generator.handle_event(serialize_event( + action_generator = ActionGenerator(self.store) + yield action_generator.handle_push_actions_for_event(serialize_event( event, self.clock.time_msec()) ) diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 00f518f609..4ab5d9e1b8 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -23,8 +23,7 @@ logger = logging.getLogger(__name__) class ActionGenerator: - def __init__(self, hs, store): - self.hs = hs + def __init__(self, store): self.store = store # really we want to get all user ids and all profile tags too, # since we want the actions for each profile tag for every user and @@ -34,9 +33,9 @@ class ActionGenerator: # tag (ie. we just need all the users). @defer.inlineCallbacks - def handle_event(self, event): + def handle_push_actions_for_event(self, event): bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id( - event['room_id'], self.hs, self.store + event['room_id'], self.store ) actions_by_user = bulk_evaluator.action_for_event_by_user(event) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index c489bfc8d4..1c4e54ba44 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -33,7 +33,7 @@ def decode_rule_json(rule): @defer.inlineCallbacks -def evaluator_for_room_id(room_id, hs, store): +def evaluator_for_room_id(room_id, store): users = yield store.get_users_in_room(room_id) rules_by_user = yield store.bulk_get_push_rules(users) rules_by_user = { diff --git a/synapse/storage/event_actions.py b/synapse/storage/event_actions.py index 3efa445c18..fa9cbe71ee 100644 --- a/synapse/storage/event_actions.py +++ b/synapse/storage/event_actions.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014 OpenMarket Ltd +# Copyright 2015 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 9dec4aa685..7c5123d644 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -62,12 +62,12 @@ class PushRuleStore(SQLBaseStore): def f(txn, user_ids_to_fetch): sql = ( "SELECT " + - ",".join(map(lambda x: "pr."+x, PushRuleTable.fields)) + + ",".join("pr."+x for x in PushRuleTable.fields) + " FROM " + PushRuleTable.table_name + " pr " + " LEFT JOIN " + PushRuleEnableTable.table_name + " pre " + " ON pr.user_name = pre.user_name and pr.rule_id = pre.rule_id " + " WHERE pr.user_name " + - " IN (" + ",".join(["?" for _ in user_ids_to_fetch]) + ")" + " IN (" + ",".join("?" for _ in user_ids_to_fetch) + ")" " AND (pre.enabled is null or pre.enabled = 1)" " ORDER BY pr.user_name, pr.priority_class DESC, pr.priority DESC" ) @@ -78,7 +78,7 @@ class PushRuleStore(SQLBaseStore): batch_start = 0 while batch_start < len(user_ids): - batch_end = max(len(user_ids), batch_size) + batch_end = min(len(user_ids), batch_size) batch_user_ids = user_ids[batch_start:batch_end] batch_start = batch_end diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 4676f225b9..09a05b08ef 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -291,18 +291,6 @@ class RegistrationStore(SQLBaseStore): defer.returnValue(ret['user_id']) defer.returnValue(None) - @defer.inlineCallbacks - def get_all_user_ids(self): - """Returns all user ids registered on this homeserver""" - return self.runInteraction( - "get_all_user_ids", - self._get_all_user_ids_txn - ) - - def _get_all_user_ids_txn(self, txn): - txn.execute("SELECT name from users") - return [r[0] for r in txn.fetchall()] - @defer.inlineCallbacks def count_all_users(self): """Counts all users registered on the homeserver.""" -- cgit 1.4.1 From c914d67cda9682331639b78190db367974e4fb8b Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 4 Jan 2016 14:05:37 +0000 Subject: Rename event-actions to event_push_actions as per PR request --- synapse/handlers/sync.py | 2 +- synapse/push/__init__.py | 2 +- synapse/push/action_generator.py | 2 +- synapse/storage/__init__.py | 4 +- synapse/storage/event_actions.py | 98 ---------------------- synapse/storage/event_push_actions.py | 98 ++++++++++++++++++++++ synapse/storage/schema/delta/27/event_actions.sql | 26 ------ .../storage/schema/delta/27/event_push_actions.sql | 26 ++++++ 8 files changed, 129 insertions(+), 129 deletions(-) delete mode 100644 synapse/storage/event_actions.py create mode 100644 synapse/storage/event_push_actions.py delete mode 100644 synapse/storage/schema/delta/27/event_actions.sql create mode 100644 synapse/storage/schema/delta/27/event_push_actions.sql (limited to 'synapse/push') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index f63c073a20..64556c5eb8 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -896,7 +896,7 @@ class SyncHandler(BaseHandler): notifs = [] if last_unread_event_id: - notifs = yield self.store.get_unread_event_actions_by_room_for_user( + notifs = yield self.store.get_unread_event_push_actions_by_room_for_user( room_id, sync_config.user.to_string(), last_unread_event_id ) else: diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 250f22a168..3ab6da0625 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -27,7 +27,7 @@ import random logger = logging.getLogger(__name__) -# Pushers could now be moved to pull out of the event_actions table instead +# Pushers could now be moved to pull out of the event_push_actions table instead # of listening on the event stream: this would avoid them having to run the # rules again. class Pusher(object): diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 4ab5d9e1b8..5526324a6d 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -40,7 +40,7 @@ class ActionGenerator: actions_by_user = bulk_evaluator.action_for_event_by_user(event) - yield self.store.set_actions_for_event_and_users( + yield self.store.set_push_actions_for_event_and_users( event, [ (uid, None, actions) for uid, actions in actions_by_user.items() diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index a112dd237f..43e05f144a 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -33,7 +33,7 @@ from .pusher import PusherStore from .push_rule import PushRuleStore from .media_repository import MediaRepositoryStore from .rejections import RejectionsStore -from .event_actions import EventActionsStore +from .event_push_actions import EventPushActionsStore from .state import StateStore from .signatures import SignatureStore @@ -76,7 +76,7 @@ class DataStore(RoomMemberStore, RoomStore, SearchStore, TagsStore, AccountDataStore, - EventActionsStore + EventPushActionsStore ): def __init__(self, hs): diff --git a/synapse/storage/event_actions.py b/synapse/storage/event_actions.py deleted file mode 100644 index fa9cbe71ee..0000000000 --- a/synapse/storage/event_actions.py +++ /dev/null @@ -1,98 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ._base import SQLBaseStore -from twisted.internet import defer - -import logging -import simplejson as json - -logger = logging.getLogger(__name__) - - -class EventActionsStore(SQLBaseStore): - @defer.inlineCallbacks - def set_actions_for_event_and_users(self, event, tuples): - """ - :param event: the event set actions for - :param tuples: list of tuples of (user_id, profile_tag, actions) - """ - values = [] - for uid, profile_tag, actions in tuples: - values.append({ - 'room_id': event['room_id'], - 'event_id': event['event_id'], - 'user_id': uid, - 'profile_tag': profile_tag, - 'actions': json.dumps(actions) - }) - - yield self.runInteraction( - "set_actions_for_event_and_users", - self._simple_insert_many_txn, - EventActionsTable.table_name, - values - ) - - @defer.inlineCallbacks - def get_unread_event_actions_by_room_for_user( - self, room_id, user_id, last_read_event_id - ): - def _get_unread_event_actions_by_room(txn): - sql = ( - "SELECT stream_ordering, topological_ordering" - " FROM events" - " WHERE room_id = ? AND event_id = ?" - ) - txn.execute( - sql, (room_id, last_read_event_id) - ) - results = txn.fetchall() - if len(results) == 0: - return [] - - stream_ordering = results[0][0] - topological_ordering = results[0][1] - - sql = ( - "SELECT ea.event_id, ea.actions" - " FROM event_actions ea, events e" - " WHERE ea.room_id = e.room_id" - " AND ea.event_id = e.event_id" - " AND ea.user_id = ?" - " AND ea.room_id = ?" - " AND (" - " e.topological_ordering > ?" - " OR (e.topological_ordering == ? AND e.stream_ordering > ?)" - ")" - ) - txn.execute(sql, ( - user_id, room_id, - topological_ordering, topological_ordering, stream_ordering - ) - ) - return [ - {"event_id": row[0], "actions": row[1]} for row in txn.fetchall() - ] - - ret = yield self.runInteraction( - "get_unread_event_actions_by_room", - _get_unread_event_actions_by_room - ) - defer.returnValue(ret) - - -class EventActionsTable(object): - table_name = "event_actions" diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py new file mode 100644 index 0000000000..016c0adf8a --- /dev/null +++ b/synapse/storage/event_push_actions.py @@ -0,0 +1,98 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore +from twisted.internet import defer + +import logging +import simplejson as json + +logger = logging.getLogger(__name__) + + +class EventPushActionsStore(SQLBaseStore): + @defer.inlineCallbacks + def set_push_actions_for_event_and_users(self, event, tuples): + """ + :param event: the event set actions for + :param tuples: list of tuples of (user_id, profile_tag, actions) + """ + values = [] + for uid, profile_tag, actions in tuples: + values.append({ + 'room_id': event['room_id'], + 'event_id': event['event_id'], + 'user_id': uid, + 'profile_tag': profile_tag, + 'actions': json.dumps(actions) + }) + + yield self.runInteraction( + "set_actions_for_event_and_users", + self._simple_insert_many_txn, + EventPushActionsTable.table_name, + values + ) + + @defer.inlineCallbacks + def get_unread_event_push_actions_by_room_for_user( + self, room_id, user_id, last_read_event_id + ): + def _get_unread_event_push_actions_by_room(txn): + sql = ( + "SELECT stream_ordering, topological_ordering" + " FROM events" + " WHERE room_id = ? AND event_id = ?" + ) + txn.execute( + sql, (room_id, last_read_event_id) + ) + results = txn.fetchall() + if len(results) == 0: + return [] + + stream_ordering = results[0][0] + topological_ordering = results[0][1] + + sql = ( + "SELECT ea.event_id, ea.actions" + " FROM event_push_actions ea, events e" + " WHERE ea.room_id = e.room_id" + " AND ea.event_id = e.event_id" + " AND ea.user_id = ?" + " AND ea.room_id = ?" + " AND (" + " e.topological_ordering > ?" + " OR (e.topological_ordering == ? AND e.stream_ordering > ?)" + ")" + ) + txn.execute(sql, ( + user_id, room_id, + topological_ordering, topological_ordering, stream_ordering + ) + ) + return [ + {"event_id": row[0], "actions": row[1]} for row in txn.fetchall() + ] + + ret = yield self.runInteraction( + "get_unread_event_push_actions_by_room", + _get_unread_event_push_actions_by_room + ) + defer.returnValue(ret) + + +class EventPushActionsTable(object): + table_name = "event_push_actions" diff --git a/synapse/storage/schema/delta/27/event_actions.sql b/synapse/storage/schema/delta/27/event_actions.sql deleted file mode 100644 index bbdaee990e..0000000000 --- a/synapse/storage/schema/delta/27/event_actions.sql +++ /dev/null @@ -1,26 +0,0 @@ -/* Copyright 2015 OpenMarket Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -CREATE TABLE IF NOT EXISTS event_actions( - room_id TEXT NOT NULL, - event_id TEXT NOT NULL, - user_id TEXT NOT NULL, - profile_tag VARCHAR(32), - actions TEXT NOT NULL, - CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag) -); - - -CREATE INDEX event_actions_room_id_event_id_user_id_profile_tag on event_actions(room_id, event_id, user_id, profile_tag); diff --git a/synapse/storage/schema/delta/27/event_push_actions.sql b/synapse/storage/schema/delta/27/event_push_actions.sql new file mode 100644 index 0000000000..bdf6ae3f24 --- /dev/null +++ b/synapse/storage/schema/delta/27/event_push_actions.sql @@ -0,0 +1,26 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS event_push_actions( + room_id TEXT NOT NULL, + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + profile_tag VARCHAR(32), + actions TEXT NOT NULL, + CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag) +); + + +CREATE INDEX event_push_actions_room_id_event_id_user_id_profile_tag on event_push_actions(room_id, event_id, user_id, profile_tag); -- cgit 1.4.1 From 85ca8cb90c706ad40034a05282d751887d9bf05f Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 5 Jan 2016 13:32:39 +0000 Subject: comment typo --- synapse/push/bulk_push_rule_evaluator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/push') diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 1c4e54ba44..c00acfd87e 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -63,7 +63,7 @@ class BulkPushRuleEvaluator: Runs push rules for all users in a room. This is faster than running PushRuleEvaluator for each user because it fetches all the rules for all the users in one (batched) db query - rarher than doing multiple queries per-user. It currently uses + rather than doing multiple queries per-user. It currently uses the same logic to run the actual rules, but could be optimised further (see https://matrix.org/jira/browse/SYN-562) """ -- cgit 1.4.1 From c79f221192044203b9d32cfbd416a7fefeb34cd5 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 6 Jan 2016 11:38:09 +0000 Subject: Add is_guest flag to users db to track whether a user is a guest user or not. Use this so we can run _filter_events_for_client when calculating event_push_actions. --- synapse/handlers/_base.py | 8 ++--- synapse/handlers/federation.py | 6 ++-- synapse/handlers/register.py | 4 ++- synapse/push/action_generator.py | 6 ++-- synapse/push/bulk_push_rule_evaluator.py | 27 ++++++++++++--- synapse/rest/client/v2_alpha/register.py | 5 ++- synapse/storage/event_push_actions.py | 4 +-- synapse/storage/registration.py | 40 ++++++++++++++++------ .../storage/schema/delta/27/event_push_actions.sql | 26 -------------- .../storage/schema/delta/28/event_push_actions.sql | 26 ++++++++++++++ 10 files changed, 95 insertions(+), 57 deletions(-) delete mode 100644 synapse/storage/schema/delta/27/event_push_actions.sql create mode 100644 synapse/storage/schema/delta/28/event_push_actions.sql (limited to 'synapse/push') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 3115a5065d..66e35de6e4 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -23,8 +23,6 @@ from synapse.push.action_generator import ActionGenerator from synapse.util.logcontext import PreserveLoggingContext -from synapse.events.utils import serialize_event - import logging @@ -256,9 +254,9 @@ class BaseHandler(object): ) action_generator = ActionGenerator(self.store) - yield action_generator.handle_push_actions_for_event(serialize_event( - event, self.clock.time_msec() - )) + yield action_generator.handle_push_actions_for_event( + event, self + ) destinations = set(extra_destinations) for k, s in context.current_state.items(): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 764709b424..075b9e21c3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -32,7 +32,7 @@ from synapse.crypto.event_signing import ( ) from synapse.types import UserID -from synapse.events.utils import prune_event, serialize_event +from synapse.events.utils import prune_event from synapse.util.retryutils import NotRetryingDestination @@ -246,8 +246,8 @@ class FederationHandler(BaseHandler): if not backfilled and not event.internal_metadata.is_outlier(): action_generator = ActionGenerator(self.store) - yield action_generator.handle_push_actions_for_event(serialize_event( - event, self.clock.time_msec()) + yield action_generator.handle_push_actions_for_event( + event, self ) @defer.inlineCallbacks diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 6f111ff63e..1799a668c6 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -84,7 +84,8 @@ class RegistrationHandler(BaseHandler): localpart=None, password=None, generate_token=True, - guest_access_token=None + guest_access_token=None, + make_guest=False ): """Registers a new client on the server. @@ -118,6 +119,7 @@ class RegistrationHandler(BaseHandler): token=token, password_hash=password_hash, was_guest=guest_access_token is not None, + make_guest=make_guest ) yield registered_user(self.distributor, user) diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 5526324a6d..bcd40798f9 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -33,12 +33,12 @@ class ActionGenerator: # tag (ie. we just need all the users). @defer.inlineCallbacks - def handle_push_actions_for_event(self, event): + def handle_push_actions_for_event(self, event, handler): bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id( - event['room_id'], self.store + event.room_id, self.store ) - actions_by_user = bulk_evaluator.action_for_event_by_user(event) + actions_by_user = yield bulk_evaluator.action_for_event_by_user(event, handler) yield self.store.set_push_actions_for_event_and_users( event, diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index c00acfd87e..63d65b4465 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -23,6 +23,8 @@ from synapse.types import UserID import baserules from push_rule_evaluator import PushRuleEvaluator +from synapse.events.utils import serialize_event + logger = logging.getLogger(__name__) @@ -54,7 +56,7 @@ def evaluator_for_room_id(room_id, store): display_names[ev.state_key] = ev.content.get("displayname") defer.returnValue(BulkPushRuleEvaluator( - room_id, rules_by_user, display_names, users + room_id, rules_by_user, display_names, users, store )) @@ -67,13 +69,15 @@ class BulkPushRuleEvaluator: the same logic to run the actual rules, but could be optimised further (see https://matrix.org/jira/browse/SYN-562) """ - def __init__(self, room_id, rules_by_user, display_names, users_in_room): + def __init__(self, room_id, rules_by_user, display_names, users_in_room, store): self.room_id = room_id self.rules_by_user = rules_by_user self.display_names = display_names self.users_in_room = users_in_room + self.store = store - def action_for_event_by_user(self, event): + @defer.inlineCallbacks + def action_for_event_by_user(self, event, handler): actions_by_user = {} for uid, rules in self.rules_by_user.items(): @@ -81,6 +85,13 @@ class BulkPushRuleEvaluator: if uid in self.display_names: display_name = self.display_names[uid] + is_guest = yield self.store.is_guest(UserID.from_string(uid)) + filtered = yield handler._filter_events_for_client( + uid, [event], is_guest=is_guest + ) + if len(filtered) == 0: + continue + for rule in rules: if 'enabled' in rule and not rule['enabled']: continue @@ -94,14 +105,20 @@ class BulkPushRuleEvaluator: if len(actions) > 0: actions_by_user[uid] = actions break - return actions_by_user + defer.returnValue(actions_by_user) @staticmethod def event_matches_rule(event, rule, display_name, room_member_count, profile_tag): matches = True + + # passing the clock all the way into here is extremely awkward and push + # rules do not care about any of the relative timestamps, so we just + # pass 0 for the current time. + client_event = serialize_event(event, 0) + for cond in rule['conditions']: matches &= PushRuleEvaluator._event_fulfills_condition( - event, cond, display_name, room_member_count, profile_tag + client_event, cond, display_name, room_member_count, profile_tag ) return matches diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 25389ceded..c4d025b465 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -259,7 +259,10 @@ class RegisterRestServlet(RestServlet): def _do_guest_registration(self): if not self.hs.config.allow_guest_access: defer.returnValue((403, "Guest access is disabled")) - user_id, _ = yield self.registration_handler.register(generate_token=False) + user_id, _ = yield self.registration_handler.register( + generate_token=False, + make_guest=True + ) access_token = self.auth_handler.generate_access_token(user_id, ["guest = true"]) defer.returnValue((200, { "user_id": user_id, diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 3075d02257..0634af6b62 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -32,8 +32,8 @@ class EventPushActionsStore(SQLBaseStore): values = [] for uid, profile_tag, actions in tuples: values.append({ - 'room_id': event['room_id'], - 'event_id': event['event_id'], + 'room_id': event.room_id, + 'event_id': event.event_id, 'user_id': uid, 'profile_tag': profile_tag, 'actions': json.dumps(actions) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index f0fa0bd33c..c79066f774 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError, Codes from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cached +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks class RegistrationStore(SQLBaseStore): @@ -73,7 +73,8 @@ class RegistrationStore(SQLBaseStore): ) @defer.inlineCallbacks - def register(self, user_id, token, password_hash, was_guest=False): + def register(self, user_id, token, password_hash, + was_guest=False, make_guest=False): """Attempts to register an account. Args: @@ -82,15 +83,18 @@ class RegistrationStore(SQLBaseStore): password_hash (str): Optional. The password hash for this user. was_guest (bool): Optional. Whether this is a guest account being upgraded to a non-guest account. + make_guest (boolean): True if the the new user should be guest, + false to add a regular user account. Raises: StoreError if the user_id could not be registered. """ yield self.runInteraction( "register", - self._register, user_id, token, password_hash, was_guest + self._register, user_id, token, password_hash, was_guest, make_guest ) + self.is_guest.invalidate((user_id,)) - def _register(self, txn, user_id, token, password_hash, was_guest): + def _register(self, txn, user_id, token, password_hash, was_guest, make_guest): now = int(self.clock.time()) next_id = self._access_tokens_id_gen.get_next_txn(txn) @@ -100,12 +104,14 @@ class RegistrationStore(SQLBaseStore): txn.execute("UPDATE users SET" " password_hash = ?," " upgrade_ts = ?" + " is_guest = ?" " WHERE name = ?", - [password_hash, now, user_id]) + [password_hash, now, make_guest, user_id]) else: - txn.execute("INSERT INTO users(name, password_hash, creation_ts) " - "VALUES (?,?,?)", - [user_id, password_hash, now]) + txn.execute("INSERT INTO users " + "(name, password_hash, creation_ts, is_guest) " + "VALUES (?,?,?,?)", + [user_id, password_hash, now, make_guest]) except self.database_engine.module.IntegrityError: raise StoreError( 400, "User ID already taken.", errcode=Codes.USER_IN_USE @@ -126,7 +132,7 @@ class RegistrationStore(SQLBaseStore): keyvalues={ "name": user_id, }, - retcols=["name", "password_hash"], + retcols=["name", "password_hash", "is_guest"], allow_none=True, ) @@ -136,7 +142,7 @@ class RegistrationStore(SQLBaseStore): """ def f(txn): sql = ( - "SELECT name, password_hash FROM users" + "SELECT name, password_hash, is_guest FROM users" " WHERE lower(name) = lower(?)" ) txn.execute(sql, (user_id,)) @@ -249,9 +255,21 @@ class RegistrationStore(SQLBaseStore): defer.returnValue(res if res else False) + @cachedInlineCallbacks() + def is_guest(self, user): + res = yield self._simple_select_one_onecol( + table="users", + keyvalues={"name": user.to_string()}, + retcol="is_guest", + allow_none=True, + desc="is_guest", + ) + + defer.returnValue(res if res else False) + def _query_for_auth(self, txn, token): sql = ( - "SELECT users.name, access_tokens.id as token_id" + "SELECT users.name, users.is_guest, access_tokens.id as token_id" " FROM users" " INNER JOIN access_tokens on users.name = access_tokens.user_id" " WHERE token = ?" diff --git a/synapse/storage/schema/delta/27/event_push_actions.sql b/synapse/storage/schema/delta/27/event_push_actions.sql deleted file mode 100644 index bdf6ae3f24..0000000000 --- a/synapse/storage/schema/delta/27/event_push_actions.sql +++ /dev/null @@ -1,26 +0,0 @@ -/* Copyright 2015 OpenMarket Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -CREATE TABLE IF NOT EXISTS event_push_actions( - room_id TEXT NOT NULL, - event_id TEXT NOT NULL, - user_id TEXT NOT NULL, - profile_tag VARCHAR(32), - actions TEXT NOT NULL, - CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag) -); - - -CREATE INDEX event_push_actions_room_id_event_id_user_id_profile_tag on event_push_actions(room_id, event_id, user_id, profile_tag); diff --git a/synapse/storage/schema/delta/28/event_push_actions.sql b/synapse/storage/schema/delta/28/event_push_actions.sql new file mode 100644 index 0000000000..bdf6ae3f24 --- /dev/null +++ b/synapse/storage/schema/delta/28/event_push_actions.sql @@ -0,0 +1,26 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS event_push_actions( + room_id TEXT NOT NULL, + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + profile_tag VARCHAR(32), + actions TEXT NOT NULL, + CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag) +); + + +CREATE INDEX event_push_actions_room_id_event_id_user_id_profile_tag on event_push_actions(room_id, event_id, user_id, profile_tag); -- cgit 1.4.1 From 992928304f3c95f87a3297799965159d295432ea Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 6 Jan 2016 11:58:46 +0000 Subject: Delete notifications for redacted events --- synapse/push/action_generator.py | 7 +++++++ synapse/storage/event_push_actions.py | 12 ++++++++++++ 2 files changed, 19 insertions(+) (limited to 'synapse/push') diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index bcd40798f9..4cf94f6c61 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -19,6 +19,8 @@ import bulk_push_rule_evaluator import logging +from synapse.api.constants import EventTypes + logger = logging.getLogger(__name__) @@ -34,6 +36,11 @@ class ActionGenerator: @defer.inlineCallbacks def handle_push_actions_for_event(self, event, handler): + if event.type == EventTypes.Redaction and event.redacts is not None: + yield self.store.remove_push_actions_for_event_id( + event.room_id, event.redacts + ) + bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id( event.room_id, self.store ) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 0634af6b62..5b44431ab9 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -93,6 +93,18 @@ class EventPushActionsStore(SQLBaseStore): ) defer.returnValue(ret) + @defer.inlineCallbacks + def remove_push_actions_for_event_id(self, room_id, event_id): + def f(txn): + txn.execute( + "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?", + (room_id, event_id) + ) + yield self.runInteraction( + "remove_push_actions_for_event_id", + f + ) + class EventPushActionsTable(object): table_name = "event_push_actions" -- cgit 1.4.1 From 823b679232ed030ba9a2f609d11074c6b3111be2 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 7 Jan 2016 10:02:47 +0000 Subject: more commas --- synapse/push/bulk_push_rule_evaluator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/push') diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 63d65b4465..ce244fa959 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -42,7 +42,7 @@ def evaluator_for_room_id(room_id, store): uid: baserules.list_with_base_rules( [decode_rule_json(rule_list) for rule_list in rules_by_user[uid]] if uid in rules_by_user else [], - UserID.from_string(uid) + UserID.from_string(uid), ) for uid in users } -- cgit 1.4.1