diff options
author | Erik Johnston <erik@matrix.org> | 2018-04-05 15:32:39 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2018-04-05 17:28:27 +0100 |
commit | 277d2c506d51ce76ebc56d1420d9991d1ad8a0d4 (patch) | |
tree | a93678de90d2df531351c6b296c802bad7ed86ae | |
parent | Merge pull request #3063 from matrix-org/jcgruenhage/cache_settings_stats (diff) | |
download | synapse-277d2c506d51ce76ebc56d1420d9991d1ad8a0d4.tar.xz |
Add cache for if ASes have users in a room
-rw-r--r-- | synapse/appservice/__init__.py | 20 | ||||
-rw-r--r-- | synapse/replication/slave/storage/appservice.py | 4 | ||||
-rw-r--r-- | synapse/storage/appservice.py | 151 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 1 | ||||
-rw-r--r-- | tests/appservice/test_appservice.py | 29 |
5 files changed, 168 insertions, 37 deletions
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index d5a7a5ce2f..2f531f9c52 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. from synapse.api.constants import EventTypes -from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.types import GroupID, get_domain_from_id from twisted.internet import defer @@ -173,6 +172,7 @@ class ApplicationService(object): if self.is_interested_in_user(event.sender): defer.returnValue(True) + # also check m.room.member state key if (event.type == EventTypes.Member and self.is_interested_in_user(event.state_key)): @@ -181,20 +181,18 @@ class ApplicationService(object): if not store: defer.returnValue(False) - does_match = yield self._matches_user_in_member_list(event.room_id, store) + does_match = yield self._matches_user_in_member_list( + event, store, + ) defer.returnValue(does_match) - @cachedInlineCallbacks(num_args=1, cache_context=True) - def _matches_user_in_member_list(self, room_id, store, cache_context): - member_list = yield store.get_users_in_room( - room_id, on_invalidate=cache_context.invalidate + @defer.inlineCallbacks + def _matches_user_in_member_list(self, event, store): + ases = yield store.get_appservices_with_user_in_room( + event, ) - # check joined member events - for user_id in member_list: - if self.is_interested_in_user(user_id): - defer.returnValue(True) - defer.returnValue(False) + defer.returnValue(self.id in ases) def _matches_room_id(self, event): if hasattr(event, "room_id"): diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py index 8cae3076f4..5482563958 100644 --- a/synapse/replication/slave/storage/appservice.py +++ b/synapse/replication/slave/storage/appservice.py @@ -17,8 +17,10 @@ from synapse.storage.appservice import ( ApplicationServiceWorkerStore, ApplicationServiceTransactionWorkerStore, ) +from synapse.replication.slave.storage.events import SlavedEventStore class SlavedApplicationServiceStore(ApplicationServiceTransactionWorkerStore, - ApplicationServiceWorkerStore): + ApplicationServiceWorkerStore, + SlavedEventStore): pass diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 12ea8a158c..55b843e2cd 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -18,9 +18,14 @@ import re import simplejson as json from twisted.internet import defer +from synapse.api.constants import EventTypes, Membership from synapse.appservice import AppServiceTransaction from synapse.config.appservice import load_appservices from synapse.storage.events import EventsWorkerStore +from synapse.storage.roommember import RoomMemberWorkerStore +from synapse.storage.state import StateGroupWorkerStore +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +from synapse.util.async import Linearizer from ._base import SQLBaseStore @@ -46,7 +51,8 @@ def _make_exclusive_regex(services_cache): return exclusive_user_regex -class ApplicationServiceWorkerStore(SQLBaseStore): +class ApplicationServiceWorkerStore(RoomMemberWorkerStore, StateGroupWorkerStore, + SQLBaseStore): def __init__(self, db_conn, hs): self.services_cache = load_appservices( hs.hostname, @@ -111,6 +117,35 @@ class ApplicationServiceWorkerStore(SQLBaseStore): return service return None + @defer.inlineCallbacks + def get_appservices_with_user_in_room(self, event): + """Get the list of appservices in the room at the given event + + Args: + event (Event) + + Returns: + Deferred[set(str)]: The IDs of all ASes in the room + """ + state_group = yield self._get_state_group_for_event(event.event_id) + + ases_in_room = yield self._get_appservices_with_user_in_room( + event.room_id, state_group, + ) + + defer.returnValue(ases_in_room) + + @cachedInlineCallbacks(num_args=2, max_entries=10000) + def _get_appservices_with_user_in_room(self, room_id, state_group): + cache = self._get_appservices_with_user_in_room_cache(room_id) + ases_in_room = yield cache.get_appservices_in_room_by_user(state_group) + + defer.returnValue(ases_in_room) + + @cached(max_entries=10000) + def _get_appservices_with_user_in_room_cache(self, room_id): + return _AppserviceUsersCache(self, room_id) + class ApplicationServiceStore(ApplicationServiceWorkerStore): # This is currently empty due to there not being any AS storage functions @@ -374,3 +409,117 @@ class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStor # to keep consistency with the other stores, we keep this empty class for # now. pass + + +class _AppserviceUsersCache(object): + """Attempts to calculate which appservices have users in a given room by + looking at state groups and their delta_ids + """ + + def __init__(self, store, room_id): + self.store = store + self.room_id = room_id + + self.linearizer = Linearizer("_AppserviceUsersCache") + + # The last state group we calculated the ASes in the room for. + self.state_group = object() + + # A dict of all appservices in the room at the above state group, + # along with a user_id of an AS user in the room. + # Dict of as_id -> user_id. + self.appservices_in_room = {} + + @defer.inlineCallbacks + def get_appservices_in_room_by_user(self, state_group): + """ + Args: + state_group(str) + + Returns: + Deferred[set(str)]: The IDs of all ASes in the room + """ + if state_group == self.state_group: + defer.returnValue(frozenset(self.appservices_in_room)) + + with (yield self.linearizer.queue(())): + # Set of ASes that we need to recalculate their membership of + # the room + uhandled_ases = set() + + # If the state groups match then there is nothing to do + if state_group == self.state_group: + defer.returnValue(frozenset(self.appservices_in_room)) + + prev_group, delta_ids = yield self.store.get_state_group_delta(state_group) + + # If the prev_group matches the last state group we can calculate + # the new value by looking at the deltas + if prev_group == self.state_group: + for (typ, state_key), event_id in delta_ids.iteritems(): + if typ != EventTypes.Member: + continue + + user_id = state_key + + event = yield self.store.get_event(event_id) + + is_join = event.membership == Membership.JOIN + for appservice in self.store.get_app_services(): + as_id = appservice.id + + # If this is a join and the appservice is already in + # the room then its a noop + if is_join: + if as_id in self.appservices_in_room: + continue + # If this is not a join, then we only need to recalculate + # if the AS is in the room and the cached joined AS user + # matches this event. + elif self.appservices_in_room.get(as_id, None) != user_id: + continue + + # If the AS is not interested in the user then its a + # noop. + if not appservice.is_interested_in_user(user_id): + continue + + if is_join: + # If an AS user is joining then the AS is now + # interested in the room + self.appservices_in_room[as_id] = user_id + else: + # If an AS user has left then we need to + # recalcualte if they're in the room. + uhandled_ases.add(appservice) + self.appservices_in_room.pop(as_id, None) + else: + uhandled_ases = set(self.store.get_app_services()) + + if uhandled_ases: + # We need to recalculate which ASes are in the room, so lets + # get the current state and try and find a join event + # that the AS is interested in. + + current_state_ids = yield self.store.get_state_ids_for_group(state_group) + + for appservice in uhandled_ases: + as_id = appservice.id + + self.appservices_in_room.pop(as_id, None) + + for (etype, state_key), event_id in current_state_ids.iteritems(): + if etype != EventTypes.Member: + continue + + if not appservice.is_interested_in_user(state_key): + continue + + event = yield self.store.get_event(event_id) + if event.membership == Membership.JOIN: + self.appservices_in_room[as_id] = state_key + break + + self.state_group = state_group + + defer.returnValue(frozenset(self.appservices_in_room)) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index d662d1cfc0..cf90716588 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -440,7 +440,6 @@ class RoomMemberWorkerStore(EventsWorkerStore): ) @cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True) - # @defer.inlineCallbacks def _get_joined_hosts(self, room_id, state_group, current_state_ids, state_entry): # We don't use `state_group`, its there so that we can cache based # on it. However, its important that its never None, since two current_state's diff --git a/tests/appservice/test_appservice.py b/tests/appservice/test_appservice.py index 5b2b95860a..80263d0a9a 100644 --- a/tests/appservice/test_appservice.py +++ b/tests/appservice/test_appservice.py @@ -55,7 +55,7 @@ class ApplicationServiceTestCase(unittest.TestCase): _regex("@irc_.*") ) self.event.sender = "@irc_foobar:matrix.org" - self.assertTrue((yield self.service.is_interested(self.event))) + self.assertTrue((yield self.service.is_interested(self.event, None))) @defer.inlineCallbacks def test_regex_user_id_prefix_no_match(self): @@ -63,7 +63,7 @@ class ApplicationServiceTestCase(unittest.TestCase): _regex("@irc_.*") ) self.event.sender = "@someone_else:matrix.org" - self.assertFalse((yield self.service.is_interested(self.event))) + self.assertFalse((yield self.service.is_interested(self.event, None))) @defer.inlineCallbacks def test_regex_room_member_is_checked(self): @@ -73,7 +73,7 @@ class ApplicationServiceTestCase(unittest.TestCase): self.event.sender = "@someone_else:matrix.org" self.event.type = "m.room.member" self.event.state_key = "@irc_foobar:matrix.org" - self.assertTrue((yield self.service.is_interested(self.event))) + self.assertTrue((yield self.service.is_interested(self.event, None))) @defer.inlineCallbacks def test_regex_room_id_match(self): @@ -81,7 +81,7 @@ class ApplicationServiceTestCase(unittest.TestCase): _regex("!some_prefix.*some_suffix:matrix.org") ) self.event.room_id = "!some_prefixs0m3th1nGsome_suffix:matrix.org" - self.assertTrue((yield self.service.is_interested(self.event))) + self.assertTrue((yield self.service.is_interested(self.event, None))) @defer.inlineCallbacks def test_regex_room_id_no_match(self): @@ -89,7 +89,7 @@ class ApplicationServiceTestCase(unittest.TestCase): _regex("!some_prefix.*some_suffix:matrix.org") ) self.event.room_id = "!XqBunHwQIXUiqCaoxq:matrix.org" - self.assertFalse((yield self.service.is_interested(self.event))) + self.assertFalse((yield self.service.is_interested(self.event, None))) @defer.inlineCallbacks def test_regex_alias_match(self): @@ -160,7 +160,7 @@ class ApplicationServiceTestCase(unittest.TestCase): self.store.get_aliases_for_room.return_value = [ "#xmpp_foobar:matrix.org", "#athing:matrix.org" ] - self.store.get_users_in_room.return_value = [] + self.store.get_appservices_with_user_in_room.return_value = [] self.assertFalse((yield self.service.is_interested( self.event, self.store ))) @@ -193,20 +193,3 @@ class ApplicationServiceTestCase(unittest.TestCase): } self.event.state_key = self.service.sender self.assertTrue((yield self.service.is_interested(self.event))) - - @defer.inlineCallbacks - def test_member_list_match(self): - self.service.namespaces[ApplicationService.NS_USERS].append( - _regex("@irc_.*") - ) - self.store.get_users_in_room.return_value = [ - "@alice:here", - "@irc_fo:here", # AS user - "@bob:here", - ] - self.store.get_aliases_for_room.return_value = [] - - self.event.sender = "@xmpp_foobar:matrix.org" - self.assertTrue((yield self.service.is_interested( - event=self.event, store=self.store - ))) |