From 43253c10b894902bcaed90dd2fbf44424b647f45 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Sep 2016 11:47:48 +0100 Subject: Remove redundant event_auth index --- synapse/storage/schema/delta/35/remove_auth_idx.sql | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 synapse/storage/schema/delta/35/remove_auth_idx.sql (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/35/remove_auth_idx.sql b/synapse/storage/schema/delta/35/remove_auth_idx.sql new file mode 100644 index 0000000000..a5bf3895b0 --- /dev/null +++ b/synapse/storage/schema/delta/35/remove_auth_idx.sql @@ -0,0 +1,16 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +DROP INDEX IF EXISTS evauth_edges_id; -- cgit 1.4.1 From 748d8fdc7bcdb43719e99a48cc74bf078f22396f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 23 Sep 2016 15:31:47 +0100 Subject: Reduce DB hits for replication Some streams will occaisonally advance their positions without actually having any new rows to send over federation. Currently this means that the token will not advance on the workers, leading to them repeatedly sending a slightly out of date token. This in turns requires the master to hit the DB to check if there are any new rows, rather than hitting the no op logic where we check if the given token matches the current token. This commit changes the API to always return an entry if the position for a stream has changed, allowing workers to advance their tokens correctly. --- .gitignore | 8 +- synapse/app/pusher.py | 4 +- synapse/replication/resource.py | 139 +++++++++++++++++++++++-------- synapse/storage/room.py | 3 + tests/replication/slave/storage/_base.py | 3 +- tests/replication/test_resource.py | 3 +- 6 files changed, 115 insertions(+), 45 deletions(-) (limited to 'synapse/storage') diff --git a/.gitignore b/.gitignore index f8c4000134..491047c352 100644 --- a/.gitignore +++ b/.gitignore @@ -24,10 +24,10 @@ homeserver*.yaml .coverage htmlcov -demo/*.db -demo/*.log -demo/*.log.* -demo/*.pid +demo/*/*.db +demo/*/*.log +demo/*/*.log.* +demo/*/*.pid demo/media_store.* demo/etc diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index d59f4a571c..1a6f5507a9 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -197,7 +197,7 @@ class PusherServer(HomeServer): yield start_pusher(user_id, app_id, pushkey) stream = results.get("events") - if stream: + if stream and stream["rows"]: min_stream_id = stream["rows"][0][0] max_stream_id = stream["position"] preserve_fn(pusher_pool.on_new_notifications)( @@ -205,7 +205,7 @@ class PusherServer(HomeServer): ) stream = results.get("receipts") - if stream: + if stream and stream["rows"]: rows = stream["rows"] affected_room_ids = set(row[1] for row in rows) min_stream_id = rows[0][0] diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 9aab3ce23c..585bd1c4ad 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -17,6 +17,7 @@ from synapse.http.servlet import parse_integer, parse_string from synapse.http.server import request_handler, finish_request from synapse.replication.pusher_resource import PusherResource from synapse.replication.presence_resource import PresenceResource +from synapse.api.errors import SynapseError from twisted.web.resource import Resource from twisted.web.server import NOT_DONE_YET @@ -166,7 +167,8 @@ class ReplicationResource(Resource): def replicate(): return self.replicate(request_streams, limit) - result = yield self.notifier.wait_for_replication(replicate, timeout) + writer = yield self.notifier.wait_for_replication(replicate, timeout) + result = writer.finish() for stream_name, stream_content in result.items(): logger.info( @@ -186,6 +188,9 @@ class ReplicationResource(Resource): current_token = yield self.current_replication_token() logger.debug("Replicating up to %r", current_token) + if limit == 0: + raise SynapseError(400, "Limit cannot be 0") + yield self.account_data(writer, current_token, limit, request_streams) yield self.events(writer, current_token, limit, request_streams) # TODO: implement limit @@ -200,7 +205,7 @@ class ReplicationResource(Resource): self.streams(writer, current_token, request_streams) logger.debug("Replicated %d rows", writer.total) - defer.returnValue(writer.finish()) + defer.returnValue(writer) def streams(self, writer, current_token, request_streams): request_token = request_streams.get("streams") @@ -233,31 +238,52 @@ class ReplicationResource(Resource): request_backfill = request_streams.get("backfill") if request_events is not None or request_backfill is not None: - if request_events is None: + if request_backfill is None: request_events = current_token.events if request_backfill is None: request_backfill = current_token.backfill + + no_new_tokens = ( + request_events == current_token.events + and request_backfill == current_token.backfill + ) + if no_new_tokens: + return + res = yield self.store.get_all_new_events( request_backfill, request_events, current_token.backfill, current_token.events, limit ) - writer.write_header_and_rows("events", res.new_forward_events, ( - "position", "internal", "json", "state_group" - )) - writer.write_header_and_rows("backfill", res.new_backfill_events, ( - "position", "internal", "json", "state_group" - )) + + upto_events_token = _position_from_rows( + res.new_forward_events, current_token.events + ) + + upto_backfill_token = _position_from_rows( + res.new_backfill_events, current_token.backfill + ) + + if request_events != upto_events_token: + writer.write_header_and_rows("events", res.new_forward_events, ( + "position", "internal", "json", "state_group" + ), position=upto_events_token) + + if request_backfill != upto_backfill_token: + writer.write_header_and_rows("backfill", res.new_backfill_events, ( + "position", "internal", "json", "state_group", + ), position=upto_backfill_token) + writer.write_header_and_rows( "forward_ex_outliers", res.forward_ex_outliers, - ("position", "event_id", "state_group") + ("position", "event_id", "state_group"), ) writer.write_header_and_rows( "backward_ex_outliers", res.backward_ex_outliers, - ("position", "event_id", "state_group") + ("position", "event_id", "state_group"), ) writer.write_header_and_rows( - "state_resets", res.state_resets, ("position",) + "state_resets", res.state_resets, ("position",), ) @defer.inlineCallbacks @@ -266,15 +292,16 @@ class ReplicationResource(Resource): request_presence = request_streams.get("presence") - if request_presence is not None: + if request_presence is not None and request_presence != current_position: presence_rows = yield self.presence_handler.get_all_presence_updates( request_presence, current_position ) + upto_token = _position_from_rows(presence_rows, current_position) writer.write_header_and_rows("presence", presence_rows, ( "position", "user_id", "state", "last_active_ts", "last_federation_update_ts", "last_user_sync_ts", "status_msg", "currently_active", - )) + ), position=upto_token) @defer.inlineCallbacks def typing(self, writer, current_token, request_streams): @@ -282,7 +309,7 @@ class ReplicationResource(Resource): request_typing = request_streams.get("typing") - if request_typing is not None: + if request_typing is not None and request_typing != current_position: # If they have a higher token than current max, we can assume that # they had been talking to a previous instance of the master. Since # we reset the token on restart, the best (but hacky) thing we can @@ -293,9 +320,10 @@ class ReplicationResource(Resource): typing_rows = yield self.typing_handler.get_all_typing_updates( request_typing, current_position ) + upto_token = _position_from_rows(typing_rows, current_position) writer.write_header_and_rows("typing", typing_rows, ( "position", "room_id", "typing" - )) + ), position=upto_token) @defer.inlineCallbacks def receipts(self, writer, current_token, limit, request_streams): @@ -303,13 +331,14 @@ class ReplicationResource(Resource): request_receipts = request_streams.get("receipts") - if request_receipts is not None: + if request_receipts is not None and request_receipts != current_position: receipts_rows = yield self.store.get_all_updated_receipts( request_receipts, current_position, limit ) + upto_token = _position_from_rows(receipts_rows, current_position) writer.write_header_and_rows("receipts", receipts_rows, ( "position", "room_id", "receipt_type", "user_id", "event_id", "data" - )) + ), position=upto_token) @defer.inlineCallbacks def account_data(self, writer, current_token, limit, request_streams): @@ -324,23 +353,36 @@ class ReplicationResource(Resource): user_account_data = current_position if room_account_data is None: room_account_data = current_position + + no_new_tokens = ( + user_account_data == current_position + and room_account_data == current_position + ) + if no_new_tokens: + return + user_rows, room_rows = yield self.store.get_all_updated_account_data( user_account_data, room_account_data, current_position, limit ) + + upto_users_token = _position_from_rows(user_rows, current_position) + upto_rooms_token = _position_from_rows(room_rows, current_position) + writer.write_header_and_rows("user_account_data", user_rows, ( "position", "user_id", "type", "content" - )) + ), position=upto_users_token) writer.write_header_and_rows("room_account_data", room_rows, ( "position", "user_id", "room_id", "type", "content" - )) + ), position=upto_rooms_token) if tag_account_data is not None: tag_rows = yield self.store.get_all_updated_tags( tag_account_data, current_position, limit ) + upto_tag_token = _position_from_rows(tag_rows, current_position) writer.write_header_and_rows("tag_account_data", tag_rows, ( "position", "user_id", "room_id", "tags" - )) + ), position=upto_tag_token) @defer.inlineCallbacks def push_rules(self, writer, current_token, limit, request_streams): @@ -348,14 +390,15 @@ class ReplicationResource(Resource): push_rules = request_streams.get("push_rules") - if push_rules is not None: + if push_rules is not None and push_rules != current_position: rows = yield self.store.get_all_push_rule_updates( push_rules, current_position, limit ) + upto_token = _position_from_rows(rows, current_position) writer.write_header_and_rows("push_rules", rows, ( "position", "event_stream_ordering", "user_id", "rule_id", "op", "priority_class", "priority", "conditions", "actions" - )) + ), position=upto_token) @defer.inlineCallbacks def pushers(self, writer, current_token, limit, request_streams): @@ -363,18 +406,19 @@ class ReplicationResource(Resource): pushers = request_streams.get("pushers") - if pushers is not None: + if pushers is not None and pushers != current_position: updated, deleted = yield self.store.get_all_updated_pushers( pushers, current_position, limit ) + upto_token = _position_from_rows(updated, current_position) writer.write_header_and_rows("pushers", updated, ( "position", "user_id", "access_token", "profile_tag", "kind", "app_id", "app_display_name", "device_display_name", "pushkey", "ts", "lang", "data" - )) + ), position=upto_token) writer.write_header_and_rows("deleted_pushers", deleted, ( "position", "user_id", "app_id", "pushkey" - )) + ), position=upto_token) @defer.inlineCallbacks def caches(self, writer, current_token, limit, request_streams): @@ -382,13 +426,14 @@ class ReplicationResource(Resource): caches = request_streams.get("caches") - if caches is not None: + if caches is not None and caches != current_position: updated_caches = yield self.store.get_all_updated_caches( caches, current_position, limit ) + upto_token = _position_from_rows(updated_caches, current_position) writer.write_header_and_rows("caches", updated_caches, ( "position", "cache_func", "keys", "invalidation_ts" - )) + ), position=upto_token) @defer.inlineCallbacks def to_device(self, writer, current_token, limit, request_streams): @@ -396,13 +441,14 @@ class ReplicationResource(Resource): to_device = request_streams.get("to_device") - if to_device is not None: + if to_device is not None and to_device != current_position: to_device_rows = yield self.store.get_all_new_device_messages( to_device, current_position, limit ) + upto_token = _position_from_rows(to_device_rows, current_position) writer.write_header_and_rows("to_device", to_device_rows, ( "position", "user_id", "device_id", "message_json" - )) + ), position=upto_token) @defer.inlineCallbacks def public_rooms(self, writer, current_token, limit, request_streams): @@ -410,13 +456,14 @@ class ReplicationResource(Resource): public_rooms = request_streams.get("public_rooms") - if public_rooms is not None: + if public_rooms is not None and public_rooms != current_position: public_rooms_rows = yield self.store.get_all_new_public_rooms( public_rooms, current_position, limit ) + upto_token = _position_from_rows(public_rooms_rows, current_position) writer.write_header_and_rows("public_rooms", public_rooms_rows, ( "position", "room_id", "visibility" - )) + ), position=upto_token) class _Writer(object): @@ -426,11 +473,11 @@ class _Writer(object): self.total = 0 def write_header_and_rows(self, name, rows, fields, position=None): - if not rows: - return - if position is None: - position = rows[-1][0] + if rows: + position = rows[-1][0] + else: + return self.streams[name] = { "position": position if type(position) is int else str(position), @@ -440,6 +487,9 @@ class _Writer(object): self.total += len(rows) + def __nonzero__(self): + return bool(self.total) + def finish(self): return self.streams @@ -461,3 +511,20 @@ class _ReplicationToken(collections.namedtuple("_ReplicationToken", ( def __str__(self): return "_".join(str(value) for value in self) + + +def _position_from_rows(rows, current_position): + """Calculates a position to return for a stream. Ideally we want to return the + position of the last row, as that will be the most correct. However, if there + are no rows we fall back to using the current position to stop us from + repeatedly hitting the storage layer unncessarily thinking there are updates. + (Not all advances of the token correspond to an actual update) + + We can't just always return the current position, as we often limit the + number of rows we replicate, and so the stream may lag. The assumption is + that if the storage layer returns no new rows then we are not lagging and + we are at the `current_position`. + """ + if rows: + return rows[-1][0] + return current_position diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 2ef13d7403..11813b44f6 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -320,6 +320,9 @@ class RoomStore(SQLBaseStore): txn.execute(sql, (prev_id, current_id, limit,)) return txn.fetchall() + if prev_id == current_id: + return defer.succeed([]) + return self.runInteraction( "get_all_new_public_rooms", get_all_new_public_rooms ) diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index 1f13cd0bc0..b82868054d 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -42,7 +42,8 @@ class BaseSlavedStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def replicate(self): streams = self.slaved_store.stream_positions() - result = yield self.replication.replicate(streams, 100) + writer = yield self.replication.replicate(streams, 100) + result = writer.finish() yield self.slaved_store.process_replication(result) @defer.inlineCallbacks diff --git a/tests/replication/test_resource.py b/tests/replication/test_resource.py index b69832cc1b..f406934a62 100644 --- a/tests/replication/test_resource.py +++ b/tests/replication/test_resource.py @@ -120,7 +120,7 @@ class ReplicationResourceCase(unittest.TestCase): self.hs.clock.advance_time_msec(1) code, body = yield get self.assertEquals(code, 200) - self.assertEquals(body, {}) + self.assertEquals(body.get("rows", []), []) test_timeout.__name__ = "test_timeout_%s" % (stream) return test_timeout @@ -195,7 +195,6 @@ class ReplicationResourceCase(unittest.TestCase): self.assertIn("field_names", stream) field_names = stream["field_names"] self.assertIn("rows", stream) - self.assertTrue(stream["rows"]) for row in stream["rows"]: self.assertEquals( len(row), len(field_names), -- cgit 1.4.1 From 9bfc61779111624e972939491a0b5c02190d3463 Mon Sep 17 00:00:00 2001 From: Patrik Oldsberg Date: Thu, 6 Oct 2016 10:43:32 +0200 Subject: storage/appservice: make appservice methods only relying on the cache synchronous --- synapse/api/auth.py | 7 +++---- synapse/handlers/appservice.py | 20 +++++++++----------- synapse/handlers/directory.py | 11 ++++------- synapse/handlers/register.py | 5 ++--- synapse/handlers/room.py | 2 +- synapse/handlers/sync.py | 2 +- synapse/rest/client/v1/register.py | 2 +- synapse/storage/appservice.py | 12 ++++++------ tests/rest/client/v2_alpha/test_register.py | 2 +- tests/storage/test_appservice.py | 9 +++------ 10 files changed, 31 insertions(+), 41 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index e75fd518be..27599124d2 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -653,7 +653,7 @@ class Auth(object): @defer.inlineCallbacks def _get_appservice_user_id(self, request): - app_service = yield self.store.get_app_service_by_token( + app_service = self.store.get_app_service_by_token( get_access_token_from_request( request, self.TOKEN_NOT_FOUND_HTTP_STATUS ) @@ -855,13 +855,12 @@ class Auth(object): } defer.returnValue(user_info) - @defer.inlineCallbacks def get_appservice_by_req(self, request): try: token = get_access_token_from_request( request, self.TOKEN_NOT_FOUND_HTTP_STATUS ) - service = yield self.store.get_app_service_by_token(token) + service = self.store.get_app_service_by_token(token) if not service: logger.warn("Unrecognised appservice access token: %s" % (token,)) raise AuthError( @@ -870,7 +869,7 @@ class Auth(object): errcode=Codes.UNKNOWN_TOKEN ) request.authenticated_entity = service.sender - defer.returnValue(service) + return defer.succeed(service) except KeyError: raise AuthError( self.TOKEN_NOT_FOUND_HTTP_STATUS, "Missing access token." diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 88fa0bb2e4..05af54d31b 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -59,7 +59,7 @@ class ApplicationServicesHandler(object): Args: current_id(int): The current maximum ID. """ - services = yield self.store.get_app_services() + services = self.store.get_app_services() if not services or not self.notify_appservices: return @@ -142,7 +142,7 @@ class ApplicationServicesHandler(object): association can be found. """ room_alias_str = room_alias.to_string() - services = yield self.store.get_app_services() + services = self.store.get_app_services() alias_query_services = [ s for s in services if ( s.is_interested_in_alias(room_alias_str) @@ -177,7 +177,7 @@ class ApplicationServicesHandler(object): @defer.inlineCallbacks def get_3pe_protocols(self, only_protocol=None): - services = yield self.store.get_app_services() + services = self.store.get_app_services() protocols = {} # Collect up all the individual protocol responses out of the ASes @@ -224,7 +224,7 @@ class ApplicationServicesHandler(object): list: A list of services interested in this event based on the service regex. """ - services = yield self.store.get_app_services() + services = self.store.get_app_services() interested_list = [ s for s in services if ( yield s.is_interested(event, self.store) @@ -232,23 +232,21 @@ class ApplicationServicesHandler(object): ] defer.returnValue(interested_list) - @defer.inlineCallbacks def _get_services_for_user(self, user_id): - services = yield self.store.get_app_services() + services = self.store.get_app_services() interested_list = [ s for s in services if ( s.is_interested_in_user(user_id) ) ] - defer.returnValue(interested_list) + return defer.succeed(interested_list) - @defer.inlineCallbacks def _get_services_for_3pn(self, protocol): - services = yield self.store.get_app_services() + services = self.store.get_app_services() interested_list = [ s for s in services if s.is_interested_in_protocol(protocol) ] - defer.returnValue(interested_list) + return defer.succeed(interested_list) @defer.inlineCallbacks def _is_unknown_user(self, user_id): @@ -264,7 +262,7 @@ class ApplicationServicesHandler(object): return # user not found; could be the AS though, so check. - services = yield self.store.get_app_services() + services = self.store.get_app_services() service_list = [s for s in services if s.sender == user_id] defer.returnValue(len(service_list) == 0) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 14352985e2..c00274afc3 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -288,13 +288,12 @@ class DirectoryHandler(BaseHandler): result = yield as_handler.query_room_alias_exists(room_alias) defer.returnValue(result) - @defer.inlineCallbacks def can_modify_alias(self, alias, user_id=None): # Any application service "interested" in an alias they are regexing on # can modify the alias. # Users can only modify the alias if ALL the interested services have # non-exclusive locks on the alias (or there are no interested services) - services = yield self.store.get_app_services() + services = self.store.get_app_services() interested_services = [ s for s in services if s.is_interested_in_alias(alias.to_string()) ] @@ -302,14 +301,12 @@ class DirectoryHandler(BaseHandler): for service in interested_services: if user_id == service.sender: # this user IS the app service so they can do whatever they like - defer.returnValue(True) - return + return defer.succeed(True) elif service.is_exclusive_alias(alias.to_string()): # another service has an exclusive lock on this alias. - defer.returnValue(False) - return + return defer.succeed(False) # either no interested services, or no service with an exclusive lock - defer.returnValue(True) + return defer.succeed(True) @defer.inlineCallbacks def _user_can_delete_alias(self, alias, user_id): diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index dd75c4fecf..19329057d5 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -194,7 +194,7 @@ class RegistrationHandler(BaseHandler): def appservice_register(self, user_localpart, as_token): user = UserID(user_localpart, self.hs.hostname) user_id = user.to_string() - service = yield self.store.get_app_service_by_token(as_token) + service = self.store.get_app_service_by_token(as_token) if not service: raise AuthError(403, "Invalid application service token.") if not service.is_interested_in_user(user_id): @@ -305,11 +305,10 @@ class RegistrationHandler(BaseHandler): # XXX: This should be a deferred list, shouldn't it? yield identity_handler.bind_threepid(c, user_id) - @defer.inlineCallbacks def check_user_id_not_appservice_exclusive(self, user_id, allowed_appservice=None): # valid user IDs must not clash with any user ID namespaces claimed by # application services. - services = yield self.store.get_app_services() + services = self.store.get_app_services() interested_services = [ s for s in services if s.is_interested_in_user(user_id) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index cbd26f8f95..a7f533f7be 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -437,7 +437,7 @@ class RoomEventSource(object): logger.warn("Stream has topological part!!!! %r", from_key) from_key = "s%s" % (from_token.stream,) - app_service = yield self.store.get_app_service_by_user_id( + app_service = self.store.get_app_service_by_user_id( user.to_string() ) if app_service: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b5962f4f5a..1f910ff814 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -788,7 +788,7 @@ class SyncHandler(object): assert since_token - app_service = yield self.store.get_app_service_by_user_id(user_id) + app_service = self.store.get_app_service_by_user_id(user_id) if app_service: rooms = yield self.store.get_app_service_rooms(app_service) joined_room_ids = set(r.room_id for r in rooms) diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index 3046da7aec..fe4480b363 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -391,7 +391,7 @@ class CreateUserRestServlet(ClientV1RestServlet): user_json = parse_json_object_from_request(request) access_token = get_access_token_from_request(request) - app_service = yield self.store.get_app_service_by_token( + app_service = self.store.get_app_service_by_token( access_token ) if not app_service: diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index a854a87eab..3d5994a580 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -37,7 +37,7 @@ class ApplicationServiceStore(SQLBaseStore): ) def get_app_services(self): - return defer.succeed(self.services_cache) + return self.services_cache def get_app_service_by_user_id(self, user_id): """Retrieve an application service from their user ID. @@ -54,8 +54,8 @@ class ApplicationServiceStore(SQLBaseStore): """ for service in self.services_cache: if service.sender == user_id: - return defer.succeed(service) - return defer.succeed(None) + return service + return None def get_app_service_by_token(self, token): """Get the application service with the given appservice token. @@ -67,8 +67,8 @@ class ApplicationServiceStore(SQLBaseStore): """ for service in self.services_cache: if service.token == token: - return defer.succeed(service) - return defer.succeed(None) + return service + return None def get_app_service_rooms(self, service): """Get a list of RoomsForUser for this application service. @@ -163,7 +163,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): ["as_id"] ) # NB: This assumes this class is linked with ApplicationServiceStore - as_list = yield self.get_app_services() + as_list = self.get_app_services() services = [] for res in results: diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py index 8ac56a1fb2..e9cb416e4b 100644 --- a/tests/rest/client/v2_alpha/test_register.py +++ b/tests/rest/client/v2_alpha/test_register.py @@ -19,7 +19,7 @@ class RegisterRestServletTestCase(unittest.TestCase): self.appservice = None self.auth = Mock(get_appservice_by_req=Mock( - side_effect=lambda x: defer.succeed(self.appservice)) + side_effect=lambda x: self.appservice) ) self.auth_result = (False, None, None, None) diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 3e2862daae..f3df8302da 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -71,14 +71,12 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): outfile.write(yaml.dump(as_yaml)) self.as_yaml_files.append(as_token) - @defer.inlineCallbacks def test_retrieve_unknown_service_token(self): - service = yield self.store.get_app_service_by_token("invalid_token") + service = self.store.get_app_service_by_token("invalid_token") self.assertEquals(service, None) - @defer.inlineCallbacks def test_retrieval_of_service(self): - stored_service = yield self.store.get_app_service_by_token( + stored_service = self.store.get_app_service_by_token( self.as_token ) self.assertEquals(stored_service.token, self.as_token) @@ -97,9 +95,8 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): [] ) - @defer.inlineCallbacks def test_retrieval_of_all_services(self): - services = yield self.store.get_app_services() + services = self.store.get_app_services() self.assertEquals(len(services), 3) -- cgit 1.4.1 From 62073992c523cc9b40c342c8443966cda7d8b5a4 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 14 Oct 2016 13:56:53 +0100 Subject: Make password reset email field case insensitive --- synapse/storage/registration.py | 30 ++++++++++++++-------- .../36/user_threepids_medium_address_index.sql | 16 ++++++++++++ 2 files changed, 36 insertions(+), 10 deletions(-) create mode 100644 synapse/storage/schema/delta/36/user_threepids_medium_address_index.sql (limited to 'synapse/storage') diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index e404fa72de..a6aa64f9fb 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -458,17 +458,27 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): @defer.inlineCallbacks def get_user_id_by_threepid(self, medium, address): - ret = yield self._simple_select_one( - "user_threepids", - { - "medium": medium, - "address": address - }, - ['user_id'], True, 'get_user_id_by_threepid' + def f(txn): + sql = ( + "SELECT user_id" + " FROM user_threepids" + " WHERE medium = ? AND LOWER(address) = LOWER(?)" + ) + txn.execute(sql, (medium, address)) + row = txn.fetchone() + if not row: + return None + if txn.rowcount > 1: + raise StoreError(500, "More than one row matched") + return { + "user_id": row[0] + } + + res = yield self.runInteraction( + "get_user_id_by_threepid", f ) - if ret: - defer.returnValue(ret['user_id']) - defer.returnValue(None) + + defer.returnValue(res) def user_delete_threepids(self, user_id): return self._simple_delete( diff --git a/synapse/storage/schema/delta/36/user_threepids_medium_address_index.sql b/synapse/storage/schema/delta/36/user_threepids_medium_address_index.sql new file mode 100644 index 0000000000..702a872784 --- /dev/null +++ b/synapse/storage/schema/delta/36/user_threepids_medium_address_index.sql @@ -0,0 +1,16 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE INDEX user_threepids_medium_address on user_threepids (medium, LOWER(address)); -- cgit 1.4.1 From 29c592202136a3bdb04f78a49d02b7b53893a973 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 14 Oct 2016 16:20:24 +0100 Subject: Revert part of 6207399 older sqlite doesn't support indexes on expressions, lets just store things lowercase in the db --- synapse/storage/registration.py | 30 ++++++++++-------------------- 1 file changed, 10 insertions(+), 20 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index a6aa64f9fb..e404fa72de 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -458,27 +458,17 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): @defer.inlineCallbacks def get_user_id_by_threepid(self, medium, address): - def f(txn): - sql = ( - "SELECT user_id" - " FROM user_threepids" - " WHERE medium = ? AND LOWER(address) = LOWER(?)" - ) - txn.execute(sql, (medium, address)) - row = txn.fetchone() - if not row: - return None - if txn.rowcount > 1: - raise StoreError(500, "More than one row matched") - return { - "user_id": row[0] - } - - res = yield self.runInteraction( - "get_user_id_by_threepid", f + ret = yield self._simple_select_one( + "user_threepids", + { + "medium": medium, + "address": address + }, + ['user_id'], True, 'get_user_id_by_threepid' ) - - defer.returnValue(res) + if ret: + defer.returnValue(ret['user_id']) + defer.returnValue(None) def user_delete_threepids(self, user_id): return self._simple_delete( -- cgit 1.4.1 From 2869a29fd7d049d7a4dd95f1992e294187884bd3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Sep 2016 13:59:07 +0100 Subject: Drop some unused indices --- synapse/storage/events.py | 1 - .../storage/schema/delta/35/remove_auth_idx.sql | 16 ----- synapse/storage/schema/delta/36/remove_auth_idx.py | 83 ++++++++++++++++++++++ 3 files changed, 83 insertions(+), 17 deletions(-) delete mode 100644 synapse/storage/schema/delta/35/remove_auth_idx.sql create mode 100644 synapse/storage/schema/delta/36/remove_auth_idx.py (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 6dc46fa50f..65b3775f1f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -603,7 +603,6 @@ class EventsStore(SQLBaseStore): "rejections", "redactions", "room_memberships", - "state_events", "topics" ): txn.executemany( diff --git a/synapse/storage/schema/delta/35/remove_auth_idx.sql b/synapse/storage/schema/delta/35/remove_auth_idx.sql deleted file mode 100644 index a5bf3895b0..0000000000 --- a/synapse/storage/schema/delta/35/remove_auth_idx.sql +++ /dev/null @@ -1,16 +0,0 @@ -/* Copyright 2016 OpenMarket Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -DROP INDEX IF EXISTS evauth_edges_id; diff --git a/synapse/storage/schema/delta/36/remove_auth_idx.py b/synapse/storage/schema/delta/36/remove_auth_idx.py new file mode 100644 index 0000000000..deac8ea39f --- /dev/null +++ b/synapse/storage/schema/delta/36/remove_auth_idx.py @@ -0,0 +1,83 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage.prepare_database import get_statements +from synapse.storage.engines import PostgresEngine, Sqlite3Engine + +import logging + +logger = logging.getLogger(__name__) + +DROP_INDICES = """ +-- We only ever query based on event_id +DROP INDEX IF EXISTS state_events_room_id; +DROP INDEX IF EXISTS state_events_type; +DROP INDEX IF EXISTS state_events_state_key; + +-- room_id is indexed elsewhere +DROP INDEX IF EXISTS current_state_events_room_id; +DROP INDEX IF EXISTS current_state_events_state_key; +DROP INDEX IF EXISTS current_state_events_type; + +DROP INDEX IF EXISTS transactions_have_ref; + +-- (topological_ordering, stream_ordering, room_id) seems like a strange index, +-- and is used incredibly rarely. +DROP INDEX IF EXISTS events_order_topo_stream_room; + +-- TODO: Drop event_auth stuff. + +DROP INDEX IF EXISTS event_search_ev_idx; +""" + +POSTGRES_DROP_CONSTRAINT = """ +ALTER TABLE event_auth DROP CONSTRAINT IF EXISTS event_auth_event_id_auth_id_room_id_key; +""" + +SQLITE_DROP_CONSTRAINT = """ +DROP INDEX IF EXISTS evauth_edges_id; + +CREATE TABLE IF NOT EXISTS event_auth_new( + event_id TEXT NOT NULL, + auth_id TEXT NOT NULL, + room_id TEXT NOT NULL +); + +INSERT INTO event_auth_new + SELECT event_id, auth_id, room_id + FROM event_auth; + +DROP TABLE event_auth; + +ALTER TABLE event_auth_new RENAME TO event_auth; + +CREATE INDEX evauth_edges_id ON event_auth(event_id); +""" + + +def run_create(cur, database_engine, *args, **kwargs): + for statement in get_statements(DROP_INDICES.splitlines()): + cur.execute(statement) + + if isinstance(database_engine, PostgresEngine): + drop_constraint = POSTGRES_DROP_CONSTRAINT + else: + drop_constraint = SQLITE_DROP_CONSTRAINT + + for statement in get_statements(drop_constraint.splitlines()): + cur.execute(statement) + + +def run_upgrade(cur, database_engine, *args, **kwargs): + pass -- cgit 1.4.1 From b59994b45423d60b44a907e834db7e7702eaa6e2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 17 Oct 2016 11:17:02 +0100 Subject: Remove TODO --- synapse/storage/schema/delta/36/remove_auth_idx.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/36/remove_auth_idx.py b/synapse/storage/schema/delta/36/remove_auth_idx.py index 4be72a86bc..784f3b348f 100644 --- a/synapse/storage/schema/delta/36/remove_auth_idx.py +++ b/synapse/storage/schema/delta/36/remove_auth_idx.py @@ -36,8 +36,6 @@ DROP INDEX IF EXISTS transactions_have_ref; -- and is used incredibly rarely. DROP INDEX IF EXISTS events_order_topo_stream_room; --- TODO: Drop event_auth stuff. - DROP INDEX IF EXISTS event_search_ev_idx; """ -- cgit 1.4.1 From 6942d68247976e572fc4c56171da9303bfd7319f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 17 Oct 2016 11:17:45 +0100 Subject: Bump schema version --- synapse/storage/prepare_database.py | 2 +- synapse/storage/schema/delta/36/remove_auth_idx.py | 81 ---------------------- synapse/storage/schema/delta/37/remove_auth_idx.py | 81 ++++++++++++++++++++++ 3 files changed, 82 insertions(+), 82 deletions(-) delete mode 100644 synapse/storage/schema/delta/36/remove_auth_idx.py create mode 100644 synapse/storage/schema/delta/37/remove_auth_idx.py (limited to 'synapse/storage') diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 08de3cc4c1..d2c0aebe48 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 36 +SCHEMA_VERSION = 37 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/36/remove_auth_idx.py b/synapse/storage/schema/delta/36/remove_auth_idx.py deleted file mode 100644 index 784f3b348f..0000000000 --- a/synapse/storage/schema/delta/36/remove_auth_idx.py +++ /dev/null @@ -1,81 +0,0 @@ -# Copyright 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.storage.prepare_database import get_statements -from synapse.storage.engines import PostgresEngine - -import logging - -logger = logging.getLogger(__name__) - -DROP_INDICES = """ --- We only ever query based on event_id -DROP INDEX IF EXISTS state_events_room_id; -DROP INDEX IF EXISTS state_events_type; -DROP INDEX IF EXISTS state_events_state_key; - --- room_id is indexed elsewhere -DROP INDEX IF EXISTS current_state_events_room_id; -DROP INDEX IF EXISTS current_state_events_state_key; -DROP INDEX IF EXISTS current_state_events_type; - -DROP INDEX IF EXISTS transactions_have_ref; - --- (topological_ordering, stream_ordering, room_id) seems like a strange index, --- and is used incredibly rarely. -DROP INDEX IF EXISTS events_order_topo_stream_room; - -DROP INDEX IF EXISTS event_search_ev_idx; -""" - -POSTGRES_DROP_CONSTRAINT = """ -ALTER TABLE event_auth DROP CONSTRAINT IF EXISTS event_auth_event_id_auth_id_room_id_key; -""" - -SQLITE_DROP_CONSTRAINT = """ -DROP INDEX IF EXISTS evauth_edges_id; - -CREATE TABLE IF NOT EXISTS event_auth_new( - event_id TEXT NOT NULL, - auth_id TEXT NOT NULL, - room_id TEXT NOT NULL -); - -INSERT INTO event_auth_new - SELECT event_id, auth_id, room_id - FROM event_auth; - -DROP TABLE event_auth; - -ALTER TABLE event_auth_new RENAME TO event_auth; - -CREATE INDEX evauth_edges_id ON event_auth(event_id); -""" - - -def run_create(cur, database_engine, *args, **kwargs): - for statement in get_statements(DROP_INDICES.splitlines()): - cur.execute(statement) - - if isinstance(database_engine, PostgresEngine): - drop_constraint = POSTGRES_DROP_CONSTRAINT - else: - drop_constraint = SQLITE_DROP_CONSTRAINT - - for statement in get_statements(drop_constraint.splitlines()): - cur.execute(statement) - - -def run_upgrade(cur, database_engine, *args, **kwargs): - pass diff --git a/synapse/storage/schema/delta/37/remove_auth_idx.py b/synapse/storage/schema/delta/37/remove_auth_idx.py new file mode 100644 index 0000000000..784f3b348f --- /dev/null +++ b/synapse/storage/schema/delta/37/remove_auth_idx.py @@ -0,0 +1,81 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage.prepare_database import get_statements +from synapse.storage.engines import PostgresEngine + +import logging + +logger = logging.getLogger(__name__) + +DROP_INDICES = """ +-- We only ever query based on event_id +DROP INDEX IF EXISTS state_events_room_id; +DROP INDEX IF EXISTS state_events_type; +DROP INDEX IF EXISTS state_events_state_key; + +-- room_id is indexed elsewhere +DROP INDEX IF EXISTS current_state_events_room_id; +DROP INDEX IF EXISTS current_state_events_state_key; +DROP INDEX IF EXISTS current_state_events_type; + +DROP INDEX IF EXISTS transactions_have_ref; + +-- (topological_ordering, stream_ordering, room_id) seems like a strange index, +-- and is used incredibly rarely. +DROP INDEX IF EXISTS events_order_topo_stream_room; + +DROP INDEX IF EXISTS event_search_ev_idx; +""" + +POSTGRES_DROP_CONSTRAINT = """ +ALTER TABLE event_auth DROP CONSTRAINT IF EXISTS event_auth_event_id_auth_id_room_id_key; +""" + +SQLITE_DROP_CONSTRAINT = """ +DROP INDEX IF EXISTS evauth_edges_id; + +CREATE TABLE IF NOT EXISTS event_auth_new( + event_id TEXT NOT NULL, + auth_id TEXT NOT NULL, + room_id TEXT NOT NULL +); + +INSERT INTO event_auth_new + SELECT event_id, auth_id, room_id + FROM event_auth; + +DROP TABLE event_auth; + +ALTER TABLE event_auth_new RENAME TO event_auth; + +CREATE INDEX evauth_edges_id ON event_auth(event_id); +""" + + +def run_create(cur, database_engine, *args, **kwargs): + for statement in get_statements(DROP_INDICES.splitlines()): + cur.execute(statement) + + if isinstance(database_engine, PostgresEngine): + drop_constraint = POSTGRES_DROP_CONSTRAINT + else: + drop_constraint = SQLITE_DROP_CONSTRAINT + + for statement in get_statements(drop_constraint.splitlines()): + cur.execute(statement) + + +def run_upgrade(cur, database_engine, *args, **kwargs): + pass -- cgit 1.4.1 From e8b1d2a45200d88a576360a83e3dff1aac4ad679 Mon Sep 17 00:00:00 2001 From: pik Date: Tue, 18 Oct 2016 12:17:38 -0500 Subject: Refactor test_filter to use real DataStore * add tests for filter api errors --- synapse/rest/client/v2_alpha/filter.py | 4 +- synapse/storage/_base.py | 1 - tests/rest/client/v2_alpha/test_filter.py | 124 +++++++++++++++++++----------- 3 files changed, 83 insertions(+), 46 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/rest/client/v2_alpha/filter.py b/synapse/rest/client/v2_alpha/filter.py index f7758fc68c..b4084fec62 100644 --- a/synapse/rest/client/v2_alpha/filter.py +++ b/synapse/rest/client/v2_alpha/filter.py @@ -74,6 +74,7 @@ class CreateFilterRestServlet(RestServlet): @defer.inlineCallbacks def on_POST(self, request, user_id): + target_user = UserID.from_string(user_id) requester = yield self.auth.get_user_by_req(request) @@ -81,10 +82,9 @@ class CreateFilterRestServlet(RestServlet): raise AuthError(403, "Cannot create filters for other users") if not self.hs.is_mine(target_user): - raise SynapseError(400, "Can only create filters for local users") + raise AuthError(403, "Can only create filters for local users") content = parse_json_object_from_request(request) - filter_id = yield self.filtering.add_user_filter( user_localpart=target_user.localpart, user_filter=content, diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 49fa8614f2..d828d6ee1d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -85,7 +85,6 @@ class LoggingTransaction(object): sql_logger.debug("[SQL] {%s} %s", self.name, sql) sql = self.database_engine.convert_param_style(sql) - if args: try: sql_logger.debug( diff --git a/tests/rest/client/v2_alpha/test_filter.py b/tests/rest/client/v2_alpha/test_filter.py index 47ca5e8c8a..3d27d03cbf 100644 --- a/tests/rest/client/v2_alpha/test_filter.py +++ b/tests/rest/client/v2_alpha/test_filter.py @@ -15,87 +15,125 @@ from twisted.internet import defer -from . import V2AlphaRestTestCase +from tests import unittest from synapse.rest.client.v2_alpha import filter -from synapse.api.errors import StoreError, Codes +from synapse.api.errors import Codes +import synapse.types + +from synapse.types import UserID + +from ....utils import MockHttpResource, setup_test_homeserver + +PATH_PREFIX = "/_matrix/client/v2_alpha" + + +class FilterTestCase(unittest.TestCase): -class FilterTestCase(V2AlphaRestTestCase): USER_ID = "@apple:test" + EXAMPLE_FILTER = {"type": ["m.*"]} + EXAMPLE_FILTER_JSON = '{"type": ["m.*"]}' TO_REGISTER = [filter] - def make_datastore_mock(self): - datastore = super(FilterTestCase, self).make_datastore_mock() + @defer.inlineCallbacks + def setUp(self): + self.mock_resource = MockHttpResource(prefix=PATH_PREFIX) + + self.hs = yield setup_test_homeserver( + http_client=None, + resource_for_client=self.mock_resource, + resource_for_federation=self.mock_resource, + ) + + self.auth = self.hs.get_auth() - self._user_filters = {} + def get_user_by_access_token(token=None, allow_guest=False): + return { + "user": UserID.from_string(self.USER_ID), + "token_id": 1, + "is_guest": False, + } - def add_user_filter(user_localpart, definition): - filters = self._user_filters.setdefault(user_localpart, []) - filter_id = len(filters) - filters.append(definition) - return defer.succeed(filter_id) - datastore.add_user_filter = add_user_filter + def get_user_by_req(request, allow_guest=False, rights="access"): + return synapse.types.create_requester( + UserID.from_string(self.USER_ID), 1, False, None) - def get_user_filter(user_localpart, filter_id): - if user_localpart not in self._user_filters: - raise StoreError(404, "No user") - filters = self._user_filters[user_localpart] - if filter_id >= len(filters): - raise StoreError(404, "No filter") - return defer.succeed(filters[filter_id]) - datastore.get_user_filter = get_user_filter + self.auth.get_user_by_access_token = get_user_by_access_token + self.auth.get_user_by_req = get_user_by_req - return datastore + self.store = self.hs.get_datastore() + self.filtering = self.hs.get_filtering() + + for r in self.TO_REGISTER: + r.register_servlets(self.hs, self.mock_resource) @defer.inlineCallbacks def test_add_filter(self): (code, response) = yield self.mock_resource.trigger( - "POST", "/user/%s/filter" % (self.USER_ID), '{"type": ["m.*"]}' + "POST", "/user/%s/filter" % (self.USER_ID), self.EXAMPLE_FILTER_JSON ) self.assertEquals(200, code) self.assertEquals({"filter_id": "0"}, response) + filter = yield self.store.get_user_filter( + user_localpart='apple', + filter_id=0, + ) + self.assertEquals(filter, self.EXAMPLE_FILTER) + + @defer.inlineCallbacks + def test_add_filter_for_other_user(self): + (code, response) = yield self.mock_resource.trigger( + "POST", "/user/%s/filter" % ('@watermelon:test'), self.EXAMPLE_FILTER_JSON + ) + self.assertEquals(403, code) + self.assertEquals(response['errcode'], Codes.FORBIDDEN) - self.assertIn("apple", self._user_filters) - self.assertEquals(len(self._user_filters["apple"]), 1) - self.assertEquals({"type": ["m.*"]}, self._user_filters["apple"][0]) + @defer.inlineCallbacks + def test_add_filter_non_local_user(self): + _is_mine = self.hs.is_mine + self.hs.is_mine = lambda target_user: False + (code, response) = yield self.mock_resource.trigger( + "POST", "/user/%s/filter" % (self.USER_ID), self.EXAMPLE_FILTER_JSON + ) + self.hs.is_mine = _is_mine + self.assertEquals(403, code) + self.assertEquals(response['errcode'], Codes.FORBIDDEN) @defer.inlineCallbacks def test_get_filter(self): - self._user_filters["apple"] = [ - {"type": ["m.*"]} - ] - + filter_id = yield self.filtering.add_user_filter( + user_localpart='apple', + user_filter=self.EXAMPLE_FILTER + ) (code, response) = yield self.mock_resource.trigger_get( - "/user/%s/filter/0" % (self.USER_ID) + "/user/%s/filter/%s" % (self.USER_ID, filter_id) ) self.assertEquals(200, code) - self.assertEquals({"type": ["m.*"]}, response) + self.assertEquals(self.EXAMPLE_FILTER, response) @defer.inlineCallbacks - def test_get_filter_no_id(self): - self._user_filters["apple"] = [ - {"type": ["m.*"]} - ] - + def test_get_filter_non_existant(self): (code, response) = yield self.mock_resource.trigger_get( - "/user/%s/filter/2" % (self.USER_ID) + "/user/%s/filter/12382148321" % (self.USER_ID) ) self.assertEquals(400, code) + self.assertEquals(response['errcode'], Codes.NOT_FOUND) + # Currently invalid params do not have an appropriate errcode + # in errors.py @defer.inlineCallbacks - def test_get_filter_no_user(self): + def test_get_filter_invalid_id(self): (code, response) = yield self.mock_resource.trigger_get( - "/user/%s/filter/0" % (self.USER_ID) + "/user/%s/filter/foobar" % (self.USER_ID) ) self.assertEquals(400, code) - self.assertEquals(response['errcode'], Codes.FORBIDDEN) + # No ID also returns an invalid_id error @defer.inlineCallbacks - def test_get_filter_missing_id(self): + def test_get_filter_no_id(self): (code, response) = yield self.mock_resource.trigger_get( - "/user/%s/filter/0" % (self.USER_ID) + "/user/%s/filter/" % (self.USER_ID) ) self.assertEquals(400, code) - self.assertEquals(response['errcode'], Codes.NOT_FOUND) -- cgit 1.4.1 From df2a616c7b028a6eb8b50c57e7e73847287a6feb Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 19 Oct 2016 11:13:55 +0100 Subject: Convert emails to lowercase when storing And db migration sql to convert existing addresses. --- synapse/handlers/auth.py | 12 +++++++++++ synapse/storage/schema/delta/36/user_threepids.sql | 23 ++++++++++++++++++++++ .../36/user_threepids_medium_address_index.sql | 16 --------------- 3 files changed, 35 insertions(+), 16 deletions(-) create mode 100644 synapse/storage/schema/delta/36/user_threepids.sql delete mode 100644 synapse/storage/schema/delta/36/user_threepids_medium_address_index.sql (limited to 'synapse/storage') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index dc0fe60e1b..3635521230 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -611,6 +611,18 @@ class AuthHandler(BaseHandler): @defer.inlineCallbacks def add_threepid(self, user_id, medium, address, validated_at): + # 'Canonicalise' email addresses down to lower case. + # We've now moving towards the Home Server being the entity that + # is responsible for validating threepids used for resetting passwords + # on accounts, so in future Synapse will gain knowledge of specific + # types (mediums) of threepid. For now, we still use the existing + # infrastructure, but this is the start of synapse gaining knowledge + # of specific types of threepid (and fixes the fact that checking + # for the presenc eof an email address during password reset was + # case sensitive). + if medium == 'email': + address = address.lower() + yield self.store.user_add_threepid( user_id, medium, address, validated_at, self.hs.get_clock().time_msec() diff --git a/synapse/storage/schema/delta/36/user_threepids.sql b/synapse/storage/schema/delta/36/user_threepids.sql new file mode 100644 index 0000000000..ef8813e72a --- /dev/null +++ b/synapse/storage/schema/delta/36/user_threepids.sql @@ -0,0 +1,23 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Update any email addresses that were stored with mixed case into all + * lowercase + */ +UPDATE user_threepids SET address = LOWER(address) where medium = 'email'; + +/* Add an index for the select we do on passwored reset */ +CREATE INDEX user_threepids_medium_address on user_threepids (medium, address); diff --git a/synapse/storage/schema/delta/36/user_threepids_medium_address_index.sql b/synapse/storage/schema/delta/36/user_threepids_medium_address_index.sql deleted file mode 100644 index 702a872784..0000000000 --- a/synapse/storage/schema/delta/36/user_threepids_medium_address_index.sql +++ /dev/null @@ -1,16 +0,0 @@ -/* Copyright 2016 OpenMarket Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -CREATE INDEX user_threepids_medium_address on user_threepids (medium, LOWER(address)); -- cgit 1.4.1 From 0108ed8ae6430b551a6d8e9f05820c615631a84b Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 19 Oct 2016 11:40:35 +0100 Subject: Latest delta is now 37 --- synapse/storage/schema/delta/36/user_threepids.sql | 23 ---------------------- synapse/storage/schema/delta/37/user_threepids.sql | 23 ++++++++++++++++++++++ 2 files changed, 23 insertions(+), 23 deletions(-) delete mode 100644 synapse/storage/schema/delta/36/user_threepids.sql create mode 100644 synapse/storage/schema/delta/37/user_threepids.sql (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/36/user_threepids.sql b/synapse/storage/schema/delta/36/user_threepids.sql deleted file mode 100644 index ef8813e72a..0000000000 --- a/synapse/storage/schema/delta/36/user_threepids.sql +++ /dev/null @@ -1,23 +0,0 @@ -/* Copyright 2016 OpenMarket Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * Update any email addresses that were stored with mixed case into all - * lowercase - */ -UPDATE user_threepids SET address = LOWER(address) where medium = 'email'; - -/* Add an index for the select we do on passwored reset */ -CREATE INDEX user_threepids_medium_address on user_threepids (medium, address); diff --git a/synapse/storage/schema/delta/37/user_threepids.sql b/synapse/storage/schema/delta/37/user_threepids.sql new file mode 100644 index 0000000000..ef8813e72a --- /dev/null +++ b/synapse/storage/schema/delta/37/user_threepids.sql @@ -0,0 +1,23 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Update any email addresses that were stored with mixed case into all + * lowercase + */ +UPDATE user_threepids SET address = LOWER(address) where medium = 'email'; + +/* Add an index for the select we do on passwored reset */ +CREATE INDEX user_threepids_medium_address on user_threepids (medium, address); -- cgit 1.4.1 From d04e2ff3a43cca3f7d393a4770f022c7bf1a372c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Oct 2016 13:35:51 +0100 Subject: Fix incredubly slow back pagination query If a client didn't specify a from token when paginating backwards synapse would attempt to query the (global) maximum topological token. This a) doesn't make much sense since they're room specific and b) there are no indices that lets postgres do this efficiently. --- synapse/handlers/message.py | 4 ++-- synapse/handlers/room.py | 7 +++++-- synapse/storage/stream.py | 19 +++++++++++++------ synapse/streams/events.py | 30 ++++++++++++++++++++++++++++-- 4 files changed, 48 insertions(+), 12 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 59eb26beaf..abfa8c65a4 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -82,8 +82,8 @@ class MessageHandler(BaseHandler): room_token = pagin_config.from_token.room_key else: pagin_config.from_token = ( - yield self.hs.get_event_sources().get_current_token( - direction='b' + yield self.hs.get_event_sources().get_current_token_for_room( + room_id=room_id ) ) room_token = pagin_config.from_token.room_key diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a7f533f7be..59e4d1cd15 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -475,8 +475,11 @@ class RoomEventSource(object): defer.returnValue((events, end_key)) - def get_current_key(self, direction='f'): - return self.store.get_room_events_max_id(direction) + def get_current_key(self): + return self.store.get_room_events_max_id() + + def get_current_key_for_room(self, room_id): + return self.store.get_room_events_max_id(room_id) @defer.inlineCallbacks def get_pagination_rows(self, user, config, key): diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 07ea969d4d..888b1cb35d 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -521,13 +521,20 @@ class StreamStore(SQLBaseStore): ) @defer.inlineCallbacks - def get_room_events_max_id(self, direction='f'): + def get_room_events_max_id(self, room_id=None): + """Returns the current token for rooms stream. + + By default, it returns the current global stream token. Specifying a + `room_id` causes it to return the current room specific topological + token. + """ token = yield self._stream_id_gen.get_current_token() - if direction != 'b': + if room_id is None: defer.returnValue("s%d" % (token,)) else: topo = yield self.runInteraction( - "_get_max_topological_txn", self._get_max_topological_txn + "_get_max_topological_txn", self._get_max_topological_txn, + room_id, ) defer.returnValue("t%d-%d" % (topo, token)) @@ -579,11 +586,11 @@ class StreamStore(SQLBaseStore): lambda r: r[0][0] if r else 0 ) - def _get_max_topological_txn(self, txn): + def _get_max_topological_txn(self, txn, room_id): txn.execute( "SELECT MAX(topological_ordering) FROM events" - " WHERE outlier = ?", - (False,) + " WHERE room_id = ?", + (room_id,) ) rows = txn.fetchall() diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 6bf21d6f5e..4018dbde56 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -41,13 +41,39 @@ class EventSources(object): self.store = hs.get_datastore() @defer.inlineCallbacks - def get_current_token(self, direction='f'): + def get_current_token(self): push_rules_key, _ = self.store.get_push_rules_stream_token() to_device_key = self.store.get_to_device_stream_token() token = StreamToken( room_key=( - yield self.sources["room"].get_current_key(direction) + yield self.sources["room"].get_current_key() + ), + presence_key=( + yield self.sources["presence"].get_current_key() + ), + typing_key=( + yield self.sources["typing"].get_current_key() + ), + receipt_key=( + yield self.sources["receipt"].get_current_key() + ), + account_data_key=( + yield self.sources["account_data"].get_current_key() + ), + push_rules_key=push_rules_key, + to_device_key=to_device_key, + ) + defer.returnValue(token) + + @defer.inlineCallbacks + def get_current_token_for_room(self, room_id): + push_rules_key, _ = self.store.get_push_rules_stream_token() + to_device_key = self.store.get_to_device_stream_token() + + token = StreamToken( + room_key=( + yield self.sources["room"].get_current_key() ), presence_key=( yield self.sources["presence"].get_current_key() -- cgit 1.4.1 From 1fc1bc2a51b025e46e3d44056ea1836dfc48cc6e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 27 Oct 2016 14:14:44 +0100 Subject: Fix user_threepids schema delta The delta `37/user_threepids.sql` aimed to update all the email addresses to be lower case, however duplicate emails may exist in the table already. This commit adds a step where the delta moves the duplicate emails to a new `medium` `email_old`. Only the most recently used account keeps the binding intact. We move rather than delete so that we retain some record of which emails were associated with which account. --- synapse/storage/schema/delta/37/user_threepids.sql | 39 +++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/37/user_threepids.sql b/synapse/storage/schema/delta/37/user_threepids.sql index ef8813e72a..5f483a2ed7 100644 --- a/synapse/storage/schema/delta/37/user_threepids.sql +++ b/synapse/storage/schema/delta/37/user_threepids.sql @@ -17,7 +17,44 @@ * Update any email addresses that were stored with mixed case into all * lowercase */ -UPDATE user_threepids SET address = LOWER(address) where medium = 'email'; + + -- There may be "duplicate" emails (with different case) already in the table, + -- so we find them and move all but the most recently used account. + UPDATE user_threepids + SET medium = 'email_old' + WHERE medium = 'email' + AND address IN ( + -- `user_last_seen` maps user_ids to the last time we saw them + WITH user_last_seen AS ( + SELECT user_id, max(last_seen) AS ts FROM user_ips GROUP BY user_id + ), + -- `duplicate_addresses` is a table of all the email addresses that + -- appear multiple times and the most recently we saw any of their users + duplicate_addresses AS ( + SELECT lower(u1.address) AS address, max(ts.ts) AS max_ts + FROM user_threepids AS u1 + INNER JOIN user_threepids AS u2 ON u1.medium = u2.medium AND lower(u1.address) = lower(u2.address) AND u1.address != u2.address + INNER JOIN user_last_seen as ts ON ts.user_id = u1.user_id + WHERE u1.medium = 'email' AND u2.medium = 'email' + GROUP BY lower(u1.address) + ) + -- We select all the addresses that are linked to the user_id that is NOT + -- the most recently seen. + SELECT u.address + FROM + user_threepids AS u, + duplicate_addresses, + user_last_seen AS ts + WHERE + lower(u.address) = duplicate_addresses.address + AND u.user_id = ts.user_id + AND ts.ts != max_ts -- NOT the most recently used + ); + + +-- This update is now safe since we've removed the duplicate addresses. +UPDATE user_threepids SET address = LOWER(address) WHERE medium = 'email'; + /* Add an index for the select we do on passwored reset */ CREATE INDEX user_threepids_medium_address on user_threepids (medium, address); -- cgit 1.4.1 From a9111786f9fb848a8012976ba3c310492b20d0e2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 27 Oct 2016 14:32:45 +0100 Subject: Use most recently added binding, not most recently seen user. --- synapse/storage/schema/delta/37/user_threepids.sql | 30 ++++++++-------------- 1 file changed, 11 insertions(+), 19 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/37/user_threepids.sql b/synapse/storage/schema/delta/37/user_threepids.sql index 5f483a2ed7..cf7a90dd10 100644 --- a/synapse/storage/schema/delta/37/user_threepids.sql +++ b/synapse/storage/schema/delta/37/user_threepids.sql @@ -24,31 +24,23 @@ SET medium = 'email_old' WHERE medium = 'email' AND address IN ( - -- `user_last_seen` maps user_ids to the last time we saw them - WITH user_last_seen AS ( - SELECT user_id, max(last_seen) AS ts FROM user_ips GROUP BY user_id - ), - -- `duplicate_addresses` is a table of all the email addresses that - -- appear multiple times and the most recently we saw any of their users - duplicate_addresses AS ( - SELECT lower(u1.address) AS address, max(ts.ts) AS max_ts - FROM user_threepids AS u1 - INNER JOIN user_threepids AS u2 ON u1.medium = u2.medium AND lower(u1.address) = lower(u2.address) AND u1.address != u2.address - INNER JOIN user_last_seen as ts ON ts.user_id = u1.user_id - WHERE u1.medium = 'email' AND u2.medium = 'email' - GROUP BY lower(u1.address) - ) -- We select all the addresses that are linked to the user_id that is NOT - -- the most recently seen. + -- the most recently created. SELECT u.address FROM user_threepids AS u, - duplicate_addresses, - user_last_seen AS ts + -- `duplicate_addresses` is a table of all the email addresses that + -- appear multiple times and when the binding was created + ( + SELECT lower(u1.address) AS address, max(u1.added_at) AS max_ts + FROM user_threepids AS u1 + INNER JOIN user_threepids AS u2 ON u1.medium = u2.medium AND lower(u1.address) = lower(u2.address) AND u1.address != u2.address + WHERE u1.medium = 'email' AND u2.medium = 'email' + GROUP BY lower(u1.address) + ) AS duplicate_addresses WHERE lower(u.address) = duplicate_addresses.address - AND u.user_id = ts.user_id - AND ts.ts != max_ts -- NOT the most recently used + AND u.added_at != max_ts -- NOT the most recently created ); -- cgit 1.4.1