summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/federation.py19
-rw-r--r--synapse/handlers/message.py44
-rw-r--r--synapse/handlers/presence.py8
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py2
-rw-r--r--synapse/storage/push_rule.py22
5 files changed, 35 insertions, 60 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index dc90a5dde4..8a1038c44a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -832,11 +832,13 @@ class FederationHandler(BaseHandler):
 
         new_pdu = event
 
-        message_handler = self.hs.get_handlers().message_handler
-        destinations = yield message_handler.get_joined_hosts_for_room_from_state(
-            context
+        users_in_room = yield self.store.get_joined_users_from_context(event, context)
+
+        destinations = set(
+            get_domain_from_id(user_id) for user_id in users_in_room
+            if not self.hs.is_mine_id(user_id)
         )
-        destinations = set(destinations)
+
         destinations.discard(origin)
 
         logger.debug(
@@ -1055,11 +1057,12 @@ class FederationHandler(BaseHandler):
 
         new_pdu = event
 
-        message_handler = self.hs.get_handlers().message_handler
-        destinations = yield message_handler.get_joined_hosts_for_room_from_state(
-            context
+        users_in_room = yield self.store.get_joined_users_from_context(event, context)
+
+        destinations = set(
+            get_domain_from_id(user_id) for user_id in users_in_room
+            if not self.hs.is_mine_id(user_id)
         )
-        destinations = set(destinations)
         destinations.discard(origin)
 
         logger.debug(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 3577db0595..178209a209 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -30,7 +30,6 @@ from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLo
 from synapse.util.caches.snapshot_cache import SnapshotCache
 from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
 from synapse.util.metrics import measure_func
-from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
@@ -945,7 +944,12 @@ class MessageHandler(BaseHandler):
             event_stream_id, max_stream_id
         )
 
-        destinations = yield self.get_joined_hosts_for_room_from_state(context)
+        users_in_room = yield self.store.get_joined_users_from_context(event, context)
+
+        destinations = [
+            get_domain_from_id(user_id) for user_id in users_in_room
+            if not self.hs.is_mine_id(user_id)
+        ]
 
         @defer.inlineCallbacks
         def _notify():
@@ -963,39 +967,3 @@ class MessageHandler(BaseHandler):
         preserve_fn(federation_handler.handle_new_event)(
             event, destinations=destinations,
         )
-
-    def get_joined_hosts_for_room_from_state(self, context):
-        state_group = context.state_group
-        if not state_group:
-            # If state_group is None it means it has yet to be assigned a
-            # state group, i.e. we need to make sure that calls with a state_group
-            # of None don't hit previous cached calls with a None state_group.
-            # To do this we set the state_group to a new object as object() != object()
-            state_group = object()
-
-        return self._get_joined_hosts_for_room_from_state(
-            state_group, context.current_state_ids
-        )
-
-    @cachedInlineCallbacks(num_args=1, cache_context=True)
-    def _get_joined_hosts_for_room_from_state(self, state_group, current_state_ids,
-                                              cache_context):
-
-        # Don't bother getting state for people on the same HS
-        current_state = yield self.store.get_events([
-            e_id for key, e_id in current_state_ids.items()
-            if key[0] == EventTypes.Member and not self.hs.is_mine_id(key[1])
-        ])
-
-        destinations = set()
-        for e in current_state.itervalues():
-            try:
-                if e.type == EventTypes.Member:
-                    if e.content["membership"] == Membership.JOIN:
-                        destinations.add(get_domain_from_id(e.state_key))
-            except SynapseError:
-                logger.warn(
-                    "Failed to get destination from event %s", e.event_id
-                )
-
-        defer.returnValue(destinations)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index cf82a2336e..7ae05603f5 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -52,6 +52,8 @@ bump_active_time_counter = metrics.register_counter("bump_active_time")
 
 get_updates_counter = metrics.register_counter("get_updates", labels=["type"])
 
+notify_reason_counter = metrics.register_counter("notify_reason", labels=["reason"])
+
 
 # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
 # "currently_active"
@@ -940,26 +942,32 @@ def should_notify(old_state, new_state):
     """Decides if a presence state change should be sent to interested parties.
     """
     if old_state.status_msg != new_state.status_msg:
+        notify_reason_counter.inc("status_msg_change")
         return True
 
     if old_state.state == PresenceState.ONLINE:
         if new_state.state != PresenceState.ONLINE:
             # Always notify for online -> anything
+            notify_reason_counter.inc("online_to_not")
             return True
 
         if new_state.currently_active != old_state.currently_active:
+            notify_reason_counter.inc("current_active_change")
             return True
 
         if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
             # Only notify about last active bumps if we're not currently acive
             if not (old_state.currently_active and new_state.currently_active):
+                notify_reason_counter.inc("last_active_change")
                 return True
 
     elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
         # Always notify for a transition where last active gets bumped.
+        notify_reason_counter.inc("last_active_change")
         return True
 
     if old_state.state != new_state.state:
+        notify_reason_counter.inc("state_change")
         return True
 
     return False
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 6ff9a06de1..f1bbe57dcb 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -38,7 +38,7 @@ def _get_rules(room_id, user_ids, store):
 @defer.inlineCallbacks
 def evaluator_for_event(event, hs, store, context):
     rules_by_user = yield store.bulk_get_push_rules_for_room(
-        event.room_id, context
+        event, context
     )
 
     # if this event is an invite event, we may need to run rules for the user
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 7e6ec411cd..49721656b6 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -16,7 +16,6 @@
 from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
 from synapse.push.baserules import list_with_base_rules
-from synapse.api.constants import EventTypes, Membership
 from twisted.internet import defer
 
 import logging
@@ -124,7 +123,7 @@ class PushRuleStore(SQLBaseStore):
 
         defer.returnValue(results)
 
-    def bulk_get_push_rules_for_room(self, room_id, context):
+    def bulk_get_push_rules_for_room(self, event, context):
         state_group = context.state_group
         if not state_group:
             # If state_group is None it means it has yet to be assigned a
@@ -134,12 +133,12 @@ class PushRuleStore(SQLBaseStore):
             state_group = object()
 
         return self._bulk_get_push_rules_for_room(
-            room_id, state_group, context.current_state_ids
+            event.room_id, state_group, context.current_state_ids, event=event
         )
 
     @cachedInlineCallbacks(num_args=2, cache_context=True)
     def _bulk_get_push_rules_for_room(self, room_id, state_group, current_state_ids,
-                                      cache_context):
+                                      cache_context, event=None):
         # We don't use `state_group`, its there so that we can cache based
         # on it. However, its important that its never None, since two current_state's
         # with a state_group of None are likely to be different.
@@ -150,18 +149,15 @@ class PushRuleStore(SQLBaseStore):
         # their unread countss are correct in the event stream, but to avoid
         # generating them for bot / AS users etc, we only do so for people who've
         # sent a read receipt into the room.
-        local_user_member_ids = [
-            e_id for (etype, state_key), e_id in current_state_ids.iteritems()
-            if etype == EventTypes.Member and self.hs.is_mine_id(state_key)
-        ]
 
-        local_member_events = yield self._get_events(local_user_member_ids)
-
-        local_users_in_room = set(
-            member_event.state_key for member_event in local_member_events
-            if member_event.membership == Membership.JOIN
+        users_in_room = yield self._get_joined_users_from_context(
+            room_id, state_group, current_state_ids,
+            on_invalidate=cache_context.invalidate,
+            event=event,
         )
 
+        local_users_in_room = set(u for u in users_in_room if self.hs.is_mine_id(u))
+
         # users in the room who have pushers need to get push rules run because
         # that's how their pushers work
         if_users_with_pushers = yield self.get_if_users_have_pushers(