From 31a9eceda5cf00b0482baf1c8bf1e138c823f621 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 30 Mar 2016 15:58:20 +0100 Subject: Add a replication stream for state groups --- tests/replication/test_resource.py | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) (limited to 'tests') diff --git a/tests/replication/test_resource.py b/tests/replication/test_resource.py index f4b5fb3328..b1dd7b4a74 100644 --- a/tests/replication/test_resource.py +++ b/tests/replication/test_resource.py @@ -58,15 +58,21 @@ class ReplicationResourceCase(unittest.TestCase): self.assertEquals(body, {}) @defer.inlineCallbacks - def test_events(self): - get = self.get(events="-1", timeout="0") + def test_events_and_state(self): + get = self.get(events="-1", state="-1", timeout="0") yield self.hs.get_handlers().room_creation_handler.create_room( Requester(self.user, "", False), {} ) code, body = yield get self.assertEquals(code, 200) self.assertEquals(body["events"]["field_names"], [ - "position", "internal", "json" + "position", "internal", "json", "state_group" + ]) + self.assertEquals(body["state_groups"]["field_names"], [ + "position", "room_id", "event_id" + ]) + self.assertEquals(body["state_group_state"]["field_names"], [ + "position", "type", "state_key", "event_id" ]) @defer.inlineCallbacks @@ -132,6 +138,7 @@ class ReplicationResourceCase(unittest.TestCase): test_timeout_backfill = _test_timeout("backfill") test_timeout_push_rules = _test_timeout("push_rules") test_timeout_pushers = _test_timeout("pushers") + test_timeout_state = _test_timeout("state") @defer.inlineCallbacks def send_text_message(self, room_id, message): @@ -182,4 +189,21 @@ class ReplicationResourceCase(unittest.TestCase): ) response_body = json.loads(response_json) + if response_code == 200: + self.check_response(response_body) + defer.returnValue((response_code, response_body)) + + def check_response(self, response_body): + for name, stream in response_body.items(): + self.assertIn("field_names", stream) + field_names = stream["field_names"] + self.assertIn("rows", stream) + self.assertTrue(stream["rows"]) + for row in stream["rows"]: + self.assertEquals( + len(row), len(field_names), + "%s: len(row = %r) == len(field_names = %r)" % ( + name, row, field_names + ) + ) -- cgit 1.4.1 From f699b8f997ed743af0cfa7046428915a7f42610b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 31 Mar 2016 10:04:28 +0100 Subject: Read from DNS cache if within TTL --- synapse/http/endpoint.py | 39 +++++++++++++++++++++++---------------- tests/test_dns.py | 5 ++++- 2 files changed, 27 insertions(+), 17 deletions(-) (limited to 'tests') diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index 4775f6707d..e80d00e2af 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -22,6 +22,7 @@ from twisted.names.error import DNSNameError, DomainError import collections import logging import random +import time logger = logging.getLogger(__name__) @@ -31,7 +32,7 @@ SERVER_CACHE = {} _Server = collections.namedtuple( - "_Server", "priority weight host port" + "_Server", "priority weight host port expires" ) @@ -92,7 +93,8 @@ class SRVClientEndpoint(object): host=domain, port=default_port, priority=0, - weight=0 + weight=0, + expires=0, ) else: self.default_server = None @@ -154,6 +156,12 @@ class SRVClientEndpoint(object): @defer.inlineCallbacks def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE): + cache_entry = cache.get(service_name, None) + if cache_entry: + if all(s.expires > int(time.time()) for s in cache_entry): + servers = list(cache_entry) + defer.returnValue(servers) + servers = [] try: @@ -173,27 +181,26 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE): continue payload = answer.payload - host = str(payload.target) + srv_ttl = answer.ttl try: answers, _, _ = yield dns_client.lookupAddress(host) except DNSNameError: continue - ips = [ - answer.payload.dottedQuad() - for answer in answers - if answer.type == dns.A and answer.payload - ] - - for ip in ips: - servers.append(_Server( - host=ip, - port=int(payload.port), - priority=int(payload.priority), - weight=int(payload.weight) - )) + for answer in answers: + if answer.type == dns.A and answer.payload: + ip = answer.payload.dottedQuad() + host_ttl = min(srv_ttl, answer.ttl) + + servers.append(_Server( + host=ip, + port=int(payload.port), + priority=int(payload.priority), + weight=int(payload.weight), + expires=int(time.time()) + host_ttl, + )) servers.sort() cache[service_name] = list(servers) diff --git a/tests/test_dns.py b/tests/test_dns.py index 637b1606f8..e006ed1a59 100644 --- a/tests/test_dns.py +++ b/tests/test_dns.py @@ -69,8 +69,11 @@ class DnsTestCase(unittest.TestCase): service_name = "test_service.examle.com" + entry = Mock(spec_set=["expires"]) + entry.expires = 999999999 + cache = { - service_name: [object()] + service_name: [entry] } servers = yield resolve_service( -- cgit 1.4.1 From c906f3066152ba7a65c0765a5812b71bb8a4016c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 1 Apr 2016 16:17:32 +0100 Subject: Do checks for memberships before creating events --- synapse/handlers/room_member.py | 237 ++++++++++++++++++++++--------------- synapse/state.py | 8 +- tests/rest/client/v1/test_rooms.py | 4 +- 3 files changed, 151 insertions(+), 98 deletions(-) (limited to 'tests') diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 5fdbd3adcc..09b8f6217a 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -95,6 +95,80 @@ class RoomMemberHandler(BaseHandler): if remotedomains is not None: remotedomains.add(member.domain) + @defer.inlineCallbacks + def _local_membership_update( + self, requester, target, room_id, membership, + txn_id=None, + ratelimit=True, + ): + msg_handler = self.hs.get_handlers().message_handler + + content = {"membership": membership} + if requester.is_guest: + content["kind"] = "guest" + + event, context = yield msg_handler.create_event( + { + "type": EventTypes.Member, + "content": content, + "room_id": room_id, + "sender": requester.user.to_string(), + "state_key": target.to_string(), + + # For backwards compatibility: + "membership": membership, + }, + token_id=requester.access_token_id, + txn_id=txn_id, + ) + + yield self.handle_new_client_event( + requester, + event, + context, + extra_users=[target], + ratelimit=ratelimit, + ) + + prev_member_event = context.current_state.get( + (EventTypes.Member, target.to_string()), + None + ) + + if event.membership == Membership.JOIN: + if not prev_member_event or prev_member_event.membership != Membership.JOIN: + # Only fire user_joined_room if the user has acutally joined the + # room. Don't bother if the user is just changing their profile + # info. + yield user_joined_room(self.distributor, target, room_id) + elif event.membership == Membership.LEAVE: + if prev_member_event and prev_member_event.membership == Membership.JOIN: + user_left_room(self.distributor, target, room_id) + + @defer.inlineCallbacks + def remote_join(self, remote_room_hosts, room_id, user, content): + if len(remote_room_hosts) == 0: + raise SynapseError(404, "No known servers") + + # We don't do an auth check if we are doing an invite + # join dance for now, since we're kinda implicitly checking + # that we are allowed to join when we decide whether or not we + # need to do the invite/join dance. + yield self.hs.get_handlers().federation_handler.do_invite_join( + remote_room_hosts, + room_id, + user.to_string(), + content, + ) + yield user_joined_room(self.distributor, user, room_id) + + def reject_remote_invite(self, user_id, room_id, remote_room_hosts): + return self.hs.get_handlers().federation_handler.do_remotely_reject_invite( + remote_room_hosts, + room_id, + user_id + ) + @defer.inlineCallbacks def update_membership( self, @@ -120,28 +194,15 @@ class RoomMemberHandler(BaseHandler): third_party_signed, ) - msg_handler = self.hs.get_handlers().message_handler - - content = {"membership": effective_membership_state} - if requester.is_guest: - content["kind"] = "guest" + if not remote_room_hosts: + remote_room_hosts = [] - event, context = yield msg_handler.create_event( - { - "type": EventTypes.Member, - "content": content, - "room_id": room_id, - "sender": requester.user.to_string(), - "state_key": target.to_string(), - - # For backwards compatibility: - "membership": effective_membership_state, - }, - token_id=requester.access_token_id, - txn_id=txn_id, + latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + current_state = yield self.state_handler.get_current_state( + room_id, latest_event_ids=latest_event_ids, ) - old_state = context.current_state.get((EventTypes.Member, event.state_key)) + old_state = current_state.get((EventTypes.Member, target.to_string())) old_membership = old_state.content.get("membership") if old_state else None if action == "unban" and old_membership != "ban": raise SynapseError( @@ -156,13 +217,57 @@ class RoomMemberHandler(BaseHandler): errcode=Codes.BAD_STATE ) - member_handler = self.hs.get_handlers().room_member_handler - yield member_handler.send_membership_event( - requester, - event, - context, + is_host_in_room = self.is_host_in_room(current_state) + + if effective_membership_state == Membership.JOIN: + if requester.is_guest and not self._can_guest_join(current_state): + # This should be an auth check, but guests are a local concept, + # so don't really fit into the general auth process. + raise AuthError(403, "Guest access not allowed") + + if not is_host_in_room: + inviter = yield self.get_inviter(target.to_string(), room_id) + if inviter and not self.hs.is_mine(inviter): + remote_room_hosts.append(inviter.domain) + + content = {"membership": Membership.JOIN} + if requester.is_guest: + content["kind"] = "guest" + + ret = yield self.remote_join( + remote_room_hosts, room_id, target, content + ) + defer.returnValue(ret) + + elif effective_membership_state == Membership.LEAVE: + if not is_host_in_room: + # perhaps we've been invited + inviter = yield self.get_inviter(target.to_string(), room_id) + if not inviter: + raise SynapseError(404, "Not a known room") + + if self.hs.is_mine(inviter): + # the inviter was on our server, but has now left. Carry on + # with the normal rejection codepath. + # + # This is a bit of a hack, because the room might still be + # active on other servers. + pass + else: + # send the rejection to the inviter's HS. + remote_room_hosts = remote_room_hosts + [inviter.domain] + ret = yield self.reject_remote_invite( + target.to_string(), room_id, remote_room_hosts + ) + defer.returnValue(ret) + + yield self._local_membership_update( + requester=requester, + target=target, + room_id=room_id, + membership=effective_membership_state, + txn_id=txn_id, ratelimit=ratelimit, - remote_room_hosts=remote_room_hosts, ) @defer.inlineCallbacks @@ -211,73 +316,19 @@ class RoomMemberHandler(BaseHandler): if prev_event is not None: return - action = "send" - if event.membership == Membership.JOIN: if requester.is_guest and not self._can_guest_join(context.current_state): # This should be an auth check, but guests are a local concept, # so don't really fit into the general auth process. raise AuthError(403, "Guest access not allowed") - do_remote_join_dance, remote_room_hosts = self._should_do_dance( - context, - (self.get_inviter(event.state_key, context.current_state)), - remote_room_hosts, - ) - if do_remote_join_dance: - action = "remote_join" - elif event.membership == Membership.LEAVE: - is_host_in_room = self.is_host_in_room(context.current_state) - - if not is_host_in_room: - # perhaps we've been invited - inviter = self.get_inviter( - target_user.to_string(), context.current_state - ) - if not inviter: - raise SynapseError(404, "Not a known room") - if self.hs.is_mine(inviter): - # the inviter was on our server, but has now left. Carry on - # with the normal rejection codepath. - # - # This is a bit of a hack, because the room might still be - # active on other servers. - pass - else: - # send the rejection to the inviter's HS. - remote_room_hosts = remote_room_hosts + [inviter.domain] - action = "remote_reject" - - federation_handler = self.hs.get_handlers().federation_handler - - if action == "remote_join": - if len(remote_room_hosts) == 0: - raise SynapseError(404, "No known servers") - - # We don't do an auth check if we are doing an invite - # join dance for now, since we're kinda implicitly checking - # that we are allowed to join when we decide whether or not we - # need to do the invite/join dance. - yield federation_handler.do_invite_join( - remote_room_hosts, - event.room_id, - event.user_id, - event.content, - ) - elif action == "remote_reject": - yield federation_handler.do_remotely_reject_invite( - remote_room_hosts, - room_id, - event.user_id - ) - else: - yield self.handle_new_client_event( - requester, - event, - context, - extra_users=[target_user], - ratelimit=ratelimit, - ) + yield self.handle_new_client_event( + requester, + event, + context, + extra_users=[target_user], + ratelimit=ratelimit, + ) prev_member_event = context.current_state.get( (EventTypes.Member, target_user.to_string()), @@ -306,11 +357,11 @@ class RoomMemberHandler(BaseHandler): and guest_access.content["guest_access"] == "can_join" ) - def _should_do_dance(self, context, inviter, room_hosts=None): + def _should_do_dance(self, current_state, inviter, room_hosts=None): # TODO: Shouldn't this be remote_room_host? room_hosts = room_hosts or [] - is_host_in_room = self.is_host_in_room(context.current_state) + is_host_in_room = self.is_host_in_room(current_state) if is_host_in_room: return False, room_hosts @@ -344,11 +395,11 @@ class RoomMemberHandler(BaseHandler): defer.returnValue((RoomID.from_string(room_id), servers)) - def get_inviter(self, user_id, current_state): - prev_state = current_state.get((EventTypes.Member, user_id)) - if prev_state and prev_state.membership == Membership.INVITE: - return UserID.from_string(prev_state.user_id) - return None + @defer.inlineCallbacks + def get_inviter(self, user_id, room_id): + invite = yield self.store.get_room_member(user_id=user_id, room_id=room_id) + if invite: + defer.returnValue(UserID.from_string(invite.sender)) @defer.inlineCallbacks def get_joined_rooms_for_user(self, user): diff --git a/synapse/state.py b/synapse/state.py index 4672ada1b3..5a5fd8ff12 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -75,7 +75,8 @@ class StateHandler(object): self._state_cache.start() @defer.inlineCallbacks - def get_current_state(self, room_id, event_type=None, state_key=""): + def get_current_state(self, room_id, event_type=None, state_key="", + latest_event_ids=None): """ Retrieves the current state for the room. This is done by calling `get_latest_events_in_room` to get the leading edges of the event graph and then resolving any of the state conflicts. @@ -88,9 +89,10 @@ class StateHandler(object): :returns map from (type, state_key) to event """ - event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + if not latest_event_ids: + latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) - res = yield self.resolve_state_groups(room_id, event_ids) + res = yield self.resolve_state_groups(room_id, latest_event_ids) state = res[1] if event_type: diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index 4ab8b35e6b..8853cbb5fc 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -259,8 +259,8 @@ class RoomPermissionsTestCase(RestTestCase): # set [invite/join/left] of self, set [invite/join/left] of other, # expect all 404s because room doesn't exist on any server for usr in [self.user_id, self.rmcreator_id]: - yield self.join(room=room, user=usr, expect_code=404) - yield self.leave(room=room, user=usr, expect_code=404) + yield self.join(room=room, user=usr, expect_code=403) + yield self.leave(room=room, user=usr, expect_code=403) @defer.inlineCallbacks def test_membership_private_room_perms(self): -- cgit 1.4.1 From 11860637e116717efa14149f17d8b941d1e5db5e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 6 Apr 2016 10:12:30 +0100 Subject: Tests --- tests/test_dns.py | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) (limited to 'tests') diff --git a/tests/test_dns.py b/tests/test_dns.py index e006ed1a59..c394c57ee7 100644 --- a/tests/test_dns.py +++ b/tests/test_dns.py @@ -21,6 +21,8 @@ from mock import Mock from synapse.http.endpoint import resolve_service +from tests.utils import MockClock + class DnsTestCase(unittest.TestCase): @@ -63,14 +65,14 @@ class DnsTestCase(unittest.TestCase): self.assertEquals(servers[0].host, ip_address) @defer.inlineCallbacks - def test_from_cache(self): + def test_from_cache_expired_and_dns_fail(self): dns_client_mock = Mock() dns_client_mock.lookupService.return_value = defer.fail(error.DNSServerError()) service_name = "test_service.examle.com" entry = Mock(spec_set=["expires"]) - entry.expires = 999999999 + entry.expires = 0 cache = { service_name: [entry] @@ -85,6 +87,31 @@ class DnsTestCase(unittest.TestCase): self.assertEquals(len(servers), 1) self.assertEquals(servers, cache[service_name]) + @defer.inlineCallbacks + def test_from_cache(self): + clock = MockClock() + + dns_client_mock = Mock(spec_set=['lookupService']) + dns_client_mock.lookupService = Mock(spec_set=[]) + + service_name = "test_service.examle.com" + + entry = Mock(spec_set=["expires"]) + entry.expires = 999999999 + + cache = { + service_name: [entry] + } + + servers = yield resolve_service( + service_name, dns_client=dns_client_mock, cache=cache, clock=clock, + ) + + self.assertFalse(dns_client_mock.lookupService.called) + + self.assertEquals(len(servers), 1) + self.assertEquals(servers, cache[service_name]) + @defer.inlineCallbacks def test_empty_cache(self): dns_client_mock = Mock() -- cgit 1.4.1 From 8aab9d87fa6739345810f0edf3982fe7f898ee30 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 6 Apr 2016 14:08:18 +0100 Subject: Don't require config to create database --- scripts/synapse_port_db | 13 ++--- synapse/app/homeserver.py | 15 +++-- synapse/storage/engines/__init__.py | 6 +- synapse/storage/engines/postgres.py | 8 +-- synapse/storage/engines/sqlite3.py | 13 +---- synapse/storage/prepare_database.py | 64 +++++++--------------- .../schema/delta/14/upgrade_appservice_db.py | 6 +- synapse/storage/schema/delta/20/pushers.py | 6 +- synapse/storage/schema/delta/25/fts.py | 6 +- synapse/storage/schema/delta/27/ts.py | 6 +- synapse/storage/schema/delta/30/as_users.py | 4 +- tests/storage/test_base.py | 2 +- tests/utils.py | 6 +- 13 files changed, 69 insertions(+), 86 deletions(-) (limited to 'tests') diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index a2a0f364cf..253a6ef6c7 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -19,6 +19,7 @@ from twisted.enterprise import adbapi from synapse.storage._base import LoggingTransaction, SQLBaseStore from synapse.storage.engines import create_engine +from synapse.storage.prepare_database import prepare_database import argparse import curses @@ -37,6 +38,7 @@ BOOLEAN_COLUMNS = { "rooms": ["is_public"], "event_edges": ["is_state"], "presence_list": ["accepted"], + "presence_stream": ["currently_active"], } @@ -292,7 +294,7 @@ class Porter(object): } ) - database_engine.prepare_database(db_conn) + prepare_database(db_conn, database_engine, config=None) db_conn.commit() @@ -309,8 +311,8 @@ class Porter(object): **self.postgres_config["args"] ) - sqlite_engine = create_engine(FakeConfig(sqlite_config)) - postgres_engine = create_engine(FakeConfig(postgres_config)) + sqlite_engine = create_engine(sqlite_config) + postgres_engine = create_engine(postgres_config) self.sqlite_store = Store(sqlite_db_pool, sqlite_engine) self.postgres_store = Store(postgres_db_pool, postgres_engine) @@ -792,8 +794,3 @@ if __name__ == "__main__": if end_error_exec_info: exc_type, exc_value, exc_traceback = end_error_exec_info traceback.print_exception(exc_type, exc_value, exc_traceback) - - -class FakeConfig: - def __init__(self, database_config): - self.database_config = database_config diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index fcdc8e6e10..2b4473b9ac 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -33,7 +33,7 @@ from synapse.python_dependencies import ( from synapse.rest import ClientRestResource from synapse.storage.engines import create_engine, IncorrectDatabaseSetup from synapse.storage import are_all_users_on_domain -from synapse.storage.prepare_database import UpgradeDatabaseException +from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database from synapse.server import HomeServer @@ -245,7 +245,7 @@ class SynapseHomeServer(HomeServer): except IncorrectDatabaseSetup as e: quit_with_error(e.message) - def get_db_conn(self): + def get_db_conn(self, run_new_connection=True): # Any param beginning with cp_ is a parameter for adbapi, and should # not be passed to the database engine. db_params = { @@ -254,7 +254,8 @@ class SynapseHomeServer(HomeServer): } db_conn = self.database_engine.module.connect(**db_params) - self.database_engine.on_new_connection(db_conn) + if run_new_connection: + self.database_engine.on_new_connection(db_conn) return db_conn @@ -386,7 +387,7 @@ def setup(config_options): tls_server_context_factory = context_factory.ServerContextFactory(config) - database_engine = create_engine(config) + database_engine = create_engine(config.database_config) config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection hs = SynapseHomeServer( @@ -402,8 +403,10 @@ def setup(config_options): logger.info("Preparing database: %s...", config.database_config['name']) try: - db_conn = hs.get_db_conn() - database_engine.prepare_database(db_conn) + db_conn = hs.get_db_conn(run_new_connection=False) + prepare_database(db_conn, database_engine, config=config) + database_engine.on_new_connection(db_conn) + hs.run_startup_checks(db_conn, database_engine) db_conn.commit() diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py index a48230b93f..7bb5de1fe7 100644 --- a/synapse/storage/engines/__init__.py +++ b/synapse/storage/engines/__init__.py @@ -26,13 +26,13 @@ SUPPORTED_MODULE = { } -def create_engine(config): - name = config.database_config["name"] +def create_engine(database_config): + name = database_config["name"] engine_class = SUPPORTED_MODULE.get(name, None) if engine_class: module = importlib.import_module(name) - return engine_class(module, config=config) + return engine_class(module) raise RuntimeError( "Unsupported database engine '%s'" % (name,) diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index a09685b4df..c2290943b4 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -13,18 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage.prepare_database import prepare_database - from ._base import IncorrectDatabaseSetup class PostgresEngine(object): single_threaded = False - def __init__(self, database_module, config): + def __init__(self, database_module): self.module = database_module self.module.extensions.register_type(self.module.extensions.UNICODE) - self.config = config def check_database(self, txn): txn.execute("SHOW SERVER_ENCODING") @@ -44,9 +41,6 @@ class PostgresEngine(object): self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ ) - def prepare_database(self, db_conn): - prepare_database(db_conn, self, config=self.config) - def is_deadlock(self, error): if isinstance(error, self.module.DatabaseError): return error.pgcode in ["40001", "40P01"] diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py index 522b905949..14203aa500 100644 --- a/synapse/storage/engines/sqlite3.py +++ b/synapse/storage/engines/sqlite3.py @@ -13,9 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage.prepare_database import ( - prepare_database, prepare_sqlite3_database -) +from synapse.storage.prepare_database import prepare_database import struct @@ -23,9 +21,8 @@ import struct class Sqlite3Engine(object): single_threaded = True - def __init__(self, database_module, config): + def __init__(self, database_module): self.module = database_module - self.config = config def check_database(self, txn): pass @@ -34,13 +31,9 @@ class Sqlite3Engine(object): return sql def on_new_connection(self, db_conn): - self.prepare_database(db_conn) + prepare_database(db_conn, self, config=None) db_conn.create_function("rank", 1, _rank) - def prepare_database(self, db_conn): - prepare_sqlite3_database(db_conn) - prepare_database(db_conn, self, config=self.config) - def is_deadlock(self, error): return False diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 4099387ba7..00833422af 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -53,6 +53,9 @@ class UpgradeDatabaseException(PrepareDatabaseException): def prepare_database(db_conn, database_engine, config): """Prepares a database for usage. Will either create all necessary tables or upgrade from an older schema version. + + If `config` is None then prepare_database will assert that no upgrade is + necessary, *or* will create a fresh database if the database is empty. """ try: cur = db_conn.cursor() @@ -60,13 +63,18 @@ def prepare_database(db_conn, database_engine, config): if version_info: user_version, delta_files, upgraded = version_info - _upgrade_existing_database( - cur, user_version, delta_files, upgraded, database_engine, config - ) - else: - _setup_new_database(cur, database_engine, config) - # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,)) + if config is None: + if user_version != SCHEMA_VERSION: + # If we don't pass in a config file then we are expecting to + # have already upgraded the DB. + raise UpgradeDatabaseException("Database needs to be upgraded") + else: + _upgrade_existing_database( + cur, user_version, delta_files, upgraded, database_engine, config + ) + else: + _setup_new_database(cur, database_engine) cur.close() db_conn.commit() @@ -75,7 +83,7 @@ def prepare_database(db_conn, database_engine, config): raise -def _setup_new_database(cur, database_engine, config): +def _setup_new_database(cur, database_engine): """Sets up the database by finding a base set of "full schemas" and then applying any necessary deltas. @@ -148,12 +156,13 @@ def _setup_new_database(cur, database_engine, config): applied_delta_files=[], upgraded=False, database_engine=database_engine, - config=config, + config=None, + is_empty=True, ) def _upgrade_existing_database(cur, current_version, applied_delta_files, - upgraded, database_engine, config): + upgraded, database_engine, config, is_empty=False): """Upgrades an existing database. Delta files can either be SQL stored in *.sql files, or python modules @@ -246,7 +255,9 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, module_name, absolute_path, python_file ) logger.debug("Running script %s", relative_path) - module.run_upgrade(cur, database_engine, config=config) + module.run_create(cur, database_engine) + if not is_empty: + module.run_upgrade(cur, database_engine, config=config) elif ext == ".pyc": # Sometimes .pyc files turn up anyway even though we've # disabled their generation; e.g. from distribution package @@ -361,36 +372,3 @@ def _get_or_create_schema_state(txn, database_engine): return current_version, applied_deltas, upgraded return None - - -def prepare_sqlite3_database(db_conn): - """This function should be called before `prepare_database` on sqlite3 - databases. - - Since we changed the way we store the current schema version and handle - updates to schemas, we need a way to upgrade from the old method to the - new. This only affects sqlite databases since they were the only ones - supported at the time. - """ - with db_conn: - schema_path = os.path.join( - dir_path, "schema", "schema_version.sql", - ) - create_schema = read_schema(schema_path) - db_conn.executescript(create_schema) - - c = db_conn.execute("SELECT * FROM schema_version") - rows = c.fetchall() - c.close() - - if not rows: - c = db_conn.execute("PRAGMA user_version") - row = c.fetchone() - c.close() - - if row and row[0]: - db_conn.execute( - "REPLACE INTO schema_version (version, upgraded)" - " VALUES (?,?)", - (row[0], False) - ) diff --git a/synapse/storage/schema/delta/14/upgrade_appservice_db.py b/synapse/storage/schema/delta/14/upgrade_appservice_db.py index 5c40a77757..8755bb2e49 100644 --- a/synapse/storage/schema/delta/14/upgrade_appservice_db.py +++ b/synapse/storage/schema/delta/14/upgrade_appservice_db.py @@ -18,7 +18,7 @@ import logging logger = logging.getLogger(__name__) -def run_upgrade(cur, *args, **kwargs): +def run_create(cur, *args, **kwargs): cur.execute("SELECT id, regex FROM application_services_regex") for row in cur.fetchall(): try: @@ -35,3 +35,7 @@ def run_upgrade(cur, *args, **kwargs): "UPDATE application_services_regex SET regex=? WHERE id=?", (new_regex, row[0]) ) + + +def run_upgrade(*args, **kwargs): + pass diff --git a/synapse/storage/schema/delta/20/pushers.py b/synapse/storage/schema/delta/20/pushers.py index 29164732af..147496a38b 100644 --- a/synapse/storage/schema/delta/20/pushers.py +++ b/synapse/storage/schema/delta/20/pushers.py @@ -27,7 +27,7 @@ import logging logger = logging.getLogger(__name__) -def run_upgrade(cur, database_engine, *args, **kwargs): +def run_create(cur, database_engine, *args, **kwargs): logger.info("Porting pushers table...") cur.execute(""" CREATE TABLE IF NOT EXISTS pushers2 ( @@ -74,3 +74,7 @@ def run_upgrade(cur, database_engine, *args, **kwargs): cur.execute("DROP TABLE pushers") cur.execute("ALTER TABLE pushers2 RENAME TO pushers") logger.info("Moved %d pushers to new table", count) + + +def run_upgrade(*args, **kwargs): + pass diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py index d3ff2b1779..4269ac69ad 100644 --- a/synapse/storage/schema/delta/25/fts.py +++ b/synapse/storage/schema/delta/25/fts.py @@ -43,7 +43,7 @@ SQLITE_TABLE = ( ) -def run_upgrade(cur, database_engine, *args, **kwargs): +def run_create(cur, database_engine, *args, **kwargs): if isinstance(database_engine, PostgresEngine): for statement in get_statements(POSTGRES_TABLE.splitlines()): cur.execute(statement) @@ -76,3 +76,7 @@ def run_upgrade(cur, database_engine, *args, **kwargs): sql = database_engine.convert_param_style(sql) cur.execute(sql, ("event_search", progress_json)) + + +def run_upgrade(*args, **kwargs): + pass diff --git a/synapse/storage/schema/delta/27/ts.py b/synapse/storage/schema/delta/27/ts.py index f8c16391a2..71b12a2731 100644 --- a/synapse/storage/schema/delta/27/ts.py +++ b/synapse/storage/schema/delta/27/ts.py @@ -27,7 +27,7 @@ ALTER_TABLE = ( ) -def run_upgrade(cur, database_engine, *args, **kwargs): +def run_create(cur, database_engine, *args, **kwargs): for statement in get_statements(ALTER_TABLE.splitlines()): cur.execute(statement) @@ -55,3 +55,7 @@ def run_upgrade(cur, database_engine, *args, **kwargs): sql = database_engine.convert_param_style(sql) cur.execute(sql, ("event_origin_server_ts", progress_json)) + + +def run_upgrade(*args, **kwargs): + pass diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py index 4f6e9dd540..b417e3ac08 100644 --- a/synapse/storage/schema/delta/30/as_users.py +++ b/synapse/storage/schema/delta/30/as_users.py @@ -18,7 +18,7 @@ from synapse.storage.appservice import ApplicationServiceStore logger = logging.getLogger(__name__) -def run_upgrade(cur, database_engine, config, *args, **kwargs): +def run_create(cur, database_engine, *args, **kwargs): # NULL indicates user was not registered by an appservice. try: cur.execute("ALTER TABLE users ADD COLUMN appservice_id TEXT") @@ -26,6 +26,8 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs): # Maybe we already added the column? Hope so... pass + +def run_upgrade(cur, database_engine, config, *args, **kwargs): cur.execute("SELECT name FROM users") rows = cur.fetchall() diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py index 2e33beb07c..afbefb2e2d 100644 --- a/tests/storage/test_base.py +++ b/tests/storage/test_base.py @@ -53,7 +53,7 @@ class SQLBaseStoreTestCase(unittest.TestCase): "test", db_pool=self.db_pool, config=config, - database_engine=create_engine(config), + database_engine=create_engine(config.database_config), ) self.datastore = SQLBaseStore(hs) diff --git a/tests/utils.py b/tests/utils.py index 52405502e9..c179df31ee 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -64,7 +64,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): hs = HomeServer( name, db_pool=db_pool, config=config, version_string="Synapse/tests", - database_engine=create_engine(config), + database_engine=create_engine(config.database_config), get_db_conn=db_pool.get_db_conn, **kargs ) @@ -73,7 +73,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): hs = HomeServer( name, db_pool=None, datastore=datastore, config=config, version_string="Synapse/tests", - database_engine=create_engine(config), + database_engine=create_engine(config.database_config), **kargs ) @@ -298,7 +298,7 @@ class SQLiteMemoryDbPool(ConnectionPool, object): return conn def create_engine(self): - return create_engine(self.config) + return create_engine(self.config.database_config) class MemoryDataStore(object): -- cgit 1.4.1 From 75fb9ac1be0fada60cdde38153ac0e3fe2b19a0a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 6 Apr 2016 14:12:51 +0100 Subject: Add a slaved events store class Add a test to check that get_room_names_and_aliases does the same thing on both the master and on the slave data store. --- synapse/replication/slave/__init__.py | 14 ++ synapse/replication/slave/storage/__init__.py | 14 ++ synapse/replication/slave/storage/_base.py | 28 +++ .../slave/storage/_slaved_id_tracker.py | 30 ++++ synapse/replication/slave/storage/events.py | 198 +++++++++++++++++++++ synapse/storage/events.py | 4 +- tests/replication/slave/__init__.py | 14 ++ tests/replication/slave/storage/__init__.py | 14 ++ tests/replication/slave/storage/_base.py | 57 ++++++ tests/replication/slave/storage/test_events.py | 114 ++++++++++++ 10 files changed, 485 insertions(+), 2 deletions(-) create mode 100644 synapse/replication/slave/__init__.py create mode 100644 synapse/replication/slave/storage/__init__.py create mode 100644 synapse/replication/slave/storage/_base.py create mode 100644 synapse/replication/slave/storage/_slaved_id_tracker.py create mode 100644 synapse/replication/slave/storage/events.py create mode 100644 tests/replication/slave/__init__.py create mode 100644 tests/replication/slave/storage/__init__.py create mode 100644 tests/replication/slave/storage/_base.py create mode 100644 tests/replication/slave/storage/test_events.py (limited to 'tests') diff --git a/synapse/replication/slave/__init__.py b/synapse/replication/slave/__init__.py new file mode 100644 index 0000000000..b7df13c9ee --- /dev/null +++ b/synapse/replication/slave/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/synapse/replication/slave/storage/__init__.py b/synapse/replication/slave/storage/__init__.py new file mode 100644 index 0000000000..b7df13c9ee --- /dev/null +++ b/synapse/replication/slave/storage/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py new file mode 100644 index 0000000000..46e43ce1c7 --- /dev/null +++ b/synapse/replication/slave/storage/_base.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage._base import SQLBaseStore +from twisted.internet import defer + + +class BaseSlavedStore(SQLBaseStore): + def __init__(self, db_conn, hs): + super(BaseSlavedStore, self).__init__(hs) + + def stream_positions(self): + return {} + + def process_replication(self, result): + return defer.succeed(None) diff --git a/synapse/replication/slave/storage/_slaved_id_tracker.py b/synapse/replication/slave/storage/_slaved_id_tracker.py new file mode 100644 index 0000000000..24b5c79d4a --- /dev/null +++ b/synapse/replication/slave/storage/_slaved_id_tracker.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage.util.id_generators import _load_current_id + + +class SlavedIdTracker(object): + def __init__(self, db_conn, table, column, extra_tables=[], step=1): + self.step = step + self._current = _load_current_id(db_conn, table, column, step) + for table, column in extra_tables: + self.advance(_load_current_id(db_conn, table, column)) + + def advance(self, new_id): + self._current = (max if self.step > 0 else min)(self._current, new_id) + + def get_current_token(self): + return self._current diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py new file mode 100644 index 0000000000..68b924e37b --- /dev/null +++ b/synapse/replication/slave/storage/events.py @@ -0,0 +1,198 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker + +from synapse.api.constants import EventTypes +from synapse.events import FrozenEvent +from synapse.storage import DataStore +from synapse.storage.room import RoomStore +from synapse.storage.roommember import RoomMemberStore +from synapse.storage.event_federation import EventFederationStore +from synapse.storage.state import StateStore +from synapse.util.caches.stream_change_cache import StreamChangeCache + +import ujson as json + +# So, um, we want to borrow a load of functions intended for reading from +# a DataStore, but we don't want to take functions that either write to the +# DataStore or are cached and don't have cache invalidation logic. +# +# Rather than write duplicate versions of those functions, or lift them to +# a common base class, we going to grab the underlying __func__ object from +# the method descriptor on the DataStore and chuck them into our class. + + +class SlavedEventStore(BaseSlavedStore): + + def __init__(self, db_conn, hs): + super(SlavedEventStore, self).__init__(db_conn, hs) + self._stream_id_gen = SlavedIdTracker( + db_conn, "events", "stream_ordering", + ) + self._backfill_id_gen = SlavedIdTracker( + db_conn, "events", "stream_ordering", step=-1 + ) + events_max = self._stream_id_gen.get_current_token() + event_cache_prefill, min_event_val = self._get_cache_dict( + db_conn, "events", + entity_column="room_id", + stream_column="stream_ordering", + max_value=events_max, + ) + self._events_stream_cache = StreamChangeCache( + "EventsRoomStreamChangeCache", min_event_val, + prefilled_cache=event_cache_prefill, + ) + + # Cached functions can't be accessed through a class instance so we need + # to reach inside the __dict__ to extract them. + get_room_name_and_aliases = RoomStore.__dict__["get_room_name_and_aliases"] + get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"] + get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"] + get_latest_event_ids_in_room = EventFederationStore.__dict__[ + "get_latest_event_ids_in_room" + ] + _get_current_state_for_key = StateStore.__dict__[ + "_get_current_state_for_key" + ] + + get_current_state = DataStore.get_current_state.__func__ + get_current_state_for_key = DataStore.get_current_state_for_key.__func__ + _get_rooms_for_user_where_membership_is_txn = ( + DataStore._get_rooms_for_user_where_membership_is_txn.__func__ + ) + get_rooms_for_user_where_membership_is = ( + DataStore.get_rooms_for_user_where_membership_is.__func__ + ) + get_membership_changes_for_user = ( + DataStore.get_membership_changes_for_user.__func__ + ) + get_room_events_max_id = DataStore.get_room_events_max_id.__func__ + get_room_events_stream_for_room = ( + DataStore.get_room_events_stream_for_room.__func__ + ) + _set_before_and_after = DataStore._set_before_and_after + + _get_events = DataStore._get_events.__func__ + _get_events_from_cache = DataStore._get_events_from_cache.__func__ + + _invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__ + _parse_events_txn = DataStore._parse_events_txn.__func__ + _get_events_txn = DataStore._get_events_txn.__func__ + _fetch_events_txn = DataStore._fetch_events_txn.__func__ + _fetch_event_rows = DataStore._fetch_event_rows.__func__ + _get_event_from_row_txn = DataStore._get_event_from_row_txn.__func__ + + def stream_positions(self): + result = super(SlavedEventStore, self).stream_positions() + result["events"] = self._stream_id_gen.get_current_token() + result["backfilled"] = self._backfill_id_gen.get_current_token() + return result + + def process_replication(self, result): + state_resets = set( + r[0] for r in result.get("state_resets", {"rows": []})["rows"] + ) + + stream = result.get("events") + if stream: + self._stream_id_gen.advance(stream["position"]) + for row in stream["rows"]: + self._process_replication_row( + row, backfilled=False, state_resets=state_resets + ) + + stream = result.get("backfill") + if stream: + self._backfill_id_gen.advance(stream["position"]) + for row in stream["rows"]: + self._process_replication_row( + row, backfilled=True, state_resets=state_resets + ) + + stream = result.get("forward_ex_outliers") + if stream: + for row in stream["rows"]: + event_id = row[1] + self._invalidate_get_event_cache(event_id) + + stream = result.get("backward_ex_outliers") + if stream: + for row in stream["rows"]: + event_id = row[1] + self._invalidate_get_event_cache(event_id) + + return super(SlavedEventStore, self).process_replication(result) + + def _process_replication_row(self, row, backfilled, state_resets): + position = row[0] + internal = json.loads(row[1]) + event_json = json.loads(row[2]) + + event = FrozenEvent(event_json, internal_metadata_dict=internal) + self._invalidate_caches_for_event( + event, backfilled, reset_state=position in state_resets + ) + + def _invalidate_caches_for_event(self, event, backfilled, reset_state): + if reset_state: + self._get_current_state_for_key.invalidate_all() + self.get_rooms_for_user.invalidate_all() + self.get_users_in_room.invalidate((event.room_id,)) + # self.get_joined_hosts_for_room.invalidate((event.room_id,)) + self.get_room_name_and_aliases.invalidate((event.room_id,)) + + self._invalidate_get_event_cache(event.event_id) + + if not backfilled: + self._events_stream_cache.entity_has_changed( + event.room_id, event.internal_metadata.stream_ordering + ) + + # self.get_unread_event_push_actions_by_room_for_user.invalidate_many( + # (event.room_id,) + # ) + + if event.type == EventTypes.Redaction: + self._invalidate_get_event_cache(event.redacts) + + if event.type == EventTypes.Member: + self.get_rooms_for_user.invalidate((event.state_key,)) + # self.get_joined_hosts_for_room.invalidate((event.room_id,)) + self.get_users_in_room.invalidate((event.room_id,)) + # self._membership_stream_cache.entity_has_changed( + # event.state_key, event.internal_metadata.stream_ordering + # ) + + if not event.is_state(): + return + + if backfilled: + return + + if (not event.internal_metadata.is_invite_from_remote() + and event.internal_metadata.is_outlier()): + return + + self._get_current_state_for_key.invalidate(( + event.room_id, event.type, event.state_key + )) + + if event.type in [EventTypes.Name, EventTypes.Aliases]: + self.get_room_name_and_aliases.invalidate( + (event.room_id,) + ) + pass diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5d299a1132..ee87a71719 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1134,7 +1134,7 @@ class EventsStore(SQLBaseStore): upper_bound = current_forward_id sql = ( - "SELECT -event_stream_ordering FROM current_state_resets" + "SELECT event_stream_ordering FROM current_state_resets" " WHERE ? < event_stream_ordering" " AND event_stream_ordering <= ?" " ORDER BY event_stream_ordering ASC" @@ -1143,7 +1143,7 @@ class EventsStore(SQLBaseStore): state_resets = txn.fetchall() sql = ( - "SELECT -event_stream_ordering, event_id, state_group" + "SELECT event_stream_ordering, event_id, state_group" " FROM ex_outlier_stream" " WHERE ? > event_stream_ordering" " AND event_stream_ordering >= ?" diff --git a/tests/replication/slave/__init__.py b/tests/replication/slave/__init__.py new file mode 100644 index 0000000000..b7df13c9ee --- /dev/null +++ b/tests/replication/slave/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/replication/slave/storage/__init__.py b/tests/replication/slave/storage/__init__.py new file mode 100644 index 0000000000..b7df13c9ee --- /dev/null +++ b/tests/replication/slave/storage/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py new file mode 100644 index 0000000000..0f525a8943 --- /dev/null +++ b/tests/replication/slave/storage/_base.py @@ -0,0 +1,57 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer +from tests import unittest + +from synapse.replication.slave.storage.events import SlavedEventStore + +from mock import Mock, NonCallableMock +from tests.utils import setup_test_homeserver +from synapse.replication.resource import ReplicationResource + + +class BaseSlavedStoreTestCase(unittest.TestCase): + @defer.inlineCallbacks + def setUp(self): + self.hs = yield setup_test_homeserver( + "blue", + http_client=None, + replication_layer=Mock(), + ratelimiter=NonCallableMock(spec_set=[ + "send_message", + ]), + ) + self.hs.get_ratelimiter().send_message.return_value = (True, 0) + + self.replication = ReplicationResource(self.hs) + + self.master_store = self.hs.get_datastore() + self.slaved_store = SlavedEventStore(self.hs.get_db_conn(), self.hs) + self.event_id = 0 + + @defer.inlineCallbacks + def replicate(self): + streams = self.slaved_store.stream_positions() + result = yield self.replication.replicate(streams, 100) + yield self.slaved_store.process_replication(result) + + @defer.inlineCallbacks + def check(self, method, args, expected_result=None): + master_result = yield getattr(self.master_store, method)(*args) + slaved_result = yield getattr(self.slaved_store, method)(*args) + self.assertEqual(master_result, slaved_result) + if expected_result is not None: + self.assertEqual(master_result, expected_result) + self.assertEqual(slaved_result, expected_result) diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py new file mode 100644 index 0000000000..c30c7c6063 --- /dev/null +++ b/tests/replication/slave/storage/test_events.py @@ -0,0 +1,114 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStoreTestCase + +from synapse.types import UserID +from synapse.events import FrozenEvent +from synapse.events.snapshot import EventContext + +from twisted.internet import defer + +USER_ID = "@feeling:blue" +USER = UserID.from_string(USER_ID) +OUTLIER = {"outlier": True} +ROOM_ID = "!room:blue" + + +class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): + + @defer.inlineCallbacks + def test_room_name_and_aliases(self): + create = yield self.persist(type="m.room.create", key="", creator=USER_ID) + yield self.persist(type="m.room.member", key=USER_ID, membership="join") + yield self.persist(type="m.room.name", key="", name="name1") + yield self.persist( + type="m.room.aliases", key="blue", aliases=["#1:blue"] + ) + yield self.replicate() + yield self.check( + "get_room_name_and_aliases", (ROOM_ID,), ("name1", ["#1:blue"]) + ) + + # Set the room name. + yield self.persist(type="m.room.name", key="", name="name2") + yield self.replicate() + yield self.check( + "get_room_name_and_aliases", (ROOM_ID,), ("name2", ["#1:blue"]) + ) + + # Set the room aliases. + yield self.persist( + type="m.room.aliases", key="blue", aliases=["#2:blue"] + ) + yield self.replicate() + yield self.check( + "get_room_name_and_aliases", (ROOM_ID,), ("name2", ["#2:blue"]) + ) + + # Leave and join the room clobbering the state. + yield self.persist(type="m.room.member", key=USER_ID, membership="leave") + yield self.persist( + type="m.room.member", key=USER_ID, membership="join", + reset_state=[create] + ) + yield self.replicate() + + yield self.check( + "get_room_name_and_aliases", (ROOM_ID,), (None, []) + ) + + event_id = 0 + + @defer.inlineCallbacks + def persist( + self, sender=USER_ID, room_id=ROOM_ID, type={}, key=None, + internal={}, + state=None, reset_state=False, backfill=False, + depth=None, prev_events=[], auth_events=[], prev_state=[], + **content + ): + if depth is None: + depth = self.event_id + + event_dict = { + "sender": sender, + "type": type, + "content": content, + "event_id": "$%d:blue" % (self.event_id,), + "room_id": room_id, + "depth": depth, + "origin_server_ts": self.event_id, + "prev_events": prev_events, + "auth_events": auth_events, + } + if key is not None: + event_dict["state_key"] = key + event_dict["prev_state"] = prev_state + + event = FrozenEvent(event_dict, internal_metadata_dict=internal) + + self.event_id += 1 + + context = EventContext(current_state=state) + + if backfill: + yield self.master_store.persist_events( + [(event, context)], backfilled=True + ) + else: + yield self.master_store.persist_event( + event, context, current_state=reset_state + ) + defer.returnValue(event) -- cgit 1.4.1 From 6bfec56796132520ad864ad00f8dd6773512f9f4 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 6 Apr 2016 16:17:15 +0100 Subject: Test that room membership is replicated --- synapse/replication/slave/storage/events.py | 7 +-- tests/replication/slave/storage/test_events.py | 71 +++++++++++++++++++++++--- 2 files changed, 67 insertions(+), 11 deletions(-) (limited to 'tests') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 68b924e37b..680dc89536 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -71,9 +71,6 @@ class SlavedEventStore(BaseSlavedStore): get_current_state = DataStore.get_current_state.__func__ get_current_state_for_key = DataStore.get_current_state_for_key.__func__ - _get_rooms_for_user_where_membership_is_txn = ( - DataStore._get_rooms_for_user_where_membership_is_txn.__func__ - ) get_rooms_for_user_where_membership_is = ( DataStore.get_rooms_for_user_where_membership_is.__func__ ) @@ -95,6 +92,10 @@ class SlavedEventStore(BaseSlavedStore): _fetch_events_txn = DataStore._fetch_events_txn.__func__ _fetch_event_rows = DataStore._fetch_event_rows.__func__ _get_event_from_row_txn = DataStore._get_event_from_row_txn.__func__ + _get_rooms_for_user_where_membership_is_txn = ( + DataStore._get_rooms_for_user_where_membership_is_txn.__func__ + ) + _get_members_rows_txn = DataStore._get_members_rows_txn.__func__ def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index c30c7c6063..351d777fb2 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -14,14 +14,14 @@ from ._base import BaseSlavedStoreTestCase -from synapse.types import UserID from synapse.events import FrozenEvent from synapse.events.snapshot import EventContext +from synapse.storage.roommember import RoomsForUser from twisted.internet import defer USER_ID = "@feeling:blue" -USER = UserID.from_string(USER_ID) +USER_ID_2 = "@bright:blue" OUTLIER = {"outlier": True} ROOM_ID = "!room:blue" @@ -69,16 +69,66 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): "get_room_name_and_aliases", (ROOM_ID,), (None, []) ) + @defer.inlineCallbacks + def test_room_members(self): + create = yield self.persist(type="m.room.create", key="", creator=USER_ID) + yield self.replicate() + yield self.check("get_rooms_for_user", (USER_ID,), []) + yield self.check("get_users_in_room", (ROOM_ID,), []) + + # Join the room. + join = yield self.persist(type="m.room.member", key=USER_ID, membership="join") + yield self.replicate() + yield self.check("get_rooms_for_user", (USER_ID,), [RoomsForUser( + room_id=ROOM_ID, + sender=USER_ID, + membership="join", + event_id=join.event_id, + stream_ordering=join.internal_metadata.stream_ordering, + )]) + yield self.check("get_users_in_room", (ROOM_ID,), [USER_ID]) + + # Leave the room. + yield self.persist(type="m.room.member", key=USER_ID, membership="leave") + yield self.replicate() + yield self.check("get_rooms_for_user", (USER_ID,), []) + yield self.check("get_users_in_room", (ROOM_ID,), []) + + # Add some other user to the room. + join = yield self.persist(type="m.room.member", key=USER_ID_2, membership="join") + yield self.replicate() + yield self.check("get_rooms_for_user", (USER_ID_2,), [RoomsForUser( + room_id=ROOM_ID, + sender=USER_ID, + membership="join", + event_id=join.event_id, + stream_ordering=join.internal_metadata.stream_ordering, + )]) + yield self.check("get_users_in_room", (ROOM_ID,), [USER_ID_2]) + + # Join the room clobbering the state. + # This should remove any evidence of the other user being in the room. + yield self.persist( + type="m.room.member", key=USER_ID, membership="join", + reset_state=[create] + ) + yield self.replicate() + yield self.check("get_users_in_room", (ROOM_ID,), [USER_ID]) + yield self.check("get_rooms_for_user", (USER_ID_2,), []) + event_id = 0 @defer.inlineCallbacks def persist( - self, sender=USER_ID, room_id=ROOM_ID, type={}, key=None, - internal={}, - state=None, reset_state=False, backfill=False, - depth=None, prev_events=[], auth_events=[], prev_state=[], - **content + self, sender=USER_ID, room_id=ROOM_ID, type={}, key=None, internal={}, + state=None, reset_state=False, backfill=False, + depth=None, prev_events=[], auth_events=[], prev_state=[], + **content ): + """ + Returns: + synapse.events.FrozenEvent: The event that was persisted. + """ if depth is None: depth = self.event_id @@ -103,12 +153,17 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): context = EventContext(current_state=state) + ordering = None if backfill: yield self.master_store.persist_events( [(event, context)], backfilled=True ) else: - yield self.master_store.persist_event( + ordering, _ = yield self.master_store.persist_event( event, context, current_state=reset_state ) + + if ordering: + event.internal_metadata.stream_ordering = ordering + defer.returnValue(event) -- cgit 1.4.1 From 60ec9793fb44ad445dd1233594957baeede60e4f Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 7 Apr 2016 13:17:56 +0100 Subject: Add tests for get_latest_event_ids_in_room and get_current_state --- synapse/events/__init__.py | 9 ++++ synapse/replication/slave/storage/events.py | 5 +++ tests/replication/slave/storage/test_events.py | 62 ++++++++++++++++++++++++++ 3 files changed, 76 insertions(+) (limited to 'tests') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 13154b1723..81e2126202 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -36,6 +36,10 @@ class _EventInternalMetadata(object): def is_invite_from_remote(self): return getattr(self, "invite_from_remote", False) + def __eq__(self, other): + "Equality check for unit tests." + return self.__dict__ == other.__dict__ + def _event_dict_property(key): def getter(self): @@ -180,3 +184,8 @@ class FrozenEvent(EventBase): self.get("type", None), self.get("state_key", None), ) + + def __eq__(self, other): + """Equality check for unit tests. Compares internal_metadata as well + as the event fields""" + return self.__dict__ == other.__dict__ diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 680dc89536..707ddd248a 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -89,8 +89,11 @@ class SlavedEventStore(BaseSlavedStore): _invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__ _parse_events_txn = DataStore._parse_events_txn.__func__ _get_events_txn = DataStore._get_events_txn.__func__ + _enqueue_events = DataStore._enqueue_events.__func__ + _do_fetch = DataStore._do_fetch.__func__ _fetch_events_txn = DataStore._fetch_events_txn.__func__ _fetch_event_rows = DataStore._fetch_event_rows.__func__ + _get_event_from_row = DataStore._get_event_from_row.__func__ _get_event_from_row_txn = DataStore._get_event_from_row_txn.__func__ _get_rooms_for_user_where_membership_is_txn = ( DataStore._get_rooms_for_user_where_membership_is_txn.__func__ @@ -158,6 +161,8 @@ class SlavedEventStore(BaseSlavedStore): self._invalidate_get_event_cache(event.event_id) + self.get_latest_event_ids_in_room.invalidate((event.room_id,)) + if not backfilled: self._events_stream_cache.entity_has_changed( event.room_id, event.internal_metadata.stream_ordering diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 351d777fb2..d5d0ef1148 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -116,6 +116,68 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): yield self.check("get_users_in_room", (ROOM_ID,), [USER_ID]) yield self.check("get_rooms_for_user", (USER_ID_2,), []) + @defer.inlineCallbacks + def test_get_latest_event_ids_in_room(self): + create = yield self.persist(type="m.room.create", key="", creator=USER_ID) + yield self.replicate() + yield self.check( + "get_latest_event_ids_in_room", (ROOM_ID,), [create.event_id] + ) + + join = yield self.persist( + type="m.room.member", key=USER_ID, membership="join", + prev_events=[(create.event_id, {})], + ) + yield self.replicate() + yield self.check( + "get_latest_event_ids_in_room", (ROOM_ID,), [join.event_id] + ) + + @defer.inlineCallbacks + def test_get_current_state(self): + # Create the room. + create = yield self.persist(type="m.room.create", key="", creator=USER_ID) + yield self.replicate() + yield self.check( + "get_current_state_for_key", (ROOM_ID, "m.room.member", USER_ID), [] + ) + + # Join the room. + join1 = yield self.persist( + type="m.room.member", key=USER_ID, membership="join", + ) + yield self.replicate() + yield self.check( + "get_current_state_for_key", (ROOM_ID, "m.room.member", USER_ID), + [join1] + ) + + # Add some other user to the room. + join2 = yield self.persist( + type="m.room.member", key=USER_ID_2, membership="join", + ) + yield self.replicate() + yield self.check( + "get_current_state_for_key", (ROOM_ID, "m.room.member", USER_ID_2), + [join2] + ) + + # Leave the room, then rejoin the room clobbering state. + yield self.persist(type="m.room.member", key=USER_ID, membership="leave") + join3 = yield self.persist( + type="m.room.member", key=USER_ID, membership="join", + reset_state=[create] + ) + yield self.replicate() + yield self.check( + "get_current_state_for_key", (ROOM_ID, "m.room.member", USER_ID_2), + [] + ) + yield self.check( + "get_current_state_for_key", (ROOM_ID, "m.room.member", USER_ID), + [join3] + ) + event_id = 0 @defer.inlineCallbacks -- cgit 1.4.1 From af03ecf35223f93971596f38393c62f4694705fa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 6 Apr 2016 15:44:22 +0100 Subject: Deduplicate joins --- synapse/handlers/room_member.py | 31 ++++++++++++++++++++++++ synapse/util/async.py | 42 +++++++++++++++++++++++++++++++++ synapse/util/caches/response_cache.py | 2 +- tests/util/test_linearizer.py | 44 +++++++++++++++++++++++++++++++++++ 4 files changed, 118 insertions(+), 1 deletion(-) create mode 100644 tests/util/test_linearizer.py (limited to 'tests') diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index fe2315df8f..0fcc9445a8 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -24,6 +24,7 @@ from synapse.api.constants import ( ) from synapse.api.errors import AuthError, SynapseError, Codes from synapse.util.logcontext import preserve_context_over_fn +from synapse.util.async import Linearizer from signedjson.sign import verify_signed_json from signedjson.key import decode_verify_key_bytes @@ -60,6 +61,8 @@ class RoomMemberHandler(BaseHandler): def __init__(self, hs): super(RoomMemberHandler, self).__init__(hs) + self.member_linearizer = Linearizer() + self.clock = hs.get_clock() self.distributor = hs.get_distributor() @@ -182,6 +185,34 @@ class RoomMemberHandler(BaseHandler): remote_room_hosts=None, third_party_signed=None, ratelimit=True, + ): + key = (target, room_id,) + + with (yield self.member_linearizer.queue(key)): + result = yield self._update_membership( + requester, + target, + room_id, + action, + txn_id=txn_id, + remote_room_hosts=remote_room_hosts, + third_party_signed=third_party_signed, + ratelimit=ratelimit, + ) + + defer.returnValue(result) + + @defer.inlineCallbacks + def _update_membership( + self, + requester, + target, + room_id, + action, + txn_id=None, + remote_room_hosts=None, + third_party_signed=None, + ratelimit=True, ): effective_membership_state = action if action in ["kick", "unban"]: diff --git a/synapse/util/async.py b/synapse/util/async.py index cd4d90f3cf..408c86be91 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -19,6 +19,8 @@ from twisted.internet import defer, reactor from .logcontext import PreserveLoggingContext, preserve_fn from synapse.util import unwrapFirstError +from contextlib import contextmanager + @defer.inlineCallbacks def sleep(seconds): @@ -137,3 +139,43 @@ def concurrently_execute(func, args, limit): preserve_fn(_concurrently_execute_inner)() for _ in xrange(limit) ], consumeErrors=True).addErrback(unwrapFirstError) + + +@contextmanager +def _trigger_defer_manager(d): + try: + yield + finally: + d.callback(None) + + +class Linearizer(object): + """Linearizes access to resources based on a key. Useful to ensure only one + thing is happening at a time on a given resource. + + Example: + + with (yield linearizer.queue("test_key")): + # do some work. + + """ + def __init__(self): + self.key_to_defer = {} + + @defer.inlineCallbacks + def queue(self, key): + current_defer = self.key_to_defer.get(key) + + new_defer = defer.Deferred() + self.key_to_defer[key] = new_defer + + def remove_if_current(_): + d = self.key_to_defer.get(key) + if d is new_defer: + self.key_to_defer.pop(key, None) + + new_defer.addBoth(remove_if_current) + + yield current_defer + + defer.returnValue(_trigger_defer_manager(new_defer)) diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index be310ba320..36686b479e 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -35,7 +35,7 @@ class ResponseCache(object): return None def set(self, key, deferred): - result = ObservableDeferred(deferred) + result = ObservableDeferred(deferred, consumeErrors=True) self.pending_result_cache[key] = result def remove(r): diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py new file mode 100644 index 0000000000..afcba482f9 --- /dev/null +++ b/tests/util/test_linearizer.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from tests import unittest + +from twisted.internet import defer + +from synapse.util.async import Linearizer + + +class LinearizerTestCase(unittest.TestCase): + + @defer.inlineCallbacks + def test_linearizer(self): + linearizer = Linearizer() + + key = object() + + d1 = linearizer.queue(key) + cm1 = yield d1 + + d2 = linearizer.queue(key) + self.assertFalse(d2.called) + + with cm1: + self.assertFalse(d2.called) + + self.assertTrue(d2.called) + + with (yield d2): + pass -- cgit 1.4.1 From 57fa1801c336603c6c710d6d868aaae596a7f5b8 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 7 Apr 2016 16:41:37 +0100 Subject: Add sensible __eq__ operators inside the tests. Rather than adding them globally. This limits the changes to only affect the tests. --- synapse/events/__init__.py | 9 -------- tests/replication/slave/storage/test_events.py | 29 +++++++++++++++++++++++++- 2 files changed, 28 insertions(+), 10 deletions(-) (limited to 'tests') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 81e2126202..13154b1723 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -36,10 +36,6 @@ class _EventInternalMetadata(object): def is_invite_from_remote(self): return getattr(self, "invite_from_remote", False) - def __eq__(self, other): - "Equality check for unit tests." - return self.__dict__ == other.__dict__ - def _event_dict_property(key): def getter(self): @@ -184,8 +180,3 @@ class FrozenEvent(EventBase): self.get("type", None), self.get("state_key", None), ) - - def __eq__(self, other): - """Equality check for unit tests. Compares internal_metadata as well - as the event fields""" - return self.__dict__ == other.__dict__ diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index d5d0ef1148..9af62702b3 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -14,20 +14,47 @@ from ._base import BaseSlavedStoreTestCase -from synapse.events import FrozenEvent +from synapse.events import FrozenEvent, _EventInternalMetadata from synapse.events.snapshot import EventContext from synapse.storage.roommember import RoomsForUser from twisted.internet import defer + USER_ID = "@feeling:blue" USER_ID_2 = "@bright:blue" OUTLIER = {"outlier": True} ROOM_ID = "!room:blue" +def dict_equals(self, other): + return self.__dict__ == other.__dict__ + + +def patch__eq__(cls): + eq = getattr(cls, "__eq__", None) + cls.__eq__ = dict_equals + + def unpatch(): + if eq is not None: + cls.__eq__ = eq + return unpatch + + class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): + def setUp(self): + # Patch up the equality operator for events so that we can check + # whether lists of events match using assertEquals + self.unpatches = [ + patch__eq__(_EventInternalMetadata), + patch__eq__(FrozenEvent), + ] + return super(SlavedEventStoreTestCase, self).setUp() + + def tearDown(self): + [unpatch() for unpatch in self.unpatches] + @defer.inlineCallbacks def test_room_name_and_aliases(self): create = yield self.persist(type="m.room.create", key="", creator=USER_ID) -- cgit 1.4.1 From ceb599e789ef34a3733a99d17701a75fc5409d17 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 7 Apr 2016 16:26:52 +0100 Subject: Add tests for redactions --- synapse/replication/slave/storage/events.py | 4 +- synapse/storage/util/id_generators.py | 2 +- tests/replication/slave/storage/_base.py | 2 +- tests/replication/slave/storage/test_events.py | 51 +++++++++++++++++++++++++- 4 files changed, 54 insertions(+), 5 deletions(-) (limited to 'tests') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 707ddd248a..cfc728a038 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -69,6 +69,7 @@ class SlavedEventStore(BaseSlavedStore): "_get_current_state_for_key" ] + get_event = DataStore.get_event.__func__ get_current_state = DataStore.get_current_state.__func__ get_current_state_for_key = DataStore.get_current_state_for_key.__func__ get_rooms_for_user_where_membership_is = ( @@ -103,7 +104,7 @@ class SlavedEventStore(BaseSlavedStore): def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() - result["backfilled"] = self._backfill_id_gen.get_current_token() + result["backfill"] = self._backfill_id_gen.get_current_token() return result def process_replication(self, result): @@ -145,7 +146,6 @@ class SlavedEventStore(BaseSlavedStore): position = row[0] internal = json.loads(row[1]) event_json = json.loads(row[2]) - event = FrozenEvent(event_json, internal_metadata_dict=internal) self._invalidate_caches_for_event( event, backfilled, reset_state=position in state_resets diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index f69f1cdad4..46cf93ff87 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -112,7 +112,7 @@ class StreamIdGenerator(object): self._current + self._step * (n + 1), self._step ) - self._current += n + self._current += n * self._step for next_id in next_ids: self._unfinished_ids.append(next_id) diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index 0f525a8943..983caafe8a 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -51,7 +51,7 @@ class BaseSlavedStoreTestCase(unittest.TestCase): def check(self, method, args, expected_result=None): master_result = yield getattr(self.master_store, method)(*args) slaved_result = yield getattr(self.slaved_store, method)(*args) - self.assertEqual(master_result, slaved_result) if expected_result is not None: self.assertEqual(master_result, expected_result) self.assertEqual(slaved_result, expected_result) + self.assertEqual(master_result, slaved_result) diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 9af62702b3..baa4a26eb5 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -205,13 +205,59 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): [join3] ) + @defer.inlineCallbacks + def test_redactions(self): + yield self.persist(type="m.room.create", key="", creator=USER_ID) + yield self.persist(type="m.room.member", key=USER_ID, membership="join") + + msg = yield self.persist( + type="m.room.message", msgtype="m.text", body="Hello" + ) + yield self.replicate() + yield self.check("get_event", [msg.event_id], msg) + + redaction = yield self.persist( + type="m.room.redaction", redacts=msg.event_id + ) + yield self.replicate() + + msg_dict = msg.get_dict() + msg_dict["content"] = {} + msg_dict["unsigned"]["redacted_by"] = redaction.event_id + msg_dict["unsigned"]["redacted_because"] = redaction + redacted = FrozenEvent(msg_dict, msg.internal_metadata.get_dict()) + yield self.check("get_event", [msg.event_id], redacted) + + @defer.inlineCallbacks + def test_backfilled_redactions(self): + yield self.persist(type="m.room.create", key="", creator=USER_ID) + yield self.persist(type="m.room.member", key=USER_ID, membership="join") + + msg = yield self.persist( + type="m.room.message", msgtype="m.text", body="Hello" + ) + yield self.replicate() + yield self.check("get_event", [msg.event_id], msg) + + redaction = yield self.persist( + type="m.room.redaction", redacts=msg.event_id, backfill=True + ) + yield self.replicate() + + msg_dict = msg.get_dict() + msg_dict["content"] = {} + msg_dict["unsigned"]["redacted_by"] = redaction.event_id + msg_dict["unsigned"]["redacted_because"] = redaction + redacted = FrozenEvent(msg_dict, msg.internal_metadata.get_dict()) + yield self.check("get_event", [msg.event_id], redacted) + event_id = 0 @defer.inlineCallbacks def persist( self, sender=USER_ID, room_id=ROOM_ID, type={}, key=None, internal={}, state=None, reset_state=False, backfill=False, - depth=None, prev_events=[], auth_events=[], prev_state=[], + depth=None, prev_events=[], auth_events=[], prev_state=[], redacts=None, **content ): """ @@ -236,6 +282,9 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): event_dict["state_key"] = key event_dict["prev_state"] = prev_state + if redacts is not None: + event_dict["redacts"] = redacts + event = FrozenEvent(event_dict, internal_metadata_dict=internal) self.event_id += 1 -- cgit 1.4.1 From 7e2f971c08250cf432d43dd6244faefb2074ff8c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 8 Apr 2016 14:01:56 +0100 Subject: Remove some unused functions (#711) * Remove some unused functions * get_room_events_stream is only used in tests * is_exclusive_room might actually be something we want --- synapse/appservice/api.py | 5 - synapse/handlers/message.py | 29 ------ synapse/handlers/room_member.py | 13 --- synapse/storage/_base.py | 6 -- synapse/storage/prepare_database.py | 12 --- synapse/storage/presence.py | 10 -- synapse/storage/roommember.py | 33 ------- synapse/storage/stream.py | 90 ------------------ synapse/util/__init__.py | 3 - synapse/util/ratelimitutils.py | 14 --- synapse/util/stringutils.py | 4 - tests/storage/test_presence.py | 27 ------ tests/storage/test_redaction.py | 51 +--------- tests/storage/test_roommember.py | 7 -- tests/storage/test_stream.py | 185 ------------------------------------ 15 files changed, 4 insertions(+), 485 deletions(-) delete mode 100644 tests/storage/test_stream.py (limited to 'tests') diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index bc90605324..6da6a1b62e 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -100,11 +100,6 @@ class ApplicationServiceApi(SimpleHttpClient): logger.warning("push_bulk to %s threw exception %s", uri, ex) defer.returnValue(False) - @defer.inlineCallbacks - def push(self, service, event, txn_id=None): - response = yield self.push_bulk(service, [event], txn_id) - defer.returnValue(response) - def _serialize(self, events): time_now = self.clock.time_msec() return [ diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index fa78d4acec..f51feda2f4 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -44,35 +44,6 @@ class MessageHandler(BaseHandler): self.validator = EventValidator() self.snapshot_cache = SnapshotCache() - @defer.inlineCallbacks - def get_message(self, msg_id=None, room_id=None, sender_id=None, - user_id=None): - """ Retrieve a message. - - Args: - msg_id (str): The message ID to obtain. - room_id (str): The room where the message resides. - sender_id (str): The user ID of the user who sent the message. - user_id (str): The user ID of the user making this request. - Returns: - The message, or None if no message exists. - Raises: - SynapseError if something went wrong. - """ - yield self.auth.check_joined_room(room_id, user_id) - - # Pull out the message from the db -# msg = yield self.store.get_message( -# room_id=room_id, -# msg_id=msg_id, -# user_id=sender_id -# ) - - # TODO (erikj): Once we work out the correct c-s api we need to think - # on how to do this. - - defer.returnValue(None) - @defer.inlineCallbacks def get_messages(self, requester, room_id=None, pagin_config=None, as_client_event=True): diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 753c75d9c1..b69f36aefe 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -392,19 +392,6 @@ class RoomMemberHandler(BaseHandler): and guest_access.content["guest_access"] == "can_join" ) - def _should_do_dance(self, current_state, inviter, room_hosts=None): - # TODO: Shouldn't this be remote_room_host? - room_hosts = room_hosts or [] - - is_host_in_room = self.is_host_in_room(current_state) - if is_host_in_room: - return False, room_hosts - - if inviter and not self.hs.is_mine(inviter): - room_hosts.append(inviter.domain) - - return True, room_hosts - @defer.inlineCallbacks def lookup_room_alias(self, room_alias): """ diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 04d7fcf6d6..1e27c2c0ce 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -810,12 +810,6 @@ class SQLBaseStore(object): return txn.execute(sql, keyvalues.values()) - def get_next_stream_id(self): - with self._next_stream_id_lock: - i = self._next_stream_id - self._next_stream_id += 1 - return i - def _get_cache_dict(self, db_conn, table, entity_column, stream_column, max_value): # Fetch a mapping of room_id -> max stream position for "recent" rooms. diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 00833422af..57f14fd12b 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -30,18 +30,6 @@ SCHEMA_VERSION = 31 dir_path = os.path.abspath(os.path.dirname(__file__)) -def read_schema(path): - """ Read the named database schema. - - Args: - path: Path of the database schema. - Returns: - A string containing the database schema. - """ - with open(path) as schema_file: - return schema_file.read() - - class PrepareDatabaseException(Exception): pass diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 59b4ef5ce6..07f5fae8dd 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -176,16 +176,6 @@ class PresenceStore(SQLBaseStore): desc="disallow_presence_visible", ) - def is_presence_visible(self, observed_localpart, observer_userid): - return self._simple_select_one( - table="presence_allow_inbound", - keyvalues={"observed_user_id": observed_localpart, - "observer_user_id": observer_userid}, - retcols=["observed_user_id"], - allow_none=True, - desc="is_presence_visible", - ) - def add_presence_list_pending(self, observer_localpart, observed_userid): return self._simple_insert( table="presence_list", diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 66e7a40e3c..77518e893f 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -121,26 +121,6 @@ class RoomMemberStore(SQLBaseStore): with self._stream_id_gen.get_next() as stream_ordering: yield self.runInteraction("locally_reject_invite", f, stream_ordering) - def get_room_member(self, user_id, room_id): - """Retrieve the current state of a room member. - - Args: - user_id (str): The member's user ID. - room_id (str): The room the member is in. - Returns: - Deferred: Results in a MembershipEvent or None. - """ - return self.runInteraction( - "get_room_member", - self._get_members_events_txn, - room_id, - user_id=user_id, - ).addCallback( - self._get_events - ).addCallback( - lambda events: events[0] if events else None - ) - @cached(max_entries=5000) def get_users_in_room(self, room_id): def f(txn): @@ -203,19 +183,6 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(invite) defer.returnValue(None) - def get_leave_and_ban_events_for_user(self, user_id): - """ Get all the leave events for a user - Args: - user_id (str): The user ID. - Returns: - A deferred list of event objects. - """ - return self.get_rooms_for_user_where_membership_is( - user_id, (Membership.LEAVE, Membership.BAN) - ).addCallback(lambda leaves: self._get_events([ - leave.event_id for leave in leaves - ])) - def get_rooms_for_user_where_membership_is(self, user_id, membership_list): """ Get all the rooms for this user where the membership for this user matches one in the membership list. diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 76bcd9cd00..95b12559a6 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -303,96 +303,6 @@ class StreamStore(SQLBaseStore): defer.returnValue(ret) - def get_room_events_stream( - self, - user_id, - from_key, - to_key, - limit=0, - is_guest=False, - room_ids=None - ): - room_ids = room_ids or [] - room_ids = [r for r in room_ids] - if is_guest: - current_room_membership_sql = ( - "SELECT c.room_id FROM history_visibility AS h" - " INNER JOIN current_state_events AS c" - " ON h.event_id = c.event_id" - " WHERE c.room_id IN (%s)" - " AND h.history_visibility = 'world_readable'" % ( - ",".join(map(lambda _: "?", room_ids)) - ) - ) - current_room_membership_args = room_ids - else: - current_room_membership_sql = ( - "SELECT m.room_id FROM room_memberships as m " - " INNER JOIN current_state_events as c" - " ON m.event_id = c.event_id AND c.state_key = m.user_id" - " WHERE m.user_id = ? AND m.membership = 'join'" - ) - current_room_membership_args = [user_id] - - # We also want to get any membership events about that user, e.g. - # invites or leave notifications. - membership_sql = ( - "SELECT m.event_id FROM room_memberships as m " - "INNER JOIN current_state_events as c ON m.event_id = c.event_id " - "WHERE m.user_id = ? " - ) - membership_args = [user_id] - - if limit: - limit = max(limit, MAX_STREAM_SIZE) - else: - limit = MAX_STREAM_SIZE - - # From and to keys should be integers from ordering. - from_id = RoomStreamToken.parse_stream_token(from_key) - to_id = RoomStreamToken.parse_stream_token(to_key) - - if from_key == to_key: - return defer.succeed(([], to_key)) - - sql = ( - "SELECT e.event_id, e.stream_ordering FROM events AS e WHERE " - "(e.outlier = ? AND (room_id IN (%(current)s)) OR " - "(event_id IN (%(invites)s))) " - "AND e.stream_ordering > ? AND e.stream_ordering <= ? " - "ORDER BY stream_ordering ASC LIMIT %(limit)d " - ) % { - "current": current_room_membership_sql, - "invites": membership_sql, - "limit": limit - } - - def f(txn): - args = ([False] + current_room_membership_args + membership_args + - [from_id.stream, to_id.stream]) - txn.execute(sql, args) - - rows = self.cursor_to_dict(txn) - - ret = self._get_events_txn( - txn, - [r["event_id"] for r in rows], - get_prev_content=True - ) - - self._set_before_and_after(ret, rows) - - if rows: - key = "s%d" % max(r["stream_ordering"] for r in rows) - else: - # Assume we didn't get anything because there was nothing to - # get. - key = to_key - - return ret, key - - return self.runInteraction("get_room_events_stream", f) - @defer.inlineCallbacks def paginate_room_events(self, room_id, from_key, to_key=None, direction='b', limit=-1): diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 3b9da5b34a..b462495eb8 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -49,9 +49,6 @@ class Clock(object): l.start(msec / 1000.0, now=False) return l - def stop_looping_call(self, loop): - loop.stop() - def call_later(self, delay, callback, *args, **kwargs): """Call something later diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 4076eed269..1101881a2d 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -100,20 +100,6 @@ class _PerHostRatelimiter(object): self.current_processing = set() self.request_times = [] - def is_empty(self): - time_now = self.clock.time_msec() - self.request_times[:] = [ - r for r in self.request_times - if time_now - r < self.window_size - ] - - return not ( - self.ready_request_queue - or self.sleeping_requests - or self.current_processing - or self.request_times - ) - @contextlib.contextmanager def ratelimit(self): # `contextlib.contextmanager` takes a generator and turns it into a diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py index b490bb8725..a100f151d4 100644 --- a/synapse/util/stringutils.py +++ b/synapse/util/stringutils.py @@ -21,10 +21,6 @@ _string_with_symbols = ( ) -def origin_from_ucid(ucid): - return ucid.split("@", 1)[1] - - def random_string(length): return ''.join(random.choice(string.ascii_letters) for _ in xrange(length)) diff --git a/tests/storage/test_presence.py b/tests/storage/test_presence.py index ec78f007ca..63203cea35 100644 --- a/tests/storage/test_presence.py +++ b/tests/storage/test_presence.py @@ -34,33 +34,6 @@ class PresenceStoreTestCase(unittest.TestCase): self.u_apple = UserID.from_string("@apple:test") self.u_banana = UserID.from_string("@banana:test") - @defer.inlineCallbacks - def test_visibility(self): - self.assertFalse((yield self.store.is_presence_visible( - observed_localpart=self.u_apple.localpart, - observer_userid=self.u_banana.to_string(), - ))) - - yield self.store.allow_presence_visible( - observed_localpart=self.u_apple.localpart, - observer_userid=self.u_banana.to_string(), - ) - - self.assertTrue((yield self.store.is_presence_visible( - observed_localpart=self.u_apple.localpart, - observer_userid=self.u_banana.to_string(), - ))) - - yield self.store.disallow_presence_visible( - observed_localpart=self.u_apple.localpart, - observer_userid=self.u_banana.to_string(), - ) - - self.assertFalse((yield self.store.is_presence_visible( - observed_localpart=self.u_apple.localpart, - observer_userid=self.u_banana.to_string(), - ))) - @defer.inlineCallbacks def test_presence_list(self): self.assertEquals( diff --git a/tests/storage/test_redaction.py b/tests/storage/test_redaction.py index 5880409867..6afaca3a61 100644 --- a/tests/storage/test_redaction.py +++ b/tests/storage/test_redaction.py @@ -110,22 +110,10 @@ class RedactionTestCase(unittest.TestCase): self.room1, self.u_alice, Membership.JOIN ) - start = yield self.store.get_room_events_max_id() - msg_event = yield self.inject_message(self.room1, self.u_alice, u"t") - end = yield self.store.get_room_events_max_id() - - results, _ = yield self.store.get_room_events_stream( - self.u_alice.to_string(), - start, - end, - ) - - self.assertEqual(1, len(results)) - # Check event has not been redacted: - event = results[0] + event = yield self.store.get_event(msg_event.event_id) self.assertObjectHasAttributes( { @@ -144,17 +132,7 @@ class RedactionTestCase(unittest.TestCase): self.room1, msg_event.event_id, self.u_alice, reason ) - results, _ = yield self.store.get_room_events_stream( - self.u_alice.to_string(), - start, - end, - ) - - self.assertEqual(1, len(results)) - - # Check redaction - - event = results[0] + event = yield self.store.get_event(msg_event.event_id) self.assertEqual(msg_event.event_id, event.event_id) @@ -184,25 +162,12 @@ class RedactionTestCase(unittest.TestCase): self.room1, self.u_alice, Membership.JOIN ) - start = yield self.store.get_room_events_max_id() - msg_event = yield self.inject_room_member( self.room1, self.u_bob, Membership.JOIN, extra_content={"blue": "red"}, ) - end = yield self.store.get_room_events_max_id() - - results, _ = yield self.store.get_room_events_stream( - self.u_alice.to_string(), - start, - end, - ) - - self.assertEqual(1, len(results)) - - # Check event has not been redacted: - event = results[0] + event = yield self.store.get_event(msg_event.event_id) self.assertObjectHasAttributes( { @@ -221,17 +186,9 @@ class RedactionTestCase(unittest.TestCase): self.room1, msg_event.event_id, self.u_alice, reason ) - results, _ = yield self.store.get_room_events_stream( - self.u_alice.to_string(), - start, - end, - ) - - self.assertEqual(1, len(results)) - # Check redaction - event = results[0] + event = yield self.store.get_event(msg_event.event_id) self.assertTrue("redacted_because" in event.unsigned) diff --git a/tests/storage/test_roommember.py b/tests/storage/test_roommember.py index b029ff0584..997090fe35 100644 --- a/tests/storage/test_roommember.py +++ b/tests/storage/test_roommember.py @@ -70,13 +70,6 @@ class RoomMemberStoreTestCase(unittest.TestCase): def test_one_member(self): yield self.inject_room_member(self.room, self.u_alice, Membership.JOIN) - self.assertEquals( - Membership.JOIN, - (yield self.store.get_room_member( - user_id=self.u_alice.to_string(), - room_id=self.room.to_string(), - )).membership - ) self.assertEquals( [self.u_alice.to_string()], [m.user_id for m in ( diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py deleted file mode 100644 index da322152c7..0000000000 --- a/tests/storage/test_stream.py +++ /dev/null @@ -1,185 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014-2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from tests import unittest -from twisted.internet import defer - -from synapse.api.constants import EventTypes, Membership -from synapse.types import UserID, RoomID -from tests.storage.event_injector import EventInjector - -from tests.utils import setup_test_homeserver - -from mock import Mock - - -class StreamStoreTestCase(unittest.TestCase): - - @defer.inlineCallbacks - def setUp(self): - hs = yield setup_test_homeserver( - resource_for_federation=Mock(), - http_client=None, - ) - - self.store = hs.get_datastore() - self.event_builder_factory = hs.get_event_builder_factory() - self.event_injector = EventInjector(hs) - self.handlers = hs.get_handlers() - self.message_handler = self.handlers.message_handler - - self.u_alice = UserID.from_string("@alice:test") - self.u_bob = UserID.from_string("@bob:test") - - self.room1 = RoomID.from_string("!abc123:test") - self.room2 = RoomID.from_string("!xyx987:test") - - @defer.inlineCallbacks - def test_event_stream_get_other(self): - # Both bob and alice joins the room - yield self.event_injector.inject_room_member( - self.room1, self.u_alice, Membership.JOIN - ) - yield self.event_injector.inject_room_member( - self.room1, self.u_bob, Membership.JOIN - ) - - # Initial stream key: - start = yield self.store.get_room_events_max_id() - - yield self.event_injector.inject_message(self.room1, self.u_alice, u"test") - - end = yield self.store.get_room_events_max_id() - - results, _ = yield self.store.get_room_events_stream( - self.u_bob.to_string(), - start, - end, - ) - - self.assertEqual(1, len(results)) - - event = results[0] - - self.assertObjectHasAttributes( - { - "type": EventTypes.Message, - "user_id": self.u_alice.to_string(), - "content": {"body": "test", "msgtype": "message"}, - }, - event, - ) - - @defer.inlineCallbacks - def test_event_stream_get_own(self): - # Both bob and alice joins the room - yield self.event_injector.inject_room_member( - self.room1, self.u_alice, Membership.JOIN - ) - yield self.event_injector.inject_room_member( - self.room1, self.u_bob, Membership.JOIN - ) - - # Initial stream key: - start = yield self.store.get_room_events_max_id() - - yield self.event_injector.inject_message(self.room1, self.u_alice, u"test") - - end = yield self.store.get_room_events_max_id() - - results, _ = yield self.store.get_room_events_stream( - self.u_alice.to_string(), - start, - end, - ) - - self.assertEqual(1, len(results)) - - event = results[0] - - self.assertObjectHasAttributes( - { - "type": EventTypes.Message, - "user_id": self.u_alice.to_string(), - "content": {"body": "test", "msgtype": "message"}, - }, - event, - ) - - @defer.inlineCallbacks - def test_event_stream_join_leave(self): - # Both bob and alice joins the room - yield self.event_injector.inject_room_member( - self.room1, self.u_alice, Membership.JOIN - ) - yield self.event_injector.inject_room_member( - self.room1, self.u_bob, Membership.JOIN - ) - - # Then bob leaves again. - yield self.event_injector.inject_room_member( - self.room1, self.u_bob, Membership.LEAVE - ) - - # Initial stream key: - start = yield self.store.get_room_events_max_id() - - yield self.event_injector.inject_message(self.room1, self.u_alice, u"test") - - end = yield self.store.get_room_events_max_id() - - results, _ = yield self.store.get_room_events_stream( - self.u_bob.to_string(), - start, - end, - ) - - # We should not get the message, as it happened *after* bob left. - self.assertEqual(0, len(results)) - - @defer.inlineCallbacks - def test_event_stream_prev_content(self): - yield self.event_injector.inject_room_member( - self.room1, self.u_bob, Membership.JOIN - ) - - yield self.event_injector.inject_room_member( - self.room1, self.u_alice, Membership.JOIN - ) - - start = yield self.store.get_room_events_max_id() - - yield self.event_injector.inject_room_member( - self.room1, self.u_alice, Membership.JOIN, - ) - - end = yield self.store.get_room_events_max_id() - - results, _ = yield self.store.get_room_events_stream( - self.u_bob.to_string(), - start, - end, - ) - - # We should not get the message, as it happened *after* bob left. - self.assertEqual(1, len(results)) - - event = results[0] - - self.assertTrue( - "prev_content" in event.unsigned, - msg="No prev_content key" - ) -- cgit 1.4.1 From eb8619e2562432cdd22a625bb612c907c63f723c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 Apr 2016 16:08:32 +0100 Subject: Create log context in Measure if one doesn't exist --- synapse/util/metrics.py | 23 +++++++++++++++++------ tests/test_state.py | 4 ++-- 2 files changed, 19 insertions(+), 8 deletions(-) (limited to 'tests') diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index c51b641125..e1f374807e 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -50,7 +50,7 @@ block_db_txn_duration = metrics.register_distribution( class Measure(object): __slots__ = [ "clock", "name", "start_context", "start", "new_context", "ru_utime", - "ru_stime", "db_txn_count", "db_txn_duration" + "ru_stime", "db_txn_count", "db_txn_duration", "created_context" ] def __init__(self, clock, name): @@ -58,14 +58,20 @@ class Measure(object): self.name = name self.start_context = None self.start = None + self.created_context = False def __enter__(self): self.start = self.clock.time_msec() self.start_context = LoggingContext.current_context() - if self.start_context: - self.ru_utime, self.ru_stime = self.start_context.get_resource_usage() - self.db_txn_count = self.start_context.db_txn_count - self.db_txn_duration = self.start_context.db_txn_duration + if not self.start_context: + logger.warn("Entered Measure without log context: %s", self.name) + self.start_context = LoggingContext("Measure") + self.start_context.__enter__() + self.created_context = True + + self.ru_utime, self.ru_stime = self.start_context.get_resource_usage() + self.db_txn_count = self.start_context.db_txn_count + self.db_txn_duration = self.start_context.db_txn_duration def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is not None or not self.start_context: @@ -91,7 +97,12 @@ class Measure(object): block_ru_utime.inc_by(ru_utime - self.ru_utime, self.name) block_ru_stime.inc_by(ru_stime - self.ru_stime, self.name) - block_db_txn_count.inc_by(context.db_txn_count - self.db_txn_count, self.name) + block_db_txn_count.inc_by( + context.db_txn_count - self.db_txn_count, self.name + ) block_db_txn_duration.inc_by( context.db_txn_duration - self.db_txn_duration, self.name ) + + if self.created_context: + self.start_context.__exit__(exc_type, exc_val, exc_tb) diff --git a/tests/test_state.py b/tests/test_state.py index a1ea7ef672..1a11bbcee0 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -140,13 +140,13 @@ class StateTestCase(unittest.TestCase): "add_event_hashes", ] ) - hs = Mock(spec=[ + hs = Mock(spec_set=[ "get_datastore", "get_auth", "get_state_handler", "get_clock", ]) hs.get_datastore.return_value = self.store hs.get_state_handler.return_value = None - hs.get_auth.return_value = Auth(hs) hs.get_clock.return_value = MockClock() + hs.get_auth.return_value = Auth(hs) self.state = StateHandler(hs) self.event_id = 0 -- cgit 1.4.1 From e99365f6015af6dc0c2c107f47bda3760ff1153e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 19 Apr 2016 15:22:14 +0100 Subject: Replicate get_invited_rooms_for_user --- synapse/replication/slave/storage/events.py | 9 +++++++-- tests/replication/slave/storage/test_events.py | 12 ++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) (limited to 'tests') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index cfc728a038..82f171c257 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -68,6 +68,9 @@ class SlavedEventStore(BaseSlavedStore): _get_current_state_for_key = StateStore.__dict__[ "_get_current_state_for_key" ] + get_invited_rooms_for_user = RoomMemberStore.__dict__[ + "get_invited_rooms_for_user" + ] get_event = DataStore.get_event.__func__ get_current_state = DataStore.get_current_state.__func__ @@ -82,6 +85,7 @@ class SlavedEventStore(BaseSlavedStore): get_room_events_stream_for_room = ( DataStore.get_room_events_stream_for_room.__func__ ) + _set_before_and_after = DataStore._set_before_and_after _get_events = DataStore._get_events.__func__ @@ -147,11 +151,11 @@ class SlavedEventStore(BaseSlavedStore): internal = json.loads(row[1]) event_json = json.loads(row[2]) event = FrozenEvent(event_json, internal_metadata_dict=internal) - self._invalidate_caches_for_event( + self.invalidate_caches_for_event( event, backfilled, reset_state=position in state_resets ) - def _invalidate_caches_for_event(self, event, backfilled, reset_state): + def invalidate_caches_for_event(self, event, backfilled, reset_state): if reset_state: self._get_current_state_for_key.invalidate_all() self.get_rooms_for_user.invalidate_all() @@ -182,6 +186,7 @@ class SlavedEventStore(BaseSlavedStore): # self._membership_stream_cache.entity_has_changed( # event.state_key, event.internal_metadata.stream_ordering # ) + self.get_invited_rooms_for_user.invalidate((event.state_key,)) if not event.is_state(): return diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index baa4a26eb5..88b8d08110 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -251,6 +251,18 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): redacted = FrozenEvent(msg_dict, msg.internal_metadata.get_dict()) yield self.check("get_event", [msg.event_id], redacted) + @defer.inlineCallbacks + def test_invites(self): + yield self.check("get_invited_rooms_for_user", [USER_ID_2], []) + event = yield self.persist( + type="m.room.member", key=USER_ID_2, membership="invite" + ) + yield self.replicate() + yield self.check("get_invited_rooms_for_user", [USER_ID_2], [RoomsForUser( + ROOM_ID, USER_ID, "invite", event.event_id, + event.internal_metadata.stream_ordering + )]) + event_id = 0 @defer.inlineCallbacks -- cgit 1.4.1 From 5bbd424ee0c983d305014df618fa1917ecd10d91 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 19 Apr 2016 17:11:44 +0100 Subject: Add a slaved receipts store --- synapse/replication/slave/storage/receipts.py | 61 ++++++++++++++++++++++++ tests/replication/slave/storage/_base.py | 4 +- tests/replication/slave/storage/test_events.py | 3 ++ tests/replication/slave/storage/test_receipts.py | 39 +++++++++++++++ 4 files changed, 104 insertions(+), 3 deletions(-) create mode 100644 synapse/replication/slave/storage/receipts.py create mode 100644 tests/replication/slave/storage/test_receipts.py (limited to 'tests') diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py new file mode 100644 index 0000000000..b55d5dfd08 --- /dev/null +++ b/synapse/replication/slave/storage/receipts.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker + +from synapse.storage import DataStore +from synapse.storage.receipts import ReceiptsStore + +# So, um, we want to borrow a load of functions intended for reading from +# a DataStore, but we don't want to take functions that either write to the +# DataStore or are cached and don't have cache invalidation logic. +# +# Rather than write duplicate versions of those functions, or lift them to +# a common base class, we going to grab the underlying __func__ object from +# the method descriptor on the DataStore and chuck them into our class. + + +class SlavedReceiptsStore(BaseSlavedStore): + + def __init__(self, db_conn, hs): + super(SlavedReceiptsStore, self).__init__(db_conn, hs) + + self._receipts_id_gen = SlavedIdTracker( + db_conn, "receipts_linearized", "stream_id" + ) + + get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"] + + get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__ + get_all_updated_receipts = DataStore.get_all_updated_receipts.__func__ + + def stream_positions(self): + result = super(SlavedReceiptsStore, self).stream_positions() + result["receipts"] = self._receipts_id_gen.get_current_token() + return result + + def process_replication(self, result): + stream = result.get("receipts") + if stream: + self._receipts_id_gen.advance(stream["position"]) + for row in stream["rows"]: + room_id, receipt_type, user_id = row[1:4] + self.invalidate_caches_for_receipt(room_id, receipt_type, user_id) + + return super(SlavedReceiptsStore, self).process_replication(result) + + def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id): + self.get_receipts_for_user.invalidate((user_id, receipt_type)) diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index 983caafe8a..1f13cd0bc0 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -15,8 +15,6 @@ from twisted.internet import defer from tests import unittest -from synapse.replication.slave.storage.events import SlavedEventStore - from mock import Mock, NonCallableMock from tests.utils import setup_test_homeserver from synapse.replication.resource import ReplicationResource @@ -38,7 +36,7 @@ class BaseSlavedStoreTestCase(unittest.TestCase): self.replication = ReplicationResource(self.hs) self.master_store = self.hs.get_datastore() - self.slaved_store = SlavedEventStore(self.hs.get_db_conn(), self.hs) + self.slaved_store = self.STORE_TYPE(self.hs.get_db_conn(), self.hs) self.event_id = 0 @defer.inlineCallbacks diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index baa4a26eb5..66f166047e 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -16,6 +16,7 @@ from ._base import BaseSlavedStoreTestCase from synapse.events import FrozenEvent, _EventInternalMetadata from synapse.events.snapshot import EventContext +from synapse.replication.slave.storage.events import SlavedEventStore from synapse.storage.roommember import RoomsForUser from twisted.internet import defer @@ -43,6 +44,8 @@ def patch__eq__(cls): class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): + STORE_TYPE = SlavedEventStore + def setUp(self): # Patch up the equality operator for events so that we can check # whether lists of events match using assertEquals diff --git a/tests/replication/slave/storage/test_receipts.py b/tests/replication/slave/storage/test_receipts.py new file mode 100644 index 0000000000..6624fe4eea --- /dev/null +++ b/tests/replication/slave/storage/test_receipts.py @@ -0,0 +1,39 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStoreTestCase + +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore + +from twisted.internet import defer + +USER_ID = "@feeling:blue" +ROOM_ID = "!room:blue" +EVENT_ID = "$event:blue" + + +class SlavedReceiptTestCase(BaseSlavedStoreTestCase): + + STORE_TYPE = SlavedReceiptsStore + + @defer.inlineCallbacks + def test_receipt(self): + yield self.check("get_receipts_for_user", [USER_ID, "m.read"], {}) + yield self.master_store.insert_receipt( + ROOM_ID, "m.read", USER_ID, [EVENT_ID], {} + ) + yield self.replicate() + yield self.check("get_receipts_for_user", [USER_ID, "m.read"], { + ROOM_ID: EVENT_ID + }) -- cgit 1.4.1 From c0d8e0eb6375d5b8754db420af43733a642c7f38 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 21 Apr 2016 15:25:47 +0100 Subject: Replicate push actions --- synapse/replication/slave/storage/events.py | 14 +++++++++ tests/replication/slave/storage/test_events.py | 43 ++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) (limited to 'tests') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 82f171c257..5f37ba6995 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -21,6 +21,7 @@ from synapse.storage import DataStore from synapse.storage.room import RoomStore from synapse.storage.roommember import RoomMemberStore from synapse.storage.event_federation import EventFederationStore +from synapse.storage.event_push_actions import EventPushActionsStore from synapse.storage.state import StateStore from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -71,7 +72,16 @@ class SlavedEventStore(BaseSlavedStore): get_invited_rooms_for_user = RoomMemberStore.__dict__[ "get_invited_rooms_for_user" ] + get_unread_event_push_actions_by_room_for_user = ( + EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"] + ) + get_unread_push_actions_for_user_in_range = ( + DataStore.get_unread_push_actions_for_user_in_range.__func__ + ) + get_push_action_users_in_range = ( + DataStore.get_push_action_users_in_range.__func__ + ) get_event = DataStore.get_event.__func__ get_current_state = DataStore.get_current_state.__func__ get_current_state_for_key = DataStore.get_current_state_for_key.__func__ @@ -167,6 +177,10 @@ class SlavedEventStore(BaseSlavedStore): self.get_latest_event_ids_in_room.invalidate((event.room_id,)) + self.get_unread_event_push_actions_by_room_for_user.invalidate_many( + (event.room_id,) + ) + if not backfilled: self._events_stream_cache.entity_has_changed( event.room_id, event.internal_metadata.stream_ordering diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 41a626cf70..17587fda00 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -266,6 +266,47 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): event.internal_metadata.stream_ordering )]) + @defer.inlineCallbacks + def test_push_actions_for_user(self): + yield self.persist(type="m.room.create", creator=USER_ID) + yield self.persist(type="m.room.join", key=USER_ID, membership="join") + yield self.persist( + type="m.room.join", sender=USER_ID, key=USER_ID_2, membership="join" + ) + event1 = yield self.persist( + type="m.room.message", msgtype="m.text", body="hello" + ) + yield self.replicate() + yield self.check( + "get_unread_event_push_actions_by_room_for_user", + [ROOM_ID, USER_ID_2, event1.event_id], + {"highlight_count": 0, "notify_count": 0} + ) + + yield self.persist( + type="m.room.message", msgtype="m.text", body="world", + push_actions=[(USER_ID_2, ["notify"])], + ) + yield self.replicate() + yield self.check( + "get_unread_event_push_actions_by_room_for_user", + [ROOM_ID, USER_ID_2, event1.event_id], + {"highlight_count": 0, "notify_count": 1} + ) + + yield self.persist( + type="m.room.message", msgtype="m.text", body="world", + push_actions=[(USER_ID_2, [ + "notify", {"set_tweak": "highlight", "value": True} + ])], + ) + yield self.replicate() + yield self.check( + "get_unread_event_push_actions_by_room_for_user", + [ROOM_ID, USER_ID_2, event1.event_id], + {"highlight_count": 1, "notify_count": 2} + ) + event_id = 0 @defer.inlineCallbacks @@ -273,6 +314,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): self, sender=USER_ID, room_id=ROOM_ID, type={}, key=None, internal={}, state=None, reset_state=False, backfill=False, depth=None, prev_events=[], auth_events=[], prev_state=[], redacts=None, + push_actions=[], **content ): """ @@ -305,6 +347,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): self.event_id += 1 context = EventContext(current_state=state) + context.push_actions = push_actions ordering = None if backfill: -- cgit 1.4.1 From 40aa6e8349b348802d6f87084c31c3895f728708 Mon Sep 17 00:00:00 2001 From: Negi Fazeli Date: Wed, 20 Apr 2016 16:21:40 +0200 Subject: Create user with expiry - Add unittests for client, api and handler Signed-off-by: Negar Fazeli --- synapse/api/auth.py | 3 +- synapse/config/key.py | 5 ++ synapse/config/registration.py | 6 +++ synapse/handlers/auth.py | 4 +- synapse/handlers/register.py | 53 +++++++++++++++++++++ synapse/rest/client/v1/register.py | 71 ++++++++++++++++++++++++++++ tests/api/test_auth.py | 12 ++--- tests/handlers/test_register.py | 67 ++++++++++++++++++++++++++ tests/rest/client/v1/test_register.py | 88 +++++++++++++++++++++++++++++++++++ tests/utils.py | 1 + 10 files changed, 301 insertions(+), 9 deletions(-) create mode 100644 tests/handlers/test_register.py create mode 100644 tests/rest/client/v1/test_register.py (limited to 'tests') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index d3e9837c81..44e38b777a 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -612,7 +612,8 @@ class Auth(object): def get_user_from_macaroon(self, macaroon_str): try: macaroon = pymacaroons.Macaroon.deserialize(macaroon_str) - self.validate_macaroon(macaroon, "access", False) + + self.validate_macaroon(macaroon, "access", self.hs.config.expire_access_token) user_prefix = "user_id = " user = None diff --git a/synapse/config/key.py b/synapse/config/key.py index a072aec714..6ee643793e 100644 --- a/synapse/config/key.py +++ b/synapse/config/key.py @@ -57,6 +57,8 @@ class KeyConfig(Config): seed = self.signing_key[0].seed self.macaroon_secret_key = hashlib.sha256(seed) + self.expire_access_token = config.get("expire_access_token", False) + def default_config(self, config_dir_path, server_name, is_generating_file=False, **kwargs): base_key_name = os.path.join(config_dir_path, server_name) @@ -69,6 +71,9 @@ class KeyConfig(Config): return """\ macaroon_secret_key: "%(macaroon_secret_key)s" + # Used to enable access token expiration. + expire_access_token: False + ## Signing Keys ## # Path to the signing key to sign messages with diff --git a/synapse/config/registration.py b/synapse/config/registration.py index 87e500c97a..cc3f879857 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -32,6 +32,7 @@ class RegistrationConfig(Config): ) self.registration_shared_secret = config.get("registration_shared_secret") + self.user_creation_max_duration = int(config["user_creation_max_duration"]) self.bcrypt_rounds = config.get("bcrypt_rounds", 12) self.trusted_third_party_id_servers = config["trusted_third_party_id_servers"] @@ -54,6 +55,11 @@ class RegistrationConfig(Config): # secret, even if registration is otherwise disabled. registration_shared_secret: "%(registration_shared_secret)s" + # Sets the expiry for the short term user creation in + # milliseconds. For instance the bellow duration is two weeks + # in milliseconds. + user_creation_max_duration: 1209600000 + # Set the number of bcrypt rounds used to generate password hash. # Larger numbers increase the work factor needed to generate the hash. # The default number of rounds is 12. diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 61fe56032a..3d36d3460e 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -521,11 +521,11 @@ class AuthHandler(BaseHandler): )) return m.serialize() - def generate_short_term_login_token(self, user_id): + def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)): macaroon = self._generate_base_macaroon(user_id) macaroon.add_first_party_caveat("type = login") now = self.hs.get_clock().time_msec() - expiry = now + (2 * 60 * 1000) + expiry = now + duration_in_ms macaroon.add_first_party_caveat("time < %d" % (expiry,)) return macaroon.serialize() diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index b0862067e1..5883b9111e 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -358,6 +358,59 @@ class RegistrationHandler(BaseHandler): ) defer.returnValue(data) + @defer.inlineCallbacks + def get_or_create_user(self, localpart, displayname, duration_seconds): + """Creates a new user or returns an access token for an existing one + + Args: + localpart : The local part of the user ID to register. If None, + one will be randomly generated. + Returns: + A tuple of (user_id, access_token). + Raises: + RegistrationError if there was a problem registering. + """ + yield run_on_reactor() + + if localpart is None: + raise SynapseError(400, "Request must include user id") + + need_register = True + + try: + yield self.check_username(localpart) + except SynapseError as e: + if e.errcode == Codes.USER_IN_USE: + need_register = False + else: + raise + + user = UserID(localpart, self.hs.hostname) + user_id = user.to_string() + auth_handler = self.hs.get_handlers().auth_handler + token = auth_handler.generate_short_term_login_token(user_id, duration_seconds) + + if need_register: + yield self.store.register( + user_id=user_id, + token=token, + password_hash=None + ) + + yield registered_user(self.distributor, user) + else: + yield self.store.flush_user(user_id=user_id) + yield self.store.add_access_token_to_user(user_id=user_id, token=token) + + if displayname is not None: + logger.info("setting user display name: %s -> %s", user_id, displayname) + profile_handler = self.hs.get_handlers().profile_handler + yield profile_handler.set_displayname( + user, user, displayname + ) + + defer.returnValue((user_id, token)) + def auth_handler(self): return self.hs.get_handlers().auth_handler diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index c6a2ef2ccc..e3f4fbb0bb 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -355,5 +355,76 @@ class RegisterRestServlet(ClientV1RestServlet): ) +class CreateUserRestServlet(ClientV1RestServlet): + """Handles user creation via a server-to-server interface + """ + + PATTERNS = client_path_patterns("/createUser$", releases=()) + + def __init__(self, hs): + super(CreateUserRestServlet, self).__init__(hs) + self.store = hs.get_datastore() + self.direct_user_creation_max_duration = hs.config.user_creation_max_duration + + @defer.inlineCallbacks + def on_POST(self, request): + user_json = parse_json_object_from_request(request) + + if "access_token" not in request.args: + raise SynapseError(400, "Expected application service token.") + + app_service = yield self.store.get_app_service_by_token( + request.args["access_token"][0] + ) + if not app_service: + raise SynapseError(403, "Invalid application service token.") + + logger.debug("creating user: %s", user_json) + + response = yield self._do_create(user_json) + + defer.returnValue((200, response)) + + def on_OPTIONS(self, request): + return 403, {} + + @defer.inlineCallbacks + def _do_create(self, user_json): + yield run_on_reactor() + + if "localpart" not in user_json: + raise SynapseError(400, "Expected 'localpart' key.") + + if "displayname" not in user_json: + raise SynapseError(400, "Expected 'displayname' key.") + + if "duration_seconds" not in user_json: + raise SynapseError(400, "Expected 'duration_seconds' key.") + + localpart = user_json["localpart"].encode("utf-8") + displayname = user_json["displayname"].encode("utf-8") + duration_seconds = 0 + try: + duration_seconds = int(user_json["duration_seconds"]) + except ValueError: + raise SynapseError(400, "Failed to parse 'duration_seconds'") + if duration_seconds > self.direct_user_creation_max_duration: + duration_seconds = self.direct_user_creation_max_duration + + handler = self.handlers.registration_handler + user_id, token = yield handler.get_or_create_user( + localpart=localpart, + displayname=displayname, + duration_seconds=duration_seconds + ) + + defer.returnValue({ + "user_id": user_id, + "access_token": token, + "home_server": self.hs.hostname, + }) + + def register_servlets(hs, http_server): RegisterRestServlet(hs).register(http_server) + CreateUserRestServlet(hs).register(http_server) diff --git a/tests/api/test_auth.py b/tests/api/test_auth.py index 7e7b0b4b1d..ad269af0ec 100644 --- a/tests/api/test_auth.py +++ b/tests/api/test_auth.py @@ -284,12 +284,12 @@ class AuthTestCase(unittest.TestCase): macaroon.add_first_party_caveat("time < 1") # ms self.hs.clock.now = 5000 # seconds - - yield self.auth.get_user_from_macaroon(macaroon.serialize()) + self.hs.config.expire_access_token = True + # yield self.auth.get_user_from_macaroon(macaroon.serialize()) # TODO(daniel): Turn on the check that we validate expiration, when we # validate expiration (and remove the above line, which will start # throwing). - # with self.assertRaises(AuthError) as cm: - # yield self.auth.get_user_from_macaroon(macaroon.serialize()) - # self.assertEqual(401, cm.exception.code) - # self.assertIn("Invalid macaroon", cm.exception.msg) + with self.assertRaises(AuthError) as cm: + yield self.auth.get_user_from_macaroon(macaroon.serialize()) + self.assertEqual(401, cm.exception.code) + self.assertIn("Invalid macaroon", cm.exception.msg) diff --git a/tests/handlers/test_register.py b/tests/handlers/test_register.py new file mode 100644 index 0000000000..8b7be96bd9 --- /dev/null +++ b/tests/handlers/test_register.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer +from .. import unittest + +from synapse.handlers.register import RegistrationHandler + +from tests.utils import setup_test_homeserver + +from mock import Mock + + +class RegistrationHandlers(object): + def __init__(self, hs): + self.registration_handler = RegistrationHandler(hs) + + +class RegistrationTestCase(unittest.TestCase): + """ Tests the RegistrationHandler. """ + + @defer.inlineCallbacks + def setUp(self): + self.mock_distributor = Mock() + self.mock_distributor.declare("registered_user") + self.mock_captcha_client = Mock() + hs = yield setup_test_homeserver( + handlers=None, + http_client=None, + expire_access_token=True) + hs.handlers = RegistrationHandlers(hs) + self.handler = hs.get_handlers().registration_handler + hs.get_handlers().profile_handler = Mock() + self.mock_handler = Mock(spec=[ + "generate_short_term_login_token", + ]) + + hs.get_handlers().auth_handler = self.mock_handler + + @defer.inlineCallbacks + def test_user_is_created_and_logged_in_if_doesnt_exist(self): + """ + Returns: + The user doess not exist in this case so it will register and log it in + """ + duration_ms = 200 + local_part = "someone" + display_name = "someone" + user_id = "@someone:test" + mock_token = self.mock_handler.generate_short_term_login_token + mock_token.return_value = 'secret' + result_user_id, result_token = yield self.handler.get_or_create_user( + local_part, display_name, duration_ms) + self.assertEquals(result_user_id, user_id) + self.assertEquals(result_token, 'secret') diff --git a/tests/rest/client/v1/test_register.py b/tests/rest/client/v1/test_register.py new file mode 100644 index 0000000000..4a898a034f --- /dev/null +++ b/tests/rest/client/v1/test_register.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.rest.client.v1.register import CreateUserRestServlet +from twisted.internet import defer +from mock import Mock +from tests import unittest +import json + + +class CreateUserServletTestCase(unittest.TestCase): + + def setUp(self): + # do the dance to hook up request data to self.request_data + self.request_data = "" + self.request = Mock( + content=Mock(read=Mock(side_effect=lambda: self.request_data)), + path='/_matrix/client/api/v1/createUser' + ) + self.request.args = {} + + self.appservice = None + self.auth = Mock(get_appservice_by_req=Mock( + side_effect=lambda x: defer.succeed(self.appservice)) + ) + + self.auth_result = (False, None, None, None) + self.auth_handler = Mock( + check_auth=Mock(side_effect=lambda x, y, z: self.auth_result), + get_session_data=Mock(return_value=None) + ) + self.registration_handler = Mock() + self.identity_handler = Mock() + self.login_handler = Mock() + + # do the dance to hook it up to the hs global + self.handlers = Mock( + auth_handler=self.auth_handler, + registration_handler=self.registration_handler, + identity_handler=self.identity_handler, + login_handler=self.login_handler + ) + self.hs = Mock() + self.hs.hostname = "supergbig~testing~thing.com" + self.hs.get_auth = Mock(return_value=self.auth) + self.hs.get_handlers = Mock(return_value=self.handlers) + self.hs.config.enable_registration = True + # init the thing we're testing + self.servlet = CreateUserRestServlet(self.hs) + + @defer.inlineCallbacks + def test_POST_createuser_with_valid_user(self): + user_id = "@someone:interesting" + token = "my token" + self.request.args = { + "access_token": "i_am_an_app_service" + } + self.request_data = json.dumps({ + "localpart": "someone", + "displayname": "someone interesting", + "duration_seconds": 200 + }) + + self.registration_handler.get_or_create_user = Mock( + return_value=(user_id, token) + ) + + (code, result) = yield self.servlet.on_POST(self.request) + self.assertEquals(code, 200) + + det_data = { + "user_id": user_id, + "access_token": token, + "home_server": self.hs.hostname + } + self.assertDictContainsSubset(det_data, result) diff --git a/tests/utils.py b/tests/utils.py index c179df31ee..9d7978a642 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -49,6 +49,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): config.event_cache_size = 1 config.enable_registration = True config.macaroon_secret_key = "not even a little secret" + config.expire_access_token = False config.server_name = "server.under.test" config.trusted_third_party_id_servers = [] config.room_invite_state_types = [] -- cgit 1.4.1 From 3abab26458cc9fe8a77d5ccee664e87ce407ed58 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 13 May 2016 15:34:06 +0100 Subject: Add a slaved datastore for account data --- synapse/replication/slave/storage/account_data.py | 61 ++++++++++++++++++++++ .../replication/slave/storage/test_account_data.py | 56 ++++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 synapse/replication/slave/storage/account_data.py create mode 100644 tests/replication/slave/storage/test_account_data.py (limited to 'tests') diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py new file mode 100644 index 0000000000..f59b0eabbc --- /dev/null +++ b/synapse/replication/slave/storage/account_data.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker +from synapse.storage.account_data import AccountDataStore + + +class SlavedAccountDataStore(BaseSlavedStore): + + def __init__(self, db_conn, hs): + super(SlavedAccountDataStore, self).__init__(db_conn, hs) + self._account_data_id_gen = SlavedIdTracker( + db_conn, "account_data_max_stream_id", "stream_id", + ) + + get_global_account_data_by_type_for_users = ( + AccountDataStore.__dict__["get_global_account_data_by_type_for_users"] + ) + + get_global_account_data_by_type_for_user = ( + AccountDataStore.__dict__["get_global_account_data_by_type_for_user"] + ) + + def stream_positions(self): + result = super(SlavedAccountDataStore, self).stream_positions() + position = self._account_data_id_gen.get_current_token() + result["user_account_data"] = position + result["room_account_data"] = position + result["tag_account_data"] = position + return result + + def process_replication(self, result): + stream = result.get("user_account_data") + if stream: + self._account_data_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + user_id, data_type = row[1:3] + self.get_global_account_data_by_type_for_user.invalidate( + (data_type, user_id,) + ) + + stream = result.get("room_account_data") + if stream: + self._account_data_id_gen.advance(int(stream["position"])) + + stream = result.get("tag_account_data") + if stream: + self._account_data_id_gen.advance(int(stream["position"])) diff --git a/tests/replication/slave/storage/test_account_data.py b/tests/replication/slave/storage/test_account_data.py new file mode 100644 index 0000000000..da54d478ce --- /dev/null +++ b/tests/replication/slave/storage/test_account_data.py @@ -0,0 +1,56 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ._base import BaseSlavedStoreTestCase + +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore + +from twisted.internet import defer + +USER_ID = "@feeling:blue" +TYPE = "my.type" + + +class SlavedAccountDataStoreTestCase(BaseSlavedStoreTestCase): + + STORE_TYPE = SlavedAccountDataStore + + @defer.inlineCallbacks + def test_user_account_data(self): + yield self.master_store.add_account_data_for_user( + USER_ID, TYPE, {"a": 1} + ) + yield self.replicate() + yield self.check( + "get_global_account_data_by_type_for_user", + [TYPE, USER_ID], {"a": 1} + ) + yield self.check( + "get_global_account_data_by_type_for_users", + [TYPE, [USER_ID]], {USER_ID: {"a": 1}} + ) + + yield self.master_store.add_account_data_for_user( + USER_ID, TYPE, {"a": 2} + ) + yield self.replicate() + yield self.check( + "get_global_account_data_by_type_for_user", + [TYPE, USER_ID], {"a": 2} + ) + yield self.check( + "get_global_account_data_by_type_for_users", + [TYPE, [USER_ID]], {USER_ID: {"a": 2}} + ) -- cgit 1.4.1 From 3b86ecfa7965f4d29e0f5ce8fb663e5f018adf89 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 16 May 2016 18:56:37 +0100 Subject: Move the presence handler out of the Handlers object --- synapse/handlers/__init__.py | 2 -- synapse/handlers/events.py | 2 +- synapse/handlers/message.py | 4 ++-- synapse/handlers/presence.py | 2 +- synapse/handlers/sync.py | 2 +- synapse/replication/resource.py | 2 +- synapse/rest/client/v1/presence.py | 20 ++++++++++++++------ synapse/rest/client/v1/room.py | 2 +- synapse/rest/client/v2_alpha/receipts.py | 2 +- synapse/rest/client/v2_alpha/sync.py | 2 +- synapse/server.py | 5 +++++ tests/replication/test_resource.py | 2 +- 12 files changed, 29 insertions(+), 18 deletions(-) (limited to 'tests') diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index f4dbf47c1d..60e31b68ff 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -24,7 +24,6 @@ from .message import MessageHandler from .events import EventStreamHandler, EventHandler from .federation import FederationHandler from .profile import ProfileHandler -from .presence import PresenceHandler from .directory import DirectoryHandler from .typing import TypingNotificationHandler from .admin import AdminHandler @@ -53,7 +52,6 @@ class Handlers(object): self.event_handler = EventHandler(hs) self.federation_handler = FederationHandler(hs) self.profile_handler = ProfileHandler(hs) - self.presence_handler = PresenceHandler(hs) self.room_list_handler = RoomListHandler(hs) self.directory_handler = DirectoryHandler(hs) self.typing_notification_handler = TypingNotificationHandler(hs) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index f25a252523..3a3a1257d3 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -58,7 +58,7 @@ class EventStreamHandler(BaseHandler): If `only_keys` is not None, events from keys will be sent down. """ auth_user = UserID.from_string(auth_user_id) - presence_handler = self.hs.get_handlers().presence_handler + presence_handler = self.hs.get_presence_handler() context = yield presence_handler.user_syncing( auth_user_id, affect_presence=affect_presence, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 13154edb78..c4e38d0faa 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -236,7 +236,7 @@ class MessageHandler(BaseHandler): ) if event.type == EventTypes.Message: - presence = self.hs.get_handlers().presence_handler + presence = self.hs.get_presence_handler() yield presence.bump_presence_active_time(user) def deduplicate_state_event(self, event, context): @@ -674,7 +674,7 @@ class MessageHandler(BaseHandler): and m.content["membership"] == Membership.JOIN ] - presence_handler = self.hs.get_handlers().presence_handler + presence_handler = self.hs.get_presence_handler() @defer.inlineCallbacks def get_presence(): diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index a8529cce42..8aaaec7030 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -860,7 +860,7 @@ class PresenceEventSource(object): from_key = int(from_key) room_ids = room_ids or [] - presence = self.hs.get_handlers().presence_handler + presence = self.hs.get_presence_handler() stream_change_cache = self.store.presence_stream_cache if not room_ids: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 921215469f..b30102f472 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -639,7 +639,7 @@ class SyncHandler(BaseHandler): # For each newly joined room, we want to send down presence of # existing users. - presence_handler = self.hs.get_handlers().presence_handler + presence_handler = self.hs.get_presence_handler() extra_presence_users = set() for room_id in newly_joined_rooms: users = yield self.store.get_users_in_room(event.room_id) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 0e983ae7fa..b0e7a17670 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -109,7 +109,7 @@ class ReplicationResource(Resource): self.version_string = hs.version_string self.store = hs.get_datastore() self.sources = hs.get_event_sources() - self.presence_handler = hs.get_handlers().presence_handler + self.presence_handler = hs.get_presence_handler() self.typing_handler = hs.get_handlers().typing_notification_handler self.notifier = hs.notifier self.clock = hs.get_clock() diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py index 27d9ed586b..eafdce865e 100644 --- a/synapse/rest/client/v1/presence.py +++ b/synapse/rest/client/v1/presence.py @@ -30,20 +30,24 @@ logger = logging.getLogger(__name__) class PresenceStatusRestServlet(ClientV1RestServlet): PATTERNS = client_path_patterns("/presence/(?P[^/]*)/status") + def __init__(self, hs): + super(PresenceStatusRestServlet, self).__init__(hs) + self.presence_handler = hs.get_presence_handler() + @defer.inlineCallbacks def on_GET(self, request, user_id): requester = yield self.auth.get_user_by_req(request) user = UserID.from_string(user_id) if requester.user != user: - allowed = yield self.handlers.presence_handler.is_visible( + allowed = yield self.presence_handler.is_visible( observed_user=user, observer_user=requester.user, ) if not allowed: raise AuthError(403, "You are not allowed to see their presence.") - state = yield self.handlers.presence_handler.get_state(target_user=user) + state = yield self.presence_handler.get_state(target_user=user) defer.returnValue((200, state)) @@ -74,7 +78,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet): except: raise SynapseError(400, "Unable to parse state") - yield self.handlers.presence_handler.set_state(user, state) + yield self.presence_handler.set_state(user, state) defer.returnValue((200, {})) @@ -85,6 +89,10 @@ class PresenceStatusRestServlet(ClientV1RestServlet): class PresenceListRestServlet(ClientV1RestServlet): PATTERNS = client_path_patterns("/presence/list/(?P[^/]*)") + def __init__(self, hs): + super(PresenceListRestServlet, self).__init__(hs) + self.presence_handler = hs.get_presence_handler() + @defer.inlineCallbacks def on_GET(self, request, user_id): requester = yield self.auth.get_user_by_req(request) @@ -96,7 +104,7 @@ class PresenceListRestServlet(ClientV1RestServlet): if requester.user != user: raise SynapseError(400, "Cannot get another user's presence list") - presence = yield self.handlers.presence_handler.get_presence_list( + presence = yield self.presence_handler.get_presence_list( observer_user=user, accepted=True ) @@ -123,7 +131,7 @@ class PresenceListRestServlet(ClientV1RestServlet): if len(u) == 0: continue invited_user = UserID.from_string(u) - yield self.handlers.presence_handler.send_presence_invite( + yield self.presence_handler.send_presence_invite( observer_user=user, observed_user=invited_user ) @@ -134,7 +142,7 @@ class PresenceListRestServlet(ClientV1RestServlet): if len(u) == 0: continue dropped_user = UserID.from_string(u) - yield self.handlers.presence_handler.drop( + yield self.presence_handler.drop( observer_user=user, observed_user=dropped_user ) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index b223fb7e5f..9c89442ce6 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -570,7 +570,7 @@ class RoomTypingRestServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomTypingRestServlet, self).__init__(hs) - self.presence_handler = hs.get_handlers().presence_handler + self.presence_handler = hs.get_presence_handler() @defer.inlineCallbacks def on_PUT(self, request, room_id, user_id): diff --git a/synapse/rest/client/v2_alpha/receipts.py b/synapse/rest/client/v2_alpha/receipts.py index b831d8c95e..891cef99c6 100644 --- a/synapse/rest/client/v2_alpha/receipts.py +++ b/synapse/rest/client/v2_alpha/receipts.py @@ -37,7 +37,7 @@ class ReceiptRestServlet(RestServlet): self.hs = hs self.auth = hs.get_auth() self.receipts_handler = hs.get_handlers().receipts_handler - self.presence_handler = hs.get_handlers().presence_handler + self.presence_handler = hs.get_presence_handler() @defer.inlineCallbacks def on_POST(self, request, room_id, receipt_type, event_id): diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 60d3dc4030..812abe22b1 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -83,7 +83,7 @@ class SyncRestServlet(RestServlet): self.sync_handler = hs.get_handlers().sync_handler self.clock = hs.get_clock() self.filtering = hs.get_filtering() - self.presence_handler = hs.get_handlers().presence_handler + self.presence_handler = hs.get_presence_handler() @defer.inlineCallbacks def on_GET(self, request): diff --git a/synapse/server.py b/synapse/server.py index ee138de756..6d01b68bd4 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -27,6 +27,7 @@ from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFa from synapse.notifier import Notifier from synapse.api.auth import Auth from synapse.handlers import Handlers +from synapse.handlers.presence import PresenceHandler from synapse.state import StateHandler from synapse.storage import DataStore from synapse.util import Clock @@ -78,6 +79,7 @@ class HomeServer(object): 'auth', 'rest_servlet_factory', 'state_handler', + 'presence_handler', 'notifier', 'distributor', 'client_resource', @@ -164,6 +166,9 @@ class HomeServer(object): def build_state_handler(self): return StateHandler(self) + def build_presence_handler(self): + return PresenceHandler(self) + def build_event_sources(self): return EventSources(self) diff --git a/tests/replication/test_resource.py b/tests/replication/test_resource.py index b1dd7b4a74..1258aaacb1 100644 --- a/tests/replication/test_resource.py +++ b/tests/replication/test_resource.py @@ -78,7 +78,7 @@ class ReplicationResourceCase(unittest.TestCase): @defer.inlineCallbacks def test_presence(self): get = self.get(presence="-1") - yield self.hs.get_handlers().presence_handler.set_state( + yield self.hs.get_presence_handler().set_state( self.user, {"presence": "online"} ) code, body = yield get -- cgit 1.4.1 From 821306120a846204e3fcd0cc93ab6df2f6d05f43 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 16 May 2016 19:48:07 +0100 Subject: Replaces calls to fetch_room_distributions_into with get_joined_hosts_for_room --- synapse/handlers/receipts.py | 11 ++++----- synapse/handlers/room_member.py | 29 ---------------------- synapse/handlers/typing.py | 54 +++++++++++++++++------------------------ tests/handlers/test_typing.py | 49 ++++--------------------------------- tests/utils.py | 2 +- 5 files changed, 33 insertions(+), 112 deletions(-) (limited to 'tests') diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index a390a1b8bd..e62722d78d 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -29,6 +29,8 @@ class ReceiptsHandler(BaseHandler): def __init__(self, hs): super(ReceiptsHandler, self).__init__(hs) + self.server_name = hs.config.server_name + self.store = hs.get_datastore() self.hs = hs self.federation = hs.get_replication_layer() self.federation.register_edu_handler( @@ -131,12 +133,9 @@ class ReceiptsHandler(BaseHandler): event_ids = receipt["event_ids"] data = receipt["data"] - remotedomains = set() - - rm_handler = self.hs.get_handlers().room_member_handler - yield rm_handler.fetch_room_distributions_into( - room_id, localusers=None, remotedomains=remotedomains - ) + remotedomains = yield self.store.get_joined_hosts_for_room(room_id) + remotedomains = remotedomains.copy() + remotedomains.discard(self.server_name) logger.debug("Sending receipt to: %r", remotedomains) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index b44e52a515..b785a8fa97 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -55,35 +55,6 @@ class RoomMemberHandler(BaseHandler): self.distributor.declare("user_joined_room") self.distributor.declare("user_left_room") - @defer.inlineCallbacks - def get_room_members(self, room_id): - users = yield self.store.get_users_in_room(room_id) - - defer.returnValue([UserID.from_string(u) for u in users]) - - @defer.inlineCallbacks - def fetch_room_distributions_into(self, room_id, localusers=None, - remotedomains=None, ignore_user=None): - """Fetch the distribution of a room, adding elements to either - 'localusers' or 'remotedomains', which should be a set() if supplied. - If ignore_user is set, ignore that user. - - This function returns nothing; its result is performed by the - side-effect on the two passed sets. This allows easy accumulation of - member lists of multiple rooms at once if required. - """ - members = yield self.get_room_members(room_id) - for member in members: - if ignore_user is not None and member == ignore_user: - continue - - if self.hs.is_mine(member): - if localusers is not None: - localusers.add(member) - else: - if remotedomains is not None: - remotedomains.add(member.domain) - @defer.inlineCallbacks def _local_membership_update( self, requester, target, room_id, membership, diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 8ce27f49ec..92eff534d6 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -39,7 +39,8 @@ class TypingNotificationHandler(BaseHandler): def __init__(self, hs): super(TypingNotificationHandler, self).__init__(hs) - self.homeserver = hs + self.store = hs.get_datastore() + self.server_name = hs.config.server_name self.clock = hs.get_clock() @@ -157,32 +158,26 @@ class TypingNotificationHandler(BaseHandler): @defer.inlineCallbacks def _push_update(self, room_id, user, typing): - localusers = set() - remotedomains = set() - - rm_handler = self.homeserver.get_handlers().room_member_handler - yield rm_handler.fetch_room_distributions_into( - room_id, localusers=localusers, remotedomains=remotedomains - ) - - if localusers: - self._push_update_local( - room_id=room_id, - user=user, - typing=typing - ) + domains = yield self.store.get_joined_hosts_for_room(room_id) deferreds = [] - for domain in remotedomains: - deferreds.append(self.federation.send_edu( - destination=domain, - edu_type="m.typing", - content={ - "room_id": room_id, - "user_id": user.to_string(), - "typing": typing, - }, - )) + for domain in domains: + if domain == self.server_name: + self._push_update_local( + room_id=room_id, + user=user, + typing=typing + ) + else: + deferreds.append(self.federation.send_edu( + destination=domain, + edu_type="m.typing", + content={ + "room_id": room_id, + "user_id": user.to_string(), + "typing": typing, + }, + )) yield defer.DeferredList(deferreds, consumeErrors=True) @@ -191,14 +186,9 @@ class TypingNotificationHandler(BaseHandler): room_id = content["room_id"] user = UserID.from_string(content["user_id"]) - localusers = set() - - rm_handler = self.homeserver.get_handlers().room_member_handler - yield rm_handler.fetch_room_distributions_into( - room_id, localusers=localusers - ) + domains = yield self.store.get_joined_hosts_for_room(room_id) - if localusers: + if self.server_name in domains: self._push_update_local( room_id=room_id, user=user, diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 3955e7f5b1..d38ca37d63 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -71,6 +71,7 @@ class TypingNotificationsTestCase(unittest.TestCase): self.auth = Mock(spec=[]) hs = yield setup_test_homeserver( + "test", auth=self.auth, clock=self.clock, datastore=Mock(spec=[ @@ -110,56 +111,16 @@ class TypingNotificationsTestCase(unittest.TestCase): self.room_id = "a-room" - # Mock the RoomMemberHandler - hs.handlers.room_member_handler = Mock(spec=[]) - self.room_member_handler = hs.handlers.room_member_handler - self.room_members = [] - def get_rooms_for_user(user): - if user in self.room_members: - return defer.succeed([self.room_id]) - else: - return defer.succeed([]) - self.room_member_handler.get_rooms_for_user = get_rooms_for_user - - def get_room_members(room_id): - if room_id == self.room_id: - return defer.succeed(self.room_members) - else: - return defer.succeed([]) - self.room_member_handler.get_room_members = get_room_members - - def get_joined_rooms_for_user(user): - if user in self.room_members: - return defer.succeed([self.room_id]) - else: - return defer.succeed([]) - self.room_member_handler.get_joined_rooms_for_user = get_joined_rooms_for_user - - @defer.inlineCallbacks - def fetch_room_distributions_into( - room_id, localusers=None, remotedomains=None, ignore_user=None - ): - members = yield get_room_members(room_id) - for member in members: - if ignore_user is not None and member == ignore_user: - continue - - if hs.is_mine(member): - if localusers is not None: - localusers.add(member) - else: - if remotedomains is not None: - remotedomains.add(member.domain) - self.room_member_handler.fetch_room_distributions_into = ( - fetch_room_distributions_into - ) - def check_joined_room(room_id, user_id): if user_id not in [u.to_string() for u in self.room_members]: raise AuthError(401, "User is not in the room") + def get_joined_hosts_for_room(room_id): + return set(member.domain for member in self.room_members) + self.datastore.get_joined_hosts_for_room = get_joined_hosts_for_room + self.auth.check_joined_room = check_joined_room # Some local users to test with diff --git a/tests/utils.py b/tests/utils.py index 9d7978a642..59d985b5f2 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -50,7 +50,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): config.enable_registration = True config.macaroon_secret_key = "not even a little secret" config.expire_access_token = False - config.server_name = "server.under.test" + config.server_name = name config.trusted_third_party_id_servers = [] config.room_invite_state_types = [] -- cgit 1.4.1 From 816df9f2678d35c33922f4dcdc343a848b16f082 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 16 May 2016 19:51:43 +0100 Subject: get_room_members is unused now --- synapse/storage/roommember.py | 18 ------------------ tests/storage/test_roommember.py | 18 ------------------ 2 files changed, 36 deletions(-) (limited to 'tests') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 223258785d..face685ed2 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -137,24 +137,6 @@ class RoomMemberStore(SQLBaseStore): return [r["user_id"] for r in rows] return self.runInteraction("get_users_in_room", f) - def get_room_members(self, room_id, membership=None): - """Retrieve the current room member list for a room. - - Args: - room_id (str): The room to get the list of members. - membership (synapse.api.constants.Membership): The filter to apply - to this list, or None to return all members with some state - associated with this room. - Returns: - list of namedtuples representing the members in this room. - """ - return self.runInteraction( - "get_room_members", - self._get_members_events_txn, - room_id, - membership=membership, - ).addCallback(self._get_events) - @cached() def get_invited_rooms_for_user(self, user_id): """ Get all the rooms the user is invited to diff --git a/tests/storage/test_roommember.py b/tests/storage/test_roommember.py index 997090fe35..27b2b3d123 100644 --- a/tests/storage/test_roommember.py +++ b/tests/storage/test_roommember.py @@ -70,12 +70,6 @@ class RoomMemberStoreTestCase(unittest.TestCase): def test_one_member(self): yield self.inject_room_member(self.room, self.u_alice, Membership.JOIN) - self.assertEquals( - [self.u_alice.to_string()], - [m.user_id for m in ( - yield self.store.get_room_members(self.room.to_string()) - )] - ) self.assertEquals( [self.room.to_string()], [m.room_id for m in ( @@ -85,18 +79,6 @@ class RoomMemberStoreTestCase(unittest.TestCase): )] ) - @defer.inlineCallbacks - def test_two_members(self): - yield self.inject_room_member(self.room, self.u_alice, Membership.JOIN) - yield self.inject_room_member(self.room, self.u_bob, Membership.JOIN) - - self.assertEquals( - {self.u_alice.to_string(), self.u_bob.to_string()}, - {m.user_id for m in ( - yield self.store.get_room_members(self.room.to_string()) - )} - ) - @defer.inlineCallbacks def test_room_hosts(self): yield self.inject_room_member(self.room, self.u_alice, Membership.JOIN) -- cgit 1.4.1 From 0cb441fedd77b42f307745a441b804fee6386cb5 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 17 May 2016 15:58:46 +0100 Subject: Move typing handler out of the Handlers object --- synapse/handlers/__init__.py | 2 -- synapse/handlers/typing.py | 33 +++++++++++++++------------------ synapse/replication/resource.py | 2 +- synapse/rest/client/v1/room.py | 7 +++---- synapse/server.py | 5 +++++ tests/handlers/test_typing.py | 10 +--------- tests/replication/test_resource.py | 2 +- tests/rest/client/v1/test_typing.py | 2 +- 8 files changed, 27 insertions(+), 36 deletions(-) (limited to 'tests') diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index e1fc9a58ad..9442ae6f1d 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -25,7 +25,6 @@ from .events import EventStreamHandler, EventHandler from .federation import FederationHandler from .profile import ProfileHandler from .directory import DirectoryHandler -from .typing import TypingNotificationHandler from .admin import AdminHandler from .appservice import ApplicationServicesHandler from .auth import AuthHandler @@ -53,7 +52,6 @@ class Handlers(object): self.profile_handler = ProfileHandler(hs) self.room_list_handler = RoomListHandler(hs) self.directory_handler = DirectoryHandler(hs) - self.typing_notification_handler = TypingNotificationHandler(hs) self.admin_handler = AdminHandler(hs) self.receipts_handler = ReceiptsHandler(hs) asapi = ApplicationServiceApi(hs) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index fca8d25c3f..d46f05f426 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -15,8 +15,6 @@ from twisted.internet import defer -from ._base import BaseHandler - from synapse.api.errors import SynapseError, AuthError from synapse.util.logcontext import PreserveLoggingContext from synapse.util.metrics import Measure @@ -35,12 +33,13 @@ logger = logging.getLogger(__name__) RoomMember = namedtuple("RoomMember", ("room_id", "user")) -class TypingNotificationHandler(BaseHandler): +class TypingHandler(object): def __init__(self, hs): - super(TypingNotificationHandler, self).__init__(hs) - self.store = hs.get_datastore() self.server_name = hs.config.server_name + self.auth = hs.get_auth() + self.is_mine = hs.is_mine + self.notifier = hs.get_notifier() self.clock = hs.get_clock() @@ -68,7 +67,7 @@ class TypingNotificationHandler(BaseHandler): @defer.inlineCallbacks def started_typing(self, target_user, auth_user, room_id, timeout): - if not self.hs.is_mine(target_user): + if not self.is_mine(target_user): raise SynapseError(400, "User is not hosted on this Home Server") if target_user != auth_user: @@ -111,7 +110,7 @@ class TypingNotificationHandler(BaseHandler): @defer.inlineCallbacks def stopped_typing(self, target_user, auth_user, room_id): - if not self.hs.is_mine(target_user): + if not self.is_mine(target_user): raise SynapseError(400, "User is not hosted on this Home Server") if target_user != auth_user: @@ -133,7 +132,7 @@ class TypingNotificationHandler(BaseHandler): @defer.inlineCallbacks def user_left_room(self, user, room_id): - if self.hs.is_mine(user): + if self.is_mine(user): member = RoomMember(room_id=room_id, user=user) yield self._stopped_typing(member) @@ -228,16 +227,14 @@ class TypingNotificationEventSource(object): def __init__(self, hs): self.hs = hs self.clock = hs.get_clock() - self._handler = None - - def handler(self): - # Avoid cyclic dependency in handler setup - if not self._handler: - self._handler = self.hs.get_handlers().typing_notification_handler - return self._handler + # We can't call get_typing_handler here because there's a cycle: + # + # Typing -> Notifier -> TypingNotificationEventSource -> Typing + # + self.get_typing_handler = hs.get_typing_handler def _make_event_for(self, room_id): - typing = self.handler()._room_typing[room_id] + typing = self.get_typing_handler()._room_typing[room_id] return { "type": "m.typing", "room_id": room_id, @@ -249,7 +246,7 @@ class TypingNotificationEventSource(object): def get_new_events(self, from_key, room_ids, **kwargs): with Measure(self.clock, "typing.get_new_events"): from_key = int(from_key) - handler = self.handler() + handler = self.get_typing_handler() events = [] for room_id in room_ids: @@ -263,7 +260,7 @@ class TypingNotificationEventSource(object): return events, handler._latest_room_serial def get_current_key(self): - return self.handler()._latest_room_serial + return self.get_typing_handler()._latest_room_serial def get_pagination_rows(self, user, pagination_config, key): return ([], pagination_config.from_key) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index b0e7a17670..847f212a3d 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -110,7 +110,7 @@ class ReplicationResource(Resource): self.store = hs.get_datastore() self.sources = hs.get_event_sources() self.presence_handler = hs.get_presence_handler() - self.typing_handler = hs.get_handlers().typing_notification_handler + self.typing_handler = hs.get_typing_handler() self.notifier = hs.notifier self.clock = hs.get_clock() diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 9c89442ce6..cf478c6f79 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -571,6 +571,7 @@ class RoomTypingRestServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomTypingRestServlet, self).__init__(hs) self.presence_handler = hs.get_presence_handler() + self.typing_handler = hs.get_typing_handler() @defer.inlineCallbacks def on_PUT(self, request, room_id, user_id): @@ -581,19 +582,17 @@ class RoomTypingRestServlet(ClientV1RestServlet): content = parse_json_object_from_request(request) - typing_handler = self.handlers.typing_notification_handler - yield self.presence_handler.bump_presence_active_time(requester.user) if content["typing"]: - yield typing_handler.started_typing( + yield self.typing_handler.started_typing( target_user=target_user, auth_user=requester.user, room_id=room_id, timeout=content.get("timeout", 30000), ) else: - yield typing_handler.stopped_typing( + yield self.typing_handler.stopped_typing( target_user=target_user, auth_user=requester.user, room_id=room_id, diff --git a/synapse/server.py b/synapse/server.py index 785a087452..01f828819f 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -29,6 +29,7 @@ from synapse.api.auth import Auth from synapse.handlers import Handlers from synapse.handlers.presence import PresenceHandler from synapse.handlers.sync import SyncHandler +from synapse.handlers.typing import TypingHandler from synapse.state import StateHandler from synapse.storage import DataStore from synapse.util import Clock @@ -82,6 +83,7 @@ class HomeServer(object): 'state_handler', 'presence_handler', 'sync_handler', + 'typing_handler', 'notifier', 'distributor', 'client_resource', @@ -171,6 +173,9 @@ class HomeServer(object): def build_presence_handler(self): return PresenceHandler(self) + def build_typing_handler(self): + return TypingHandler(self) + def build_sync_handler(self): return SyncHandler(self) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index d38ca37d63..abb739ae52 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -25,8 +25,6 @@ from ..utils import ( ) from synapse.api.errors import AuthError -from synapse.handlers.typing import TypingNotificationHandler - from synapse.types import UserID @@ -49,11 +47,6 @@ def _make_edu_json(origin, edu_type, content): return json.dumps(_expect_edu("test", edu_type, content, origin=origin)) -class JustTypingNotificationHandlers(object): - def __init__(self, hs): - self.typing_notification_handler = TypingNotificationHandler(hs) - - class TypingNotificationsTestCase(unittest.TestCase): """Tests typing notifications to rooms.""" @defer.inlineCallbacks @@ -89,9 +82,8 @@ class TypingNotificationsTestCase(unittest.TestCase): http_client=self.mock_http_client, keyring=Mock(), ) - hs.handlers = JustTypingNotificationHandlers(hs) - self.handler = hs.get_handlers().typing_notification_handler + self.handler = hs.get_typing_handler() self.event_source = hs.get_event_sources().sources["typing"] diff --git a/tests/replication/test_resource.py b/tests/replication/test_resource.py index 1258aaacb1..842e3d29d7 100644 --- a/tests/replication/test_resource.py +++ b/tests/replication/test_resource.py @@ -93,7 +93,7 @@ class ReplicationResourceCase(unittest.TestCase): def test_typing(self): room_id = yield self.create_room() get = self.get(typing="-1") - yield self.hs.get_handlers().typing_notification_handler.started_typing( + yield self.hs.get_typing_handler().started_typing( self.user, self.user, room_id, timeout=2 ) code, body = yield get diff --git a/tests/rest/client/v1/test_typing.py b/tests/rest/client/v1/test_typing.py index d0037a53ef..467f253ef6 100644 --- a/tests/rest/client/v1/test_typing.py +++ b/tests/rest/client/v1/test_typing.py @@ -106,7 +106,7 @@ class RoomTypingTestCase(RestTestCase): yield self.join(self.room_id, user="@jim:red") def tearDown(self): - self.hs.get_handlers().typing_notification_handler.tearDown() + self.hs.get_typing_handler().tearDown() @defer.inlineCallbacks def test_set_typing(self): -- cgit 1.4.1 From 6fe04ffef2fd44a3af61fe262266b25158c46db7 Mon Sep 17 00:00:00 2001 From: Negi Fazeli Date: Mon, 23 May 2016 11:14:11 +0200 Subject: Fix set profile error with Requester. Replace flush_user with delete access token due to function removal Add a new test case for if the user is already registered --- synapse/handlers/register.py | 9 +++++---- tests/handlers/test_register.py | 34 +++++++++++++++++++++++++--------- 2 files changed, 30 insertions(+), 13 deletions(-) (limited to 'tests') diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 5883b9111e..16f33f8371 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -16,7 +16,7 @@ """Contains functions for registering clients.""" from twisted.internet import defer -from synapse.types import UserID +from synapse.types import UserID, Requester from synapse.api.errors import ( AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError ) @@ -360,7 +360,8 @@ class RegistrationHandler(BaseHandler): @defer.inlineCallbacks def get_or_create_user(self, localpart, displayname, duration_seconds): - """Creates a new user or returns an access token for an existing one + """Creates a new user if the user does not exist, + else revokes all previous access tokens and generates a new one. Args: localpart : The local part of the user ID to register. If None, @@ -399,14 +400,14 @@ class RegistrationHandler(BaseHandler): yield registered_user(self.distributor, user) else: - yield self.store.flush_user(user_id=user_id) + yield self.store.user_delete_access_tokens(user_id=user_id) yield self.store.add_access_token_to_user(user_id=user_id, token=token) if displayname is not None: logger.info("setting user display name: %s -> %s", user_id, displayname) profile_handler = self.hs.get_handlers().profile_handler yield profile_handler.set_displayname( - user, user, displayname + user, Requester(user, token, False), displayname ) defer.returnValue((user_id, token)) diff --git a/tests/handlers/test_register.py b/tests/handlers/test_register.py index 8b7be96bd9..9d5c653b45 100644 --- a/tests/handlers/test_register.py +++ b/tests/handlers/test_register.py @@ -17,6 +17,7 @@ from twisted.internet import defer from .. import unittest from synapse.handlers.register import RegistrationHandler +from synapse.types import UserID from tests.utils import setup_test_homeserver @@ -36,25 +37,21 @@ class RegistrationTestCase(unittest.TestCase): self.mock_distributor = Mock() self.mock_distributor.declare("registered_user") self.mock_captcha_client = Mock() - hs = yield setup_test_homeserver( + self.hs = yield setup_test_homeserver( handlers=None, http_client=None, expire_access_token=True) - hs.handlers = RegistrationHandlers(hs) - self.handler = hs.get_handlers().registration_handler - hs.get_handlers().profile_handler = Mock() + self.hs.handlers = RegistrationHandlers(self.hs) + self.handler = self.hs.get_handlers().registration_handler + self.hs.get_handlers().profile_handler = Mock() self.mock_handler = Mock(spec=[ "generate_short_term_login_token", ]) - hs.get_handlers().auth_handler = self.mock_handler + self.hs.get_handlers().auth_handler = self.mock_handler @defer.inlineCallbacks def test_user_is_created_and_logged_in_if_doesnt_exist(self): - """ - Returns: - The user doess not exist in this case so it will register and log it in - """ duration_ms = 200 local_part = "someone" display_name = "someone" @@ -65,3 +62,22 @@ class RegistrationTestCase(unittest.TestCase): local_part, display_name, duration_ms) self.assertEquals(result_user_id, user_id) self.assertEquals(result_token, 'secret') + + @defer.inlineCallbacks + def test_if_user_exists(self): + store = self.hs.get_datastore() + frank = UserID.from_string("@frank:test") + yield store.register( + user_id=frank.to_string(), + token="jkv;g498752-43gj['eamb!-5", + password_hash=None) + duration_ms = 200 + local_part = "frank" + display_name = "Frank" + user_id = "@frank:test" + mock_token = self.mock_handler.generate_short_term_login_token + mock_token.return_value = 'secret' + result_user_id, result_token = yield self.handler.get_or_create_user( + local_part, display_name, duration_ms) + self.assertEquals(result_user_id, user_id) + self.assertEquals(result_token, 'secret') -- cgit 1.4.1 From c626fc576a87f401c594cbb070d5b0000e45b4e1 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 31 May 2016 13:53:48 +0100 Subject: Move the AS handler out of the Handlers object. Access it directly from the homeserver itself. It already wasn't inheriting from BaseHandler storing it on the Handlers object was already somewhat dubious. --- synapse/appservice/scheduler.py | 14 +++++++------- synapse/handlers/__init__.py | 11 ----------- synapse/handlers/appservice.py | 15 +++++---------- synapse/handlers/directory.py | 3 ++- synapse/notifier.py | 10 ++++------ synapse/server.py | 15 +++++++++++++++ tests/handlers/test_appservice.py | 6 +++--- 7 files changed, 36 insertions(+), 38 deletions(-) (limited to 'tests') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 47a4e9f864..9afc8fd754 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -56,22 +56,22 @@ import logging logger = logging.getLogger(__name__) -class AppServiceScheduler(object): +class ApplicationServiceScheduler(object): """ Public facing API for this module. Does the required DI to tie the components together. This also serves as the "event_pool", which in this case is a simple array. """ - def __init__(self, clock, store, as_api): - self.clock = clock - self.store = store - self.as_api = as_api + def __init__(self, hs): + self.clock = hs.get_clock() + self.store = hs.get_datastore() + self.as_api = hs.get_application_service_api() def create_recoverer(service, callback): - return _Recoverer(clock, store, as_api, service, callback) + return _Recoverer(self.clock, self.store, self.as_api, service, callback) self.txn_ctrl = _TransactionController( - clock, store, as_api, create_recoverer + self.clock, self.store, self.as_api, create_recoverer ) self.queuer = _ServiceQueuer(self.txn_ctrl) diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 0ac5d3da3a..c0069e23d6 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.appservice.scheduler import AppServiceScheduler -from synapse.appservice.api import ApplicationServiceApi from .register import RegistrationHandler from .room import ( RoomCreationHandler, RoomContextHandler, @@ -26,7 +24,6 @@ from .federation import FederationHandler from .profile import ProfileHandler from .directory import DirectoryHandler from .admin import AdminHandler -from .appservice import ApplicationServicesHandler from .auth import AuthHandler from .identity import IdentityHandler from .receipts import ReceiptsHandler @@ -53,14 +50,6 @@ class Handlers(object): self.directory_handler = DirectoryHandler(hs) self.admin_handler = AdminHandler(hs) self.receipts_handler = ReceiptsHandler(hs) - asapi = ApplicationServiceApi(hs) - self.appservice_handler = ApplicationServicesHandler( - hs, asapi, AppServiceScheduler( - clock=hs.get_clock(), - store=hs.get_datastore(), - as_api=asapi - ) - ) self.auth_handler = AuthHandler(hs) self.identity_handler = IdentityHandler(hs) self.search_handler = SearchHandler(hs) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 75fc74c797..051ccdb380 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -17,7 +17,6 @@ from twisted.internet import defer from synapse.api.constants import EventTypes from synapse.appservice import ApplicationService -from synapse.types import UserID import logging @@ -35,16 +34,13 @@ def log_failure(failure): ) -# NB: Purposefully not inheriting BaseHandler since that contains way too much -# setup code which this handler does not need or use. This makes testing a lot -# easier. class ApplicationServicesHandler(object): - def __init__(self, hs, appservice_api, appservice_scheduler): + def __init__(self, hs): self.store = hs.get_datastore() - self.hs = hs - self.appservice_api = appservice_api - self.scheduler = appservice_scheduler + self.is_mine_id = hs.is_mine_id + self.appservice_api = hs.get_application_service_api() + self.scheduler = hs.get_application_service_scheduler() self.started_scheduler = False @defer.inlineCallbacks @@ -169,8 +165,7 @@ class ApplicationServicesHandler(object): @defer.inlineCallbacks def _is_unknown_user(self, user_id): - user = UserID.from_string(user_id) - if not self.hs.is_mine(user): + if not self.is_mine_id(user_id): # we don't know if they are unknown or not since it isn't one of our # users. We can't poke ASes. defer.returnValue(False) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 8eeb225811..4bea7f2b19 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -33,6 +33,7 @@ class DirectoryHandler(BaseHandler): super(DirectoryHandler, self).__init__(hs) self.state = hs.get_state_handler() + self.appservice_handler = hs.get_application_service_handler() self.federation = hs.get_replication_layer() self.federation.register_query_handler( @@ -281,7 +282,7 @@ class DirectoryHandler(BaseHandler): ) if not result: # Query AS to see if it exists - as_handler = self.hs.get_handlers().appservice_handler + as_handler = self.appservice_handler result = yield as_handler.query_room_alias_exists(room_alias) defer.returnValue(result) diff --git a/synapse/notifier.py b/synapse/notifier.py index 33b79c0ec7..cbec4d30ae 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -140,8 +140,6 @@ class Notifier(object): UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000 def __init__(self, hs): - self.hs = hs - self.user_to_user_stream = {} self.room_to_user_streams = {} self.appservice_to_user_streams = {} @@ -151,6 +149,8 @@ class Notifier(object): self.pending_new_room_events = [] self.clock = hs.get_clock() + self.appservice_handler = hs.get_application_service_handler() + self.state_handler = hs.get_state_handler() hs.get_distributor().observe( "user_joined_room", self._user_joined_room @@ -232,9 +232,7 @@ class Notifier(object): def _on_new_room_event(self, event, room_stream_id, extra_users=[]): """Notify any user streams that are interested in this room event""" # poke any interested application service. - self.hs.get_handlers().appservice_handler.notify_interested_services( - event - ) + self.appservice_handler.notify_interested_services(event) app_streams = set() @@ -449,7 +447,7 @@ class Notifier(object): @defer.inlineCallbacks def _is_world_readable(self, room_id): - state = yield self.hs.get_state_handler().get_current_state( + state = yield self.state_handler.get_current_state( room_id, EventTypes.RoomHistoryVisibility ) diff --git a/synapse/server.py b/synapse/server.py index bfd5608b7d..7cf22b1eea 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -22,6 +22,8 @@ from twisted.web.client import BrowserLikePolicyForHTTPS from twisted.enterprise import adbapi +from synapse.appservice.scheduler import ApplicationServiceScheduler +from synapse.appservice.api import ApplicationServiceApi from synapse.federation import initialize_http_replication from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory from synapse.notifier import Notifier @@ -31,6 +33,7 @@ from synapse.handlers.presence import PresenceHandler from synapse.handlers.sync import SyncHandler from synapse.handlers.typing import TypingHandler from synapse.handlers.room import RoomListHandler +from synapse.handlers.appservice import ApplicationServicesHandler from synapse.state import StateHandler from synapse.storage import DataStore from synapse.util import Clock @@ -86,6 +89,9 @@ class HomeServer(object): 'sync_handler', 'typing_handler', 'room_list_handler', + 'application_service_api', + 'application_service_scheduler', + 'application_service_handler', 'notifier', 'distributor', 'client_resource', @@ -184,6 +190,15 @@ class HomeServer(object): def build_room_list_handler(self): return RoomListHandler(self) + def build_application_service_api(self): + return ApplicationServiceApi(self) + + def build_application_service_scheduler(self): + return ApplicationServiceScheduler(self) + + def build_application_service_handler(self): + return ApplicationServicesHandler(self) + def build_event_sources(self): return EventSources(self) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 7ddbbb9b4a..a884c95f8d 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -30,9 +30,9 @@ class AppServiceHandlerTestCase(unittest.TestCase): self.mock_scheduler = Mock() hs = Mock() hs.get_datastore = Mock(return_value=self.mock_store) - self.handler = ApplicationServicesHandler( - hs, self.mock_as_api, self.mock_scheduler - ) + hs.get_application_service_api = Mock(return_value=self.mock_as_api) + hs.get_application_service_scheduler = Mock(return_value=self.mock_scheduler) + self.handler = ApplicationServicesHandler(hs) @defer.inlineCallbacks def test_notify_interested_services(self): -- cgit 1.4.1 From 195254cae80f4748c3fc0ac3b46000047c2e6cc0 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 1 Jun 2016 11:14:16 +0100 Subject: Inject fake room list handler in tests Otherwise it tries to start the remote public room list updating looping call which breaks. --- tests/utils.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'tests') diff --git a/tests/utils.py b/tests/utils.py index 59d985b5f2..006abedbc1 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -67,6 +67,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): version_string="Synapse/tests", database_engine=create_engine(config.database_config), get_db_conn=db_pool.get_db_conn, + room_list_handler=object(), **kargs ) hs.setup() @@ -75,6 +76,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): name, db_pool=None, datastore=datastore, config=config, version_string="Synapse/tests", database_engine=create_engine(config.database_config), + room_list_handler=object(), **kargs ) -- cgit 1.4.1 From 4a10510cd5aff790127a185ecefc83b881a717cc Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 2 Jun 2016 13:31:45 +0100 Subject: Split out the auth handler --- synapse/handlers/__init__.py | 2 -- synapse/handlers/register.py | 2 +- synapse/rest/client/v1/login.py | 11 ++++++----- synapse/rest/client/v2_alpha/account.py | 4 ++-- synapse/rest/client/v2_alpha/auth.py | 2 +- synapse/rest/client/v2_alpha/register.py | 2 +- synapse/rest/client/v2_alpha/tokenrefresh.py | 2 +- synapse/server.py | 5 +++++ tests/rest/client/v2_alpha/test_register.py | 2 +- tests/utils.py | 15 +++++---------- 10 files changed, 23 insertions(+), 24 deletions(-) (limited to 'tests') diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index c0069e23d6..d28e07f0d9 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -24,7 +24,6 @@ from .federation import FederationHandler from .profile import ProfileHandler from .directory import DirectoryHandler from .admin import AdminHandler -from .auth import AuthHandler from .identity import IdentityHandler from .receipts import ReceiptsHandler from .search import SearchHandler @@ -50,7 +49,6 @@ class Handlers(object): self.directory_handler = DirectoryHandler(hs) self.admin_handler = AdminHandler(hs) self.receipts_handler = ReceiptsHandler(hs) - self.auth_handler = AuthHandler(hs) self.identity_handler = IdentityHandler(hs) self.search_handler = SearchHandler(hs) self.room_context_handler = RoomContextHandler(hs) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 16f33f8371..bbc07b045e 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -413,7 +413,7 @@ class RegistrationHandler(BaseHandler): defer.returnValue((user_id, token)) def auth_handler(self): - return self.hs.get_handlers().auth_handler + return self.hs.get_auth_handler() @defer.inlineCallbacks def guest_access_token_for(self, medium, address, inviter_user_id): diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index 3b5544851b..8df9d10efa 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -58,6 +58,7 @@ class LoginRestServlet(ClientV1RestServlet): self.cas_required_attributes = hs.config.cas_required_attributes self.servername = hs.config.server_name self.http_client = hs.get_simple_http_client() + self.auth_handler = self.hs.get_auth_handler() def on_GET(self, request): flows = [] @@ -143,7 +144,7 @@ class LoginRestServlet(ClientV1RestServlet): user_id, self.hs.hostname ).to_string() - auth_handler = self.handlers.auth_handler + auth_handler = self.auth_handler user_id, access_token, refresh_token = yield auth_handler.login_with_password( user_id=user_id, password=login_submission["password"]) @@ -160,7 +161,7 @@ class LoginRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def do_token_login(self, login_submission): token = login_submission['token'] - auth_handler = self.handlers.auth_handler + auth_handler = self.auth_handler user_id = ( yield auth_handler.validate_short_term_login_token_and_get_user_id(token) ) @@ -194,7 +195,7 @@ class LoginRestServlet(ClientV1RestServlet): raise LoginError(401, "Unauthorized", errcode=Codes.UNAUTHORIZED) user_id = UserID.create(user, self.hs.hostname).to_string() - auth_handler = self.handlers.auth_handler + auth_handler = self.auth_handler user_exists = yield auth_handler.does_user_exist(user_id) if user_exists: user_id, access_token, refresh_token = ( @@ -243,7 +244,7 @@ class LoginRestServlet(ClientV1RestServlet): raise LoginError(401, "Invalid JWT", errcode=Codes.UNAUTHORIZED) user_id = UserID.create(user, self.hs.hostname).to_string() - auth_handler = self.handlers.auth_handler + auth_handler = self.auth_handler user_exists = yield auth_handler.does_user_exist(user_id) if user_exists: user_id, access_token, refresh_token = ( @@ -412,7 +413,7 @@ class CasTicketServlet(ClientV1RestServlet): raise LoginError(401, "Unauthorized", errcode=Codes.UNAUTHORIZED) user_id = UserID.create(user, self.hs.hostname).to_string() - auth_handler = self.handlers.auth_handler + auth_handler = self.auth_handler user_exists = yield auth_handler.does_user_exist(user_id) if not user_exists: user_id, _ = ( diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index c88c270537..9a84873a5f 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -35,7 +35,7 @@ class PasswordRestServlet(RestServlet): super(PasswordRestServlet, self).__init__() self.hs = hs self.auth = hs.get_auth() - self.auth_handler = hs.get_handlers().auth_handler + self.auth_handler = hs.get_auth_handler() @defer.inlineCallbacks def on_POST(self, request): @@ -97,7 +97,7 @@ class ThreepidRestServlet(RestServlet): self.hs = hs self.identity_handler = hs.get_handlers().identity_handler self.auth = hs.get_auth() - self.auth_handler = hs.get_handlers().auth_handler + self.auth_handler = hs.get_auth_handler() @defer.inlineCallbacks def on_GET(self, request): diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py index 78181b7b18..58d3cad6a1 100644 --- a/synapse/rest/client/v2_alpha/auth.py +++ b/synapse/rest/client/v2_alpha/auth.py @@ -104,7 +104,7 @@ class AuthRestServlet(RestServlet): super(AuthRestServlet, self).__init__() self.hs = hs self.auth = hs.get_auth() - self.auth_handler = hs.get_handlers().auth_handler + self.auth_handler = hs.get_auth_handler() self.registration_handler = hs.get_handlers().registration_handler @defer.inlineCallbacks diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 1ecc02d94d..2088c316d1 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -49,7 +49,7 @@ class RegisterRestServlet(RestServlet): self.hs = hs self.auth = hs.get_auth() self.store = hs.get_datastore() - self.auth_handler = hs.get_handlers().auth_handler + self.auth_handler = hs.get_auth_handler() self.registration_handler = hs.get_handlers().registration_handler self.identity_handler = hs.get_handlers().identity_handler diff --git a/synapse/rest/client/v2_alpha/tokenrefresh.py b/synapse/rest/client/v2_alpha/tokenrefresh.py index a158c2209a..8270e8787f 100644 --- a/synapse/rest/client/v2_alpha/tokenrefresh.py +++ b/synapse/rest/client/v2_alpha/tokenrefresh.py @@ -38,7 +38,7 @@ class TokenRefreshRestServlet(RestServlet): body = parse_json_object_from_request(request) try: old_refresh_token = body["refresh_token"] - auth_handler = self.hs.get_handlers().auth_handler + auth_handler = self.hs.get_auth_handler() (user_id, new_refresh_token) = yield self.store.exchange_refresh_token( old_refresh_token, auth_handler.generate_refresh_token) new_access_token = yield auth_handler.issue_access_token(user_id) diff --git a/synapse/server.py b/synapse/server.py index 7cf22b1eea..dd4b81c658 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -33,6 +33,7 @@ from synapse.handlers.presence import PresenceHandler from synapse.handlers.sync import SyncHandler from synapse.handlers.typing import TypingHandler from synapse.handlers.room import RoomListHandler +from synapse.handlers.auth import AuthHandler from synapse.handlers.appservice import ApplicationServicesHandler from synapse.state import StateHandler from synapse.storage import DataStore @@ -89,6 +90,7 @@ class HomeServer(object): 'sync_handler', 'typing_handler', 'room_list_handler', + 'auth_handler', 'application_service_api', 'application_service_scheduler', 'application_service_handler', @@ -190,6 +192,9 @@ class HomeServer(object): def build_room_list_handler(self): return RoomListHandler(self) + def build_auth_handler(self): + return AuthHandler(self) + def build_application_service_api(self): return ApplicationServiceApi(self) diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py index affd42c015..cda0a2b27c 100644 --- a/tests/rest/client/v2_alpha/test_register.py +++ b/tests/rest/client/v2_alpha/test_register.py @@ -33,7 +33,6 @@ class RegisterRestServletTestCase(unittest.TestCase): # do the dance to hook it up to the hs global self.handlers = Mock( - auth_handler=self.auth_handler, registration_handler=self.registration_handler, identity_handler=self.identity_handler, login_handler=self.login_handler @@ -42,6 +41,7 @@ class RegisterRestServletTestCase(unittest.TestCase): self.hs.hostname = "superbig~testing~thing.com" self.hs.get_auth = Mock(return_value=self.auth) self.hs.get_handlers = Mock(return_value=self.handlers) + self.hs.get_auth_handler = Mock(return_value=self.auth_handler) self.hs.config.enable_registration = True # init the thing we're testing diff --git a/tests/utils.py b/tests/utils.py index 006abedbc1..e19ae581e0 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -81,16 +81,11 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): ) # bcrypt is far too slow to be doing in unit tests - def swap_out_hash_for_testing(old_build_handlers): - def build_handlers(): - handlers = old_build_handlers() - auth_handler = handlers.auth_handler - auth_handler.hash = lambda p: hashlib.md5(p).hexdigest() - auth_handler.validate_hash = lambda p, h: hashlib.md5(p).hexdigest() == h - return handlers - return build_handlers - - hs.build_handlers = swap_out_hash_for_testing(hs.build_handlers) + # Need to let the HS build an auth handler and then mess with it + # because AuthHandler's constructor requires the HS, so we can't make one + # beforehand and pass it in to the HS's constructor (chicken / egg) + hs.get_auth_handler().hash = lambda p: hashlib.md5(p).hexdigest() + hs.get_auth_handler().validate_hash = lambda p, h: hashlib.md5(p).hexdigest() == h fed = kargs.get("resource_for_federation", None) if fed: -- cgit 1.4.1 From 70599ce9252997d32d0bf9f26a4e02c99bbe474d Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 2 Jun 2016 15:20:15 +0100 Subject: Allow external processes to mark a user as syncing. (#812) * Add infrastructure to the presence handler to track sync requests in external processes * Expire stale entries for dead external processes * Add an http endpoint for making users as syncing Add some docstrings and comments. * Fixes --- synapse/handlers/presence.py | 119 +++++++++++++++++++++++++++---- synapse/replication/presence_resource.py | 59 +++++++++++++++ synapse/replication/resource.py | 2 + tests/handlers/test_presence.py | 16 ++--- 4 files changed, 174 insertions(+), 22 deletions(-) create mode 100644 synapse/replication/presence_resource.py (limited to 'tests') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 37f57301fb..fc8538b41e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -68,6 +68,10 @@ FEDERATION_TIMEOUT = 30 * 60 * 1000 # How often to resend presence to remote servers FEDERATION_PING_INTERVAL = 25 * 60 * 1000 +# How long we will wait before assuming that the syncs from an external process +# are dead. +EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000 + assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER @@ -158,10 +162,21 @@ class PresenceHandler(object): self.serial_to_user = {} self._next_serial = 1 - # Keeps track of the number of *ongoing* syncs. While this is non zero - # a user will never go offline. + # Keeps track of the number of *ongoing* syncs on this process. While + # this is non zero a user will never go offline. self.user_to_num_current_syncs = {} + # Keeps track of the number of *ongoing* syncs on other processes. + # While any sync is ongoing on another process the user will never + # go offline. + # Each process has a unique identifier and an update frequency. If + # no update is received from that process within the update period then + # we assume that all the sync requests on that process have stopped. + # Stored as a dict from process_id to set of user_id, and a dict of + # process_id to millisecond timestamp last updated. + self.external_process_to_current_syncs = {} + self.external_process_last_updated_ms = {} + # Start a LoopingCall in 30s that fires every 5s. # The initial delay is to allow disconnected clients a chance to # reconnect before we treat them as offline. @@ -272,13 +287,26 @@ class PresenceHandler(object): # Fetch the list of users that *may* have timed out. Things may have # changed since the timeout was set, so we won't necessarily have to # take any action. - users_to_check = self.wheel_timer.fetch(now) + users_to_check = set(self.wheel_timer.fetch(now)) + + # Check whether the lists of syncing processes from an external + # process have expired. + expired_process_ids = [ + process_id for process_id, last_update + in self.external_process_last_update.items() + if now - last_update > EXTERNAL_PROCESS_EXPIRY + ] + for process_id in expired_process_ids: + users_to_check.update( + self.external_process_to_current_syncs.pop(process_id, ()) + ) + self.external_process_last_update.pop(process_id) states = [ self.user_to_current_state.get( user_id, UserPresenceState.default(user_id) ) - for user_id in set(users_to_check) + for user_id in users_to_check ] timers_fired_counter.inc_by(len(states)) @@ -286,7 +314,7 @@ class PresenceHandler(object): changes = handle_timeouts( states, is_mine_fn=self.is_mine_id, - user_to_num_current_syncs=self.user_to_num_current_syncs, + syncing_users=self.get_syncing_users(), now=now, ) @@ -363,6 +391,73 @@ class PresenceHandler(object): defer.returnValue(_user_syncing()) + def get_currently_syncing_users(self): + """Get the set of user ids that are currently syncing on this HS. + Returns: + set(str): A set of user_id strings. + """ + syncing_user_ids = { + user_id for user_id, count in self.user_to_num_current_syncs.items() + if count + } + syncing_user_ids.update(self.external_process_to_current_syncs.values()) + return syncing_user_ids + + @defer.inlineCallbacks + def update_external_syncs(self, process_id, syncing_user_ids): + """Update the syncing users for an external process + + Args: + process_id(str): An identifier for the process the users are + syncing against. This allows synapse to process updates + as user start and stop syncing against a given process. + syncing_user_ids(set(str)): The set of user_ids that are + currently syncing on that server. + """ + + # Grab the previous list of user_ids that were syncing on that process + prev_syncing_user_ids = ( + self.external_process_to_current_syncs.get(process_id, set()) + ) + # Grab the current presence state for both the users that are syncing + # now and the users that were syncing before this update. + prev_states = yield self.current_state_for_users( + syncing_user_ids | prev_syncing_user_ids + ) + updates = [] + time_now_ms = self.clock.time_msec() + + # For each new user that is syncing check if we need to mark them as + # being online. + for new_user_id in syncing_user_ids - prev_syncing_user_ids: + prev_state = prev_states[new_user_id] + if prev_state.state == PresenceState.OFFLINE: + updates.append(prev_state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=time_now_ms, + last_user_sync_ts=time_now_ms, + )) + else: + updates.append(prev_state.copy_and_replace( + last_user_sync_ts=time_now_ms, + )) + + # For each user that is still syncing or stopped syncing update the + # last sync time so that we will correctly apply the grace period when + # they stop syncing. + for old_user_id in prev_syncing_user_ids: + prev_state = prev_states[old_user_id] + updates.append(prev_state.copy_and_replace( + last_user_sync_ts=time_now_ms, + )) + + yield self._update_states(updates) + + # Update the last updated time for the process. We expire the entries + # if we don't receive an update in the given timeframe. + self.external_process_last_updated_ms[process_id] = self.clock.time_msec() + self.external_process_to_current_syncs[process_id] = syncing_user_ids + @defer.inlineCallbacks def current_state_for_user(self, user_id): """Get the current presence state for a user. @@ -935,15 +1030,14 @@ class PresenceEventSource(object): return self.get_new_events(user, from_key=None, include_offline=False) -def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now): +def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now): """Checks the presence of users that have timed out and updates as appropriate. Args: user_states(list): List of UserPresenceState's to check. is_mine_fn (fn): Function that returns if a user_id is ours - user_to_num_current_syncs (dict): Mapping of user_id to number of currently - active syncs. + syncing_user_ids (set): Set of user_ids with active syncs. now (int): Current time in ms. Returns: @@ -954,21 +1048,20 @@ def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now): for state in user_states: is_mine = is_mine_fn(state.user_id) - new_state = handle_timeout(state, is_mine, user_to_num_current_syncs, now) + new_state = handle_timeout(state, is_mine, syncing_user_ids, now) if new_state: changes[state.user_id] = new_state return changes.values() -def handle_timeout(state, is_mine, user_to_num_current_syncs, now): +def handle_timeout(state, is_mine, syncing_user_ids, now): """Checks the presence of the user to see if any of the timers have elapsed Args: state (UserPresenceState) is_mine (bool): Whether the user is ours - user_to_num_current_syncs (dict): Mapping of user_id to number of currently - active syncs. + syncing_user_ids (set): Set of user_ids with active syncs. now (int): Current time in ms. Returns: @@ -1002,7 +1095,7 @@ def handle_timeout(state, is_mine, user_to_num_current_syncs, now): # If there are have been no sync for a while (and none ongoing), # set presence to offline - if not user_to_num_current_syncs.get(user_id, 0): + if user_id not in syncing_user_ids: if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT: state = state.copy_and_replace( state=PresenceState.OFFLINE, diff --git a/synapse/replication/presence_resource.py b/synapse/replication/presence_resource.py new file mode 100644 index 0000000000..fc18130ab4 --- /dev/null +++ b/synapse/replication/presence_resource.py @@ -0,0 +1,59 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.http.server import respond_with_json_bytes, request_handler +from synapse.http.servlet import parse_json_object_from_request + +from twisted.web.resource import Resource +from twisted.web.server import NOT_DONE_YET +from twisted.internet import defer + + +class PresenceResource(Resource): + """ + HTTP endpoint for marking users as syncing. + + POST /_synapse/replication/presence HTTP/1.1 + Content-Type: application/json + + { + "process_id": "", + "syncing_users": [""] + } + """ + + def __init__(self, hs): + Resource.__init__(self) # Resource is old-style, so no super() + + self.version_string = hs.version_string + self.clock = hs.get_clock() + self.presence_handler = hs.get_presence_handler() + + def render_POST(self, request): + self._async_render_POST(request) + return NOT_DONE_YET + + @request_handler() + @defer.inlineCallbacks + def _async_render_POST(self, request): + content = parse_json_object_from_request(request) + + process_id = content["process_id"] + syncing_user_ids = content["syncing_users"] + + yield self.presence_handler.update_external_syncs( + process_id, set(syncing_user_ids) + ) + + respond_with_json_bytes(request, 200, "{}") diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 847f212a3d..8c2d487ff4 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -16,6 +16,7 @@ from synapse.http.servlet import parse_integer, parse_string from synapse.http.server import request_handler, finish_request from synapse.replication.pusher_resource import PusherResource +from synapse.replication.presence_resource import PresenceResource from twisted.web.resource import Resource from twisted.web.server import NOT_DONE_YET @@ -115,6 +116,7 @@ class ReplicationResource(Resource): self.clock = hs.get_clock() self.putChild("remove_pushers", PusherResource(hs)) + self.putChild("syncing_users", PresenceResource(hs)) def render_GET(self, request): self._async_render_GET(request) diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 87c795fcfa..b531ba8540 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -264,7 +264,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): ) new_state = handle_timeout( - state, is_mine=True, user_to_num_current_syncs={}, now=now + state, is_mine=True, syncing_user_ids=set(), now=now ) self.assertIsNotNone(new_state) @@ -282,7 +282,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): ) new_state = handle_timeout( - state, is_mine=True, user_to_num_current_syncs={}, now=now + state, is_mine=True, syncing_user_ids=set(), now=now ) self.assertIsNotNone(new_state) @@ -300,9 +300,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): ) new_state = handle_timeout( - state, is_mine=True, user_to_num_current_syncs={ - user_id: 1, - }, now=now + state, is_mine=True, syncing_user_ids=set([user_id]), now=now ) self.assertIsNotNone(new_state) @@ -321,7 +319,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): ) new_state = handle_timeout( - state, is_mine=True, user_to_num_current_syncs={}, now=now + state, is_mine=True, syncing_user_ids=set(), now=now ) self.assertIsNotNone(new_state) @@ -340,7 +338,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): ) new_state = handle_timeout( - state, is_mine=True, user_to_num_current_syncs={}, now=now + state, is_mine=True, syncing_user_ids=set(), now=now ) self.assertIsNone(new_state) @@ -358,7 +356,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): ) new_state = handle_timeout( - state, is_mine=False, user_to_num_current_syncs={}, now=now + state, is_mine=False, syncing_user_ids=set(), now=now ) self.assertIsNotNone(new_state) @@ -377,7 +375,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): ) new_state = handle_timeout( - state, is_mine=True, user_to_num_current_syncs={}, now=now + state, is_mine=True, syncing_user_ids=set(), now=now ) self.assertIsNotNone(new_state) -- cgit 1.4.1 From 56d15a05306896555ded3025f8a808bda04872fa Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 2 Jun 2016 16:28:54 +0100 Subject: Store the typing users as user_id strings. (#819) Rather than storing them as UserID objects. --- synapse/handlers/typing.py | 64 ++++++++++++++++++++++++------------------- tests/handlers/test_typing.py | 4 +-- 2 files changed, 38 insertions(+), 30 deletions(-) (limited to 'tests') diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index d46f05f426..3c54307bed 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -30,7 +30,7 @@ logger = logging.getLogger(__name__) # A tiny object useful for storing a user's membership in a room, as a mapping # key -RoomMember = namedtuple("RoomMember", ("room_id", "user")) +RoomMember = namedtuple("RoomMember", ("room_id", "user_id")) class TypingHandler(object): @@ -38,7 +38,7 @@ class TypingHandler(object): self.store = hs.get_datastore() self.server_name = hs.config.server_name self.auth = hs.get_auth() - self.is_mine = hs.is_mine + self.is_mine_id = hs.is_mine_id self.notifier = hs.get_notifier() self.clock = hs.get_clock() @@ -67,20 +67,23 @@ class TypingHandler(object): @defer.inlineCallbacks def started_typing(self, target_user, auth_user, room_id, timeout): - if not self.is_mine(target_user): + target_user_id = target_user.to_string() + auth_user_id = auth_user.to_string() + + if not self.is_mine_id(target_user_id): raise SynapseError(400, "User is not hosted on this Home Server") - if target_user != auth_user: + if target_user_id != auth_user_id: raise AuthError(400, "Cannot set another user's typing state") - yield self.auth.check_joined_room(room_id, target_user.to_string()) + yield self.auth.check_joined_room(room_id, target_user_id) logger.debug( - "%s has started typing in %s", target_user.to_string(), room_id + "%s has started typing in %s", target_user_id, room_id ) until = self.clock.time_msec() + timeout - member = RoomMember(room_id=room_id, user=target_user) + member = RoomMember(room_id=room_id, user_id=target_user_id) was_present = member in self._member_typing_until @@ -104,25 +107,28 @@ class TypingHandler(object): yield self._push_update( room_id=room_id, - user=target_user, + user_id=target_user_id, typing=True, ) @defer.inlineCallbacks def stopped_typing(self, target_user, auth_user, room_id): - if not self.is_mine(target_user): + target_user_id = target_user.to_string() + auth_user_id = auth_user.to_string() + + if not self.is_mine_id(target_user_id): raise SynapseError(400, "User is not hosted on this Home Server") - if target_user != auth_user: + if target_user_id != auth_user_id: raise AuthError(400, "Cannot set another user's typing state") - yield self.auth.check_joined_room(room_id, target_user.to_string()) + yield self.auth.check_joined_room(room_id, target_user_id) logger.debug( - "%s has stopped typing in %s", target_user.to_string(), room_id + "%s has stopped typing in %s", target_user_id, room_id ) - member = RoomMember(room_id=room_id, user=target_user) + member = RoomMember(room_id=room_id, user_id=target_user_id) if member in self._member_typing_timer: self.clock.cancel_call_later(self._member_typing_timer[member]) @@ -132,8 +138,9 @@ class TypingHandler(object): @defer.inlineCallbacks def user_left_room(self, user, room_id): - if self.is_mine(user): - member = RoomMember(room_id=room_id, user=user) + user_id = user.to_string() + if self.is_mine_id(user_id): + member = RoomMember(room_id=room_id, user=user_id) yield self._stopped_typing(member) @defer.inlineCallbacks @@ -144,7 +151,7 @@ class TypingHandler(object): yield self._push_update( room_id=member.room_id, - user=member.user, + user_id=member.user_id, typing=False, ) @@ -156,7 +163,7 @@ class TypingHandler(object): del self._member_typing_timer[member] @defer.inlineCallbacks - def _push_update(self, room_id, user, typing): + def _push_update(self, room_id, user_id, typing): domains = yield self.store.get_joined_hosts_for_room(room_id) deferreds = [] @@ -164,7 +171,7 @@ class TypingHandler(object): if domain == self.server_name: self._push_update_local( room_id=room_id, - user=user, + user_id=user_id, typing=typing ) else: @@ -173,7 +180,7 @@ class TypingHandler(object): edu_type="m.typing", content={ "room_id": room_id, - "user_id": user.to_string(), + "user_id": user_id, "typing": typing, }, )) @@ -183,23 +190,26 @@ class TypingHandler(object): @defer.inlineCallbacks def _recv_edu(self, origin, content): room_id = content["room_id"] - user = UserID.from_string(content["user_id"]) + user_id = content["user_id"] + + # Check that the string is a valid user id + UserID.from_string(user_id) domains = yield self.store.get_joined_hosts_for_room(room_id) if self.server_name in domains: self._push_update_local( room_id=room_id, - user=user, + user_id=user_id, typing=content["typing"] ) - def _push_update_local(self, room_id, user, typing): + def _push_update_local(self, room_id, user_id, typing): room_set = self._room_typing.setdefault(room_id, set()) if typing: - room_set.add(user) + room_set.add(user_id) else: - room_set.discard(user) + room_set.discard(user_id) self._latest_room_serial += 1 self._room_serials[room_id] = self._latest_room_serial @@ -215,9 +225,7 @@ class TypingHandler(object): for room_id, serial in self._room_serials.items(): if last_id < serial and serial <= current_id: typing = self._room_typing[room_id] - typing_bytes = json.dumps([ - u.to_string() for u in typing - ], ensure_ascii=False) + typing_bytes = json.dumps(list(typing), ensure_ascii=False) rows.append((serial, room_id, typing_bytes)) rows.sort() return rows @@ -239,7 +247,7 @@ class TypingNotificationEventSource(object): "type": "m.typing", "room_id": room_id, "content": { - "user_ids": [u.to_string() for u in typing], + "user_ids": list(typing), }, } diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index abb739ae52..ab9899b7d5 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -251,12 +251,12 @@ class TypingNotificationsTestCase(unittest.TestCase): # Gut-wrenching from synapse.handlers.typing import RoomMember - member = RoomMember(self.room_id, self.u_apple) + member = RoomMember(self.room_id, self.u_apple.to_string()) self.handler._member_typing_until[member] = 1002000 self.handler._member_typing_timer[member] = ( self.clock.call_later(1002, lambda: 0) ) - self.handler._room_typing[self.room_id] = set((self.u_apple,)) + self.handler._room_typing[self.room_id] = set((self.u_apple.to_string(),)) self.assertEquals(self.event_source.get_current_key(), 0) -- cgit 1.4.1 From 73c711243382a48b9b67fddf5ed9df2d1ee1be43 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 2 Jun 2016 11:29:44 +0100 Subject: Change CacheMetrics to be quicker We change it so that each cache has an individual CacheMetric, instead of having one global CacheMetric. This means that when a cache tries to increment a counter it does not need to go through so many indirections. --- synapse/metrics/__init__.py | 16 ++++------- synapse/metrics/metric.py | 44 +++++++++++++++--------------- synapse/util/caches/__init__.py | 20 ++++++++++---- synapse/util/caches/descriptors.py | 17 +++++++++--- synapse/util/caches/dictionary_cache.py | 8 +++--- synapse/util/caches/expiringcache.py | 8 +++--- synapse/util/caches/stream_change_cache.py | 16 +++++------ tests/metrics/test_metric.py | 23 +++++++--------- 8 files changed, 82 insertions(+), 70 deletions(-) (limited to 'tests') diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 5664d5a381..c38f24485a 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -33,11 +33,7 @@ from .metric import ( logger = logging.getLogger(__name__) -# We'll keep all the available metrics in a single toplevel dict, one shared -# for the entire process. We don't currently support per-HomeServer instances -# of metrics, because in practice any one python VM will host only one -# HomeServer anyway. This makes a lot of implementation neater -all_metrics = {} +all_metrics = [] class Metrics(object): @@ -53,7 +49,7 @@ class Metrics(object): metric = metric_class(full_name, *args, **kwargs) - all_metrics[full_name] = metric + all_metrics.append(metric) return metric def register_counter(self, *args, **kwargs): @@ -84,12 +80,12 @@ def render_all(): # TODO(paul): Internal hack update_resource_metrics() - for name in sorted(all_metrics.keys()): + for metric in all_metrics: try: - strs += all_metrics[name].render() + strs += metric.render() except Exception: - strs += ["# FAILED to render %s" % name] - logger.exception("Failed to render %s metric", name) + strs += ["# FAILED to render"] + logger.exception("Failed to render metric") strs.append("") # to generate a final CRLF diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py index 368fc24984..341043952a 100644 --- a/synapse/metrics/metric.py +++ b/synapse/metrics/metric.py @@ -47,9 +47,6 @@ class BaseMetric(object): for k, v in zip(self.labels, values)]) ) - def render(self): - return map_concat(self.render_item, sorted(self.counts.keys())) - class CounterMetric(BaseMetric): """The simplest kind of metric; one that stores a monotonically-increasing @@ -83,6 +80,9 @@ class CounterMetric(BaseMetric): def render_item(self, k): return ["%s%s %d" % (self.name, self._render_key(k), self.counts[k])] + def render(self): + return map_concat(self.render_item, sorted(self.counts.keys())) + class CallbackMetric(BaseMetric): """A metric that returns the numeric value returned by a callback whenever @@ -126,30 +126,30 @@ class DistributionMetric(object): class CacheMetric(object): - """A combination of two CounterMetrics, one to count cache hits and one to - count a total, and a callback metric to yield the current size. + __slots__ = ("name", "cache_name", "hits", "misses", "size_callback") - This metric generates standard metric name pairs, so that monitoring rules - can easily be applied to measure hit ratio.""" - - def __init__(self, name, size_callback, labels=[]): + def __init__(self, name, size_callback, cache_name): self.name = name + self.cache_name = cache_name - self.hits = CounterMetric(name + ":hits", labels=labels) - self.total = CounterMetric(name + ":total", labels=labels) + self.hits = 0 + self.misses = 0 - self.size = CallbackMetric( - name + ":size", - callback=size_callback, - labels=labels, - ) + self.size_callback = size_callback - def inc_hits(self, *values): - self.hits.inc(*values) - self.total.inc(*values) + def inc_hits(self): + self.hits += 1 - def inc_misses(self, *values): - self.total.inc(*values) + def inc_misses(self): + self.misses += 1 def render(self): - return self.hits.render() + self.total.render() + self.size.render() + size = self.size_callback() + hits = self.hits + total = self.misses + self.hits + + return [ + """%s:hits{name="%s"} %d""" % (self.name, self.cache_name, hits), + """%s:total{name="%s"} %d""" % (self.name, self.cache_name, total), + """%s:size{name="%s"} %d""" % (self.name, self.cache_name, size), + ] diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index d53569ca49..ebd715c5dc 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -24,11 +24,21 @@ DEBUG_CACHES = False metrics = synapse.metrics.get_metrics_for("synapse.util.caches") caches_by_name = {} -cache_counter = metrics.register_cache( - "cache", - lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, - labels=["name"], -) +# cache_counter = metrics.register_cache( +# "cache", +# lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, +# labels=["name"], +# ) + + +def register_cache(name, cache): + caches_by_name[name] = cache + return metrics.register_cache( + "cache", + lambda: len(cache), + name, + ) + _string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR)) caches_by_name["string_cache"] = _string_cache diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 758f5982b0..5d25c9e762 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -22,7 +22,7 @@ from synapse.util.logcontext import ( PreserveLoggingContext, preserve_context_over_deferred, preserve_context_over_fn ) -from . import caches_by_name, DEBUG_CACHES, cache_counter +from . import DEBUG_CACHES, register_cache from twisted.internet import defer @@ -43,6 +43,15 @@ CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) class Cache(object): + __slots__ = ( + "cache", + "max_entries", + "name", + "keylen", + "sequence", + "thread", + "metrics", + ) def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False): if lru: @@ -59,7 +68,7 @@ class Cache(object): self.keylen = keylen self.sequence = 0 self.thread = None - caches_by_name[name] = self.cache + self.metrics = register_cache(name, self.cache) def check_thread(self): expected_thread = self.thread @@ -74,10 +83,10 @@ class Cache(object): def get(self, key, default=_CacheSentinel): val = self.cache.get(key, _CacheSentinel) if val is not _CacheSentinel: - cache_counter.inc_hits(self.name) + self.metrics.inc_hits() return val - cache_counter.inc_misses(self.name) + self.metrics.inc_misses() if default is _CacheSentinel: raise KeyError() diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py index f92d80542b..b0ca1bb79d 100644 --- a/synapse/util/caches/dictionary_cache.py +++ b/synapse/util/caches/dictionary_cache.py @@ -15,7 +15,7 @@ from synapse.util.caches.lrucache import LruCache from collections import namedtuple -from . import caches_by_name, cache_counter +from . import register_cache import threading import logging @@ -43,7 +43,7 @@ class DictionaryCache(object): __slots__ = [] self.sentinel = Sentinel() - caches_by_name[name] = self.cache + self.metrics = register_cache(name, self.cache) def check_thread(self): expected_thread = self.thread @@ -58,7 +58,7 @@ class DictionaryCache(object): def get(self, key, dict_keys=None): entry = self.cache.get(key, self.sentinel) if entry is not self.sentinel: - cache_counter.inc_hits(self.name) + self.metrics.inc_hits() if dict_keys is None: return DictionaryEntry(entry.full, dict(entry.value)) @@ -69,7 +69,7 @@ class DictionaryCache(object): if k in entry.value }) - cache_counter.inc_misses(self.name) + self.metrics.inc_misses() return DictionaryEntry(False, {}) def invalidate(self, key): diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 2b68c1ac93..080388958f 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.caches import cache_counter, caches_by_name +from synapse.util.caches import register_cache import logging @@ -49,7 +49,7 @@ class ExpiringCache(object): self._cache = {} - caches_by_name[cache_name] = self._cache + self.metrics = register_cache(cache_name, self._cache) def start(self): if not self._expiry_ms: @@ -78,9 +78,9 @@ class ExpiringCache(object): def __getitem__(self, key): try: entry = self._cache[key] - cache_counter.inc_hits(self._cache_name) + self.metrics.inc_hits() except KeyError: - cache_counter.inc_misses(self._cache_name) + self.metrics.inc_misses() raise if self._reset_expiry_on_get: diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index ea8a74ca69..3c051dabc4 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.caches import cache_counter, caches_by_name +from synapse.util.caches import register_cache from blist import sorteddict @@ -42,7 +42,7 @@ class StreamChangeCache(object): self._cache = sorteddict() self._earliest_known_stream_pos = current_stream_pos self.name = name - caches_by_name[self.name] = self._cache + self.metrics = register_cache(self.name, self._cache) for entity, stream_pos in prefilled_cache.items(): self.entity_has_changed(entity, stream_pos) @@ -53,19 +53,19 @@ class StreamChangeCache(object): assert type(stream_pos) is int if stream_pos < self._earliest_known_stream_pos: - cache_counter.inc_misses(self.name) + self.metrics.inc_misses() return True latest_entity_change_pos = self._entity_to_key.get(entity, None) if latest_entity_change_pos is None: - cache_counter.inc_hits(self.name) + self.metrics.inc_hits() return False if stream_pos < latest_entity_change_pos: - cache_counter.inc_misses(self.name) + self.metrics.inc_misses() return True - cache_counter.inc_hits(self.name) + self.metrics.inc_hits() return False def get_entities_changed(self, entities, stream_pos): @@ -82,10 +82,10 @@ class StreamChangeCache(object): self._cache[k] for k in keys[i:] ).intersection(entities) - cache_counter.inc_hits(self.name) + self.metrics.inc_hits() else: result = entities - cache_counter.inc_misses(self.name) + self.metrics.inc_misses() return result diff --git a/tests/metrics/test_metric.py b/tests/metrics/test_metric.py index f3c1927ce1..f85455a5af 100644 --- a/tests/metrics/test_metric.py +++ b/tests/metrics/test_metric.py @@ -61,9 +61,6 @@ class CounterMetricTestCase(unittest.TestCase): 'vector{method="PUT"} 1', ]) - # Check that passing too few values errors - self.assertRaises(ValueError, counter.inc) - class CallbackMetricTestCase(unittest.TestCase): @@ -138,27 +135,27 @@ class CacheMetricTestCase(unittest.TestCase): def test_cache(self): d = dict() - metric = CacheMetric("cache", lambda: len(d)) + metric = CacheMetric("cache", lambda: len(d), "cache_name") self.assertEquals(metric.render(), [ - 'cache:hits 0', - 'cache:total 0', - 'cache:size 0', + 'cache:hits{name="cache_name"} 0', + 'cache:total{name="cache_name"} 0', + 'cache:size{name="cache_name"} 0', ]) metric.inc_misses() d["key"] = "value" self.assertEquals(metric.render(), [ - 'cache:hits 0', - 'cache:total 1', - 'cache:size 1', + 'cache:hits{name="cache_name"} 0', + 'cache:total{name="cache_name"} 1', + 'cache:size{name="cache_name"} 1', ]) metric.inc_hits() self.assertEquals(metric.render(), [ - 'cache:hits 1', - 'cache:total 2', - 'cache:size 1', + 'cache:hits{name="cache_name"} 1', + 'cache:total{name="cache_name"} 2', + 'cache:size{name="cache_name"} 1', ]) -- cgit 1.4.1 From 05e01f21d7012c1853ff566c8a76aa66087bfbd7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 3 Jun 2016 17:12:48 +0100 Subject: Remove event fetching from DB threads --- synapse/replication/slave/storage/events.py | 5 - synapse/storage/appservice.py | 21 +++-- synapse/storage/events.py | 138 ---------------------------- synapse/storage/room.py | 46 ++++++---- synapse/storage/search.py | 29 +++--- synapse/storage/stream.py | 34 +++---- tests/storage/test_appservice.py | 2 +- 7 files changed, 75 insertions(+), 200 deletions(-) (limited to 'tests') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index cbc1ae4190..877c68508c 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -131,15 +131,10 @@ class SlavedEventStore(BaseSlavedStore): _get_events_from_cache = DataStore._get_events_from_cache.__func__ _invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__ - _parse_events_txn = DataStore._parse_events_txn.__func__ - _get_events_txn = DataStore._get_events_txn.__func__ - _get_event_txn = DataStore._get_event_txn.__func__ _enqueue_events = DataStore._enqueue_events.__func__ _do_fetch = DataStore._do_fetch.__func__ - _fetch_events_txn = DataStore._fetch_events_txn.__func__ _fetch_event_rows = DataStore._fetch_event_rows.__func__ _get_event_from_row = DataStore._get_event_from_row.__func__ - _get_event_from_row_txn = DataStore._get_event_from_row_txn.__func__ _get_rooms_for_user_where_membership_is_txn = ( DataStore._get_rooms_for_user_where_membership_is_txn.__func__ ) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index feb9d228ae..ffb7d4a25b 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -298,6 +298,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): dict(txn_id=txn_id, as_id=service.id) ) + @defer.inlineCallbacks def get_oldest_unsent_txn(self, service): """Get the oldest transaction which has not been sent for this service. @@ -308,12 +309,23 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves to an AppServiceTransaction or None. """ - return self.runInteraction( + entry = yield self.runInteraction( "get_oldest_unsent_appservice_txn", self._get_oldest_unsent_txn, service ) + if not entry: + defer.returnValue(None) + + event_ids = json.loads(entry["event_ids"]) + + events = yield self.get_events(event_ids) + + defer.returnValue(AppServiceTransaction( + service=service, id=entry["txn_id"], events=events + )) + def _get_oldest_unsent_txn(self, txn, service): # Monotonically increasing txn ids, so just select the smallest # one in the txns table (we delete them when they are sent) @@ -328,12 +340,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): entry = rows[0] - event_ids = json.loads(entry["event_ids"]) - events = self._get_events_txn(txn, event_ids) - - return AppServiceTransaction( - service=service, id=entry["txn_id"], events=events - ) + return entry def _get_last_txn(self, txn, service_id): txn.execute( diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2b3f79577b..b710505a7e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -762,41 +762,6 @@ class EventsStore(SQLBaseStore): if e_id in event_map and event_map[e_id] ]) - def _get_events_txn(self, txn, event_ids, check_redacted=True, - get_prev_content=False, allow_rejected=False): - if not event_ids: - return [] - - event_map = self._get_events_from_cache( - event_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - - missing_events_ids = [e for e in event_ids if e not in event_map] - - if not missing_events_ids: - return [ - event_map[e_id] for e_id in event_ids - if e_id in event_map and event_map[e_id] - ] - - missing_events = self._fetch_events_txn( - txn, - missing_events_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - - event_map.update(missing_events) - - return [ - event_map[e_id] for e_id in event_ids - if e_id in event_map and event_map[e_id] - ] - def _invalidate_get_event_cache(self, event_id): for check_redacted in (False, True): for get_prev_content in (False, True): @@ -804,18 +769,6 @@ class EventsStore(SQLBaseStore): (event_id, check_redacted, get_prev_content) ) - def _get_event_txn(self, txn, event_id, check_redacted=True, - get_prev_content=False, allow_rejected=False): - - events = self._get_events_txn( - txn, [event_id], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - - return events[0] if events else None - def _get_events_from_cache(self, events, check_redacted, get_prev_content, allow_rejected): event_map = {} @@ -981,34 +934,6 @@ class EventsStore(SQLBaseStore): return rows - def _fetch_events_txn(self, txn, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): - if not events: - return {} - - rows = self._fetch_event_rows( - txn, events, - ) - - if not allow_rejected: - rows[:] = [r for r in rows if not r["rejects"]] - - res = [ - self._get_event_from_row_txn( - txn, - row["internal_metadata"], row["json"], row["redacts"], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - rejected_reason=row["rejects"], - ) - for row in rows - ] - - return { - r.event_id: r - for r in res - } - @defer.inlineCallbacks def _get_event_from_row(self, internal_metadata, js, redacted, check_redacted=True, get_prev_content=False, @@ -1070,69 +995,6 @@ class EventsStore(SQLBaseStore): defer.returnValue(ev) - def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, - check_redacted=True, get_prev_content=False, - rejected_reason=None): - d = json.loads(js) - internal_metadata = json.loads(internal_metadata) - - if rejected_reason: - rejected_reason = self._simple_select_one_onecol_txn( - txn, - table="rejections", - keyvalues={"event_id": rejected_reason}, - retcol="reason", - ) - - ev = FrozenEvent( - d, - internal_metadata_dict=internal_metadata, - rejected_reason=rejected_reason, - ) - - if check_redacted and redacted: - ev = prune_event(ev) - - redaction_id = self._simple_select_one_onecol_txn( - txn, - table="redactions", - keyvalues={"redacts": ev.event_id}, - retcol="event_id", - ) - - ev.unsigned["redacted_by"] = redaction_id - # Get the redaction event. - - because = self._get_event_txn( - txn, - redaction_id, - check_redacted=False - ) - - if because: - ev.unsigned["redacted_because"] = because - - if get_prev_content and "replaces_state" in ev.unsigned: - prev = self._get_event_txn( - txn, - ev.unsigned["replaces_state"], - get_prev_content=False, - ) - if prev: - ev.unsigned["prev_content"] = prev.content - ev.unsigned["prev_sender"] = prev.sender - - self._get_event_cache.prefill( - (ev.event_id, check_redacted, get_prev_content), ev - ) - - return ev - - def _parse_events_txn(self, txn, rows): - event_ids = [r["event_id"] for r in rows] - - return self._get_events_txn(txn, event_ids) - @defer.inlineCallbacks def count_daily_messages(self): """ diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 26933e593a..97f9f1929c 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -194,32 +194,44 @@ class RoomStore(SQLBaseStore): @cachedInlineCallbacks() def get_room_name_and_aliases(self, room_id): - def f(txn): + def get_room_name(txn): sql = ( - "SELECT event_id FROM current_state_events " - "WHERE room_id = ? " + "SELECT name FROM room_names" + " INNER JOIN current_state_events USING (room_id, event_id)" + " WHERE room_id = ?" + " LIMIT 1" ) - sql += " AND ((type = 'm.room.name' AND state_key = '')" - sql += " OR type = 'm.room.aliases')" - txn.execute(sql, (room_id,)) - results = self.cursor_to_dict(txn) + rows = txn.fetchall() + if rows: + return rows[0][0] + else: + return None - return self._parse_events_txn(txn, results) + return [row[0] for row in txn.fetchall()] - events = yield self.runInteraction("get_room_name_and_aliases", f) + def get_room_aliases(txn): + sql = ( + "SELECT content FROM current_state_events" + " INNER JOIN events USING (room_id, event_id)" + " WHERE room_id = ?" + ) + txn.execute(sql, (room_id,)) + return [row[0] for row in txn.fetchall()] + + name = yield self.runInteraction("get_room_name", get_room_name) + alias_contents = yield self.runInteraction("get_room_aliases", get_room_aliases) - name = None aliases = [] - for e in events: - if e.type == 'm.room.name': - if 'name' in e.content: - name = e.content['name'] - elif e.type == 'm.room.aliases': - if 'aliases' in e.content: - aliases.extend(e.content['aliases']) + for c in alias_contents: + try: + content = json.loads(c) + except: + continue + + aliases.extend(content.get('aliases', [])) defer.returnValue((name, aliases)) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 0224299625..12941d1775 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -21,6 +21,7 @@ from synapse.storage.engines import PostgresEngine, Sqlite3Engine import logging import re +import ujson as json logger = logging.getLogger(__name__) @@ -52,7 +53,7 @@ class SearchStore(BackgroundUpdateStore): def reindex_search_txn(txn): sql = ( - "SELECT stream_ordering, event_id FROM events" + "SELECT stream_ordering, event_id, room_id, type, content FROM events" " WHERE ? <= stream_ordering AND stream_ordering < ?" " AND (%s)" " ORDER BY stream_ordering DESC" @@ -61,28 +62,30 @@ class SearchStore(BackgroundUpdateStore): txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) - rows = txn.fetchall() + rows = self.cursor_to_dict(txn) if not rows: return 0 - min_stream_id = rows[-1][0] - event_ids = [row[1] for row in rows] - - events = self._get_events_txn(txn, event_ids) + min_stream_id = rows[-1]["stream_ordering"] event_search_rows = [] - for event in events: + for row in rows: try: - event_id = event.event_id - room_id = event.room_id - content = event.content - if event.type == "m.room.message": + event_id = row["event_id"] + room_id = row["room_id"] + etype = row["type"] + try: + content = json.loads(row["content"]) + except: + continue + + if etype == "m.room.message": key = "content.body" value = content["body"] - elif event.type == "m.room.topic": + elif etype == "m.room.topic": key = "content.topic" value = content["topic"] - elif event.type == "m.room.name": + elif etype == "m.room.name": key = "content.name" value = content["name"] except (KeyError, AttributeError): diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 95b12559a6..b9ad965fd6 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -132,29 +132,25 @@ class StreamStore(SQLBaseStore): return True return False - ret = self._get_events_txn( - txn, - # apply the filter on the room id list - [ - r["event_id"] for r in rows - if app_service_interested(r) - ], - get_prev_content=True - ) + return [r for r in rows if app_service_interested(r)] - self._set_before_and_after(ret, rows) + rows = yield self.runInteraction("get_appservice_room_stream", f) - if rows: - key = "s%d" % max(r["stream_ordering"] for r in rows) - else: - # Assume we didn't get anything because there was nothing to - # get. - key = to_key + ret = yield self._get_events( + [r["event_id"] for r in rows], + get_prev_content=True + ) - return ret, key + self._set_before_and_after(ret, rows, topo_order=from_id is None) - results = yield self.runInteraction("get_appservice_room_stream", f) - defer.returnValue(results) + if rows: + key = "s%d" % max(r["stream_ordering"] for r in rows) + else: + # Assume we didn't get anything because there was nothing to + # get. + key = to_key + + defer.returnValue((ret, key)) @defer.inlineCallbacks def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0, diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 5734198121..f44c4870e3 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -357,7 +357,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): other_events = [Mock(event_id="e5"), Mock(event_id="e6")] # we aren't testing store._base stuff here, so mock this out - self.store._get_events_txn = Mock(return_value=events) + self.store.get_events = Mock(return_value=events) yield self._insert_txn(self.as_list[1]["id"], 9, other_events) yield self._insert_txn(service.id, 10, events) -- cgit 1.4.1 From 310197bab5cf8ed2c26fae522f15f092dbcdff58 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2016 09:34:50 +0100 Subject: Fix AS retries --- synapse/storage/appservice.py | 4 ++-- tests/storage/test_appservice.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) (limited to 'tests') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index ffb7d4a25b..a281571637 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -320,10 +320,10 @@ class ApplicationServiceTransactionStore(SQLBaseStore): event_ids = json.loads(entry["event_ids"]) - events = yield self.get_events(event_ids) + event_map = yield self.get_events(event_ids) defer.returnValue(AppServiceTransaction( - service=service, id=entry["txn_id"], events=events + service=service, id=entry["txn_id"], events=event_map.values() )) def _get_oldest_unsent_txn(self, txn, service): diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index f44c4870e3..6db4b966db 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -353,21 +353,21 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_get_oldest_unsent_txn(self): service = Mock(id=self.as_list[0]["id"]) - events = [Mock(event_id="e1"), Mock(event_id="e2")] + events = {"e1": Mock(event_id="e1"), "e2": Mock(event_id="e2")} other_events = [Mock(event_id="e5"), Mock(event_id="e6")] # we aren't testing store._base stuff here, so mock this out self.store.get_events = Mock(return_value=events) yield self._insert_txn(self.as_list[1]["id"], 9, other_events) - yield self._insert_txn(service.id, 10, events) + yield self._insert_txn(service.id, 10, events.values()) yield self._insert_txn(service.id, 11, other_events) yield self._insert_txn(service.id, 12, other_events) txn = yield self.store.get_oldest_unsent_txn(service) self.assertEquals(service, txn.service) self.assertEquals(10, txn.id) - self.assertEquals(events, txn.events) + self.assertEquals(events.values(), txn.events) @defer.inlineCallbacks def test_get_appservices_by_state_single(self): -- cgit 1.4.1 From 84379062f9ec259abc302af321d4ed8f5a958c01 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2016 10:24:50 +0100 Subject: Fix AS retries, but with correct ordering --- synapse/storage/appservice.py | 4 ++-- tests/storage/test_appservice.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) (limited to 'tests') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index a281571637..d1ee533fac 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -320,10 +320,10 @@ class ApplicationServiceTransactionStore(SQLBaseStore): event_ids = json.loads(entry["event_ids"]) - event_map = yield self.get_events(event_ids) + events = yield self._get_events(event_ids) defer.returnValue(AppServiceTransaction( - service=service, id=entry["txn_id"], events=event_map.values() + service=service, id=entry["txn_id"], events=events )) def _get_oldest_unsent_txn(self, txn, service): diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 6db4b966db..3e2862daae 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -353,21 +353,21 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_get_oldest_unsent_txn(self): service = Mock(id=self.as_list[0]["id"]) - events = {"e1": Mock(event_id="e1"), "e2": Mock(event_id="e2")} + events = [Mock(event_id="e1"), Mock(event_id="e2")] other_events = [Mock(event_id="e5"), Mock(event_id="e6")] # we aren't testing store._base stuff here, so mock this out - self.store.get_events = Mock(return_value=events) + self.store._get_events = Mock(return_value=events) yield self._insert_txn(self.as_list[1]["id"], 9, other_events) - yield self._insert_txn(service.id, 10, events.values()) + yield self._insert_txn(service.id, 10, events) yield self._insert_txn(service.id, 11, other_events) yield self._insert_txn(service.id, 12, other_events) txn = yield self.store.get_oldest_unsent_txn(service) self.assertEquals(service, txn.service) self.assertEquals(10, txn.id) - self.assertEquals(events.values(), txn.events) + self.assertEquals(events, txn.events) @defer.inlineCallbacks def test_get_appservices_by_state_single(self): -- cgit 1.4.1