diff options
-rw-r--r-- | problems_with_this_branch | 5 | ||||
-rw-r--r-- | synapse/handlers/_base.py | 8 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 10 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 52 | ||||
-rw-r--r-- | synapse/push/__init__.py | 18 | ||||
-rw-r--r-- | synapse/push/action_generator.py | 55 | ||||
-rw-r--r-- | synapse/push/push_rule_evaluator.py | 9 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/sync.py | 1 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 2 | ||||
-rw-r--r-- | synapse/storage/event_actions.py | 93 | ||||
-rw-r--r-- | synapse/storage/registration.py | 12 | ||||
-rw-r--r-- | synapse/storage/schema/delta/27/event_actions.sql | 26 |
12 files changed, 273 insertions, 18 deletions
diff --git a/problems_with_this_branch b/problems_with_this_branch new file mode 100644 index 0000000000..9c5df55212 --- /dev/null +++ b/problems_with_this_branch @@ -0,0 +1,5 @@ + * processing push by bluntly running the rules for each user + * processing push twice because the pushers still run the rules themselves rather than pulling out of the event_actions table + * consequently converting events back & forth between objects & dicts when processing + * fetching all the current ephemeral events for each room on an incremental sync to get the user's read receipt + * doing 2 more db queries per room to get the unread notif count diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 5fd20285d2..24c4c62698 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -19,9 +19,12 @@ from synapse.api.errors import LimitExceededError, SynapseError, AuthError from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.api.constants import Membership, EventTypes from synapse.types import UserID, RoomAlias +from synapse.push.action_generator import ActionGenerator from synapse.util.logcontext import PreserveLoggingContext +from synapse.events.utils import serialize_event + import logging @@ -264,6 +267,11 @@ class BaseHandler(object): event, context=context ) + action_generator = ActionGenerator(self.hs, self.store) + yield action_generator.handle_event(serialize_event( + event, self.clock.time_msec() + )) + destinations = set(extra_destinations) for k, s in context.current_state.items(): try: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 28f2ff68d6..0b1221deb5 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -32,10 +32,12 @@ from synapse.crypto.event_signing import ( ) from synapse.types import UserID -from synapse.events.utils import prune_event +from synapse.events.utils import prune_event, serialize_event from synapse.util.retryutils import NotRetryingDestination +from synapse.push.action_generator import ActionGenerator + from twisted.internet import defer import itertools @@ -242,6 +244,12 @@ class FederationHandler(BaseHandler): user = UserID.from_string(event.state_key) yield user_joined_room(self.distributor, user, event.room_id) + if not backfilled and not event.internal_metadata.is_outlier(): + action_generator = ActionGenerator(self.hs, self.store) + yield action_generator.handle_event(serialize_event( + event, self.clock.time_msec()) + ) + @defer.inlineCallbacks def _filter_events_for_server(self, server_name, room_id, events): event_to_state = yield self.store.get_state_for_events( diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 7088c20cb4..4cbb43a31b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -53,6 +53,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ "state", # dict[(str, str), FrozenEvent] "ephemeral", "account_data", + "unread_notification_count", ])): __slots__ = [] @@ -65,6 +66,8 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ or self.state or self.ephemeral or self.account_data + # nb the notification count does not, er, count: if there's nothing + # else in the result, we don't need to send it. ) @@ -162,6 +165,18 @@ class SyncHandler(BaseHandler): else: return self.incremental_sync_with_gap(sync_config, since_token) + def last_read_event_id_for_room_and_user(self, room_id, user_id, ephemeral_by_room): + if room_id not in ephemeral_by_room: + return None + for e in ephemeral_by_room[room_id]: + if e['type'] != 'm.receipt': + continue + for receipt_event_id, val in e['content'].items(): + if 'm.read' in val: + if user_id in val['m.read']: + return receipt_event_id + return None + @defer.inlineCallbacks def full_state_sync(self, sync_config, timeline_since_token): """Get a sync for a client which is starting without any state. @@ -273,6 +288,13 @@ class SyncHandler(BaseHandler): room_id, sync_config, now_token, since_token=timeline_since_token ) + notifs = yield self.unread_notifs_for_room_id( + room_id, sync_config, ephemeral_by_room + ) + notif_count = None + if notifs is not None: + notif_count = len(notifs) + current_state = yield self.get_state_at(room_id, now_token) defer.returnValue(JoinedSyncResult( @@ -283,6 +305,7 @@ class SyncHandler(BaseHandler): account_data=self.account_data_for_room( room_id, tags_by_room, account_data_by_room ), + unread_notification_count=notif_count )) def account_data_for_user(self, account_data): @@ -424,6 +447,10 @@ class SyncHandler(BaseHandler): ) now_token = now_token.copy_and_replace("presence_key", presence_key) + _, all_ephemeral_by_room = yield self.ephemeral_by_room( + sync_config, now_token + ) + now_token, ephemeral_by_room = yield self.ephemeral_by_room( sync_config, now_token, since_token ) @@ -497,6 +524,13 @@ class SyncHandler(BaseHandler): else: prev_batch = now_token + notifs = yield self.unread_notifs_for_room_id( + room_id, sync_config, all_ephemeral_by_room + ) + notif_count = None + if notifs is not None: + notif_count = len(notifs) + just_joined = yield self.check_joined_room(sync_config, state) if just_joined: logger.debug("User has just joined %s: needs full state", @@ -517,6 +551,7 @@ class SyncHandler(BaseHandler): account_data=self.account_data_for_room( room_id, tags_by_room, account_data_by_room ), + unread_notification_count=notif_count ) logger.debug("Result for room %s: %r", room_id, room_sync) @@ -787,3 +822,20 @@ class SyncHandler(BaseHandler): if join_event.content["membership"] == Membership.JOIN: return True return False + + @defer.inlineCallbacks + def unread_notifs_for_room_id(self, room_id, sync_config, ephemeral_by_room): + last_unread_event_id = self.last_read_event_id_for_room_and_user( + room_id, sync_config.user.to_string(), ephemeral_by_room + ) + + notifs = [] + if last_unread_event_id: + notifs = yield self.store.get_unread_event_actions_by_room_for_user( + room_id, sync_config.user.to_string(), last_unread_event_id + ) + else: + # There is no new information in this period, so your notification + # count is whatever it was last time. + defer.returnValue(None) + defer.returnValue(notifs) diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index e7c964bcd2..d5928c1d29 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -157,21 +157,7 @@ class Pusher(object): actions = yield rule_evaluator.actions_for_event(single_event) tweaks = rule_evaluator.tweaks_for_actions(actions) - if len(actions) == 0: - logger.warn("Empty actions! Using default action.") - actions = Pusher.DEFAULT_ACTIONS - - if 'notify' not in actions and 'dont_notify' not in actions: - logger.warn("Neither notify nor dont_notify in actions: adding default") - actions.extend(Pusher.DEFAULT_ACTIONS) - - if 'dont_notify' in actions: - logger.debug( - "%s for %s: dont_notify", - single_event['event_id'], self.user_name - ) - processed = True - else: + if 'notify' in actions: rejected = yield self.dispatch_push(single_event, tweaks) self.has_unread = True if isinstance(rejected, list) or isinstance(rejected, tuple): @@ -192,6 +178,8 @@ class Pusher(object): yield self.hs.get_pusherpool().remove_pusher( self.app_id, pk, self.user_name ) + else: + processed = True if not self.alive: return diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py new file mode 100644 index 0000000000..6e107ca792 --- /dev/null +++ b/synapse/push/action_generator.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.types import UserID + +import push_rule_evaluator + +import logging + +logger = logging.getLogger(__name__) + + +class ActionGenerator: + def __init__(self, hs, store): + self.hs = hs + self.store = store + # really we want to get all user ids and all profile tags too, + # since we want the actions for each profile tag for every user and + # also actions for a client with no profile tag for each user. + # Currently the event stream doesn't support profile tags on an + # event stream, so we just run the rules for a client with no profile + # tag (ie. we just need all the users). + + @defer.inlineCallbacks + def handle_event(self, event): + users = yield self.store.get_users_in_room(event['room_id']) + + for uid in users: + if not self.hs.is_mine(UserID.from_string(uid)): + continue + + evaluator = yield push_rule_evaluator.\ + evaluator_for_user_name_and_profile_tag( + uid, None, event['room_id'], self.store + ) + actions = yield evaluator.actions_for_event(event) + logger.info("actions for user %s: %s", uid, actions) + if len(actions): + self.store.set_actions_for_event( + event, uid, None, actions + ) diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 92c7fd048f..420476fd0b 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -43,7 +43,7 @@ def evaluator_for_user_name_and_profile_tag(user_name, profile_tag, room_id, sto class PushRuleEvaluator: - DEFAULT_ACTIONS = ['dont_notify'] + DEFAULT_ACTIONS = [] INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$") def __init__(self, user_name, profile_tag, raw_rules, enabled_map, room_id, @@ -85,7 +85,7 @@ class PushRuleEvaluator: """ if ev['user_id'] == self.user_name: # let's assume you probably know about messages you sent yourself - defer.returnValue(['dont_notify']) + defer.returnValue([]) room_id = ev['room_id'] @@ -131,6 +131,11 @@ class PushRuleEvaluator: "%s matches for user %s, event %s", r['rule_id'], self.user_name, ev['event_id'] ) + + # filter out dont_notify as we treat an empty actions list + # as dont_notify, and this doesn't take up a row in our database + actions = [x for x in actions if x != 'dont_notify'] + defer.returnValue(actions) logger.info( diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 697df03dda..93e607f9ec 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -303,6 +303,7 @@ class SyncRestServlet(RestServlet): }, "state": {"events": serialized_state}, "account_data": {"events": account_data}, + "unread_notification_count": room.unread_notification_count } if joined: diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index c46b653f11..a112dd237f 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -33,6 +33,7 @@ from .pusher import PusherStore from .push_rule import PushRuleStore from .media_repository import MediaRepositoryStore from .rejections import RejectionsStore +from .event_actions import EventActionsStore from .state import StateStore from .signatures import SignatureStore @@ -75,6 +76,7 @@ class DataStore(RoomMemberStore, RoomStore, SearchStore, TagsStore, AccountDataStore, + EventActionsStore ): def __init__(self, hs): diff --git a/synapse/storage/event_actions.py b/synapse/storage/event_actions.py new file mode 100644 index 0000000000..fbd0a42279 --- /dev/null +++ b/synapse/storage/event_actions.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore +from twisted.internet import defer + +import logging +import simplejson as json + +logger = logging.getLogger(__name__) + + +class EventActionsStore(SQLBaseStore): + @defer.inlineCallbacks + def set_actions_for_event(self, event, user_id, profile_tag, actions): + actionsJson = json.dumps(actions) + + ret = yield self.runInteraction( + "_set_actions_for_event", + self._simple_upsert_txn, + EventActionsTable.table_name, + { + 'room_id': event['room_id'], + 'event_id': event['event_id'], + 'user_id': user_id, + 'profile_tag': profile_tag + }, + {'actions': actionsJson} + ) + defer.returnValue(ret) + + @defer.inlineCallbacks + def get_unread_event_actions_by_room_for_user( + self, room_id, user_id, last_read_event_id + ): + def _get_unread_event_actions_by_room(txn): + sql = ( + "SELECT stream_ordering, topological_ordering" + " FROM events" + " WHERE room_id = ? AND event_id = ?" + ) + txn.execute( + sql, (room_id, last_read_event_id) + ) + results = txn.fetchall() + if len(results) == 0: + return [] + + stream_ordering = results[0][0] + topological_ordering = results[0][1] + + sql = ( + "SELECT ea.event_id, ea.actions" + " FROM event_actions ea, events e" + " WHERE ea.room_id = e.room_id" + " AND ea.event_id = e.event_id" + " AND ea.user_id = ?" + " AND ea.room_id = ?" + " AND (" + " e.topological_ordering > ?" + " OR (e.topological_ordering == ? AND e.stream_ordering > ?)" + ")" + ) + txn.execute(sql, ( + user_id, room_id, + topological_ordering, topological_ordering, stream_ordering + ) + ) + return [ + {"event_id": row[0], "actions": row[1]} for row in txn.fetchall() + ] + + ret = yield self.runInteraction( + "get_unread_event_actions_by_room", + _get_unread_event_actions_by_room + ) + defer.returnValue(ret) + + +class EventActionsTable(object): + table_name = "event_actions" diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 09a05b08ef..4676f225b9 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -292,6 +292,18 @@ class RegistrationStore(SQLBaseStore): defer.returnValue(None) @defer.inlineCallbacks + def get_all_user_ids(self): + """Returns all user ids registered on this homeserver""" + return self.runInteraction( + "get_all_user_ids", + self._get_all_user_ids_txn + ) + + def _get_all_user_ids_txn(self, txn): + txn.execute("SELECT name from users") + return [r[0] for r in txn.fetchall()] + + @defer.inlineCallbacks def count_all_users(self): """Counts all users registered on the homeserver.""" def _count_users(txn): diff --git a/synapse/storage/schema/delta/27/event_actions.sql b/synapse/storage/schema/delta/27/event_actions.sql new file mode 100644 index 0000000000..bbdaee990e --- /dev/null +++ b/synapse/storage/schema/delta/27/event_actions.sql @@ -0,0 +1,26 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS event_actions( + room_id TEXT NOT NULL, + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + profile_tag VARCHAR(32), + actions TEXT NOT NULL, + CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag) +); + + +CREATE INDEX event_actions_room_id_event_id_user_id_profile_tag on event_actions(room_id, event_id, user_id, profile_tag); |