diff options
-rw-r--r-- | docs/workers.rst | 11 | ||||
-rwxr-xr-x | scripts/synapse_port_db | 81 | ||||
-rwxr-xr-x | synapse/app/synctl.py | 1 | ||||
-rw-r--r-- | synapse/federation/transport/client.py | 19 | ||||
-rw-r--r-- | synapse/federation/transport/server.py | 20 | ||||
-rw-r--r-- | synapse/groups/groups_server.py | 78 | ||||
-rw-r--r-- | synapse/handlers/groups_local.py | 40 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 8 | ||||
-rw-r--r-- | synapse/http/matrixfederationclient.py | 6 | ||||
-rw-r--r-- | synapse/http/server.py | 7 | ||||
-rw-r--r-- | synapse/rest/client/v1/room.py | 7 | ||||
-rw-r--r-- | synapse/storage/group_server.py | 5 | ||||
-rw-r--r-- | synapse/util/caches/descriptors.py | 64 | ||||
-rw-r--r-- | tests/util/caches/test_descriptors.py | 46 |
14 files changed, 303 insertions, 90 deletions
diff --git a/docs/workers.rst b/docs/workers.rst index 80f8d2181a..1d521b9ec5 100644 --- a/docs/workers.rst +++ b/docs/workers.rst @@ -55,7 +55,12 @@ synapse process.) You then create a set of configs for the various worker processes. These should be worker configuration files, and should be stored in a dedicated -subdirectory, to allow synctl to manipulate them. +subdirectory, to allow synctl to manipulate them. An additional configuration +for the master synapse process will need to be created because the process will +not be started automatically. That configuration should look like this:: + + worker_app: synapse.app.homeserver + daemonize: true Each worker configuration file inherits the configuration of the main homeserver configuration file. You can then override configuration specific to that worker, @@ -230,9 +235,11 @@ file. For example:: ``synapse.app.event_creator`` ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Handles non-state event creation. It can handle REST endpoints matching:: +Handles some event creation. It can handle REST endpoints matching:: ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send + ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$ + ^/_matrix/client/(api/v1|r0|unstable)/join/ It will create events locally and then send them on to the main synapse instance to be persisted and handled. diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index d46581e4e1..7b23a44854 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -1,6 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -250,6 +251,12 @@ class Porter(object): @defer.inlineCallbacks def handle_table(self, table, postgres_size, table_size, forward_chunk, backward_chunk): + logger.info( + "Table %s: %i/%i (rows %i-%i) already ported", + table, postgres_size, table_size, + backward_chunk+1, forward_chunk-1, + ) + if not table_size: return @@ -467,31 +474,10 @@ class Porter(object): self.progress.set_state("Preparing PostgreSQL") self.setup_db(postgres_config, postgres_engine) - # Step 2. Get tables. - self.progress.set_state("Fetching tables") - sqlite_tables = yield self.sqlite_store._simple_select_onecol( - table="sqlite_master", - keyvalues={ - "type": "table", - }, - retcol="name", - ) - - postgres_tables = yield self.postgres_store._simple_select_onecol( - table="information_schema.tables", - keyvalues={}, - retcol="distinct table_name", - ) - - tables = set(sqlite_tables) & set(postgres_tables) - - self.progress.set_state("Creating tables") - - logger.info("Found %d tables", len(tables)) - + self.progress.set_state("Creating port tables") def create_port_table(txn): txn.execute( - "CREATE TABLE port_from_sqlite3 (" + "CREATE TABLE IF NOT EXISTS port_from_sqlite3 (" " table_name varchar(100) NOT NULL UNIQUE," " forward_rowid bigint NOT NULL," " backward_rowid bigint NOT NULL" @@ -517,18 +503,33 @@ class Porter(object): "alter_table", alter_table ) except Exception as e: - logger.info("Failed to create port table: %s", e) + pass - try: - yield self.postgres_store.runInteraction( - "create_port_table", create_port_table - ) - except Exception as e: - logger.info("Failed to create port table: %s", e) + yield self.postgres_store.runInteraction( + "create_port_table", create_port_table + ) + + # Step 2. Get tables. + self.progress.set_state("Fetching tables") + sqlite_tables = yield self.sqlite_store._simple_select_onecol( + table="sqlite_master", + keyvalues={ + "type": "table", + }, + retcol="name", + ) - self.progress.set_state("Setting up") + postgres_tables = yield self.postgres_store._simple_select_onecol( + table="information_schema.tables", + keyvalues={}, + retcol="distinct table_name", + ) - # Set up tables. + tables = set(sqlite_tables) & set(postgres_tables) + logger.info("Found %d tables", len(tables)) + + # Step 3. Figure out what still needs copying + self.progress.set_state("Checking on port progress") setup_res = yield defer.gatherResults( [ self.setup_table(table) @@ -539,7 +540,8 @@ class Porter(object): consumeErrors=True, ) - # Process tables. + # Step 4. Do the copying. + self.progress.set_state("Copying to postgres") yield defer.gatherResults( [ self.handle_table(*res) @@ -548,6 +550,9 @@ class Porter(object): consumeErrors=True, ) + # Step 5. Do final post-processing + yield self._setup_state_group_id_seq() + self.progress.done() except: global end_error_exec_info @@ -707,6 +712,16 @@ class Porter(object): defer.returnValue((done, remaining + done)) + def _setup_state_group_id_seq(self): + def r(txn): + txn.execute("SELECT MAX(id) FROM state_groups") + next_id = txn.fetchone()[0]+1 + txn.execute( + "ALTER SEQUENCE state_group_id_seq RESTART WITH %s", + (next_id,), + ) + return self.postgres_store.runInteraction("setup_state_group_id_seq", r) + ############################################## ###### The following is simply UI stuff ###### diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index b0e1b5e66a..712dfa870e 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -252,6 +252,7 @@ def main(): for running_pid in running_pids: while pid_running(running_pid): time.sleep(0.2) + write("All processes exited; now restarting...") if action == "start" or action == "restart": if start_stop_synapse: diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 1fe162d55b..50a967a7ec 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -615,6 +615,19 @@ class TransportLayerClient(object): ) @log_function + def join_group(self, destination, group_id, user_id, content): + """Attempts to join a group + """ + path = PREFIX + "/groups/%s/users/%s/join" % (group_id, user_id) + + return self.client.post_json( + destination=destination, + path=path, + data=content, + ignore_backoff=True, + ) + + @log_function def invite_to_group(self, destination, group_id, user_id, requester_user_id, content): """Invite a user to a group """ @@ -858,13 +871,13 @@ class TransportLayerClient(object): ) @log_function - def set_group_joinable(self, destination, group_id, requester_user_id, - content): + def set_group_join_policy(self, destination, group_id, requester_user_id, + content): """Sets the join policy for a group """ path = PREFIX + "/groups/%s/settings/m.join_policy" % (group_id,) - return self.client.post_json( + return self.client.put_json( destination=destination, path=path, args={"requester_user_id": requester_user_id}, diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 3658ca75f3..4c94d5a36c 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -803,6 +803,23 @@ class FederationGroupsAcceptInviteServlet(BaseFederationServlet): defer.returnValue((200, new_content)) +class FederationGroupsJoinServlet(BaseFederationServlet): + """Attempt to join a group + """ + PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/join$" + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id, user_id): + if get_domain_from_id(user_id) != origin: + raise SynapseError(403, "user_id doesn't match origin") + + new_content = yield self.handler.join_group( + group_id, user_id, content, + ) + + defer.returnValue((200, new_content)) + + class FederationGroupsRemoveUserServlet(BaseFederationServlet): """Leave or kick a user from the group """ @@ -1131,7 +1148,7 @@ class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet): PATH = "/groups/(?P<group_id>[^/]*)/settings/m.join_policy$" @defer.inlineCallbacks - def on_POST(self, origin, content, query, group_id): + def on_PUT(self, origin, content, query, group_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") @@ -1182,6 +1199,7 @@ GROUP_SERVER_SERVLET_CLASSES = ( FederationGroupsInvitedUsersServlet, FederationGroupsInviteServlet, FederationGroupsAcceptInviteServlet, + FederationGroupsJoinServlet, FederationGroupsRemoveUserServlet, FederationGroupsSummaryRoomsServlet, FederationGroupsCategoriesServlet, diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 70781e1854..2d95b04e0c 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -404,9 +404,16 @@ class GroupsServerHandler(object): yield self.check_group_is_ours(group_id, requester_user_id) - group_description = yield self.store.get_group(group_id) + group = yield self.store.get_group(group_id) + + if group: + cols = [ + "name", "short_description", "long_description", + "avatar_url", "is_public", + ] + group_description = {key: group[key] for key in cols} + group_description["is_openly_joinable"] = group["join_policy"] == "open" - if group_description: defer.returnValue(group_description) else: raise SynapseError(404, "Unknown group") @@ -678,30 +685,21 @@ class GroupsServerHandler(object): raise SynapseError(502, "Unknown state returned by HS") @defer.inlineCallbacks - def accept_invite(self, group_id, requester_user_id, content): - """User tries to accept an invite to the group. + def _add_user(self, group_id, user_id, content): + """Add a user to a group based on a content dict. - This is different from them asking to join, and so should error if no - invite exists (and they're not a member of the group) + See accept_invite, join_group. """ - - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - - is_invited = yield self.store.is_user_invited_to_local_group( - group_id, requester_user_id, - ) - if not is_invited: - raise SynapseError(403, "User not invited to group") - - if not self.hs.is_mine_id(requester_user_id): + if not self.hs.is_mine_id(user_id): local_attestation = self.attestations.create_attestation( - group_id, requester_user_id, + group_id, user_id, ) + remote_attestation = content["attestation"] yield self.attestations.verify_attestation( remote_attestation, - user_id=requester_user_id, + user_id=user_id, group_id=group_id, ) else: @@ -711,13 +709,53 @@ class GroupsServerHandler(object): is_public = _parse_visibility_from_contents(content) yield self.store.add_user_to_group( - group_id, requester_user_id, + group_id, user_id, is_admin=False, is_public=is_public, local_attestation=local_attestation, remote_attestation=remote_attestation, ) + defer.returnValue(local_attestation) + + @defer.inlineCallbacks + def accept_invite(self, group_id, requester_user_id, content): + """User tries to accept an invite to the group. + + This is different from them asking to join, and so should error if no + invite exists (and they're not a member of the group) + """ + + yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + + is_invited = yield self.store.is_user_invited_to_local_group( + group_id, requester_user_id, + ) + if not is_invited: + raise SynapseError(403, "User not invited to group") + + local_attestation = yield self._add_user(group_id, requester_user_id, content) + + defer.returnValue({ + "state": "join", + "attestation": local_attestation, + }) + + @defer.inlineCallbacks + def join_group(self, group_id, requester_user_id, content): + """User tries to join the group. + + This will error if the group requires an invite/knock to join + """ + + group_info = yield self.check_group_is_ours( + group_id, requester_user_id, and_exists=True + ) + if group_info['join_policy'] != "open": + raise SynapseError(403, "Group is not publicly joinable") + + local_attestation = yield self._add_user(group_id, requester_user_id, content) + defer.returnValue({ "state": "join", "attestation": local_attestation, @@ -874,7 +912,7 @@ def _parse_join_policy_dict(join_policy_dict): """ join_policy_type = join_policy_dict.get("type") if not join_policy_type: - return True + return "invite" if join_policy_type not in ("invite", "open"): raise SynapseError( diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 5f7b0ff305..977993e7d4 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -229,7 +229,45 @@ class GroupsLocalHandler(object): def join_group(self, group_id, user_id, content): """Request to join a group """ - raise NotImplementedError() # TODO + if self.is_mine_id(group_id): + yield self.groups_server_handler.join_group( + group_id, user_id, content + ) + local_attestation = None + remote_attestation = None + else: + local_attestation = self.attestations.create_attestation(group_id, user_id) + content["attestation"] = local_attestation + + res = yield self.transport_client.join_group( + get_domain_from_id(group_id), group_id, user_id, content, + ) + + remote_attestation = res["attestation"] + + yield self.attestations.verify_attestation( + remote_attestation, + group_id=group_id, + user_id=user_id, + server_name=get_domain_from_id(group_id), + ) + + # TODO: Check that the group is public and we're being added publically + is_publicised = content.get("publicise", False) + + token = yield self.store.register_user_group_membership( + group_id, user_id, + membership="join", + is_admin=False, + local_attestation=local_attestation, + remote_attestation=remote_attestation, + is_publicised=is_publicised, + ) + self.notifier.on_new_event( + "groups_key", token, users=[user_id], + ) + + defer.returnValue({}) @defer.inlineCallbacks def accept_invite(self, group_id, user_id, content): diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 9977be8831..c45142d38d 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -852,6 +852,14 @@ class RoomMemberMasterHandler(RoomMemberHandler): def _remote_join(self, requester, remote_room_hosts, room_id, user, content): """Implements RoomMemberHandler._remote_join """ + # filter ourselves out of remote_room_hosts: do_invite_join ignores it + # and if it is the only entry we'd like to return a 404 rather than a + # 500. + + remote_room_hosts = [ + host for host in remote_room_hosts if host != self.hs.hostname + ] + if len(remote_room_hosts) == 0: raise SynapseError(404, "No known servers") diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 9145405cb0..60a29081e8 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -286,7 +286,8 @@ class MatrixFederationHttpClient(object): headers_dict[b"Authorization"] = auth_headers @defer.inlineCallbacks - def put_json(self, destination, path, data={}, json_data_callback=None, + def put_json(self, destination, path, args={}, data={}, + json_data_callback=None, long_retries=False, timeout=None, ignore_backoff=False, backoff_on_404=False): @@ -296,6 +297,7 @@ class MatrixFederationHttpClient(object): destination (str): The remote server to send the HTTP request to. path (str): The HTTP path. + args (dict): query params data (dict): A dict containing the data that will be used as the request body. This will be encoded as JSON. json_data_callback (callable): A callable returning the dict to @@ -342,6 +344,7 @@ class MatrixFederationHttpClient(object): path, body_callback=body_callback, headers_dict={"Content-Type": ["application/json"]}, + query_bytes=encode_query_args(args), long_retries=long_retries, timeout=timeout, ignore_backoff=ignore_backoff, @@ -373,6 +376,7 @@ class MatrixFederationHttpClient(object): giving up. None indicates no timeout. ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. + args (dict): query params Returns: Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. diff --git a/synapse/http/server.py b/synapse/http/server.py index f19c068ef6..64e083ebfc 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -113,6 +113,11 @@ response_db_sched_duration = metrics.register_counter( "response_db_sched_duration_seconds", labels=["method", "servlet", "tag"] ) +# size in bytes of the response written +response_size = metrics.register_counter( + "response_size", labels=["method", "servlet", "tag"] +) + _next_request_id = 0 @@ -426,6 +431,8 @@ class RequestMetrics(object): context.db_sched_duration_ms / 1000., request.method, self.name, tag ) + response_size.inc_by(request.sentLength, request.method, self.name, tag) + class RootRedirect(resource.Resource): """Redirects the root '/' path to another path.""" diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 70d788deea..d06cbdc35e 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -655,7 +655,12 @@ class RoomMembershipRestServlet(ClientV1RestServlet): content=event_content, ) - defer.returnValue((200, {})) + return_value = {} + + if membership_action == "join": + return_value["room_id"] = room_id + + defer.returnValue((200, return_value)) def _has_3pid_invite_keys(self, content): for key in {"id_server", "medium", "address"}: diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index db316a27ec..da05ccb027 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -55,10 +55,11 @@ class GroupServerStore(SQLBaseStore): "group_id": group_id, }, retcols=( - "name", "short_description", "long_description", "avatar_url", "is_public" + "name", "short_description", "long_description", + "avatar_url", "is_public", "join_policy", ), allow_none=True, - desc="is_user_in_group", + desc="get_group", ) def get_users_in_group(self, group_id, include_private=False): diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index bf3a66eae4..68285a7594 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -39,12 +40,11 @@ _CacheSentinel = object() class CacheEntry(object): __slots__ = [ - "deferred", "sequence", "callbacks", "invalidated" + "deferred", "callbacks", "invalidated" ] - def __init__(self, deferred, sequence, callbacks): + def __init__(self, deferred, callbacks): self.deferred = deferred - self.sequence = sequence self.callbacks = set(callbacks) self.invalidated = False @@ -62,7 +62,6 @@ class Cache(object): "max_entries", "name", "keylen", - "sequence", "thread", "metrics", "_pending_deferred_cache", @@ -80,7 +79,6 @@ class Cache(object): self.name = name self.keylen = keylen - self.sequence = 0 self.thread = None self.metrics = register_cache(name, self.cache) @@ -113,11 +111,10 @@ class Cache(object): callbacks = [callback] if callback else [] val = self._pending_deferred_cache.get(key, _CacheSentinel) if val is not _CacheSentinel: - if val.sequence == self.sequence: - val.callbacks.update(callbacks) - if update_metrics: - self.metrics.inc_hits() - return val.deferred + val.callbacks.update(callbacks) + if update_metrics: + self.metrics.inc_hits() + return val.deferred val = self.cache.get(key, _CacheSentinel, callbacks=callbacks) if val is not _CacheSentinel: @@ -137,12 +134,9 @@ class Cache(object): self.check_thread() entry = CacheEntry( deferred=value, - sequence=self.sequence, callbacks=callbacks, ) - entry.callbacks.update(callbacks) - existing_entry = self._pending_deferred_cache.pop(key, None) if existing_entry: existing_entry.invalidate() @@ -150,13 +144,25 @@ class Cache(object): self._pending_deferred_cache[key] = entry def shuffle(result): - if self.sequence == entry.sequence: - existing_entry = self._pending_deferred_cache.pop(key, None) - if existing_entry is entry: - self.cache.set(key, result, entry.callbacks) - else: - entry.invalidate() + existing_entry = self._pending_deferred_cache.pop(key, None) + if existing_entry is entry: + self.cache.set(key, result, entry.callbacks) else: + # oops, the _pending_deferred_cache has been updated since + # we started our query, so we are out of date. + # + # Better put back whatever we took out. (We do it this way + # round, rather than peeking into the _pending_deferred_cache + # and then removing on a match, to make the common case faster) + if existing_entry is not None: + self._pending_deferred_cache[key] = existing_entry + + # we're not going to put this entry into the cache, so need + # to make sure that the invalidation callbacks are called. + # That was probably done when _pending_deferred_cache was + # updated, but it's possible that `set` was called without + # `invalidate` being previously called, in which case it may + # not have been. Either way, let's double-check now. entry.invalidate() return result @@ -168,25 +174,29 @@ class Cache(object): def invalidate(self, key): self.check_thread() + self.cache.pop(key, None) - # Increment the sequence number so that any SELECT statements that - # raced with the INSERT don't update the cache (SYN-369) - self.sequence += 1 + # if we have a pending lookup for this key, remove it from the + # _pending_deferred_cache, which will (a) stop it being returned + # for future queries and (b) stop it being persisted as a proper entry + # in self.cache. entry = self._pending_deferred_cache.pop(key, None) + + # run the invalidation callbacks now, rather than waiting for the + # deferred to resolve. if entry: entry.invalidate() - self.cache.pop(key, None) - def invalidate_many(self, key): self.check_thread() if not isinstance(key, tuple): raise TypeError( "The cache key must be a tuple not %r" % (type(key),) ) - self.sequence += 1 self.cache.del_multi(key) + # if we have a pending lookup for this key, remove it from the + # _pending_deferred_cache, as above entry_dict = self._pending_deferred_cache.pop(key, None) if entry_dict is not None: for entry in iterate_tree_cache_entry(entry_dict): @@ -194,8 +204,10 @@ class Cache(object): def invalidate_all(self): self.check_thread() - self.sequence += 1 self.cache.clear() + for entry in self._pending_deferred_cache.itervalues(): + entry.invalidate() + self._pending_deferred_cache.clear() class _CacheDescriptorBase(object): diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py index 3f14ab503f..2516fe40f4 100644 --- a/tests/util/caches/test_descriptors.py +++ b/tests/util/caches/test_descriptors.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,6 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from functools import partial import logging import mock @@ -25,6 +27,50 @@ from tests import unittest logger = logging.getLogger(__name__) +class CacheTestCase(unittest.TestCase): + def test_invalidate_all(self): + cache = descriptors.Cache("testcache") + + callback_record = [False, False] + + def record_callback(idx): + callback_record[idx] = True + + # add a couple of pending entries + d1 = defer.Deferred() + cache.set("key1", d1, partial(record_callback, 0)) + + d2 = defer.Deferred() + cache.set("key2", d2, partial(record_callback, 1)) + + # lookup should return the deferreds + self.assertIs(cache.get("key1"), d1) + self.assertIs(cache.get("key2"), d2) + + # let one of the lookups complete + d2.callback("result2") + self.assertEqual(cache.get("key2"), "result2") + + # now do the invalidation + cache.invalidate_all() + + # lookup should return none + self.assertIsNone(cache.get("key1", None)) + self.assertIsNone(cache.get("key2", None)) + + # both callbacks should have been callbacked + self.assertTrue( + callback_record[0], "Invalidation callback for key1 not called", + ) + self.assertTrue( + callback_record[1], "Invalidation callback for key2 not called", + ) + + # letting the other lookup complete should do nothing + d1.callback("result1") + self.assertIsNone(cache.get("key1", None)) + + class DescriptorTestCase(unittest.TestCase): @defer.inlineCallbacks def test_cache(self): |