diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 296c4447ec..edf45dc599 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -13,333 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
-
-from synapse.streams.config import PaginationConfig
-from synapse.types import StreamToken
-from synapse.util.logcontext import LoggingContext
-from synapse.util.metrics import Measure
-
-import synapse.util.async
-from .push_rule_evaluator import evaluator_for_user_id
-
-import logging
-import random
-
-logger = logging.getLogger(__name__)
-
-
-_NEXT_ID = 1
-
-
-def _get_next_id():
- global _NEXT_ID
- _id = _NEXT_ID
- _NEXT_ID += 1
- return _id
-
-
-# Pushers could now be moved to pull out of the event_push_actions table instead
-# of listening on the event stream: this would avoid them having to run the
-# rules again.
-class Pusher(object):
- INITIAL_BACKOFF = 1000
- MAX_BACKOFF = 60 * 60 * 1000
- GIVE_UP_AFTER = 24 * 60 * 60 * 1000
-
- def __init__(self, _hs, user_id, app_id,
- app_display_name, device_display_name, pushkey, pushkey_ts,
- data, last_token, last_success, failing_since):
- self.hs = _hs
- self.evStreamHandler = self.hs.get_handlers().event_stream_handler
- self.store = self.hs.get_datastore()
- self.clock = self.hs.get_clock()
- self.user_id = user_id
- self.app_id = app_id
- self.app_display_name = app_display_name
- self.device_display_name = device_display_name
- self.pushkey = pushkey
- self.pushkey_ts = pushkey_ts
- self.data = data
- self.last_token = last_token
- self.last_success = last_success # not actually used
- self.backoff_delay = Pusher.INITIAL_BACKOFF
- self.failing_since = failing_since
- self.alive = True
- self.badge = None
-
- self.name = "Pusher-%d" % (_get_next_id(),)
-
- # The last value of last_active_time that we saw
- self.last_last_active_time = 0
- self.has_unread = True
-
- @defer.inlineCallbacks
- def get_context_for_event(self, ev):
- name_aliases = yield self.store.get_room_name_and_aliases(
- ev['room_id']
- )
-
- ctx = {'aliases': name_aliases[1]}
- if name_aliases[0] is not None:
- ctx['name'] = name_aliases[0]
-
- their_member_events_for_room = yield self.store.get_current_state(
- room_id=ev['room_id'],
- event_type='m.room.member',
- state_key=ev['user_id']
- )
- for mev in their_member_events_for_room:
- if mev.content['membership'] == 'join' and 'displayname' in mev.content:
- dn = mev.content['displayname']
- if dn is not None:
- ctx['sender_display_name'] = dn
-
- defer.returnValue(ctx)
-
- @defer.inlineCallbacks
- def start(self):
- with LoggingContext(self.name):
- if not self.last_token:
- # First-time setup: get a token to start from (we can't
- # just start from no token, ie. 'now'
- # because we need the result to be reproduceable in case
- # we fail to dispatch the push)
- config = PaginationConfig(from_token=None, limit='1')
- chunk = yield self.evStreamHandler.get_stream(
- self.user_id, config, timeout=0, affect_presence=False
- )
- self.last_token = chunk['end']
- yield self.store.update_pusher_last_token(
- self.app_id, self.pushkey, self.user_id, self.last_token
- )
- logger.info("New pusher %s for user %s starting from token %s",
- self.pushkey, self.user_id, self.last_token)
-
- else:
- logger.info(
- "Old pusher %s for user %s starting",
- self.pushkey, self.user_id,
- )
-
- wait = 0
- while self.alive:
- try:
- if wait > 0:
- yield synapse.util.async.sleep(wait)
- with Measure(self.clock, "push"):
- yield self.get_and_dispatch()
- wait = 0
- except:
- if wait == 0:
- wait = 1
- else:
- wait = min(wait * 2, 1800)
- logger.exception(
- "Exception in pusher loop for pushkey %s. Pausing for %ds",
- self.pushkey, wait
- )
-
- @defer.inlineCallbacks
- def get_and_dispatch(self):
- from_tok = StreamToken.from_string(self.last_token)
- config = PaginationConfig(from_token=from_tok, limit='1')
- timeout = (300 + random.randint(-60, 60)) * 1000
- chunk = yield self.evStreamHandler.get_stream(
- self.user_id, config, timeout=timeout, affect_presence=False,
- only_keys=("room", "receipt",),
- )
-
- # limiting to 1 may get 1 event plus 1 presence event, so
- # pick out the actual event
- single_event = None
- read_receipt = None
- for c in chunk['chunk']:
- if 'event_id' in c: # Hmmm...
- single_event = c
- elif c['type'] == 'm.receipt':
- read_receipt = c
-
- have_updated_badge = False
- if read_receipt:
- for receipt_part in read_receipt['content'].values():
- if 'm.read' in receipt_part:
- if self.user_id in receipt_part['m.read'].keys():
- have_updated_badge = True
-
- if not single_event:
- if have_updated_badge:
- yield self.update_badge()
- self.last_token = chunk['end']
- yield self.store.update_pusher_last_token(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.last_token
- )
- return
-
- if not self.alive:
- return
-
- processed = False
-
- rule_evaluator = yield \
- evaluator_for_user_id(
- self.user_id, single_event['room_id'], self.store
- )
-
- actions = yield rule_evaluator.actions_for_event(single_event)
- tweaks = rule_evaluator.tweaks_for_actions(actions)
-
- if 'notify' in actions:
- self.badge = yield self._get_badge_count()
- rejected = yield self.dispatch_push(single_event, tweaks, self.badge)
- self.has_unread = True
- if isinstance(rejected, list) or isinstance(rejected, tuple):
- processed = True
- for pk in rejected:
- if pk != self.pushkey:
- # for sanity, we only remove the pushkey if it
- # was the one we actually sent...
- logger.warn(
- ("Ignoring rejected pushkey %s because we"
- " didn't send it"), pk
- )
- else:
- logger.info(
- "Pushkey %s was rejected: removing",
- pk
- )
- yield self.hs.get_pusherpool().remove_pusher(
- self.app_id, pk, self.user_id
- )
- else:
- if have_updated_badge:
- yield self.update_badge()
- processed = True
-
- if not self.alive:
- return
-
- if processed:
- self.backoff_delay = Pusher.INITIAL_BACKOFF
- self.last_token = chunk['end']
- yield self.store.update_pusher_last_token_and_success(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.last_token,
- self.clock.time_msec()
- )
- if self.failing_since:
- self.failing_since = None
- yield self.store.update_pusher_failing_since(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.failing_since)
- else:
- if not self.failing_since:
- self.failing_since = self.clock.time_msec()
- yield self.store.update_pusher_failing_since(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.failing_since
- )
-
- if (self.failing_since and
- self.failing_since <
- self.clock.time_msec() - Pusher.GIVE_UP_AFTER):
- # we really only give up so that if the URL gets
- # fixed, we don't suddenly deliver a load
- # of old notifications.
- logger.warn("Giving up on a notification to user %s, "
- "pushkey %s",
- self.user_id, self.pushkey)
- self.backoff_delay = Pusher.INITIAL_BACKOFF
- self.last_token = chunk['end']
- yield self.store.update_pusher_last_token(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.last_token
- )
-
- self.failing_since = None
- yield self.store.update_pusher_failing_since(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.failing_since
- )
- else:
- logger.warn("Failed to dispatch push for user %s "
- "(failing for %dms)."
- "Trying again in %dms",
- self.user_id,
- self.clock.time_msec() - self.failing_since,
- self.backoff_delay)
- yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
- self.backoff_delay *= 2
- if self.backoff_delay > Pusher.MAX_BACKOFF:
- self.backoff_delay = Pusher.MAX_BACKOFF
-
- def stop(self):
- self.alive = False
-
- def dispatch_push(self, p, tweaks, badge):
- """
- Overridden by implementing classes to actually deliver the notification
- Args:
- p: The event to notify for as a single event from the event stream
- Returns: If the notification was delivered, an array containing any
- pushkeys that were rejected by the push gateway.
- False if the notification could not be delivered (ie.
- should be retried).
- """
- pass
-
- @defer.inlineCallbacks
- def update_badge(self):
- new_badge = yield self._get_badge_count()
- if self.badge != new_badge:
- self.badge = new_badge
- yield self.send_badge(self.badge)
-
- def send_badge(self, badge):
- """
- Overridden by implementing classes to send an updated badge count
- """
- pass
-
- @defer.inlineCallbacks
- def _get_badge_count(self):
- invites, joins = yield defer.gatherResults([
- self.store.get_invited_rooms_for_user(self.user_id),
- self.store.get_rooms_for_user(self.user_id),
- ], consumeErrors=True)
-
- my_receipts_by_room = yield self.store.get_receipts_for_user(
- self.user_id,
- "m.read",
- )
-
- badge = len(invites)
-
- for r in joins:
- if r.room_id in my_receipts_by_room:
- last_unread_event_id = my_receipts_by_room[r.room_id]
-
- notifs = yield (
- self.store.get_unread_event_push_actions_by_room_for_user(
- r.room_id, self.user_id, last_unread_event_id
- )
- )
- badge += notifs["notify_count"]
- defer.returnValue(badge)
-
class PusherConfigException(Exception):
def __init__(self, msg):
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 0d5450bc01..cc030a57a0 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -168,7 +168,7 @@ class HttpPusher(object):
if 'notify' not in push_action['actions']:
defer.returnValue(True)
- tweaks = push_rule_evaluator.PushRuleEvaluator.tweaks_for_actions(push_action['actions'])
+ tweaks = push_rule_evaluator.tweaks_for_actions(push_action['actions'])
badge = yield push_tools.get_badge_count(self.hs, self.user_id)
event = yield self.store.get_event(push_action['event_id'], allow_none=True)
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index c3c2877629..4db76f18bd 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -13,12 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
-
-from .baserules import list_with_base_rules
-
import logging
-import simplejson as json
import re
from synapse.types import UserID
@@ -32,22 +27,6 @@ IS_GLOB = re.compile(r'[\?\*\[\]]')
INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$")
-@defer.inlineCallbacks
-def evaluator_for_user_id(user_id, room_id, store):
- rawrules = yield store.get_push_rules_for_user(user_id)
- enabled_map = yield store.get_push_rules_enabled_for_user(user_id)
- our_member_event = yield store.get_current_state(
- room_id=room_id,
- event_type='m.room.member',
- state_key=user_id,
- )
-
- defer.returnValue(PushRuleEvaluator(
- user_id, rawrules, enabled_map,
- room_id, our_member_event, store
- ))
-
-
def _room_member_count(ev, condition, room_member_count):
if 'is' not in condition:
return False
@@ -74,111 +53,14 @@ def _room_member_count(ev, condition, room_member_count):
return False
-class PushRuleEvaluator:
- DEFAULT_ACTIONS = []
-
- def __init__(self, user_id, raw_rules, enabled_map, room_id,
- our_member_event, store):
- self.user_id = user_id
- self.room_id = room_id
- self.our_member_event = our_member_event
- self.store = store
-
- rules = []
- for raw_rule in raw_rules:
- rule = dict(raw_rule)
- rule['conditions'] = json.loads(raw_rule['conditions'])
- rule['actions'] = json.loads(raw_rule['actions'])
- rules.append(rule)
-
- self.rules = list_with_base_rules(rules)
-
- self.enabled_map = enabled_map
-
- @staticmethod
- def tweaks_for_actions(actions):
- tweaks = {}
- for a in actions:
- if not isinstance(a, dict):
- continue
- if 'set_tweak' in a and 'value' in a:
- tweaks[a['set_tweak']] = a['value']
- return tweaks
-
- @defer.inlineCallbacks
- def actions_for_event(self, ev):
- """
- This should take into account notification settings that the user
- has configured both globally and per-room when we have the ability
- to do such things.
- """
- if ev['user_id'] == self.user_id:
- # let's assume you probably know about messages you sent yourself
- defer.returnValue([])
-
- room_id = ev['room_id']
-
- # get *our* member event for display name matching
- my_display_name = None
-
- if self.our_member_event:
- my_display_name = self.our_member_event[0].content.get("displayname")
-
- room_members = yield self.store.get_users_in_room(room_id)
- room_member_count = len(room_members)
-
- evaluator = PushRuleEvaluatorForEvent(ev, room_member_count)
-
- for r in self.rules:
- enabled = self.enabled_map.get(r['rule_id'], None)
- if enabled is not None and not enabled:
- continue
- elif enabled is None and not r.get("enabled", True):
- # if no override, check enabled on the rule itself
- # (may have come from a base rule)
- continue
-
- conditions = r['conditions']
- actions = r['actions']
-
- # ignore rules with no actions (we have an explict 'dont_notify')
- if len(actions) == 0:
- logger.warn(
- "Ignoring rule id %s with no actions for user %s",
- r['rule_id'], self.user_id
- )
- continue
-
- matches = True
- for c in conditions:
- matches = evaluator.matches(
- c, self.user_id, my_display_name
- )
- if not matches:
- break
-
- logger.debug(
- "Rule %s %s",
- r['rule_id'], "matches" if matches else "doesn't match"
- )
-
- if matches:
- logger.debug(
- "%s matches for user %s, event %s",
- r['rule_id'], self.user_id, ev['event_id']
- )
-
- # filter out dont_notify as we treat an empty actions list
- # as dont_notify, and this doesn't take up a row in our database
- actions = [x for x in actions if x != 'dont_notify']
-
- defer.returnValue(actions)
-
- logger.debug(
- "No rules match for user %s, event %s",
- self.user_id, ev['event_id']
- )
- defer.returnValue(PushRuleEvaluator.DEFAULT_ACTIONS)
+def tweaks_for_actions(actions):
+ tweaks = {}
+ for a in actions:
+ if not isinstance(a, dict):
+ continue
+ if 'set_tweak' in a and 'value' in a:
+ tweaks[a['set_tweak']] = a['value']
+ return tweaks
class PushRuleEvaluatorForEvent(object):
|