diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index d5a7a5ce2f..2f531f9c52 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.api.constants import EventTypes
-from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.types import GroupID, get_domain_from_id
from twisted.internet import defer
@@ -173,6 +172,7 @@ class ApplicationService(object):
if self.is_interested_in_user(event.sender):
defer.returnValue(True)
+
# also check m.room.member state key
if (event.type == EventTypes.Member and
self.is_interested_in_user(event.state_key)):
@@ -181,20 +181,18 @@ class ApplicationService(object):
if not store:
defer.returnValue(False)
- does_match = yield self._matches_user_in_member_list(event.room_id, store)
+ does_match = yield self._matches_user_in_member_list(
+ event, store,
+ )
defer.returnValue(does_match)
- @cachedInlineCallbacks(num_args=1, cache_context=True)
- def _matches_user_in_member_list(self, room_id, store, cache_context):
- member_list = yield store.get_users_in_room(
- room_id, on_invalidate=cache_context.invalidate
+ @defer.inlineCallbacks
+ def _matches_user_in_member_list(self, event, store):
+ ases = yield store.get_appservices_with_user_in_room(
+ event,
)
- # check joined member events
- for user_id in member_list:
- if self.is_interested_in_user(user_id):
- defer.returnValue(True)
- defer.returnValue(False)
+ defer.returnValue(self.id in ases)
def _matches_room_id(self, event):
if hasattr(event, "room_id"):
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
index 8cae3076f4..5482563958 100644
--- a/synapse/replication/slave/storage/appservice.py
+++ b/synapse/replication/slave/storage/appservice.py
@@ -17,8 +17,10 @@
from synapse.storage.appservice import (
ApplicationServiceWorkerStore, ApplicationServiceTransactionWorkerStore,
)
+from synapse.replication.slave.storage.events import SlavedEventStore
class SlavedApplicationServiceStore(ApplicationServiceTransactionWorkerStore,
- ApplicationServiceWorkerStore):
+ ApplicationServiceWorkerStore,
+ SlavedEventStore):
pass
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 12ea8a158c..55b843e2cd 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -18,9 +18,14 @@ import re
import simplejson as json
from twisted.internet import defer
+from synapse.api.constants import EventTypes, Membership
from synapse.appservice import AppServiceTransaction
from synapse.config.appservice import load_appservices
from synapse.storage.events import EventsWorkerStore
+from synapse.storage.roommember import RoomMemberWorkerStore
+from synapse.storage.state import StateGroupWorkerStore
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+from synapse.util.async import Linearizer
from ._base import SQLBaseStore
@@ -46,7 +51,8 @@ def _make_exclusive_regex(services_cache):
return exclusive_user_regex
-class ApplicationServiceWorkerStore(SQLBaseStore):
+class ApplicationServiceWorkerStore(RoomMemberWorkerStore, StateGroupWorkerStore,
+ SQLBaseStore):
def __init__(self, db_conn, hs):
self.services_cache = load_appservices(
hs.hostname,
@@ -111,6 +117,35 @@ class ApplicationServiceWorkerStore(SQLBaseStore):
return service
return None
+ @defer.inlineCallbacks
+ def get_appservices_with_user_in_room(self, event):
+ """Get the list of appservices in the room at the given event
+
+ Args:
+ event (Event)
+
+ Returns:
+ Deferred[set(str)]: The IDs of all ASes in the room
+ """
+ state_group = yield self._get_state_group_for_event(event.event_id)
+
+ ases_in_room = yield self._get_appservices_with_user_in_room(
+ event.room_id, state_group,
+ )
+
+ defer.returnValue(ases_in_room)
+
+ @cachedInlineCallbacks(num_args=2, max_entries=10000)
+ def _get_appservices_with_user_in_room(self, room_id, state_group):
+ cache = self._get_appservices_with_user_in_room_cache(room_id)
+ ases_in_room = yield cache.get_appservices_in_room_by_user(state_group)
+
+ defer.returnValue(ases_in_room)
+
+ @cached(max_entries=10000)
+ def _get_appservices_with_user_in_room_cache(self, room_id):
+ return _AppserviceUsersCache(self, room_id)
+
class ApplicationServiceStore(ApplicationServiceWorkerStore):
# This is currently empty due to there not being any AS storage functions
@@ -374,3 +409,117 @@ class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStor
# to keep consistency with the other stores, we keep this empty class for
# now.
pass
+
+
+class _AppserviceUsersCache(object):
+ """Attempts to calculate which appservices have users in a given room by
+ looking at state groups and their delta_ids
+ """
+
+ def __init__(self, store, room_id):
+ self.store = store
+ self.room_id = room_id
+
+ self.linearizer = Linearizer("_AppserviceUsersCache")
+
+ # The last state group we calculated the ASes in the room for.
+ self.state_group = object()
+
+ # A dict of all appservices in the room at the above state group,
+ # along with a user_id of an AS user in the room.
+ # Dict of as_id -> user_id.
+ self.appservices_in_room = {}
+
+ @defer.inlineCallbacks
+ def get_appservices_in_room_by_user(self, state_group):
+ """
+ Args:
+ state_group(str)
+
+ Returns:
+ Deferred[set(str)]: The IDs of all ASes in the room
+ """
+ if state_group == self.state_group:
+ defer.returnValue(frozenset(self.appservices_in_room))
+
+ with (yield self.linearizer.queue(())):
+ # Set of ASes that we need to recalculate their membership of
+ # the room
+ uhandled_ases = set()
+
+ # If the state groups match then there is nothing to do
+ if state_group == self.state_group:
+ defer.returnValue(frozenset(self.appservices_in_room))
+
+ prev_group, delta_ids = yield self.store.get_state_group_delta(state_group)
+
+ # If the prev_group matches the last state group we can calculate
+ # the new value by looking at the deltas
+ if prev_group == self.state_group:
+ for (typ, state_key), event_id in delta_ids.iteritems():
+ if typ != EventTypes.Member:
+ continue
+
+ user_id = state_key
+
+ event = yield self.store.get_event(event_id)
+
+ is_join = event.membership == Membership.JOIN
+ for appservice in self.store.get_app_services():
+ as_id = appservice.id
+
+ # If this is a join and the appservice is already in
+ # the room then its a noop
+ if is_join:
+ if as_id in self.appservices_in_room:
+ continue
+ # If this is not a join, then we only need to recalculate
+ # if the AS is in the room and the cached joined AS user
+ # matches this event.
+ elif self.appservices_in_room.get(as_id, None) != user_id:
+ continue
+
+ # If the AS is not interested in the user then its a
+ # noop.
+ if not appservice.is_interested_in_user(user_id):
+ continue
+
+ if is_join:
+ # If an AS user is joining then the AS is now
+ # interested in the room
+ self.appservices_in_room[as_id] = user_id
+ else:
+ # If an AS user has left then we need to
+ # recalcualte if they're in the room.
+ uhandled_ases.add(appservice)
+ self.appservices_in_room.pop(as_id, None)
+ else:
+ uhandled_ases = set(self.store.get_app_services())
+
+ if uhandled_ases:
+ # We need to recalculate which ASes are in the room, so lets
+ # get the current state and try and find a join event
+ # that the AS is interested in.
+
+ current_state_ids = yield self.store.get_state_ids_for_group(state_group)
+
+ for appservice in uhandled_ases:
+ as_id = appservice.id
+
+ self.appservices_in_room.pop(as_id, None)
+
+ for (etype, state_key), event_id in current_state_ids.iteritems():
+ if etype != EventTypes.Member:
+ continue
+
+ if not appservice.is_interested_in_user(state_key):
+ continue
+
+ event = yield self.store.get_event(event_id)
+ if event.membership == Membership.JOIN:
+ self.appservices_in_room[as_id] = state_key
+ break
+
+ self.state_group = state_group
+
+ defer.returnValue(frozenset(self.appservices_in_room))
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index d662d1cfc0..cf90716588 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -440,7 +440,6 @@ class RoomMemberWorkerStore(EventsWorkerStore):
)
@cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True)
- # @defer.inlineCallbacks
def _get_joined_hosts(self, room_id, state_group, current_state_ids, state_entry):
# We don't use `state_group`, its there so that we can cache based
# on it. However, its important that its never None, since two current_state's
diff --git a/tests/appservice/test_appservice.py b/tests/appservice/test_appservice.py
index 5b2b95860a..80263d0a9a 100644
--- a/tests/appservice/test_appservice.py
+++ b/tests/appservice/test_appservice.py
@@ -55,7 +55,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
_regex("@irc_.*")
)
self.event.sender = "@irc_foobar:matrix.org"
- self.assertTrue((yield self.service.is_interested(self.event)))
+ self.assertTrue((yield self.service.is_interested(self.event, None)))
@defer.inlineCallbacks
def test_regex_user_id_prefix_no_match(self):
@@ -63,7 +63,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
_regex("@irc_.*")
)
self.event.sender = "@someone_else:matrix.org"
- self.assertFalse((yield self.service.is_interested(self.event)))
+ self.assertFalse((yield self.service.is_interested(self.event, None)))
@defer.inlineCallbacks
def test_regex_room_member_is_checked(self):
@@ -73,7 +73,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
self.event.sender = "@someone_else:matrix.org"
self.event.type = "m.room.member"
self.event.state_key = "@irc_foobar:matrix.org"
- self.assertTrue((yield self.service.is_interested(self.event)))
+ self.assertTrue((yield self.service.is_interested(self.event, None)))
@defer.inlineCallbacks
def test_regex_room_id_match(self):
@@ -81,7 +81,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
_regex("!some_prefix.*some_suffix:matrix.org")
)
self.event.room_id = "!some_prefixs0m3th1nGsome_suffix:matrix.org"
- self.assertTrue((yield self.service.is_interested(self.event)))
+ self.assertTrue((yield self.service.is_interested(self.event, None)))
@defer.inlineCallbacks
def test_regex_room_id_no_match(self):
@@ -89,7 +89,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
_regex("!some_prefix.*some_suffix:matrix.org")
)
self.event.room_id = "!XqBunHwQIXUiqCaoxq:matrix.org"
- self.assertFalse((yield self.service.is_interested(self.event)))
+ self.assertFalse((yield self.service.is_interested(self.event, None)))
@defer.inlineCallbacks
def test_regex_alias_match(self):
@@ -160,7 +160,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
self.store.get_aliases_for_room.return_value = [
"#xmpp_foobar:matrix.org", "#athing:matrix.org"
]
- self.store.get_users_in_room.return_value = []
+ self.store.get_appservices_with_user_in_room.return_value = []
self.assertFalse((yield self.service.is_interested(
self.event, self.store
)))
@@ -193,20 +193,3 @@ class ApplicationServiceTestCase(unittest.TestCase):
}
self.event.state_key = self.service.sender
self.assertTrue((yield self.service.is_interested(self.event)))
-
- @defer.inlineCallbacks
- def test_member_list_match(self):
- self.service.namespaces[ApplicationService.NS_USERS].append(
- _regex("@irc_.*")
- )
- self.store.get_users_in_room.return_value = [
- "@alice:here",
- "@irc_fo:here", # AS user
- "@bob:here",
- ]
- self.store.get_aliases_for_room.return_value = []
-
- self.event.sender = "@xmpp_foobar:matrix.org"
- self.assertTrue((yield self.service.is_interested(
- event=self.event, store=self.store
- )))
|