summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
authorDavid Baker <dave@matrix.org>2016-04-07 16:31:38 +0100
committerDavid Baker <dave@matrix.org>2016-04-07 16:31:38 +0100
commitd549fdfa22f6927479d2a185f7420cadbfbf5607 (patch)
tree00f92bd8475680b131c886986788a26b24919aa8 /synapse/push
parentSend badge count pushes. (diff)
downloadsynapse-d549fdfa22f6927479d2a185f7420cadbfbf5607.tar.xz
Remove code that's now been obsoleted or moved elsewhere
Diffstat (limited to 'synapse/push')
-rw-r--r--synapse/push/__init__.py327
-rw-r--r--synapse/push/httppusher.py2
-rw-r--r--synapse/push/push_rule_evaluator.py134
3 files changed, 9 insertions, 454 deletions
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 296c4447ec..edf45dc599 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -13,333 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
-
-from synapse.streams.config import PaginationConfig
-from synapse.types import StreamToken
-from synapse.util.logcontext import LoggingContext
-from synapse.util.metrics import Measure
-
-import synapse.util.async
-from .push_rule_evaluator import evaluator_for_user_id
-
-import logging
-import random
-
-logger = logging.getLogger(__name__)
-
-
-_NEXT_ID = 1
-
-
-def _get_next_id():
-    global _NEXT_ID
-    _id = _NEXT_ID
-    _NEXT_ID += 1
-    return _id
-
-
-# Pushers could now be moved to pull out of the event_push_actions table instead
-# of listening on the event stream: this would avoid them having to run the
-# rules again.
-class Pusher(object):
-    INITIAL_BACKOFF = 1000
-    MAX_BACKOFF = 60 * 60 * 1000
-    GIVE_UP_AFTER = 24 * 60 * 60 * 1000
-
-    def __init__(self, _hs, user_id, app_id,
-                 app_display_name, device_display_name, pushkey, pushkey_ts,
-                 data, last_token, last_success, failing_since):
-        self.hs = _hs
-        self.evStreamHandler = self.hs.get_handlers().event_stream_handler
-        self.store = self.hs.get_datastore()
-        self.clock = self.hs.get_clock()
-        self.user_id = user_id
-        self.app_id = app_id
-        self.app_display_name = app_display_name
-        self.device_display_name = device_display_name
-        self.pushkey = pushkey
-        self.pushkey_ts = pushkey_ts
-        self.data = data
-        self.last_token = last_token
-        self.last_success = last_success  # not actually used
-        self.backoff_delay = Pusher.INITIAL_BACKOFF
-        self.failing_since = failing_since
-        self.alive = True
-        self.badge = None
-
-        self.name = "Pusher-%d" % (_get_next_id(),)
-
-        # The last value of last_active_time that we saw
-        self.last_last_active_time = 0
-        self.has_unread = True
-
-    @defer.inlineCallbacks
-    def get_context_for_event(self, ev):
-        name_aliases = yield self.store.get_room_name_and_aliases(
-            ev['room_id']
-        )
-
-        ctx = {'aliases': name_aliases[1]}
-        if name_aliases[0] is not None:
-            ctx['name'] = name_aliases[0]
-
-        their_member_events_for_room = yield self.store.get_current_state(
-            room_id=ev['room_id'],
-            event_type='m.room.member',
-            state_key=ev['user_id']
-        )
-        for mev in their_member_events_for_room:
-            if mev.content['membership'] == 'join' and 'displayname' in mev.content:
-                dn = mev.content['displayname']
-                if dn is not None:
-                    ctx['sender_display_name'] = dn
-
-        defer.returnValue(ctx)
-
-    @defer.inlineCallbacks
-    def start(self):
-        with LoggingContext(self.name):
-            if not self.last_token:
-                # First-time setup: get a token to start from (we can't
-                # just start from no token, ie. 'now'
-                # because we need the result to be reproduceable in case
-                # we fail to dispatch the push)
-                config = PaginationConfig(from_token=None, limit='1')
-                chunk = yield self.evStreamHandler.get_stream(
-                    self.user_id, config, timeout=0, affect_presence=False
-                )
-                self.last_token = chunk['end']
-                yield self.store.update_pusher_last_token(
-                    self.app_id, self.pushkey, self.user_id, self.last_token
-                )
-                logger.info("New pusher %s for user %s starting from token %s",
-                            self.pushkey, self.user_id, self.last_token)
-
-            else:
-                logger.info(
-                    "Old pusher %s for user %s starting",
-                    self.pushkey, self.user_id,
-                )
-
-            wait = 0
-            while self.alive:
-                try:
-                    if wait > 0:
-                        yield synapse.util.async.sleep(wait)
-                    with Measure(self.clock, "push"):
-                        yield self.get_and_dispatch()
-                    wait = 0
-                except:
-                    if wait == 0:
-                        wait = 1
-                    else:
-                        wait = min(wait * 2, 1800)
-                    logger.exception(
-                        "Exception in pusher loop for pushkey %s. Pausing for %ds",
-                        self.pushkey, wait
-                    )
-
-    @defer.inlineCallbacks
-    def get_and_dispatch(self):
-        from_tok = StreamToken.from_string(self.last_token)
-        config = PaginationConfig(from_token=from_tok, limit='1')
-        timeout = (300 + random.randint(-60, 60)) * 1000
-        chunk = yield self.evStreamHandler.get_stream(
-            self.user_id, config, timeout=timeout, affect_presence=False,
-            only_keys=("room", "receipt",),
-        )
-
-        # limiting to 1 may get 1 event plus 1 presence event, so
-        # pick out the actual event
-        single_event = None
-        read_receipt = None
-        for c in chunk['chunk']:
-            if 'event_id' in c:  # Hmmm...
-                single_event = c
-            elif c['type'] == 'm.receipt':
-                read_receipt = c
-
-        have_updated_badge = False
-        if read_receipt:
-            for receipt_part in read_receipt['content'].values():
-                if 'm.read' in receipt_part:
-                    if self.user_id in receipt_part['m.read'].keys():
-                        have_updated_badge = True
-
-        if not single_event:
-            if have_updated_badge:
-                yield self.update_badge()
-            self.last_token = chunk['end']
-            yield self.store.update_pusher_last_token(
-                self.app_id,
-                self.pushkey,
-                self.user_id,
-                self.last_token
-            )
-            return
-
-        if not self.alive:
-            return
-
-        processed = False
-
-        rule_evaluator = yield \
-            evaluator_for_user_id(
-                self.user_id, single_event['room_id'], self.store
-            )
-
-        actions = yield rule_evaluator.actions_for_event(single_event)
-        tweaks = rule_evaluator.tweaks_for_actions(actions)
-
-        if 'notify' in actions:
-            self.badge = yield self._get_badge_count()
-            rejected = yield self.dispatch_push(single_event, tweaks, self.badge)
-            self.has_unread = True
-            if isinstance(rejected, list) or isinstance(rejected, tuple):
-                processed = True
-                for pk in rejected:
-                    if pk != self.pushkey:
-                        # for sanity, we only remove the pushkey if it
-                        # was the one we actually sent...
-                        logger.warn(
-                            ("Ignoring rejected pushkey %s because we"
-                             " didn't send it"), pk
-                        )
-                    else:
-                        logger.info(
-                            "Pushkey %s was rejected: removing",
-                            pk
-                        )
-                        yield self.hs.get_pusherpool().remove_pusher(
-                            self.app_id, pk, self.user_id
-                        )
-        else:
-            if have_updated_badge:
-                yield self.update_badge()
-            processed = True
-
-        if not self.alive:
-            return
-
-        if processed:
-            self.backoff_delay = Pusher.INITIAL_BACKOFF
-            self.last_token = chunk['end']
-            yield self.store.update_pusher_last_token_and_success(
-                self.app_id,
-                self.pushkey,
-                self.user_id,
-                self.last_token,
-                self.clock.time_msec()
-            )
-            if self.failing_since:
-                self.failing_since = None
-                yield self.store.update_pusher_failing_since(
-                    self.app_id,
-                    self.pushkey,
-                    self.user_id,
-                    self.failing_since)
-        else:
-            if not self.failing_since:
-                self.failing_since = self.clock.time_msec()
-                yield self.store.update_pusher_failing_since(
-                    self.app_id,
-                    self.pushkey,
-                    self.user_id,
-                    self.failing_since
-                )
-
-            if (self.failing_since and
-               self.failing_since <
-               self.clock.time_msec() - Pusher.GIVE_UP_AFTER):
-                # we really only give up so that if the URL gets
-                # fixed, we don't suddenly deliver a load
-                # of old notifications.
-                logger.warn("Giving up on a notification to user %s, "
-                            "pushkey %s",
-                            self.user_id, self.pushkey)
-                self.backoff_delay = Pusher.INITIAL_BACKOFF
-                self.last_token = chunk['end']
-                yield self.store.update_pusher_last_token(
-                    self.app_id,
-                    self.pushkey,
-                    self.user_id,
-                    self.last_token
-                )
-
-                self.failing_since = None
-                yield self.store.update_pusher_failing_since(
-                    self.app_id,
-                    self.pushkey,
-                    self.user_id,
-                    self.failing_since
-                )
-            else:
-                logger.warn("Failed to dispatch push for user %s "
-                            "(failing for %dms)."
-                            "Trying again in %dms",
-                            self.user_id,
-                            self.clock.time_msec() - self.failing_since,
-                            self.backoff_delay)
-                yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
-                self.backoff_delay *= 2
-                if self.backoff_delay > Pusher.MAX_BACKOFF:
-                    self.backoff_delay = Pusher.MAX_BACKOFF
-
-    def stop(self):
-        self.alive = False
-
-    def dispatch_push(self, p, tweaks, badge):
-        """
-        Overridden by implementing classes to actually deliver the notification
-        Args:
-            p: The event to notify for as a single event from the event stream
-        Returns: If the notification was delivered, an array containing any
-                 pushkeys that were rejected by the push gateway.
-                 False if the notification could not be delivered (ie.
-                 should be retried).
-        """
-        pass
-
-    @defer.inlineCallbacks
-    def update_badge(self):
-        new_badge = yield self._get_badge_count()
-        if self.badge != new_badge:
-            self.badge = new_badge
-            yield self.send_badge(self.badge)
-
-    def send_badge(self, badge):
-        """
-        Overridden by implementing classes to send an updated badge count
-        """
-        pass
-
-    @defer.inlineCallbacks
-    def _get_badge_count(self):
-        invites, joins = yield defer.gatherResults([
-            self.store.get_invited_rooms_for_user(self.user_id),
-            self.store.get_rooms_for_user(self.user_id),
-        ], consumeErrors=True)
-
-        my_receipts_by_room = yield self.store.get_receipts_for_user(
-            self.user_id,
-            "m.read",
-        )
-
-        badge = len(invites)
-
-        for r in joins:
-            if r.room_id in my_receipts_by_room:
-                last_unread_event_id = my_receipts_by_room[r.room_id]
-
-                notifs = yield (
-                    self.store.get_unread_event_push_actions_by_room_for_user(
-                        r.room_id, self.user_id, last_unread_event_id
-                    )
-                )
-                badge += notifs["notify_count"]
-        defer.returnValue(badge)
-
 
 class PusherConfigException(Exception):
     def __init__(self, msg):
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 0d5450bc01..cc030a57a0 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -168,7 +168,7 @@ class HttpPusher(object):
         if 'notify' not in push_action['actions']:
             defer.returnValue(True)
 
-        tweaks = push_rule_evaluator.PushRuleEvaluator.tweaks_for_actions(push_action['actions'])
+        tweaks = push_rule_evaluator.tweaks_for_actions(push_action['actions'])
         badge = yield push_tools.get_badge_count(self.hs, self.user_id)
 
         event = yield self.store.get_event(push_action['event_id'], allow_none=True)
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index c3c2877629..4db76f18bd 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -13,12 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
-
-from .baserules import list_with_base_rules
-
 import logging
-import simplejson as json
 import re
 
 from synapse.types import UserID
@@ -32,22 +27,6 @@ IS_GLOB = re.compile(r'[\?\*\[\]]')
 INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$")
 
 
-@defer.inlineCallbacks
-def evaluator_for_user_id(user_id, room_id, store):
-    rawrules = yield store.get_push_rules_for_user(user_id)
-    enabled_map = yield store.get_push_rules_enabled_for_user(user_id)
-    our_member_event = yield store.get_current_state(
-        room_id=room_id,
-        event_type='m.room.member',
-        state_key=user_id,
-    )
-
-    defer.returnValue(PushRuleEvaluator(
-        user_id, rawrules, enabled_map,
-        room_id, our_member_event, store
-    ))
-
-
 def _room_member_count(ev, condition, room_member_count):
     if 'is' not in condition:
         return False
@@ -74,111 +53,14 @@ def _room_member_count(ev, condition, room_member_count):
         return False
 
 
-class PushRuleEvaluator:
-    DEFAULT_ACTIONS = []
-
-    def __init__(self, user_id, raw_rules, enabled_map, room_id,
-                 our_member_event, store):
-        self.user_id = user_id
-        self.room_id = room_id
-        self.our_member_event = our_member_event
-        self.store = store
-
-        rules = []
-        for raw_rule in raw_rules:
-            rule = dict(raw_rule)
-            rule['conditions'] = json.loads(raw_rule['conditions'])
-            rule['actions'] = json.loads(raw_rule['actions'])
-            rules.append(rule)
-
-        self.rules = list_with_base_rules(rules)
-
-        self.enabled_map = enabled_map
-
-    @staticmethod
-    def tweaks_for_actions(actions):
-        tweaks = {}
-        for a in actions:
-            if not isinstance(a, dict):
-                continue
-            if 'set_tweak' in a and 'value' in a:
-                tweaks[a['set_tweak']] = a['value']
-        return tweaks
-
-    @defer.inlineCallbacks
-    def actions_for_event(self, ev):
-        """
-        This should take into account notification settings that the user
-        has configured both globally and per-room when we have the ability
-        to do such things.
-        """
-        if ev['user_id'] == self.user_id:
-            # let's assume you probably know about messages you sent yourself
-            defer.returnValue([])
-
-        room_id = ev['room_id']
-
-        # get *our* member event for display name matching
-        my_display_name = None
-
-        if self.our_member_event:
-            my_display_name = self.our_member_event[0].content.get("displayname")
-
-        room_members = yield self.store.get_users_in_room(room_id)
-        room_member_count = len(room_members)
-
-        evaluator = PushRuleEvaluatorForEvent(ev, room_member_count)
-
-        for r in self.rules:
-            enabled = self.enabled_map.get(r['rule_id'], None)
-            if enabled is not None and not enabled:
-                continue
-            elif enabled is None and not r.get("enabled", True):
-                # if no override, check enabled on the rule itself
-                # (may have come from a base rule)
-                continue
-
-            conditions = r['conditions']
-            actions = r['actions']
-
-            # ignore rules with no actions (we have an explict 'dont_notify')
-            if len(actions) == 0:
-                logger.warn(
-                    "Ignoring rule id %s with no actions for user %s",
-                    r['rule_id'], self.user_id
-                )
-                continue
-
-            matches = True
-            for c in conditions:
-                matches = evaluator.matches(
-                    c, self.user_id, my_display_name
-                )
-                if not matches:
-                    break
-
-            logger.debug(
-                "Rule %s %s",
-                r['rule_id'], "matches" if matches else "doesn't match"
-            )
-
-            if matches:
-                logger.debug(
-                    "%s matches for user %s, event %s",
-                    r['rule_id'], self.user_id, ev['event_id']
-                )
-
-                # filter out dont_notify as we treat an empty actions list
-                # as dont_notify, and this doesn't take up a row in our database
-                actions = [x for x in actions if x != 'dont_notify']
-
-                defer.returnValue(actions)
-
-        logger.debug(
-            "No rules match for user %s, event %s",
-            self.user_id, ev['event_id']
-        )
-        defer.returnValue(PushRuleEvaluator.DEFAULT_ACTIONS)
+def tweaks_for_actions(actions):
+    tweaks = {}
+    for a in actions:
+        if not isinstance(a, dict):
+            continue
+        if 'set_tweak' in a and 'value' in a:
+            tweaks[a['set_tweak']] = a['value']
+    return tweaks
 
 
 class PushRuleEvaluatorForEvent(object):