From 3e19beb941f3f797262b051d47227018898bb36f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 9 Aug 2018 14:33:49 +0100 Subject: Fix tests --- tests/replication/slave/storage/test_events.py | 3 ++- tests/storage/test_redaction.py | 4 +++- tests/storage/test_roommember.py | 4 +++- tests/test_state.py | 6 +++++- tests/test_visibility.py | 4 +++- tests/utils.py | 30 ++++++++++++++++++++++++++ 6 files changed, 46 insertions(+), 5 deletions(-) (limited to 'tests') diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index f5b47f5ec0..8c08985fa9 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -122,6 +122,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): @defer.inlineCallbacks def test_invites(self): + yield self.persist(type="m.room.create", key="", creator=USER_ID) yield self.check("get_invited_rooms_for_user", [USER_ID_2], []) event = yield self.persist( type="m.room.member", key=USER_ID_2, membership="invite" @@ -134,7 +135,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): @defer.inlineCallbacks def test_push_actions_for_user(self): - yield self.persist(type="m.room.create", creator=USER_ID) + yield self.persist(type="m.room.create", key="", creator=USER_ID) yield self.persist(type="m.room.join", key=USER_ID, membership="join") yield self.persist( type="m.room.join", sender=USER_ID, key=USER_ID_2, membership="join" diff --git a/tests/storage/test_redaction.py b/tests/storage/test_redaction.py index 475ec900c4..05f40eaa94 100644 --- a/tests/storage/test_redaction.py +++ b/tests/storage/test_redaction.py @@ -22,7 +22,7 @@ from synapse.api.constants import EventTypes, Membership from synapse.types import RoomID, UserID from tests import unittest -from tests.utils import setup_test_homeserver +from tests.utils import create_room, setup_test_homeserver class RedactionTestCase(unittest.TestCase): @@ -43,6 +43,8 @@ class RedactionTestCase(unittest.TestCase): self.room1 = RoomID.from_string("!abc123:test") + yield create_room(hs, self.room1.to_string(), self.u_alice.to_string()) + self.depth = 1 @defer.inlineCallbacks diff --git a/tests/storage/test_roommember.py b/tests/storage/test_roommember.py index c5fd54f67e..034ea3fbbf 100644 --- a/tests/storage/test_roommember.py +++ b/tests/storage/test_roommember.py @@ -22,7 +22,7 @@ from synapse.api.constants import EventTypes, Membership from synapse.types import RoomID, UserID from tests import unittest -from tests.utils import setup_test_homeserver +from tests.utils import create_room, setup_test_homeserver class RoomMemberStoreTestCase(unittest.TestCase): @@ -47,6 +47,8 @@ class RoomMemberStoreTestCase(unittest.TestCase): self.room = RoomID.from_string("!abc123:test") + yield create_room(hs, self.room.to_string(), self.u_alice.to_string()) + @defer.inlineCallbacks def inject_room_member(self, room, user, membership, replaces_state=None): builder = self.event_builder_factory.new({ diff --git a/tests/test_state.py b/tests/test_state.py index 429a18cbf7..770e94437a 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -18,7 +18,7 @@ from mock import Mock from twisted.internet import defer from synapse.api.auth import Auth -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import EventTypes, Membership, RoomVersions from synapse.events import FrozenEvent from synapse.state import StateHandler, StateResolutionHandler @@ -108,6 +108,9 @@ class StateGroupStore(object): def register_event_id_state_group(self, event_id, state_group): self._event_to_state_group[event_id] = state_group + def get_room_version(self, room_id): + return RoomVersions.V1 + class DictObj(dict): def __init__(self, **kwargs): @@ -167,6 +170,7 @@ class StateTestCase(unittest.TestCase): "START": DictObj( type=EventTypes.Create, state_key="", + content={}, depth=1, ), "A": DictObj( diff --git a/tests/test_visibility.py b/tests/test_visibility.py index 0dc1a924d3..15ebb0aa08 100644 --- a/tests/test_visibility.py +++ b/tests/test_visibility.py @@ -21,7 +21,7 @@ from synapse.events import FrozenEvent from synapse.visibility import filter_events_for_server import tests.unittest -from tests.utils import setup_test_homeserver +from tests.utils import create_room, setup_test_homeserver logger = logging.getLogger(__name__) @@ -36,6 +36,8 @@ class FilterEventsForServerTestCase(tests.unittest.TestCase): self.event_builder_factory = self.hs.get_event_builder_factory() self.store = self.hs.get_datastore() + yield create_room(self.hs, TEST_ROOM_ID, "@someone:ROOM") + @defer.inlineCallbacks def test_filtering(self): # diff --git a/tests/utils.py b/tests/utils.py index 3f17304934..8930a3a230 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -21,6 +21,7 @@ from six.moves.urllib import parse as urlparse from twisted.internet import defer, reactor +from synapse.api.constants import EventTypes from synapse.api.errors import CodeMessageException, cs_error from synapse.federation.transport import server from synapse.http.server import HttpServer @@ -445,3 +446,32 @@ class DeferredMockCallable(object): "call(%s)" % _format_call(c[0], c[1]) for c in calls ]) ) + + +@defer.inlineCallbacks +def create_room(hs, room_id, creator_id): + """Creates and persist a creation event for the given room + + Args: + hs + room_id (str) + creator_id (str) + """ + + store = hs.get_datastore() + event_builder_factory = hs.get_event_builder_factory() + event_creation_handler = hs.get_event_creation_handler() + + builder = event_builder_factory.new({ + "type": EventTypes.Create, + "state_key": "", + "sender": creator_id, + "room_id": room_id, + "content": {}, + }) + + event, context = yield event_creation_handler.create_new_client_event( + builder + ) + + yield store.persist_event(event, context) -- cgit 1.5.1 From a87af25fbb9d8938b1557ef42e31edecf1ee731c Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 15 Aug 2018 23:43:41 +1000 Subject: Fix the tests --- tests/rest/client/v1/test_register.py | 7 ++----- tests/rest/client/v1/utils.py | 8 +++----- tests/rest/client/v2_alpha/test_filter.py | 23 ++++++++--------------- tests/rest/client/v2_alpha/test_register.py | 26 +++++++++----------------- tests/rest/client/v2_alpha/test_sync.py | 5 ++--- tests/server.py | 23 +++++++++++++++++++---- tests/test_server.py | 17 ++++++----------- 7 files changed, 49 insertions(+), 60 deletions(-) (limited to 'tests') diff --git a/tests/rest/client/v1/test_register.py b/tests/rest/client/v1/test_register.py index 4be88b8a39..6b7ff813d5 100644 --- a/tests/rest/client/v1/test_register.py +++ b/tests/rest/client/v1/test_register.py @@ -25,7 +25,7 @@ from synapse.rest.client.v1_only.register import register_servlets from synapse.util import Clock from tests import unittest -from tests.server import make_request, setup_test_homeserver +from tests.server import make_request, render, setup_test_homeserver class CreateUserServletTestCase(unittest.TestCase): @@ -77,10 +77,7 @@ class CreateUserServletTestCase(unittest.TestCase): ) request, channel = make_request(b"POST", url, request_data) - request.render(res) - - # Advance the clock because it waits - self.clock.advance(1) + render(request, res, self.clock) self.assertEquals(channel.result["code"], b"200") diff --git a/tests/rest/client/v1/utils.py b/tests/rest/client/v1/utils.py index 9f862f9dfa..40dc4ea256 100644 --- a/tests/rest/client/v1/utils.py +++ b/tests/rest/client/v1/utils.py @@ -23,7 +23,7 @@ from twisted.internet import defer from synapse.api.constants import Membership from tests import unittest -from tests.server import make_request, wait_until_result +from tests.server import make_request, render class RestTestCase(unittest.TestCase): @@ -171,8 +171,7 @@ class RestHelper(object): request, channel = make_request( "POST", path, json.dumps(content).encode('utf8') ) - request.render(self.resource) - wait_until_result(self.hs.get_reactor(), channel) + render(request, self.resource, self.hs.get_reactor()) assert channel.result["code"] == b"200", channel.result self.auth_user_id = temp_id @@ -220,8 +219,7 @@ class RestHelper(object): request, channel = make_request("PUT", path, json.dumps(data).encode('utf8')) - request.render(self.resource) - wait_until_result(self.hs.get_reactor(), channel) + render(request, self.resource, self.hs.get_reactor()) assert int(channel.result["code"]) == expect_code, ( "Expected: %d, got: %d, resp: %r" diff --git a/tests/rest/client/v2_alpha/test_filter.py b/tests/rest/client/v2_alpha/test_filter.py index 8260c130f8..6a886ee3b8 100644 --- a/tests/rest/client/v2_alpha/test_filter.py +++ b/tests/rest/client/v2_alpha/test_filter.py @@ -24,8 +24,8 @@ from tests import unittest from tests.server import ( ThreadedMemoryReactorClock as MemoryReactorClock, make_request, + render, setup_test_homeserver, - wait_until_result, ) PATH_PREFIX = "/_matrix/client/v2_alpha" @@ -76,8 +76,7 @@ class FilterTestCase(unittest.TestCase): "/_matrix/client/r0/user/%s/filter" % (self.USER_ID), self.EXAMPLE_FILTER_JSON, ) - request.render(self.resource) - wait_until_result(self.clock, channel) + render(request, self.resource, self.clock) self.assertEqual(channel.result["code"], b"200") self.assertEqual(channel.json_body, {"filter_id": "0"}) @@ -91,8 +90,7 @@ class FilterTestCase(unittest.TestCase): "/_matrix/client/r0/user/%s/filter" % ("@watermelon:test"), self.EXAMPLE_FILTER_JSON, ) - request.render(self.resource) - wait_until_result(self.clock, channel) + render(request, self.resource, self.clock) self.assertEqual(channel.result["code"], b"403") self.assertEquals(channel.json_body["errcode"], Codes.FORBIDDEN) @@ -105,8 +103,7 @@ class FilterTestCase(unittest.TestCase): "/_matrix/client/r0/user/%s/filter" % (self.USER_ID), self.EXAMPLE_FILTER_JSON, ) - request.render(self.resource) - wait_until_result(self.clock, channel) + render(request, self.resource, self.clock) self.hs.is_mine = _is_mine self.assertEqual(channel.result["code"], b"403") @@ -121,8 +118,7 @@ class FilterTestCase(unittest.TestCase): request, channel = make_request( "GET", "/_matrix/client/r0/user/%s/filter/%s" % (self.USER_ID, filter_id) ) - request.render(self.resource) - wait_until_result(self.clock, channel) + render(request, self.resource, self.clock) self.assertEqual(channel.result["code"], b"200") self.assertEquals(channel.json_body, self.EXAMPLE_FILTER) @@ -131,8 +127,7 @@ class FilterTestCase(unittest.TestCase): request, channel = make_request( "GET", "/_matrix/client/r0/user/%s/filter/12382148321" % (self.USER_ID) ) - request.render(self.resource) - wait_until_result(self.clock, channel) + render(request, self.resource, self.clock) self.assertEqual(channel.result["code"], b"400") self.assertEquals(channel.json_body["errcode"], Codes.NOT_FOUND) @@ -143,8 +138,7 @@ class FilterTestCase(unittest.TestCase): request, channel = make_request( "GET", "/_matrix/client/r0/user/%s/filter/foobar" % (self.USER_ID) ) - request.render(self.resource) - wait_until_result(self.clock, channel) + render(request, self.resource, self.clock) self.assertEqual(channel.result["code"], b"400") @@ -153,7 +147,6 @@ class FilterTestCase(unittest.TestCase): request, channel = make_request( "GET", "/_matrix/client/r0/user/%s/filter/" % (self.USER_ID) ) - request.render(self.resource) - wait_until_result(self.clock, channel) + render(request, self.resource, self.clock) self.assertEqual(channel.result["code"], b"400") diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py index b72bd0fb7f..1c128e81f5 100644 --- a/tests/rest/client/v2_alpha/test_register.py +++ b/tests/rest/client/v2_alpha/test_register.py @@ -11,7 +11,7 @@ from synapse.rest.client.v2_alpha.register import register_servlets from synapse.util import Clock from tests import unittest -from tests.server import make_request, setup_test_homeserver, wait_until_result +from tests.server import make_request, render, setup_test_homeserver class RegisterRestServletTestCase(unittest.TestCase): @@ -72,8 +72,7 @@ class RegisterRestServletTestCase(unittest.TestCase): request, channel = make_request( b"POST", self.url + b"?access_token=i_am_an_app_service", request_data ) - request.render(self.resource) - wait_until_result(self.clock, channel) + render(request, self.resource, self.clock) self.assertEquals(channel.result["code"], b"200", channel.result) det_data = { @@ -89,16 +88,14 @@ class RegisterRestServletTestCase(unittest.TestCase): request, channel = make_request( b"POST", self.url + b"?access_token=i_am_an_app_service", request_data ) - request.render(self.resource) - wait_until_result(self.clock, channel) + render(request, self.resource, self.clock) self.assertEquals(channel.result["code"], b"401", channel.result) def test_POST_bad_password(self): request_data = json.dumps({"username": "kermit", "password": 666}) request, channel = make_request(b"POST", self.url, request_data) - request.render(self.resource) - wait_until_result(self.clock, channel) + render(request, self.resource, self.clock) self.assertEquals(channel.result["code"], b"400", channel.result) self.assertEquals(channel.json_body["error"], "Invalid password") @@ -106,8 +103,7 @@ class RegisterRestServletTestCase(unittest.TestCase): def test_POST_bad_username(self): request_data = json.dumps({"username": 777, "password": "monkey"}) request, channel = make_request(b"POST", self.url, request_data) - request.render(self.resource) - wait_until_result(self.clock, channel) + render(request, self.resource, self.clock) self.assertEquals(channel.result["code"], b"400", channel.result) self.assertEquals(channel.json_body["error"], "Invalid username") @@ -126,8 +122,7 @@ class RegisterRestServletTestCase(unittest.TestCase): self.device_handler.check_device_registered = Mock(return_value=device_id) request, channel = make_request(b"POST", self.url, request_data) - request.render(self.resource) - wait_until_result(self.clock, channel) + render(request, self.resource, self.clock) det_data = { "user_id": user_id, @@ -149,8 +144,7 @@ class RegisterRestServletTestCase(unittest.TestCase): self.registration_handler.register = Mock(return_value=("@user:id", "t")) request, channel = make_request(b"POST", self.url, request_data) - request.render(self.resource) - wait_until_result(self.clock, channel) + render(request, self.resource, self.clock) self.assertEquals(channel.result["code"], b"403", channel.result) self.assertEquals(channel.json_body["error"], "Registration has been disabled") @@ -162,8 +156,7 @@ class RegisterRestServletTestCase(unittest.TestCase): self.registration_handler.register = Mock(return_value=(user_id, None)) request, channel = make_request(b"POST", self.url + b"?kind=guest", b"{}") - request.render(self.resource) - wait_until_result(self.clock, channel) + render(request, self.resource, self.clock) det_data = { "user_id": user_id, @@ -177,8 +170,7 @@ class RegisterRestServletTestCase(unittest.TestCase): self.hs.config.allow_guest_access = False request, channel = make_request(b"POST", self.url + b"?kind=guest", b"{}") - request.render(self.resource) - wait_until_result(self.clock, channel) + render(request, self.resource, self.clock) self.assertEquals(channel.result["code"], b"403", channel.result) self.assertEquals(channel.json_body["error"], "Guest access is disabled") diff --git a/tests/rest/client/v2_alpha/test_sync.py b/tests/rest/client/v2_alpha/test_sync.py index 2e1d06c509..9f3d8bd1db 100644 --- a/tests/rest/client/v2_alpha/test_sync.py +++ b/tests/rest/client/v2_alpha/test_sync.py @@ -23,8 +23,8 @@ from tests import unittest from tests.server import ( ThreadedMemoryReactorClock as MemoryReactorClock, make_request, + render, setup_test_homeserver, - wait_until_result, ) PATH_PREFIX = "/_matrix/client/v2_alpha" @@ -69,8 +69,7 @@ class FilterTestCase(unittest.TestCase): def test_sync_argless(self): request, channel = make_request("GET", "/_matrix/client/r0/sync") - request.render(self.resource) - wait_until_result(self.clock, channel) + render(request, self.resource, self.clock) self.assertEqual(channel.result["code"], b"200") self.assertTrue( diff --git a/tests/server.py b/tests/server.py index beb24cf032..c63b2c3100 100644 --- a/tests/server.py +++ b/tests/server.py @@ -24,6 +24,7 @@ class FakeChannel(object): """ result = attr.ib(default=attr.Factory(dict)) + _producer = None @property def json_body(self): @@ -49,6 +50,15 @@ class FakeChannel(object): self.result["body"] += content + def registerProducer(self, producer, streaming): + self._producer = producer + + def unregisterProducer(self): + if self._producer is None: + return + + self._producer = None + def requestDone(self, _self): self.result["done"] = True @@ -111,14 +121,19 @@ def make_request(method, path, content=b""): return req, channel -def wait_until_result(clock, channel, timeout=100): +def wait_until_result(clock, request, timeout=100): """ - Wait until the channel has a result. + Wait until the request is finished. """ clock.run() x = 0 - while not channel.result: + while not request.finished: + + # If there's a producer, tell it to resume producing so we get content + if request._channel._producer: + request._channel._producer.resumeProducing() + x += 1 if x > timeout: @@ -129,7 +144,7 @@ def wait_until_result(clock, channel, timeout=100): def render(request, resource, clock): request.render(resource) - wait_until_result(clock, request._channel) + wait_until_result(clock, request) class ThreadedMemoryReactorClock(MemoryReactorClock): diff --git a/tests/test_server.py b/tests/test_server.py index 895e490406..ef74544e93 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -8,7 +8,7 @@ from synapse.http.server import JsonResource from synapse.util import Clock from tests import unittest -from tests.server import make_request, setup_test_homeserver +from tests.server import make_request, render, setup_test_homeserver class JsonResourceTests(unittest.TestCase): @@ -37,7 +37,7 @@ class JsonResourceTests(unittest.TestCase): ) request, channel = make_request(b"GET", b"/_matrix/foo/%E2%98%83?a=%E2%98%83") - request.render(res) + render(request, res, self.reactor) self.assertEqual(request.args, {b'a': [u"\N{SNOWMAN}".encode('utf8')]}) self.assertEqual(got_kwargs, {u"room_id": u"\N{SNOWMAN}"}) @@ -55,7 +55,7 @@ class JsonResourceTests(unittest.TestCase): res.register_paths("GET", [re.compile("^/_matrix/foo$")], _callback) request, channel = make_request(b"GET", b"/_matrix/foo") - request.render(res) + render(request, res, self.reactor) self.assertEqual(channel.result["code"], b'500') @@ -78,13 +78,8 @@ class JsonResourceTests(unittest.TestCase): res.register_paths("GET", [re.compile("^/_matrix/foo$")], _callback) request, channel = make_request(b"GET", b"/_matrix/foo") - request.render(res) + render(request, res, self.reactor) - # No error has been raised yet - self.assertTrue("code" not in channel.result) - - # Advance time, now there's an error - self.reactor.advance(1) self.assertEqual(channel.result["code"], b'500') def test_callback_synapseerror(self): @@ -100,7 +95,7 @@ class JsonResourceTests(unittest.TestCase): res.register_paths("GET", [re.compile("^/_matrix/foo$")], _callback) request, channel = make_request(b"GET", b"/_matrix/foo") - request.render(res) + render(request, res, self.reactor) self.assertEqual(channel.result["code"], b'403') self.assertEqual(channel.json_body["error"], "Forbidden!!one!") @@ -121,7 +116,7 @@ class JsonResourceTests(unittest.TestCase): res.register_paths("GET", [re.compile("^/_matrix/foo$")], _callback) request, channel = make_request(b"GET", b"/_matrix/foobar") - request.render(res) + render(request, res, self.reactor) self.assertEqual(channel.result["code"], b'400') self.assertEqual(channel.json_body["error"], "Unrecognized request") -- cgit 1.5.1 From ca87ad1defac1082462367854cb4a656b7a96e90 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 17 Aug 2018 11:43:16 +0100 Subject: Split ProfileHandler into master and worker --- synapse/handlers/profile.py | 21 ++++++++++++++------- synapse/server.py | 7 +++++-- tests/handlers/test_profile.py | 4 ++-- 3 files changed, 21 insertions(+), 11 deletions(-) (limited to 'tests') diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 6d1fbb1a5c..8b349f6ad6 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -33,12 +33,12 @@ from ._base import BaseHandler logger = logging.getLogger(__name__) -class ProfileHandler(BaseHandler): +class WorkerProfileHandler(BaseHandler): PROFILE_UPDATE_MS = 60 * 1000 PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000 def __init__(self, hs): - super(ProfileHandler, self).__init__(hs) + super(WorkerProfileHandler, self).__init__(hs) self.federation = hs.get_federation_client() hs.get_federation_registry().register_query_handler( @@ -47,11 +47,6 @@ class ProfileHandler(BaseHandler): self.user_directory_handler = hs.get_user_directory_handler() - if hs.config.worker_app is None: - self.clock.looping_call( - self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS, - ) - self._notify_master_profile_change = ( ReplicationHandleProfileChangeRestServlet.make_client(hs) ) @@ -298,6 +293,18 @@ class ProfileHandler(BaseHandler): room_id, str(e.message) ) + +class MasterProfileHandler(WorkerProfileHandler): + PROFILE_UPDATE_MS = 60 * 1000 + PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000 + + def __init__(self, hs): + super(MasterProfileHandler, self).__init__(hs) + + self.clock.looping_call( + self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS, + ) + def _start_update_remote_profile_cache(self): return run_as_background_process( "Update remote profile", self._update_remote_profile_cache, diff --git a/synapse/server.py b/synapse/server.py index 140be9ebe8..be85aad8cf 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -55,7 +55,7 @@ from synapse.handlers.initial_sync import InitialSyncHandler from synapse.handlers.message import EventCreationHandler, MessageHandler from synapse.handlers.pagination import PaginationHandler from synapse.handlers.presence import PresenceHandler -from synapse.handlers.profile import ProfileHandler +from synapse.handlers.profile import MasterProfileHandler, WorkerProfileHandler from synapse.handlers.read_marker import ReadMarkerHandler from synapse.handlers.receipts import ReceiptsHandler from synapse.handlers.room import RoomContextHandler, RoomCreationHandler @@ -307,7 +307,10 @@ class HomeServer(object): return InitialSyncHandler(self) def build_profile_handler(self): - return ProfileHandler(self) + if self.config.worker_app: + return WorkerProfileHandler(self) + else: + return MasterProfileHandler(self) def build_event_creation_handler(self): return EventCreationHandler(self) diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index dc17918a3d..07cf5f4c8e 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -20,7 +20,7 @@ from twisted.internet import defer import synapse.types from synapse.api.errors import AuthError -from synapse.handlers.profile import ProfileHandler +from synapse.handlers.profile import MasterProfileHandler from synapse.types import UserID from tests import unittest @@ -29,7 +29,7 @@ from tests.utils import setup_test_homeserver class ProfileHandlers(object): def __init__(self, hs): - self.profile_handler = ProfileHandler(hs) + self.profile_handler = MasterProfileHandler(hs) class ProfileTestCase(unittest.TestCase): -- cgit 1.5.1 From c334ca67bb89039b3a00b7c9a1ce610e99859653 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Sat, 18 Aug 2018 01:08:45 +1000 Subject: Integrate presence from hotfixes (#3694) --- changelog.d/3694.feature | 1 + docs/workers.rst | 8 +++ synapse/app/_base.py | 6 ++- synapse/app/frontend_proxy.py | 39 ++++++++++++++- synapse/app/synchrotron.py | 16 ++++-- synapse/config/server.py | 6 +++ synapse/federation/transaction_queue.py | 4 ++ synapse/handlers/initial_sync.py | 4 ++ synapse/handlers/presence.py | 26 +++++++--- synapse/handlers/sync.py | 3 +- synapse/rest/client/v1/presence.py | 3 +- tests/app/__init__.py | 0 tests/app/test_frontend_proxy.py | 88 +++++++++++++++++++++++++++++++++ tests/rest/client/v1/test_presence.py | 72 +++++++++++++++++++++++++++ tests/rest/client/v2_alpha/test_sync.py | 79 +++++++++++++---------------- tests/unittest.py | 8 ++- tests/utils.py | 7 +-- 17 files changed, 303 insertions(+), 67 deletions(-) create mode 100644 changelog.d/3694.feature create mode 100644 tests/app/__init__.py create mode 100644 tests/app/test_frontend_proxy.py create mode 100644 tests/rest/client/v1/test_presence.py (limited to 'tests') diff --git a/changelog.d/3694.feature b/changelog.d/3694.feature new file mode 100644 index 0000000000..916a342ff4 --- /dev/null +++ b/changelog.d/3694.feature @@ -0,0 +1 @@ +Synapse's presence functionality can now be disabled with the "use_presence" configuration option. diff --git a/docs/workers.rst b/docs/workers.rst index ac9efb621f..aec319dd84 100644 --- a/docs/workers.rst +++ b/docs/workers.rst @@ -241,6 +241,14 @@ regular expressions:: ^/_matrix/client/(api/v1|r0|unstable)/keys/upload +If ``use_presence`` is False in the homeserver config, it can also handle REST +endpoints matching the following regular expressions:: + + ^/_matrix/client/(api/v1|r0|unstable)/presence/[^/]+/status + +This "stub" presence handler will pass through ``GET`` request but make the +``PUT`` effectively a no-op. + It will proxy any requests it cannot handle to the main synapse instance. It must therefore be configured with the location of the main instance, via the ``worker_main_http_uri`` setting in the frontend_proxy worker configuration diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 391bd14c5c..7c866e246a 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -140,7 +140,7 @@ def listen_metrics(bind_addresses, port): logger.info("Metrics now reporting on %s:%d", host, port) -def listen_tcp(bind_addresses, port, factory, backlog=50): +def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50): """ Create a TCP socket for a port and several addresses """ @@ -156,7 +156,9 @@ def listen_tcp(bind_addresses, port, factory, backlog=50): check_bind_error(e, address, bind_addresses) -def listen_ssl(bind_addresses, port, factory, context_factory, backlog=50): +def listen_ssl( + bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50 +): """ Create an SSL socket for a port and several addresses """ diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index 671fbbcb2a..8d484c1cd4 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -38,6 +38,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns from synapse.rest.client.v2_alpha._base import client_v2_patterns from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -49,6 +50,35 @@ from synapse.util.versionstring import get_version_string logger = logging.getLogger("synapse.app.frontend_proxy") +class PresenceStatusStubServlet(ClientV1RestServlet): + PATTERNS = client_path_patterns("/presence/(?P[^/]*)/status") + + def __init__(self, hs): + super(PresenceStatusStubServlet, self).__init__(hs) + self.http_client = hs.get_simple_http_client() + self.auth = hs.get_auth() + self.main_uri = hs.config.worker_main_http_uri + + @defer.inlineCallbacks + def on_GET(self, request, user_id): + # Pass through the auth headers, if any, in case the access token + # is there. + auth_headers = request.requestHeaders.getRawHeaders("Authorization", []) + headers = { + "Authorization": auth_headers, + } + result = yield self.http_client.get_json( + self.main_uri + request.uri, + headers=headers, + ) + defer.returnValue((200, result)) + + @defer.inlineCallbacks + def on_PUT(self, request, user_id): + yield self.auth.get_user_by_req(request) + defer.returnValue((200, {})) + + class KeyUploadServlet(RestServlet): PATTERNS = client_v2_patterns("/keys/upload(/(?P[^/]+))?$") @@ -135,6 +165,12 @@ class FrontendProxyServer(HomeServer): elif name == "client": resource = JsonResource(self, canonical_json=False) KeyUploadServlet(self).register(resource) + + # If presence is disabled, use the stub servlet that does + # not allow sending presence + if not self.config.use_presence: + PresenceStatusStubServlet(self).register(resource) + resources.update({ "/_matrix/client/r0": resource, "/_matrix/client/unstable": resource, @@ -153,7 +189,8 @@ class FrontendProxyServer(HomeServer): listener_config, root_resource, self.version_string, - ) + ), + reactor=self.get_reactor() ) logger.info("Synapse client reader now listening on port %d", port) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index e201f18efd..cade09d60e 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -114,7 +114,10 @@ class SynchrotronPresence(object): logger.info("Presence process_id is %r", self.process_id) def send_user_sync(self, user_id, is_syncing, last_sync_ms): - self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms) + if self.hs.config.use_presence: + self.hs.get_tcp_replication().send_user_sync( + user_id, is_syncing, last_sync_ms + ) def mark_as_coming_online(self, user_id): """A user has started syncing. Send a UserSync to the master, unless they @@ -211,10 +214,13 @@ class SynchrotronPresence(object): yield self.notify_from_replication(states, stream_id) def get_currently_syncing_users(self): - return [ - user_id for user_id, count in iteritems(self.user_to_num_current_syncs) - if count > 0 - ] + if self.hs.config.use_presence: + return [ + user_id for user_id, count in iteritems(self.user_to_num_current_syncs) + if count > 0 + ] + else: + return set() class SynchrotronTyping(object): diff --git a/synapse/config/server.py b/synapse/config/server.py index a41c48e69c..68a612e594 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -49,6 +49,9 @@ class ServerConfig(Config): # "disable" federation self.send_federation = config.get("send_federation", True) + # Whether to enable user presence. + self.use_presence = config.get("use_presence", True) + # Whether to update the user directory or not. This should be set to # false only if we are updating the user directory in a worker self.update_user_directory = config.get("update_user_directory", True) @@ -250,6 +253,9 @@ class ServerConfig(Config): # hard limit. soft_file_limit: 0 + # Set to false to disable presence tracking on this homeserver. + use_presence: true + # The GC threshold parameters to pass to `gc.set_threshold`, if defined # gc_thresholds: [700, 10, 10] diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index f603c8a368..94d7423d01 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -58,6 +58,7 @@ class TransactionQueue(object): """ def __init__(self, hs): + self.hs = hs self.server_name = hs.hostname self.store = hs.get_datastore() @@ -308,6 +309,9 @@ class TransactionQueue(object): Args: states (list(UserPresenceState)) """ + if not self.hs.config.use_presence: + # No-op if presence is disabled. + return # First we queue up the new presence by user ID, so multiple presence # updates in quick successtion are correctly handled diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 1fb17fd9a5..e009395207 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -372,6 +372,10 @@ class InitialSyncHandler(BaseHandler): @defer.inlineCallbacks def get_presence(): + # If presence is disabled, return an empty list + if not self.hs.config.use_presence: + defer.returnValue([]) + states = yield presence_handler.get_states( [m.user_id for m in room_members], as_event=True, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 3671d24f60..ba3856674d 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -395,6 +395,10 @@ class PresenceHandler(object): """We've seen the user do something that indicates they're interacting with the app. """ + # If presence is disabled, no-op + if not self.hs.config.use_presence: + return + user_id = user.to_string() bump_active_time_counter.inc() @@ -424,6 +428,11 @@ class PresenceHandler(object): Useful for streams that are not associated with an actual client that is being used by a user. """ + # Override if it should affect the user's presence, if presence is + # disabled. + if not self.hs.config.use_presence: + affect_presence = False + if affect_presence: curr_sync = self.user_to_num_current_syncs.get(user_id, 0) self.user_to_num_current_syncs[user_id] = curr_sync + 1 @@ -469,13 +478,16 @@ class PresenceHandler(object): Returns: set(str): A set of user_id strings. """ - syncing_user_ids = { - user_id for user_id, count in self.user_to_num_current_syncs.items() - if count - } - for user_ids in self.external_process_to_current_syncs.values(): - syncing_user_ids.update(user_ids) - return syncing_user_ids + if self.hs.config.use_presence: + syncing_user_ids = { + user_id for user_id, count in self.user_to_num_current_syncs.items() + if count + } + for user_ids in self.external_process_to_current_syncs.values(): + syncing_user_ids.update(user_ids) + return syncing_user_ids + else: + return set() @defer.inlineCallbacks def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ac3edf0cc9..648debc8aa 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -185,6 +185,7 @@ class SyncResult(collections.namedtuple("SyncResult", [ class SyncHandler(object): def __init__(self, hs): + self.hs_config = hs.config self.store = hs.get_datastore() self.notifier = hs.get_notifier() self.presence_handler = hs.get_presence_handler() @@ -860,7 +861,7 @@ class SyncHandler(object): since_token is None and sync_config.filter_collection.blocks_all_presence() ) - if not block_all_presence_data: + if self.hs_config.use_presence and not block_all_presence_data: yield self._generate_sync_entry_for_presence( sync_result_builder, newly_joined_rooms, newly_joined_users ) diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py index a14f0c807e..b5a6d6aebf 100644 --- a/synapse/rest/client/v1/presence.py +++ b/synapse/rest/client/v1/presence.py @@ -84,7 +84,8 @@ class PresenceStatusRestServlet(ClientV1RestServlet): except Exception: raise SynapseError(400, "Unable to parse state") - yield self.presence_handler.set_state(user, state) + if self.hs.config.use_presence: + yield self.presence_handler.set_state(user, state) defer.returnValue((200, {})) diff --git a/tests/app/__init__.py b/tests/app/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/app/test_frontend_proxy.py b/tests/app/test_frontend_proxy.py new file mode 100644 index 0000000000..76b5090fff --- /dev/null +++ b/tests/app/test_frontend_proxy.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.app.frontend_proxy import FrontendProxyServer + +from tests.unittest import HomeserverTestCase + + +class FrontendProxyTests(HomeserverTestCase): + def make_homeserver(self, reactor, clock): + + hs = self.setup_test_homeserver( + http_client=None, homeserverToUse=FrontendProxyServer + ) + + return hs + + def test_listen_http_with_presence_enabled(self): + """ + When presence is on, the stub servlet will not register. + """ + # Presence is on + self.hs.config.use_presence = True + + config = { + "port": 8080, + "bind_addresses": ["0.0.0.0"], + "resources": [{"names": ["client"]}], + } + + # Listen with the config + self.hs._listen_http(config) + + # Grab the resource from the site that was told to listen + self.assertEqual(len(self.reactor.tcpServers), 1) + site = self.reactor.tcpServers[0][1] + self.resource = ( + site.resource.children["_matrix"].children["client"].children["r0"] + ) + + request, channel = self.make_request("PUT", "presence/a/status") + self.render(request) + + # 400 + unrecognised, because nothing is registered + self.assertEqual(channel.code, 400) + self.assertEqual(channel.json_body["errcode"], "M_UNRECOGNIZED") + + def test_listen_http_with_presence_disabled(self): + """ + When presence is on, the stub servlet will register. + """ + # Presence is off + self.hs.config.use_presence = False + + config = { + "port": 8080, + "bind_addresses": ["0.0.0.0"], + "resources": [{"names": ["client"]}], + } + + # Listen with the config + self.hs._listen_http(config) + + # Grab the resource from the site that was told to listen + self.assertEqual(len(self.reactor.tcpServers), 1) + site = self.reactor.tcpServers[0][1] + self.resource = ( + site.resource.children["_matrix"].children["client"].children["r0"] + ) + + request, channel = self.make_request("PUT", "presence/a/status") + self.render(request) + + # 401, because the stub servlet still checks authentication + self.assertEqual(channel.code, 401) + self.assertEqual(channel.json_body["errcode"], "M_MISSING_TOKEN") diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py new file mode 100644 index 0000000000..66c2b68707 --- /dev/null +++ b/tests/rest/client/v1/test_presence.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mock import Mock + +from synapse.rest.client.v1 import presence +from synapse.types import UserID + +from tests import unittest + + +class PresenceTestCase(unittest.HomeserverTestCase): + """ Tests presence REST API. """ + + user_id = "@sid:red" + + user = UserID.from_string(user_id) + servlets = [presence.register_servlets] + + def make_homeserver(self, reactor, clock): + + hs = self.setup_test_homeserver( + "red", http_client=None, federation_client=Mock() + ) + + hs.presence_handler = Mock() + + return hs + + def test_put_presence(self): + """ + PUT to the status endpoint with use_presence enabled will call + set_state on the presence handler. + """ + self.hs.config.use_presence = True + + body = {"presence": "here", "status_msg": "beep boop"} + request, channel = self.make_request( + "PUT", "/presence/%s/status" % (self.user_id,), body + ) + self.render(request) + + self.assertEqual(channel.code, 200) + self.assertEqual(self.hs.presence_handler.set_state.call_count, 1) + + def test_put_presence_disabled(self): + """ + PUT to the status endpoint with use_presence disbled will NOT call + set_state on the presence handler. + """ + self.hs.config.use_presence = False + + body = {"presence": "here", "status_msg": "beep boop"} + request, channel = self.make_request( + "PUT", "/presence/%s/status" % (self.user_id,), body + ) + self.render(request) + + self.assertEqual(channel.code, 200) + self.assertEqual(self.hs.presence_handler.set_state.call_count, 0) diff --git a/tests/rest/client/v2_alpha/test_sync.py b/tests/rest/client/v2_alpha/test_sync.py index 9f3d8bd1db..560b1fba96 100644 --- a/tests/rest/client/v2_alpha/test_sync.py +++ b/tests/rest/client/v2_alpha/test_sync.py @@ -13,71 +13,58 @@ # See the License for the specific language governing permissions and # limitations under the License. -import synapse.types -from synapse.http.server import JsonResource +from mock import Mock + from synapse.rest.client.v2_alpha import sync -from synapse.types import UserID -from synapse.util import Clock from tests import unittest -from tests.server import ( - ThreadedMemoryReactorClock as MemoryReactorClock, - make_request, - render, - setup_test_homeserver, -) - -PATH_PREFIX = "/_matrix/client/v2_alpha" -class FilterTestCase(unittest.TestCase): +class FilterTestCase(unittest.HomeserverTestCase): - USER_ID = "@apple:test" - TO_REGISTER = [sync] + user_id = "@apple:test" + servlets = [sync.register_servlets] - def setUp(self): - self.clock = MemoryReactorClock() - self.hs_clock = Clock(self.clock) + def make_homeserver(self, reactor, clock): - self.hs = setup_test_homeserver( - self.addCleanup, http_client=None, clock=self.hs_clock, reactor=self.clock + hs = self.setup_test_homeserver( + "red", http_client=None, federation_client=Mock() ) + return hs - self.auth = self.hs.get_auth() - - def get_user_by_access_token(token=None, allow_guest=False): - return { - "user": UserID.from_string(self.USER_ID), - "token_id": 1, - "is_guest": False, - } - - def get_user_by_req(request, allow_guest=False, rights="access"): - return synapse.types.create_requester( - UserID.from_string(self.USER_ID), 1, False, None - ) - - self.auth.get_user_by_access_token = get_user_by_access_token - self.auth.get_user_by_req = get_user_by_req + def test_sync_argless(self): + request, channel = self.make_request("GET", "/sync") + self.render(request) - self.store = self.hs.get_datastore() - self.filtering = self.hs.get_filtering() - self.resource = JsonResource(self.hs) + self.assertEqual(channel.code, 200) + self.assertTrue( + set( + [ + "next_batch", + "rooms", + "presence", + "account_data", + "to_device", + "device_lists", + ] + ).issubset(set(channel.json_body.keys())) + ) - for r in self.TO_REGISTER: - r.register_servlets(self.hs, self.resource) + def test_sync_presence_disabled(self): + """ + When presence is disabled, the key does not appear in /sync. + """ + self.hs.config.use_presence = False - def test_sync_argless(self): - request, channel = make_request("GET", "/_matrix/client/r0/sync") - render(request, self.resource, self.clock) + request, channel = self.make_request("GET", "/sync") + self.render(request) - self.assertEqual(channel.result["code"], b"200") + self.assertEqual(channel.code, 200) self.assertTrue( set( [ "next_batch", "rooms", - "presence", "account_data", "to_device", "device_lists", diff --git a/tests/unittest.py b/tests/unittest.py index e6afe3b96d..d852e2465a 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -18,6 +18,8 @@ import logging from mock import Mock +from canonicaljson import json + import twisted import twisted.logger from twisted.trial import unittest @@ -241,11 +243,15 @@ class HomeserverTestCase(TestCase): method (bytes/unicode): The HTTP request method ("verb"). path (bytes/unicode): The HTTP path, suitably URL encoded (e.g. escaped UTF-8 & spaces and such). - content (bytes): The body of the request. + content (bytes or dict): The body of the request. JSON-encoded, if + a dict. Returns: A synapse.http.site.SynapseRequest. """ + if isinstance(content, dict): + content = json.dumps(content).encode('utf8') + return make_request(method, path, content) def render(self, request): diff --git a/tests/utils.py b/tests/utils.py index 6f8b1de3e7..bb0fc74054 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -93,7 +93,8 @@ def setupdb(): @defer.inlineCallbacks def setup_test_homeserver( - cleanup_func, name="test", datastore=None, config=None, reactor=None, **kargs + cleanup_func, name="test", datastore=None, config=None, reactor=None, + homeserverToUse=HomeServer, **kargs ): """ Setup a homeserver suitable for running tests against. Keyword arguments @@ -192,7 +193,7 @@ def setup_test_homeserver( config.database_config["args"]["cp_openfun"] = db_engine.on_new_connection if datastore is None: - hs = HomeServer( + hs = homeserverToUse( name, config=config, db_config=config.database_config, @@ -235,7 +236,7 @@ def setup_test_homeserver( hs.setup() else: - hs = HomeServer( + hs = homeserverToUse( name, db_pool=None, datastore=datastore, -- cgit 1.5.1 From bb81e78ec6c05edc95b25a954bdf1cf688d4d652 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Wed, 22 Aug 2018 00:56:37 +0200 Subject: Split the state_group_cache in two (#3726) Splits the state_group_cache in two. One half contains normal state events; the other contains member events. The idea is that the lazyloading common case of: "I want a subset of member events plus all of the other state" can be accomplished efficiently by splitting the cache into two, and asking for "all events" from the non-members cache, and "just these keys" from the members cache. This means we can avoid having to make DictionaryCache aware of these sort of complicated queries, whilst letting LL requests benefit from the caching. Previously we were unable to sensibly use the caching and had to pull all state from the DB irrespective of the filtering, which made things slow. Hopefully fixes https://github.com/matrix-org/synapse/issues/3720. --- changelog.d/3726.misc | 1 + synapse/storage/state.py | 158 +++++++++++++++++++++++++++++++++++++++----- tests/storage/test_state.py | 105 ++++++++++++++++++++++++++--- 3 files changed, 236 insertions(+), 28 deletions(-) create mode 100644 changelog.d/3726.misc (limited to 'tests') diff --git a/changelog.d/3726.misc b/changelog.d/3726.misc new file mode 100644 index 0000000000..c4f66ec998 --- /dev/null +++ b/changelog.d/3726.misc @@ -0,0 +1 @@ +Split the state_group_cache into member and non-member state events (and so speed up LL /sync) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index dd03c4168b..4b971efdba 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -60,8 +60,43 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): def __init__(self, db_conn, hs): super(StateGroupWorkerStore, self).__init__(db_conn, hs) + # Originally the state store used a single DictionaryCache to cache the + # event IDs for the state types in a given state group to avoid hammering + # on the state_group* tables. + # + # The point of using a DictionaryCache is that it can cache a subset + # of the state events for a given state group (i.e. a subset of the keys for a + # given dict which is an entry in the cache for a given state group ID). + # + # However, this poses problems when performing complicated queries + # on the store - for instance: "give me all the state for this group, but + # limit members to this subset of users", as DictionaryCache's API isn't + # rich enough to say "please cache any of these fields, apart from this subset". + # This is problematic when lazy loading members, which requires this behaviour, + # as without it the cache has no choice but to speculatively load all + # state events for the group, which negates the efficiency being sought. + # + # Rather than overcomplicating DictionaryCache's API, we instead split the + # state_group_cache into two halves - one for tracking non-member events, + # and the other for tracking member_events. This means that lazy loading + # queries can be made in a cache-friendly manner by querying both caches + # separately and then merging the result. So for the example above, you + # would query the members cache for a specific subset of state keys + # (which DictionaryCache will handle efficiently and fine) and the non-members + # cache for all state (which DictionaryCache will similarly handle fine) + # and then just merge the results together. + # + # We size the non-members cache to be smaller than the members cache as the + # vast majority of state in Matrix (today) is member events. + self._state_group_cache = DictionaryCache( - "*stateGroupCache*", 500000 * get_cache_factor_for("stateGroupCache") + "*stateGroupCache*", + # TODO: this hasn't been tuned yet + 50000 * get_cache_factor_for("stateGroupCache") + ) + self._state_group_members_cache = DictionaryCache( + "*stateGroupMembersCache*", + 500000 * get_cache_factor_for("stateGroupMembersCache") ) @defer.inlineCallbacks @@ -275,7 +310,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): }) @defer.inlineCallbacks - def _get_state_groups_from_groups(self, groups, types): + def _get_state_groups_from_groups(self, groups, types, members=None): """Returns the state groups for a given set of groups, filtering on types of state events. @@ -284,6 +319,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): types (Iterable[str, str|None]|None): list of 2-tuples of the form (`type`, `state_key`), where a `state_key` of `None` matches all state_keys for the `type`. If None, all types are returned. + members (bool|None): If not None, then, in addition to any filtering + implied by types, the results are also filtered to only include + member events (if True), or to exclude member events (if False) Returns: dictionary state_group -> (dict of (type, state_key) -> event id) @@ -294,14 +332,14 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): for chunk in chunks: res = yield self.runInteraction( "_get_state_groups_from_groups", - self._get_state_groups_from_groups_txn, chunk, types, + self._get_state_groups_from_groups_txn, chunk, types, members, ) results.update(res) defer.returnValue(results) def _get_state_groups_from_groups_txn( - self, txn, groups, types=None, + self, txn, groups, types=None, members=None, ): results = {group: {} for group in groups} @@ -339,6 +377,11 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): %s """) + if members is True: + sql += " AND type = '%s'" % (EventTypes.Member,) + elif members is False: + sql += " AND type <> '%s'" % (EventTypes.Member,) + # Turns out that postgres doesn't like doing a list of OR's and # is about 1000x slower, so we just issue a query for each specific # type seperately. @@ -386,6 +429,11 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): else: where_clause = "" + if members is True: + where_clause += " AND type = '%s'" % EventTypes.Member + elif members is False: + where_clause += " AND type <> '%s'" % EventTypes.Member + # We don't use WITH RECURSIVE on sqlite3 as there are distributions # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) for group in groups: @@ -580,10 +628,11 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): defer.returnValue({row["event_id"]: row["state_group"] for row in rows}) - def _get_some_state_from_cache(self, group, types, filtered_types=None): + def _get_some_state_from_cache(self, cache, group, types, filtered_types=None): """Checks if group is in cache. See `_get_state_for_groups` Args: + cache(DictionaryCache): the state group cache to use group(int): The state group to lookup types(list[str, str|None]): List of 2-tuples of the form (`type`, `state_key`), where a `state_key` of `None` matches all @@ -597,11 +646,11 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): requests state from the cache, if False we need to query the DB for the missing state. """ - is_all, known_absent, state_dict_ids = self._state_group_cache.get(group) + is_all, known_absent, state_dict_ids = cache.get(group) type_to_key = {} - # tracks whether any of ourrequested types are missing from the cache + # tracks whether any of our requested types are missing from the cache missing_types = False for typ, state_key in types: @@ -648,7 +697,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): if include(k[0], k[1]) }, got_all - def _get_all_state_from_cache(self, group): + def _get_all_state_from_cache(self, cache, group): """Checks if group is in cache. See `_get_state_for_groups` Returns 2-tuple (`state_dict`, `got_all`). `got_all` is a bool @@ -656,9 +705,10 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): cache, if False we need to query the DB for the missing state. Args: + cache(DictionaryCache): the state group cache to use group: The state group to lookup """ - is_all, _, state_dict_ids = self._state_group_cache.get(group) + is_all, _, state_dict_ids = cache.get(group) return state_dict_ids, is_all @@ -681,6 +731,62 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): list of event types. Other types of events are returned unfiltered. If None, `types` filtering is applied to all events. + Returns: + Deferred[dict[int, dict[(type, state_key), EventBase]]] + a dictionary mapping from state group to state dictionary. + """ + if types is not None: + non_member_types = [t for t in types if t[0] != EventTypes.Member] + + if filtered_types is not None and EventTypes.Member not in filtered_types: + # we want all of the membership events + member_types = None + else: + member_types = [t for t in types if t[0] == EventTypes.Member] + + else: + non_member_types = None + member_types = None + + non_member_state = yield self._get_state_for_groups_using_cache( + groups, self._state_group_cache, non_member_types, filtered_types, + ) + # XXX: we could skip this entirely if member_types is [] + member_state = yield self._get_state_for_groups_using_cache( + # we set filtered_types=None as member_state only ever contain members. + groups, self._state_group_members_cache, member_types, None, + ) + + state = non_member_state + for group in groups: + state[group].update(member_state[group]) + + defer.returnValue(state) + + @defer.inlineCallbacks + def _get_state_for_groups_using_cache( + self, groups, cache, types=None, filtered_types=None + ): + """Gets the state at each of a list of state groups, optionally + filtering by type/state_key, querying from a specific cache. + + Args: + groups (iterable[int]): list of state groups for which we want + to get the state. + cache (DictionaryCache): the cache of group ids to state dicts which + we will pass through - either the normal state cache or the specific + members state cache. + types (None|iterable[(str, None|str)]): + indicates the state type/keys required. If None, the whole + state is fetched and returned. + + Otherwise, each entry should be a `(type, state_key)` tuple to + include in the response. A `state_key` of None is a wildcard + meaning that we require all state with that type. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. + Returns: Deferred[dict[int, dict[(type, state_key), EventBase]]] a dictionary mapping from state group to state dictionary. @@ -692,7 +798,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): if types is not None: for group in set(groups): state_dict_ids, got_all = self._get_some_state_from_cache( - group, types, filtered_types + cache, group, types, filtered_types ) results[group] = state_dict_ids @@ -701,7 +807,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): else: for group in set(groups): state_dict_ids, got_all = self._get_all_state_from_cache( - group + cache, group ) results[group] = state_dict_ids @@ -710,8 +816,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): missing_groups.append(group) if missing_groups: - # Okay, so we have some missing_types, lets fetch them. - cache_seq_num = self._state_group_cache.sequence + # Okay, so we have some missing_types, let's fetch them. + cache_seq_num = cache.sequence # the DictionaryCache knows if it has *all* the state, but # does not know if it has all of the keys of a particular type, @@ -725,7 +831,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): types_to_fetch = types group_to_state_dict = yield self._get_state_groups_from_groups( - missing_groups, types_to_fetch + missing_groups, types_to_fetch, cache == self._state_group_members_cache, ) for group, group_state_dict in iteritems(group_to_state_dict): @@ -745,7 +851,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): # update the cache with all the things we fetched from the # database. - self._state_group_cache.update( + cache.update( cache_seq_num, key=group, value=group_state_dict, @@ -847,15 +953,33 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): ], ) - # Prefill the state group cache with this group. + # Prefill the state group caches with this group. # It's fine to use the sequence like this as the state group map # is immutable. (If the map wasn't immutable then this prefill could # race with another update) + + current_member_state_ids = { + s: ev + for (s, ev) in iteritems(current_state_ids) + if s[0] == EventTypes.Member + } + txn.call_after( + self._state_group_members_cache.update, + self._state_group_members_cache.sequence, + key=state_group, + value=dict(current_member_state_ids), + ) + + current_non_member_state_ids = { + s: ev + for (s, ev) in iteritems(current_state_ids) + if s[0] != EventTypes.Member + } txn.call_after( self._state_group_cache.update, self._state_group_cache.sequence, key=state_group, - value=dict(current_state_ids), + value=dict(current_non_member_state_ids), ) return state_group diff --git a/tests/storage/test_state.py b/tests/storage/test_state.py index ebfd969b36..d717b9f94e 100644 --- a/tests/storage/test_state.py +++ b/tests/storage/test_state.py @@ -185,6 +185,7 @@ class StateStoreTestCase(tests.unittest.TestCase): # test _get_some_state_from_cache correctly filters out members with types=[] (state_dict, is_all) = yield self.store._get_some_state_from_cache( + self.store._state_group_cache, group, [], filtered_types=[EventTypes.Member] ) @@ -197,8 +198,20 @@ class StateStoreTestCase(tests.unittest.TestCase): state_dict, ) + (state_dict, is_all) = yield self.store._get_some_state_from_cache( + self.store._state_group_members_cache, + group, [], filtered_types=[EventTypes.Member] + ) + + self.assertEqual(is_all, True) + self.assertDictEqual( + {}, + state_dict, + ) + # test _get_some_state_from_cache correctly filters in members with wildcard types (state_dict, is_all) = yield self.store._get_some_state_from_cache( + self.store._state_group_cache, group, [(EventTypes.Member, None)], filtered_types=[EventTypes.Member] ) @@ -207,6 +220,18 @@ class StateStoreTestCase(tests.unittest.TestCase): { (e1.type, e1.state_key): e1.event_id, (e2.type, e2.state_key): e2.event_id, + }, + state_dict, + ) + + (state_dict, is_all) = yield self.store._get_some_state_from_cache( + self.store._state_group_members_cache, + group, [(EventTypes.Member, None)], filtered_types=[EventTypes.Member] + ) + + self.assertEqual(is_all, True) + self.assertDictEqual( + { (e3.type, e3.state_key): e3.event_id, # e4 is overwritten by e5 (e5.type, e5.state_key): e5.event_id, @@ -216,6 +241,7 @@ class StateStoreTestCase(tests.unittest.TestCase): # test _get_some_state_from_cache correctly filters in members with specific types (state_dict, is_all) = yield self.store._get_some_state_from_cache( + self.store._state_group_cache, group, [(EventTypes.Member, e5.state_key)], filtered_types=[EventTypes.Member], @@ -226,6 +252,20 @@ class StateStoreTestCase(tests.unittest.TestCase): { (e1.type, e1.state_key): e1.event_id, (e2.type, e2.state_key): e2.event_id, + }, + state_dict, + ) + + (state_dict, is_all) = yield self.store._get_some_state_from_cache( + self.store._state_group_members_cache, + group, + [(EventTypes.Member, e5.state_key)], + filtered_types=[EventTypes.Member], + ) + + self.assertEqual(is_all, True) + self.assertDictEqual( + { (e5.type, e5.state_key): e5.event_id, }, state_dict, @@ -234,6 +274,7 @@ class StateStoreTestCase(tests.unittest.TestCase): # test _get_some_state_from_cache correctly filters in members with specific types # and no filtered_types (state_dict, is_all) = yield self.store._get_some_state_from_cache( + self.store._state_group_members_cache, group, [(EventTypes.Member, e5.state_key)], filtered_types=None ) @@ -254,9 +295,6 @@ class StateStoreTestCase(tests.unittest.TestCase): { (e1.type, e1.state_key): e1.event_id, (e2.type, e2.state_key): e2.event_id, - (e3.type, e3.state_key): e3.event_id, - # e4 is overwritten by e5 - (e5.type, e5.state_key): e5.event_id, }, ) @@ -269,8 +307,6 @@ class StateStoreTestCase(tests.unittest.TestCase): # list fetched keys so it knows it's partial fetched_keys=( (e1.type, e1.state_key), - (e3.type, e3.state_key), - (e5.type, e5.state_key), ), ) @@ -284,8 +320,6 @@ class StateStoreTestCase(tests.unittest.TestCase): set( [ (e1.type, e1.state_key), - (e3.type, e3.state_key), - (e5.type, e5.state_key), ] ), ) @@ -293,8 +327,6 @@ class StateStoreTestCase(tests.unittest.TestCase): state_dict_ids, { (e1.type, e1.state_key): e1.event_id, - (e3.type, e3.state_key): e3.event_id, - (e5.type, e5.state_key): e5.event_id, }, ) @@ -304,14 +336,25 @@ class StateStoreTestCase(tests.unittest.TestCase): # test _get_some_state_from_cache correctly filters out members with types=[] room_id = self.room.to_string() (state_dict, is_all) = yield self.store._get_some_state_from_cache( + self.store._state_group_cache, group, [], filtered_types=[EventTypes.Member] ) self.assertEqual(is_all, False) self.assertDictEqual({(e1.type, e1.state_key): e1.event_id}, state_dict) + room_id = self.room.to_string() + (state_dict, is_all) = yield self.store._get_some_state_from_cache( + self.store._state_group_members_cache, + group, [], filtered_types=[EventTypes.Member] + ) + + self.assertEqual(is_all, True) + self.assertDictEqual({}, state_dict) + # test _get_some_state_from_cache correctly filters in members wildcard types (state_dict, is_all) = yield self.store._get_some_state_from_cache( + self.store._state_group_cache, group, [(EventTypes.Member, None)], filtered_types=[EventTypes.Member] ) @@ -319,8 +362,19 @@ class StateStoreTestCase(tests.unittest.TestCase): self.assertDictEqual( { (e1.type, e1.state_key): e1.event_id, + }, + state_dict, + ) + + (state_dict, is_all) = yield self.store._get_some_state_from_cache( + self.store._state_group_members_cache, + group, [(EventTypes.Member, None)], filtered_types=[EventTypes.Member] + ) + + self.assertEqual(is_all, True) + self.assertDictEqual( + { (e3.type, e3.state_key): e3.event_id, - # e4 is overwritten by e5 (e5.type, e5.state_key): e5.event_id, }, state_dict, @@ -328,6 +382,7 @@ class StateStoreTestCase(tests.unittest.TestCase): # test _get_some_state_from_cache correctly filters in members with specific types (state_dict, is_all) = yield self.store._get_some_state_from_cache( + self.store._state_group_cache, group, [(EventTypes.Member, e5.state_key)], filtered_types=[EventTypes.Member], @@ -337,6 +392,20 @@ class StateStoreTestCase(tests.unittest.TestCase): self.assertDictEqual( { (e1.type, e1.state_key): e1.event_id, + }, + state_dict, + ) + + (state_dict, is_all) = yield self.store._get_some_state_from_cache( + self.store._state_group_members_cache, + group, + [(EventTypes.Member, e5.state_key)], + filtered_types=[EventTypes.Member], + ) + + self.assertEqual(is_all, True) + self.assertDictEqual( + { (e5.type, e5.state_key): e5.event_id, }, state_dict, @@ -345,8 +414,22 @@ class StateStoreTestCase(tests.unittest.TestCase): # test _get_some_state_from_cache correctly filters in members with specific types # and no filtered_types (state_dict, is_all) = yield self.store._get_some_state_from_cache( + self.store._state_group_cache, + group, [(EventTypes.Member, e5.state_key)], filtered_types=None + ) + + self.assertEqual(is_all, False) + self.assertDictEqual({}, state_dict) + + (state_dict, is_all) = yield self.store._get_some_state_from_cache( + self.store._state_group_members_cache, group, [(EventTypes.Member, e5.state_key)], filtered_types=None ) self.assertEqual(is_all, True) - self.assertDictEqual({(e5.type, e5.state_key): e5.event_id}, state_dict) + self.assertDictEqual( + { + (e5.type, e5.state_key): e5.event_id, + }, + state_dict, + ) -- cgit 1.5.1