summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-04-05 15:32:39 +0100
committerErik Johnston <erik@matrix.org>2018-04-05 17:28:27 +0100
commit277d2c506d51ce76ebc56d1420d9991d1ad8a0d4 (patch)
treea93678de90d2df531351c6b296c802bad7ed86ae
parentMerge pull request #3063 from matrix-org/jcgruenhage/cache_settings_stats (diff)
downloadsynapse-277d2c506d51ce76ebc56d1420d9991d1ad8a0d4.tar.xz
Add cache for if ASes have users in a room
-rw-r--r--synapse/appservice/__init__.py20
-rw-r--r--synapse/replication/slave/storage/appservice.py4
-rw-r--r--synapse/storage/appservice.py151
-rw-r--r--synapse/storage/roommember.py1
-rw-r--r--tests/appservice/test_appservice.py29
5 files changed, 168 insertions, 37 deletions
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index d5a7a5ce2f..2f531f9c52 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 from synapse.api.constants import EventTypes
-from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.types import GroupID, get_domain_from_id
 
 from twisted.internet import defer
@@ -173,6 +172,7 @@ class ApplicationService(object):
 
         if self.is_interested_in_user(event.sender):
             defer.returnValue(True)
+
         # also check m.room.member state key
         if (event.type == EventTypes.Member and
                 self.is_interested_in_user(event.state_key)):
@@ -181,20 +181,18 @@ class ApplicationService(object):
         if not store:
             defer.returnValue(False)
 
-        does_match = yield self._matches_user_in_member_list(event.room_id, store)
+        does_match = yield self._matches_user_in_member_list(
+            event, store,
+        )
         defer.returnValue(does_match)
 
-    @cachedInlineCallbacks(num_args=1, cache_context=True)
-    def _matches_user_in_member_list(self, room_id, store, cache_context):
-        member_list = yield store.get_users_in_room(
-            room_id, on_invalidate=cache_context.invalidate
+    @defer.inlineCallbacks
+    def _matches_user_in_member_list(self, event, store):
+        ases = yield store.get_appservices_with_user_in_room(
+            event,
         )
 
-        # check joined member events
-        for user_id in member_list:
-            if self.is_interested_in_user(user_id):
-                defer.returnValue(True)
-        defer.returnValue(False)
+        defer.returnValue(self.id in ases)
 
     def _matches_room_id(self, event):
         if hasattr(event, "room_id"):
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
index 8cae3076f4..5482563958 100644
--- a/synapse/replication/slave/storage/appservice.py
+++ b/synapse/replication/slave/storage/appservice.py
@@ -17,8 +17,10 @@
 from synapse.storage.appservice import (
     ApplicationServiceWorkerStore, ApplicationServiceTransactionWorkerStore,
 )
+from synapse.replication.slave.storage.events import SlavedEventStore
 
 
 class SlavedApplicationServiceStore(ApplicationServiceTransactionWorkerStore,
-                                    ApplicationServiceWorkerStore):
+                                    ApplicationServiceWorkerStore,
+                                    SlavedEventStore):
     pass
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 12ea8a158c..55b843e2cd 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -18,9 +18,14 @@ import re
 import simplejson as json
 from twisted.internet import defer
 
+from synapse.api.constants import EventTypes, Membership
 from synapse.appservice import AppServiceTransaction
 from synapse.config.appservice import load_appservices
 from synapse.storage.events import EventsWorkerStore
+from synapse.storage.roommember import RoomMemberWorkerStore
+from synapse.storage.state import StateGroupWorkerStore
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+from synapse.util.async import Linearizer
 from ._base import SQLBaseStore
 
 
@@ -46,7 +51,8 @@ def _make_exclusive_regex(services_cache):
     return exclusive_user_regex
 
 
-class ApplicationServiceWorkerStore(SQLBaseStore):
+class ApplicationServiceWorkerStore(RoomMemberWorkerStore, StateGroupWorkerStore,
+                                    SQLBaseStore):
     def __init__(self, db_conn, hs):
         self.services_cache = load_appservices(
             hs.hostname,
@@ -111,6 +117,35 @@ class ApplicationServiceWorkerStore(SQLBaseStore):
                 return service
         return None
 
+    @defer.inlineCallbacks
+    def get_appservices_with_user_in_room(self, event):
+        """Get the list of appservices in the room at the given event
+
+        Args:
+            event (Event)
+
+        Returns:
+            Deferred[set(str)]: The IDs of all ASes in the room
+        """
+        state_group = yield self._get_state_group_for_event(event.event_id)
+
+        ases_in_room = yield self._get_appservices_with_user_in_room(
+            event.room_id, state_group,
+        )
+
+        defer.returnValue(ases_in_room)
+
+    @cachedInlineCallbacks(num_args=2, max_entries=10000)
+    def _get_appservices_with_user_in_room(self, room_id, state_group):
+        cache = self._get_appservices_with_user_in_room_cache(room_id)
+        ases_in_room = yield cache.get_appservices_in_room_by_user(state_group)
+
+        defer.returnValue(ases_in_room)
+
+    @cached(max_entries=10000)
+    def _get_appservices_with_user_in_room_cache(self, room_id):
+        return _AppserviceUsersCache(self, room_id)
+
 
 class ApplicationServiceStore(ApplicationServiceWorkerStore):
     # This is currently empty due to there not being any AS storage functions
@@ -374,3 +409,117 @@ class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStor
     # to keep consistency with the other stores, we keep this empty class for
     # now.
     pass
+
+
+class _AppserviceUsersCache(object):
+    """Attempts to calculate which appservices have users in a given room by
+    looking at state groups and their delta_ids
+    """
+
+    def __init__(self, store, room_id):
+        self.store = store
+        self.room_id = room_id
+
+        self.linearizer = Linearizer("_AppserviceUsersCache")
+
+        # The last state group we calculated the ASes in the room for.
+        self.state_group = object()
+
+        # A dict of all appservices in the room at the above state group,
+        # along with a user_id of an AS user in the room.
+        # Dict of as_id -> user_id.
+        self.appservices_in_room = {}
+
+    @defer.inlineCallbacks
+    def get_appservices_in_room_by_user(self, state_group):
+        """
+        Args:
+            state_group(str)
+
+        Returns:
+            Deferred[set(str)]: The IDs of all ASes in the room
+        """
+        if state_group == self.state_group:
+            defer.returnValue(frozenset(self.appservices_in_room))
+
+        with (yield self.linearizer.queue(())):
+            # Set of ASes that we need to recalculate their membership of
+            # the room
+            uhandled_ases = set()
+
+            # If the state groups match then there is nothing to do
+            if state_group == self.state_group:
+                defer.returnValue(frozenset(self.appservices_in_room))
+
+            prev_group, delta_ids = yield self.store.get_state_group_delta(state_group)
+
+            # If the prev_group matches the last state group we can calculate
+            # the new value by looking at the deltas
+            if prev_group == self.state_group:
+                for (typ, state_key), event_id in delta_ids.iteritems():
+                    if typ != EventTypes.Member:
+                        continue
+
+                    user_id = state_key
+
+                    event = yield self.store.get_event(event_id)
+
+                    is_join = event.membership == Membership.JOIN
+                    for appservice in self.store.get_app_services():
+                        as_id = appservice.id
+
+                        # If this is a join and the appservice is already in
+                        # the room then its a noop
+                        if is_join:
+                            if as_id in self.appservices_in_room:
+                                continue
+                        # If this is not a join, then we only need to recalculate
+                        # if the AS is in the room and the cached joined AS user
+                        # matches this event.
+                        elif self.appservices_in_room.get(as_id, None) != user_id:
+                            continue
+
+                        # If the AS is not interested in the user then its a
+                        # noop.
+                        if not appservice.is_interested_in_user(user_id):
+                            continue
+
+                        if is_join:
+                            # If an AS user is joining then the AS is now
+                            # interested in the room
+                            self.appservices_in_room[as_id] = user_id
+                        else:
+                            # If an AS user has left then we need to
+                            # recalcualte if they're in the room.
+                            uhandled_ases.add(appservice)
+                            self.appservices_in_room.pop(as_id, None)
+                else:
+                    uhandled_ases = set(self.store.get_app_services())
+
+            if uhandled_ases:
+                # We need to recalculate which ASes are in the room, so lets
+                # get the current state and try and find a join event
+                # that the AS is interested in.
+
+                current_state_ids = yield self.store.get_state_ids_for_group(state_group)
+
+                for appservice in uhandled_ases:
+                    as_id = appservice.id
+
+                    self.appservices_in_room.pop(as_id, None)
+
+                    for (etype, state_key), event_id in current_state_ids.iteritems():
+                        if etype != EventTypes.Member:
+                            continue
+
+                        if not appservice.is_interested_in_user(state_key):
+                            continue
+
+                        event = yield self.store.get_event(event_id)
+                        if event.membership == Membership.JOIN:
+                            self.appservices_in_room[as_id] = state_key
+                            break
+
+            self.state_group = state_group
+
+        defer.returnValue(frozenset(self.appservices_in_room))
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index d662d1cfc0..cf90716588 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -440,7 +440,6 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         )
 
     @cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True)
-    # @defer.inlineCallbacks
     def _get_joined_hosts(self, room_id, state_group, current_state_ids, state_entry):
         # We don't use `state_group`, its there so that we can cache based
         # on it. However, its important that its never None, since two current_state's
diff --git a/tests/appservice/test_appservice.py b/tests/appservice/test_appservice.py
index 5b2b95860a..80263d0a9a 100644
--- a/tests/appservice/test_appservice.py
+++ b/tests/appservice/test_appservice.py
@@ -55,7 +55,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
             _regex("@irc_.*")
         )
         self.event.sender = "@irc_foobar:matrix.org"
-        self.assertTrue((yield self.service.is_interested(self.event)))
+        self.assertTrue((yield self.service.is_interested(self.event, None)))
 
     @defer.inlineCallbacks
     def test_regex_user_id_prefix_no_match(self):
@@ -63,7 +63,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
             _regex("@irc_.*")
         )
         self.event.sender = "@someone_else:matrix.org"
-        self.assertFalse((yield self.service.is_interested(self.event)))
+        self.assertFalse((yield self.service.is_interested(self.event, None)))
 
     @defer.inlineCallbacks
     def test_regex_room_member_is_checked(self):
@@ -73,7 +73,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
         self.event.sender = "@someone_else:matrix.org"
         self.event.type = "m.room.member"
         self.event.state_key = "@irc_foobar:matrix.org"
-        self.assertTrue((yield self.service.is_interested(self.event)))
+        self.assertTrue((yield self.service.is_interested(self.event, None)))
 
     @defer.inlineCallbacks
     def test_regex_room_id_match(self):
@@ -81,7 +81,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
             _regex("!some_prefix.*some_suffix:matrix.org")
         )
         self.event.room_id = "!some_prefixs0m3th1nGsome_suffix:matrix.org"
-        self.assertTrue((yield self.service.is_interested(self.event)))
+        self.assertTrue((yield self.service.is_interested(self.event, None)))
 
     @defer.inlineCallbacks
     def test_regex_room_id_no_match(self):
@@ -89,7 +89,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
             _regex("!some_prefix.*some_suffix:matrix.org")
         )
         self.event.room_id = "!XqBunHwQIXUiqCaoxq:matrix.org"
-        self.assertFalse((yield self.service.is_interested(self.event)))
+        self.assertFalse((yield self.service.is_interested(self.event, None)))
 
     @defer.inlineCallbacks
     def test_regex_alias_match(self):
@@ -160,7 +160,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
         self.store.get_aliases_for_room.return_value = [
             "#xmpp_foobar:matrix.org", "#athing:matrix.org"
         ]
-        self.store.get_users_in_room.return_value = []
+        self.store.get_appservices_with_user_in_room.return_value = []
         self.assertFalse((yield self.service.is_interested(
             self.event, self.store
         )))
@@ -193,20 +193,3 @@ class ApplicationServiceTestCase(unittest.TestCase):
         }
         self.event.state_key = self.service.sender
         self.assertTrue((yield self.service.is_interested(self.event)))
-
-    @defer.inlineCallbacks
-    def test_member_list_match(self):
-        self.service.namespaces[ApplicationService.NS_USERS].append(
-            _regex("@irc_.*")
-        )
-        self.store.get_users_in_room.return_value = [
-            "@alice:here",
-            "@irc_fo:here",  # AS user
-            "@bob:here",
-        ]
-        self.store.get_aliases_for_room.return_value = []
-
-        self.event.sender = "@xmpp_foobar:matrix.org"
-        self.assertTrue((yield self.service.is_interested(
-            event=self.event, store=self.store
-        )))