From 9e982750ee5d0872c2157a444070878f2e3a6e4f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 15 Mar 2016 13:24:31 +0000 Subject: Persist rejection of invites over federation --- synapse/handlers/federation.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 27f2b40bfe..86ed37e9f3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -813,7 +813,23 @@ class FederationHandler(BaseHandler): target_hosts, signed_event ) - defer.returnValue(None) + + context = yield self.state_handler.compute_event_context(event) + + event_stream_id, max_stream_id = yield self.store.persist_event( + event, + context=context, + backfilled=False, + ) + + target_user = UserID.from_string(event.state_key) + with PreserveLoggingContext(): + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=[target_user], + ) + + defer.returnValue(event) @defer.inlineCallbacks def _make_and_verify_event(self, target_hosts, room_id, user_id, membership, -- cgit 1.5.1 From e5f0e5893127b9474ed8ea38827a9d143cbff1e8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 15 Mar 2016 13:48:40 +0000 Subject: Remove needless PreserveLoggingContext --- synapse/handlers/federation.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 86ed37e9f3..f599e817aa 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -823,11 +823,10 @@ class FederationHandler(BaseHandler): ) target_user = UserID.from_string(event.state_key) - with PreserveLoggingContext(): - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=[target_user], - ) + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=[target_user], + ) defer.returnValue(event) -- cgit 1.5.1 From b6e8420aeed9921ba7d0fd4c8ebaf1b64d5f677c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 15 Mar 2016 17:01:43 +0000 Subject: Add replication stream for pushers --- synapse/replication/resource.py | 25 ++++++++- synapse/storage/__init__.py | 5 +- synapse/storage/pusher.py | 63 ++++++++++++++++------ .../storage/schema/delta/30/deleted_pushers.sql | 24 +++++++++ synapse/storage/util/id_generators.py | 7 ++- tests/replication/test_resource.py | 1 + 6 files changed, 107 insertions(+), 18 deletions(-) create mode 100644 synapse/storage/schema/delta/30/deleted_pushers.sql diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index adc1eb1d0b..8c1ae0fbc7 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -37,6 +37,7 @@ STREAM_NAMES = ( ("user_account_data", "room_account_data", "tag_account_data",), ("backfill",), ("push_rules",), + ("pushers",), ) @@ -65,6 +66,7 @@ class ReplicationResource(Resource): * "tag_account_data": Per room per user tags. * "backfill": Old events that have been backfilled from other servers. * "push_rules": Per user changes to push rules. + * "pushers": Per user changes to their pushers. The API takes two additional query parameters: @@ -120,6 +122,7 @@ class ReplicationResource(Resource): stream_token = yield self.sources.get_current_token() backfill_token = yield self.store.get_current_backfill_token() push_rules_token, room_stream_token = self.store.get_push_rules_stream_token() + pushers_token = self.store.get_pushers_stream_token() defer.returnValue(_ReplicationToken( room_stream_token, @@ -129,6 +132,7 @@ class ReplicationResource(Resource): int(stream_token.account_data_key), backfill_token, push_rules_token, + pushers_token, )) @request_handler @@ -151,6 +155,7 @@ class ReplicationResource(Resource): yield self.typing(writer, current_token) # TODO: implement limit yield self.receipts(writer, current_token, limit) yield self.push_rules(writer, current_token, limit) + yield self.pushers(writer, current_token, limit) self.streams(writer, current_token) logger.info("Replicated %d rows", writer.total) @@ -297,6 +302,24 @@ class ReplicationResource(Resource): "priority_class", "priority", "conditions", "actions" )) + @defer.inlineCallbacks + def pushers(self, writer, current_token, limit): + current_position = current_token.pushers + + pushers = parse_integer(writer.request, "pushers") + if pushers is not None: + updated, deleted = yield self.store.get_all_updated_pushers( + pushers, current_position, limit + ) + writer.write_header_and_rows("pushers", updated, ( + "position", "user_id", "access_token", "profile_tag", "kind", + "app_id", "app_display_name", "device_display_name", "pushkey", + "ts", "lang", "data" + )) + writer.write_header_and_rows("deleted", deleted, ( + "position", "user_id", "app_id", "pushkey" + )) + class _Writer(object): """Writes the streams as a JSON object as the response to the request""" @@ -327,7 +350,7 @@ class _Writer(object): class _ReplicationToken(collections.namedtuple("_ReplicationToken", ( "events", "presence", "typing", "receipts", "account_data", "backfill", - "push_rules" + "push_rules", "pushers" ))): __slots__ = [] diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 168eb27b03..250ba536ea 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -119,12 +119,15 @@ class DataStore(RoomMemberStore, RoomStore, self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id") self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id") self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id") - self._pushers_id_gen = IdGenerator(db_conn, "pushers", "id") self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id") self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id") self._push_rules_stream_id_gen = ChainedIdGenerator( self._stream_id_gen, db_conn, "push_rules_stream", "stream_id" ) + self._pushers_id_gen = StreamIdGenerator( + db_conn, "pushers", "id", + extra_tables=[("deleted_pushers", "stream_id")], + ) events_max = self._stream_id_gen.get_max_token() event_cache_prefill, min_event_val = self._get_cache_dict( diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 7693ab9082..29da3bbd13 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -16,8 +16,6 @@ from ._base import SQLBaseStore from twisted.internet import defer -from synapse.api.errors import StoreError - from canonicaljson import encode_canonical_json import logging @@ -79,12 +77,41 @@ class PusherStore(SQLBaseStore): rows = yield self.runInteraction("get_all_pushers", get_pushers) defer.returnValue(rows) + def get_pushers_stream_token(self): + return self._pushers_id_gen.get_max_token() + + def get_all_updated_pushers(self, last_id, current_id, limit): + def get_all_updated_pushers_txn(txn): + sql = ( + "SELECT id, user_name, access_token, profile_tag, kind," + " app_id, app_display_name, device_display_name, pushkey, ts," + " lang, data" + " FROM pushers" + " WHERE ? < id AND id <= ?" + " ORDER BY id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + updated = txn.fetchall() + + sql = ( + "SELECT stream_id, user_id, app_id, pushkey" + " FROM deleted_pushers" + " WHERE ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + deleted = txn.fetchall() + + return (updated, deleted) + return self.runInteraction( + "get_all_updated_pushers", get_all_updated_pushers_txn + ) + @defer.inlineCallbacks def add_pusher(self, user_id, access_token, kind, app_id, app_display_name, device_display_name, pushkey, pushkey_ts, lang, data, profile_tag=""): - try: - next_id = self._pushers_id_gen.get_next() + with self._pushers_id_gen.get_next() as stream_id: yield self._simple_upsert( "pushers", dict( @@ -101,23 +128,29 @@ class PusherStore(SQLBaseStore): lang=lang, data=encode_canonical_json(data), profile_tag=profile_tag, - ), - insertion_values=dict( - id=next_id, + id=stream_id, ), desc="add_pusher", ) - except Exception as e: - logger.error("create_pusher with failed: %s", e) - raise StoreError(500, "Problem creating pusher.") @defer.inlineCallbacks def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): - yield self._simple_delete_one( - "pushers", - {"app_id": app_id, "pushkey": pushkey, 'user_name': user_id}, - desc="delete_pusher_by_app_id_pushkey_user_id", - ) + def delete_pusher_txn(txn, stream_id): + self._simple_delete_one( + txn, + "pushers", + {"app_id": app_id, "pushkey": pushkey, "user_name": user_id} + ) + self._simple_upsert_txn( + txn, + "deleted_pushers", + {"app_id": app_id, "pushkey": pushkey, "user_id": user_id}, + {"stream_id", stream_id}, + ) + with self._pushers_id_gen.get_next() as stream_id: + yield self.runInteraction( + "delete_pusher", delete_pusher_txn, stream_id + ) @defer.inlineCallbacks def update_pusher_last_token(self, app_id, pushkey, user_id, last_token): diff --git a/synapse/storage/schema/delta/30/deleted_pushers.sql b/synapse/storage/schema/delta/30/deleted_pushers.sql new file mode 100644 index 0000000000..cdcf79ac81 --- /dev/null +++ b/synapse/storage/schema/delta/30/deleted_pushers.sql @@ -0,0 +1,24 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS deleted_pushers( + stream_id BIGINT NOT NULL, + app_id TEXT NOT NULL, + pushkey TEXT NOT NULL, + user_id TEXT NOT NULL, + UNIQUE (app_id, pushkey, user_id) +); + +CREATE INDEX deleted_pushers_stream_id ON deleted_pushers (stream_id); diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 610ddad423..a02dfc7d58 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -49,9 +49,14 @@ class StreamIdGenerator(object): with stream_id_gen.get_next() as stream_id: # ... persist event ... """ - def __init__(self, db_conn, table, column): + def __init__(self, db_conn, table, column, extra_tables=[]): self._lock = threading.Lock() self._current_max = _load_max_id(db_conn, table, column) + for table, column in extra_tables: + self._current_max = max( + self._current_max, + _load_max_id(db_conn, table, column) + ) self._unfinished_ids = deque() def get_next(self): diff --git a/tests/replication/test_resource.py b/tests/replication/test_resource.py index 4a42eb3365..f4b5fb3328 100644 --- a/tests/replication/test_resource.py +++ b/tests/replication/test_resource.py @@ -131,6 +131,7 @@ class ReplicationResourceCase(unittest.TestCase): test_timeout_tag_account_data = _test_timeout("tag_account_data") test_timeout_backfill = _test_timeout("backfill") test_timeout_push_rules = _test_timeout("push_rules") + test_timeout_pushers = _test_timeout("pushers") @defer.inlineCallbacks def send_text_message(self, room_id, message): -- cgit 1.5.1 From 12904932c41c73714543b817157f09073fcc2625 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 15 Mar 2016 17:41:06 +0000 Subject: Hook up adding a pusher to the notifier for replication. --- synapse/notifier.py | 6 ++++++ synapse/rest/client/v1/pusher.py | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/synapse/notifier.py b/synapse/notifier.py index 9b69b0333a..f00cd8c588 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -282,6 +282,12 @@ class Notifier(object): self.notify_replication() + def on_new_replication_data(self): + """Used to inform replication listeners that something has happend + without waking up any of the normal user event streams""" + with PreserveLoggingContext(): + self.notify_replication() + @defer.inlineCallbacks def wait_for_events(self, user_id, timeout, callback, room_ids=None, from_token=StreamToken.START): diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py index ee029b4f77..9881f068c3 100644 --- a/synapse/rest/client/v1/pusher.py +++ b/synapse/rest/client/v1/pusher.py @@ -29,6 +29,10 @@ logger = logging.getLogger(__name__) class PusherRestServlet(ClientV1RestServlet): PATTERNS = client_path_patterns("/pushers/set$") + def __init__(self, hs): + super(PusherRestServlet, self).__init__(hs) + self.notifier = hs.get_notifier() + @defer.inlineCallbacks def on_POST(self, request): requester = yield self.auth.get_user_by_req(request) @@ -87,6 +91,8 @@ class PusherRestServlet(ClientV1RestServlet): raise SynapseError(400, "Config Error: " + pce.message, errcode=Codes.MISSING_PARAM) + self.notifier.on_new_replication_data() + defer.returnValue((200, {})) def on_OPTIONS(self, _): -- cgit 1.5.1 From ee32d622cec56f2ab7b11577d15e4b805477d13f Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 15 Mar 2016 17:47:36 +0000 Subject: Fix a couple of errors when deleting pushers --- synapse/storage/pusher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 29da3bbd13..87b2ac5773 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -136,7 +136,7 @@ class PusherStore(SQLBaseStore): @defer.inlineCallbacks def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): def delete_pusher_txn(txn, stream_id): - self._simple_delete_one( + self._simple_delete_one_txn( txn, "pushers", {"app_id": app_id, "pushkey": pushkey, "user_name": user_id} @@ -145,7 +145,7 @@ class PusherStore(SQLBaseStore): txn, "deleted_pushers", {"app_id": app_id, "pushkey": pushkey, "user_id": user_id}, - {"stream_id", stream_id}, + {"stream_id": stream_id}, ) with self._pushers_id_gen.get_next() as stream_id: yield self.runInteraction( -- cgit 1.5.1 From a877209c8b0c7c476ee6676c6d00c4cacdc83207 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 16 Mar 2016 09:45:37 +0000 Subject: Password reset docs and script Replace the bash/perl gen_password script with a python one, and write a note on how to use it. --- README.rst | 20 ++++++++++++++++++++ scripts/gen_password | 1 - scripts/hash_password | 39 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 59 insertions(+), 1 deletion(-) delete mode 100644 scripts/gen_password create mode 100755 scripts/hash_password diff --git a/README.rst b/README.rst index 8a745259bf..a48a0802b2 100644 --- a/README.rst +++ b/README.rst @@ -525,6 +525,26 @@ Logging In To An Existing Account Just enter the ``@localpart:my.domain.here`` Matrix user ID and password into the form and click the Login button. +Password reset +============== + +Synapse does not yet support a password-reset function (see +https://matrix.org/jira/browse/SYN-11). In the meantime, it is possible to +manually reset a user's password via direct database access. + +First calculate the hash of the new password: + + $ source ~/.synapse/bin/activate + $ ./scripts/hash_password + Password: + Confirm password: + $2a$12$xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + +Then update the `users` table in the database: + + UPDATE users SET password_hash='$2a$12$xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' + WHERE name='@test:test.com'; + Identity Servers ================ diff --git a/scripts/gen_password b/scripts/gen_password deleted file mode 100644 index 7afd3a5dfd..0000000000 --- a/scripts/gen_password +++ /dev/null @@ -1 +0,0 @@ -perl -MCrypt::Random -MCrypt::Eksblowfish::Bcrypt -e 'print Crypt::Eksblowfish::Bcrypt::bcrypt("secret", "\$2\$12\$" . Crypt::Eksblowfish::Bcrypt::en_base64(Crypt::Random::makerandom_octet(Length=>16)))."\n"' diff --git a/scripts/hash_password b/scripts/hash_password new file mode 100755 index 0000000000..e784600989 --- /dev/null +++ b/scripts/hash_password @@ -0,0 +1,39 @@ +#!/usr/bin/env python + +import argparse +import bcrypt +import getpass + +bcrypt_rounds=12 + +def prompt_for_pass(): + password = getpass.getpass("Password: ") + + if not password: + raise Exception("Password cannot be blank.") + + confirm_password = getpass.getpass("Confirm password: ") + + if password != confirm_password: + raise Exception("Passwords do not match.") + + return password + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Calculate the hash of a new password, so that passwords" + " can be reset") + parser.add_argument( + "-p", "--password", + default=None, + help="New password for user. Will prompt if omitted.", + ) + + args = parser.parse_args() + password = args.password + + if not password: + password = prompt_for_pass() + + print bcrypt.hashpw(password, bcrypt.gensalt(bcrypt_rounds)) + -- cgit 1.5.1 From ba660ecde20544ac1cfc163a5586f4f202627afa Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 16 Mar 2016 10:35:00 +0000 Subject: Add a comment to offer a hint to an explanation for why we have a unique constraint on (app_id, pushkey, user_id) --- synapse/storage/schema/delta/30/deleted_pushers.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/storage/schema/delta/30/deleted_pushers.sql b/synapse/storage/schema/delta/30/deleted_pushers.sql index cdcf79ac81..712c454aa1 100644 --- a/synapse/storage/schema/delta/30/deleted_pushers.sql +++ b/synapse/storage/schema/delta/30/deleted_pushers.sql @@ -18,6 +18,7 @@ CREATE TABLE IF NOT EXISTS deleted_pushers( app_id TEXT NOT NULL, pushkey TEXT NOT NULL, user_id TEXT NOT NULL, + /* We only track the most recent delete for each app_id, pushkey and user_id. */ UNIQUE (app_id, pushkey, user_id) ); -- cgit 1.5.1 From 660ae8e0f3c7f667b3a24b02f095d60c2b09531f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 16 Mar 2016 10:40:38 +0000 Subject: Clarify that we do have reset functionality via the IS --- README.rst | 41 ++++++++++++++++++++--------------------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/README.rst b/README.rst index a48a0802b2..285fc5aa8a 100644 --- a/README.rst +++ b/README.rst @@ -525,27 +525,6 @@ Logging In To An Existing Account Just enter the ``@localpart:my.domain.here`` Matrix user ID and password into the form and click the Login button. -Password reset -============== - -Synapse does not yet support a password-reset function (see -https://matrix.org/jira/browse/SYN-11). In the meantime, it is possible to -manually reset a user's password via direct database access. - -First calculate the hash of the new password: - - $ source ~/.synapse/bin/activate - $ ./scripts/hash_password - Password: - Confirm password: - $2a$12$xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx - -Then update the `users` table in the database: - - UPDATE users SET password_hash='$2a$12$xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' - WHERE name='@test:test.com'; - - Identity Servers ================ @@ -565,6 +544,26 @@ as the primary means of identity and E2E encryption is not complete. As such, we are running a single identity server (https://matrix.org) at the current time. +Password reset +============== + +If a user has registered an email address to their account using an identity +server, they can request a password-reset token via clients such as Vector. + +A manual password reset can be done via direct database access as follows. + +First calculate the hash of the new password: + + $ source ~/.synapse/bin/activate + $ ./scripts/hash_password + Password: + Confirm password: + $2a$12$xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + +Then update the `users` table in the database: + + UPDATE users SET password_hash='$2a$12$xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' + WHERE name='@test:test.com'; Where's the spec?! ================== -- cgit 1.5.1 From c12b9d719a3cf1eeb9c4c8d354dbaecab5e76233 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 11:56:24 +0000 Subject: Make registration idempotent: if you specify the same session, make it give you an access token for the user that was registered on previous uses of that session. Tweak the UI auth layer to not delete sessions when their auth has completed and hence expire themn so they don't hang around until server restart. Allow server-side data to be associated with UI auth sessions. --- synapse/handlers/auth.py | 60 +++++++++++++++++++++++++------- synapse/rest/client/v2_alpha/register.py | 27 +++++++++++++- 2 files changed, 74 insertions(+), 13 deletions(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 5c0ea636bc..5dc9d91757 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -27,6 +27,7 @@ import logging import bcrypt import pymacaroons import simplejson +import time import synapse.util.stringutils as stringutils @@ -35,6 +36,7 @@ logger = logging.getLogger(__name__) class AuthHandler(BaseHandler): + SESSION_EXPIRE_SECS = 48 * 60 * 60 def __init__(self, hs): super(AuthHandler, self).__init__(hs) @@ -66,15 +68,18 @@ class AuthHandler(BaseHandler): 'auth' key: this method prompts for auth if none is sent. clientip (str): The IP address of the client. Returns: - A tuple of (authed, dict, dict) where authed is true if the client - has successfully completed an auth flow. If it is true, the first - dict contains the authenticated credentials of each stage. + A tuple of (authed, dict, dict, session_id) where authed is true if + the client has successfully completed an auth flow. If it is true + the first dict contains the authenticated credentials of each stage. If authed is false, the first dictionary is the server response to the login request and should be passed back to the client. In either case, the second dict contains the parameters for this request (which may have been given only in a previous call). + + session_id is the ID of this session, either passed in by the client + or assigned by the call to check_auth """ authdict = None @@ -103,7 +108,10 @@ class AuthHandler(BaseHandler): if not authdict: defer.returnValue( - (False, self._auth_dict_for_flows(flows, session), clientdict) + ( + False, self._auth_dict_for_flows(flows, session), + clientdict, session['id'] + ) ) if 'creds' not in session: @@ -122,12 +130,11 @@ class AuthHandler(BaseHandler): for f in flows: if len(set(f) - set(creds.keys())) == 0: logger.info("Auth completed with creds: %r", creds) - self._remove_session(session) - defer.returnValue((True, creds, clientdict)) + defer.returnValue((True, creds, clientdict, session['id'])) ret = self._auth_dict_for_flows(flows, session) ret['completed'] = creds.keys() - defer.returnValue((False, ret, clientdict)) + defer.returnValue((False, ret, clientdict, session['id'])) @defer.inlineCallbacks def add_oob_auth(self, stagetype, authdict, clientip): @@ -154,6 +161,29 @@ class AuthHandler(BaseHandler): defer.returnValue(True) defer.returnValue(False) + def set_session_data(self, session_id, key, value): + """ + Store a key-value pair into the sessions data associated with this + request. This data is stored server-side and cannot be modified by + the client. + :param session_id: (string) The ID of this session as returned from check_auth + :param key: (string) The key to store the data under + :param value: (any) The data to store + """ + sess = self._get_session_info(session_id) + sess.setdefault('serverdict', {})[key] = value + self._save_session(sess) + + def get_session_data(self, session_id, key, default=None): + """ + Retrieve data stored with set_session_data + :param session_id: (string) The ID of this session as returned from check_auth + :param key: (string) The key to store the data under + :param default: (any) Value to return if the key has not been set + """ + sess = self._get_session_info(session_id) + return sess.setdefault('serverdict', {}).get(key, default) + @defer.inlineCallbacks def _check_password_auth(self, authdict, _): if "user" not in authdict or "password" not in authdict: @@ -263,7 +293,7 @@ class AuthHandler(BaseHandler): if not session_id: # create a new session while session_id is None or session_id in self.sessions: - session_id = stringutils.random_string(24) + session_id = stringutils.random_string_with_symbols(24) self.sessions[session_id] = { "id": session_id, } @@ -455,11 +485,17 @@ class AuthHandler(BaseHandler): def _save_session(self, session): # TODO: Persistent storage logger.debug("Saving session %s", session) + session["last_used"] = time.time() self.sessions[session["id"]] = session - - def _remove_session(self, session): - logger.debug("Removing session %s", session) - del self.sessions[session["id"]] + self._prune_sessions() + + def _prune_sessions(self): + for sid,sess in self.sessions.items(): + last_used = 0 + if 'last_used' in sess: + last_used = sess['last_used'] + if last_used < time.time() - AuthHandler.SESSION_EXPIRE_SECS: + del self.sessions[sid] def hash(self, password): """Computes a secure hash of password. diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 533ff136eb..649491bdf6 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -139,7 +139,7 @@ class RegisterRestServlet(RestServlet): [LoginType.EMAIL_IDENTITY] ] - authed, result, params = yield self.auth_handler.check_auth( + authed, result, params, session_id = yield self.auth_handler.check_auth( flows, body, self.hs.get_ip_from_request(request) ) @@ -147,6 +147,24 @@ class RegisterRestServlet(RestServlet): defer.returnValue((401, result)) return + # have we already registered a user for this session + registered_user_id = self.auth_handler.get_session_data( + session_id, "registered_user_id", None + ) + if registered_user_id is not None: + logger.info( + "Already registered user ID %r for this session", + registered_user_id + ) + access_token = yield self.auth_handler.issue_access_token(registered_user_id) + refresh_token = yield self.auth_handler.issue_refresh_token(registered_user_id) + defer.returnValue((200, { + "user_id": registered_user_id, + "access_token": access_token, + "home_server": self.hs.hostname, + "refresh_token": refresh_token, + })) + # NB: This may be from the auth handler and NOT from the POST if 'password' not in params: raise SynapseError(400, "Missing password.", Codes.MISSING_PARAM) @@ -161,6 +179,13 @@ class RegisterRestServlet(RestServlet): guest_access_token=guest_access_token, ) + # remember that we've now registered that user account, and with what + # user ID (since the user may not have specified) + logger.info("%r", body) + self.auth_handler.set_session_data( + session_id, "registered_user_id", user_id + ) + if result and LoginType.EMAIL_IDENTITY in result: threepid = result[LoginType.EMAIL_IDENTITY] -- cgit 1.5.1 From 99797947aa5a7cdf8fe12043b4f25a155bcf4555 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 12:51:34 +0000 Subject: pep8 & remove debug logging --- synapse/handlers/auth.py | 2 +- synapse/rest/client/v2_alpha/register.py | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 5dc9d91757..a9f5e3710b 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -490,7 +490,7 @@ class AuthHandler(BaseHandler): self._prune_sessions() def _prune_sessions(self): - for sid,sess in self.sessions.items(): + for sid, sess in self.sessions.items(): last_used = 0 if 'last_used' in sess: last_used = sess['last_used'] diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 649491bdf6..c440430e25 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -149,7 +149,7 @@ class RegisterRestServlet(RestServlet): # have we already registered a user for this session registered_user_id = self.auth_handler.get_session_data( - session_id, "registered_user_id", None + session_id, "registered_user_id", None ) if registered_user_id is not None: logger.info( @@ -157,7 +157,9 @@ class RegisterRestServlet(RestServlet): registered_user_id ) access_token = yield self.auth_handler.issue_access_token(registered_user_id) - refresh_token = yield self.auth_handler.issue_refresh_token(registered_user_id) + refresh_token = yield self.auth_handler.issue_refresh_token( + registered_user_id + ) defer.returnValue((200, { "user_id": registered_user_id, "access_token": access_token, @@ -181,9 +183,8 @@ class RegisterRestServlet(RestServlet): # remember that we've now registered that user account, and with what # user ID (since the user may not have specified) - logger.info("%r", body) self.auth_handler.set_session_data( - session_id, "registered_user_id", user_id + session_id, "registered_user_id", user_id ) if result and LoginType.EMAIL_IDENTITY in result: -- cgit 1.5.1 From ff7d3dc3a03538b08f36342b15492a348b2b0391 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 14:25:14 +0000 Subject: Fix tests --- tests/rest/client/v2_alpha/test_register.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py index 9a202e9dd7..affd42c015 100644 --- a/tests/rest/client/v2_alpha/test_register.py +++ b/tests/rest/client/v2_alpha/test_register.py @@ -22,9 +22,10 @@ class RegisterRestServletTestCase(unittest.TestCase): side_effect=lambda x: defer.succeed(self.appservice)) ) - self.auth_result = (False, None, None) + self.auth_result = (False, None, None, None) self.auth_handler = Mock( - check_auth=Mock(side_effect=lambda x, y, z: self.auth_result) + check_auth=Mock(side_effect=lambda x, y, z: self.auth_result), + get_session_data=Mock(return_value=None) ) self.registration_handler = Mock() self.identity_handler = Mock() @@ -112,7 +113,7 @@ class RegisterRestServletTestCase(unittest.TestCase): self.auth_result = (True, None, { "username": "kermit", "password": "monkey" - }) + }, None) self.registration_handler.register = Mock(return_value=(user_id, token)) (code, result) = yield self.servlet.on_POST(self.request) @@ -135,7 +136,7 @@ class RegisterRestServletTestCase(unittest.TestCase): self.auth_result = (True, None, { "username": "kermit", "password": "monkey" - }) + }, None) self.registration_handler.register = Mock(return_value=("@user:id", "t")) d = self.servlet.on_POST(self.request) return self.assertFailure(d, SynapseError) -- cgit 1.5.1 From f5e90422f5d70afaf9bdf97cc620b563cf31a8eb Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 14:33:19 +0000 Subject: take extra return val from check_auth in account too --- synapse/rest/client/v2_alpha/account.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index dd4ea45588..7f8a6a4cf7 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -43,7 +43,7 @@ class PasswordRestServlet(RestServlet): body = parse_json_object_from_request(request) - authed, result, params = yield self.auth_handler.check_auth([ + authed, result, params, _ = yield self.auth_handler.check_auth([ [LoginType.PASSWORD], [LoginType.EMAIL_IDENTITY] ], body, self.hs.get_ip_from_request(request)) -- cgit 1.5.1 From 742b6c6d158f46a71724821ce13b8ad535df08bc Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 15:42:35 +0000 Subject: Use hs get_clock instead of time.time() --- synapse/handlers/auth.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index a9f5e3710b..dba6c76dfc 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -36,7 +36,7 @@ logger = logging.getLogger(__name__) class AuthHandler(BaseHandler): - SESSION_EXPIRE_SECS = 48 * 60 * 60 + SESSION_EXPIRE_MS = 48 * 60 * 60 * 1000 def __init__(self, hs): super(AuthHandler, self).__init__(hs) @@ -494,7 +494,7 @@ class AuthHandler(BaseHandler): last_used = 0 if 'last_used' in sess: last_used = sess['last_used'] - if last_used < time.time() - AuthHandler.SESSION_EXPIRE_SECS: + if last_used < self.hs.get_clock().time() - AuthHandler.SESSION_EXPIRE_MS: del self.sessions[sid] def hash(self, password): -- cgit 1.5.1 From 9671e6750c1756c7f76888b87eedfe7516ef748b Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 15:51:28 +0000 Subject: Replace other time.time(). --- synapse/handlers/auth.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index dba6c76dfc..9fa8347098 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -27,7 +27,6 @@ import logging import bcrypt import pymacaroons import simplejson -import time import synapse.util.stringutils as stringutils @@ -485,7 +484,7 @@ class AuthHandler(BaseHandler): def _save_session(self, session): # TODO: Persistent storage logger.debug("Saving session %s", session) - session["last_used"] = time.time() + session["last_used"] = self.hs.get_clock().time_msec() self.sessions[session["id"]] = session self._prune_sessions() -- cgit 1.5.1 From 3176aebf9d827eeb939438deea49e12ceddc5b3e Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 15:55:49 +0000 Subject: string with symbols is a bit too symboly. --- synapse/handlers/auth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 9fa8347098..85f9b82712 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -292,7 +292,7 @@ class AuthHandler(BaseHandler): if not session_id: # create a new session while session_id is None or session_id in self.sessions: - session_id = stringutils.random_string_with_symbols(24) + session_id = stringutils.random_string(24) self.sessions[session_id] = { "id": session_id, } -- cgit 1.5.1 From 3ee7d7dc7f1793beefee433f780af81a64dfa590 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 16:18:52 +0000 Subject: time_msec() --- synapse/handlers/auth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 85f9b82712..cdf9e9000c 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -493,7 +493,7 @@ class AuthHandler(BaseHandler): last_used = 0 if 'last_used' in sess: last_used = sess['last_used'] - if last_used < self.hs.get_clock().time() - AuthHandler.SESSION_EXPIRE_MS: + if last_used < self.hs.get_clock().time_msec() - AuthHandler.SESSION_EXPIRE_MS: del self.sessions[sid] def hash(self, password): -- cgit 1.5.1 From b58d10a87595bd64305f46c2ac86252a67d7b0e4 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 16:22:20 +0000 Subject: pep8 --- synapse/handlers/auth.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index cdf9e9000c..d7233cd0d6 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -493,7 +493,8 @@ class AuthHandler(BaseHandler): last_used = 0 if 'last_used' in sess: last_used = sess['last_used'] - if last_used < self.hs.get_clock().time_msec() - AuthHandler.SESSION_EXPIRE_MS: + now = self.hs.get_clock().time_msec() + if last_used < now - AuthHandler.SESSION_EXPIRE_MS: del self.sessions[sid] def hash(self, password): -- cgit 1.5.1 From a7daa5ae131cc860769d859cf03b48cefdc0500a Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 19:36:57 +0000 Subject: Make registration idempotent, part 2: be idempotent if the client specifies a username. --- synapse/handlers/auth.py | 14 ++++++++++++++ synapse/handlers/register.py | 12 +++++++++++- synapse/rest/client/v2_alpha/register.py | 22 +++++++++++++++++----- 3 files changed, 42 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index d7233cd0d6..82d458b424 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -160,6 +160,20 @@ class AuthHandler(BaseHandler): defer.returnValue(True) defer.returnValue(False) + def get_session_id(self, clientdict): + """ + Gets the session ID for a client given the client dictionary + :param clientdict: The dictionary sent by the client in the request + :return: The string session ID the client sent. If the client did not + send a session ID, returns None. + """ + sid = None + if clientdict and 'auth' in clientdict: + authdict = clientdict['auth'] + if 'session' in authdict: + sid = authdict['session'] + return sid + def set_session_data(self, session_id, key, value): """ Store a key-value pair into the sessions data associated with this diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 6ffb8c0da6..f287ee247b 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -47,7 +47,8 @@ class RegistrationHandler(BaseHandler): self._next_generated_user_id = None @defer.inlineCallbacks - def check_username(self, localpart, guest_access_token=None): + def check_username(self, localpart, guest_access_token=None, + assigned_user_id=None): yield run_on_reactor() if urllib.quote(localpart.encode('utf-8')) != localpart: @@ -60,6 +61,15 @@ class RegistrationHandler(BaseHandler): user = UserID(localpart, self.hs.hostname) user_id = user.to_string() + if assigned_user_id: + if user_id == assigned_user_id: + return + else: + raise SynapseError( + 400, + "A different user ID has already been registered for this session", + ) + yield self.check_user_id_not_appservice_exclusive(user_id) users = yield self.store.get_users_by_id_case_insensitive(user_id) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index c440430e25..b8590560d3 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -16,6 +16,7 @@ from twisted.internet import defer from synapse.api.constants import LoginType +from synapse.types import UserID from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError from synapse.http.servlet import RestServlet, parse_json_object_from_request @@ -122,10 +123,25 @@ class RegisterRestServlet(RestServlet): guest_access_token = body.get("guest_access_token", None) + session_id = self.auth_handler.get_session_id(body) + logger.error("session id: %r", session_id) + registered_user_id = None + if session_id: + # if we get a registered user id out of here, it means we previously + # registered a user for this session, so we could just return the + # user here. We carry on and go through the auth checks though, + # for paranoia. + registered_user_id = self.auth_handler.get_session_data( + session_id, "registered_user_id", None + ) + logger.error("already regged: %r", registered_user_id) + logger.error("check: %r", desired_username) + if desired_username is not None: yield self.registration_handler.check_username( desired_username, - guest_access_token=guest_access_token + guest_access_token=guest_access_token, + assigned_user_id=registered_user_id, ) if self.hs.config.enable_registration_captcha: @@ -147,10 +163,6 @@ class RegisterRestServlet(RestServlet): defer.returnValue((401, result)) return - # have we already registered a user for this session - registered_user_id = self.auth_handler.get_session_data( - session_id, "registered_user_id", None - ) if registered_user_id is not None: logger.info( "Already registered user ID %r for this session", -- cgit 1.5.1 From f984decd6636baa4974a136e2ce8d4fecab3146f Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 19:40:48 +0000 Subject: Unused import --- synapse/rest/client/v2_alpha/register.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index b8590560d3..d3e66740ad 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -16,7 +16,6 @@ from twisted.internet import defer from synapse.api.constants import LoginType -from synapse.types import UserID from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError from synapse.http.servlet import RestServlet, parse_json_object_from_request -- cgit 1.5.1 From 5670205e2a0e4b87005be743eb6cdfd817fe89ae Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 16 Mar 2016 19:49:42 +0000 Subject: remove debug logging --- synapse/rest/client/v2_alpha/register.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index d3e66740ad..d32c06c882 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -123,7 +123,6 @@ class RegisterRestServlet(RestServlet): guest_access_token = body.get("guest_access_token", None) session_id = self.auth_handler.get_session_id(body) - logger.error("session id: %r", session_id) registered_user_id = None if session_id: # if we get a registered user id out of here, it means we previously @@ -133,8 +132,6 @@ class RegisterRestServlet(RestServlet): registered_user_id = self.auth_handler.get_session_data( session_id, "registered_user_id", None ) - logger.error("already regged: %r", registered_user_id) - logger.error("check: %r", desired_username) if desired_username is not None: yield self.registration_handler.check_username( -- cgit 1.5.1 From 4ebb688f4ffcef2784df0aa7f5b088e72b28c07b Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 17 Mar 2016 10:59:12 +0000 Subject: Add option to definitions.py to search for functions a function refers to --- scripts-dev/definitions.py | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/scripts-dev/definitions.py b/scripts-dev/definitions.py index 8340c72618..47dac7772d 100755 --- a/scripts-dev/definitions.py +++ b/scripts-dev/definitions.py @@ -86,9 +86,12 @@ def used_names(prefix, item, defs, names): for name, funcs in defs.get('class', {}).items(): used_names(prefix + name + ".", name, funcs, names) + path = prefix.rstrip('.') for used in defs.get('uses', ()): if used in names: - names[used].setdefault('used', {}).setdefault(item, []).append(prefix.rstrip('.')) + if item: + names[item].setdefault('uses', []).append(used) + names[used].setdefault('used', {}).setdefault(item, []).append(path) if __name__ == '__main__': @@ -113,6 +116,10 @@ if __name__ == '__main__': "--referrers", default=0, type=int, help="Include referrers up to the given depth" ) + parser.add_argument( + "--referred", default=0, type=int, + help="Include referred down to the given depth" + ) parser.add_argument( "--format", default="yaml", help="Output format, one of 'yaml' or 'dot'" @@ -161,6 +168,20 @@ if __name__ == '__main__': continue result[name] = definition + referred_depth = args.referred + referred = set() + while referred_depth: + referred_depth -= 1 + for entry in result.values(): + for uses in entry.get("uses", ()): + referred.add(uses) + for name, definition in names.items(): + if not name in referred: + continue + if ignore and any(pattern.match(name) for pattern in ignore): + continue + result[name] = definition + if args.format == 'yaml': yaml.dump(result, sys.stdout, default_flow_style=False) elif args.format == 'dot': -- cgit 1.5.1 From 673c96ce97052126f5bfd11c7dcc19880614ec25 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 17 Mar 2016 11:01:28 +0000 Subject: Remove dead code left over from presence changes --- synapse/handlers/events.py | 70 ---------------------------------------- synapse/handlers/presence.py | 4 --- synapse/storage/roommember.py | 24 -------------- tests/storage/test_roommember.py | 10 ------ 4 files changed, 108 deletions(-) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 72a31a9755..f25a252523 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -18,7 +18,6 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.types import UserID from synapse.events.utils import serialize_event -from synapse.util.logcontext import preserve_context_over_fn from synapse.api.constants import Membership, EventTypes from synapse.events import EventBase @@ -31,20 +30,6 @@ import random logger = logging.getLogger(__name__) -def started_user_eventstream(distributor, user): - return preserve_context_over_fn( - distributor.fire, - "started_user_eventstream", user - ) - - -def stopped_user_eventstream(distributor, user): - return preserve_context_over_fn( - distributor.fire, - "stopped_user_eventstream", user - ) - - class EventStreamHandler(BaseHandler): def __init__(self, hs): @@ -63,61 +48,6 @@ class EventStreamHandler(BaseHandler): self.notifier = hs.get_notifier() - @defer.inlineCallbacks - def started_stream(self, user): - """Tells the presence handler that we have started an eventstream for - the user: - - Args: - user (User): The user who started a stream. - Returns: - A deferred that completes once their presence has been updated. - """ - if user not in self._streams_per_user: - # Make sure we set the streams per user to 1 here rather than - # setting it to zero and incrementing the value below. - # Otherwise this may race with stopped_stream causing the - # user to be erased from the map before we have a chance - # to increment it. - self._streams_per_user[user] = 1 - if user in self._stop_timer_per_user: - try: - self.clock.cancel_call_later( - self._stop_timer_per_user.pop(user) - ) - except: - logger.exception("Failed to cancel event timer") - else: - yield started_user_eventstream(self.distributor, user) - else: - self._streams_per_user[user] += 1 - - def stopped_stream(self, user): - """If there are no streams for a user this starts a timer that will - notify the presence handler that we haven't got an event stream for - the user unless the user starts a new stream in 30 seconds. - - Args: - user (User): The user who stopped a stream. - """ - self._streams_per_user[user] -= 1 - if not self._streams_per_user[user]: - del self._streams_per_user[user] - - # 30 seconds of grace to allow the client to reconnect again - # before we think they're gone - def _later(): - logger.debug("_later stopped_user_eventstream %s", user) - - self._stop_timer_per_user.pop(user, None) - - return stopped_user_eventstream(self.distributor, user) - - logger.debug("Scheduling _later: for %s", user) - self._stop_timer_per_user[user] = ( - self.clock.call_later(30, _later) - ) - @defer.inlineCallbacks @log_function def get_stream(self, auth_user_id, pagin_config, timeout=0, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index f6cf343174..cfbcf2d32c 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -73,10 +73,6 @@ FEDERATION_PING_INTERVAL = 25 * 60 * 1000 assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER -def user_presence_changed(distributor, user, statuscache): - return distributor.fire("user_presence_changed", user, statuscache) - - def collect_presencelike_data(distributor, user, content): return distributor.fire("collect_presencelike_data", user, content) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 3065b0c1a5..0cd89260f2 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -251,30 +251,6 @@ class RoomMemberStore(SQLBaseStore): user_id, membership_list=[Membership.JOIN], ) - @defer.inlineCallbacks - def user_rooms_intersect(self, user_id_list): - """ Checks whether all the users whose IDs are given in a list share a - room. - - This is a "hot path" function that's called a lot, e.g. by presence for - generating the event stream. As such, it is implemented locally by - wrapping logic around heavily-cached database queries. - """ - if len(user_id_list) < 2: - defer.returnValue(True) - - deferreds = [self.get_rooms_for_user(u) for u in user_id_list] - - results = yield defer.DeferredList(deferreds, consumeErrors=True) - - # A list of sets of strings giving room IDs for each user - room_id_lists = [set([r.room_id for r in result[1]]) for result in results] - - # There isn't a setintersection(*list_of_sets) - ret = len(room_id_lists.pop(0).intersection(*room_id_lists)) > 0 - - defer.returnValue(ret) - @defer.inlineCallbacks def forget(self, user_id, room_id): """Indicate that user_id wishes to discard history for room_id.""" diff --git a/tests/storage/test_roommember.py b/tests/storage/test_roommember.py index 677d11f68d..b029ff0584 100644 --- a/tests/storage/test_roommember.py +++ b/tests/storage/test_roommember.py @@ -91,11 +91,6 @@ class RoomMemberStoreTestCase(unittest.TestCase): ) )] ) - self.assertFalse( - (yield self.store.user_rooms_intersect( - [self.u_alice.to_string(), self.u_bob.to_string()] - )) - ) @defer.inlineCallbacks def test_two_members(self): @@ -108,11 +103,6 @@ class RoomMemberStoreTestCase(unittest.TestCase): yield self.store.get_room_members(self.room.to_string()) )} ) - self.assertTrue(( - yield self.store.user_rooms_intersect([ - self.u_alice.to_string(), self.u_bob.to_string() - ]) - )) @defer.inlineCallbacks def test_room_hosts(self): -- cgit 1.5.1 From 2cd9260500efa82713edd365f54d491ac0328fb0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 17 Mar 2016 11:09:03 +0000 Subject: Update aliases event after deletion Attempt to update the appropriate `m.room.aliases` event after deleting an alias. This may fail due to the deleter not being in the room. Will also check if the canonical alias of the event is set to the deleted alias, and if so will attempt to delete it. --- synapse/handlers/directory.py | 52 ++++++++++++++++++++++++++++++++----- synapse/rest/client/v1/directory.py | 3 ++- 2 files changed, 48 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index c4aaa11918..be9f2a21b2 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -32,6 +32,8 @@ class DirectoryHandler(BaseHandler): def __init__(self, hs): super(DirectoryHandler, self).__init__(hs) + self.state = hs.get_state_handler() + self.federation = hs.get_replication_layer() self.federation.register_query_handler( "directory", self.on_directory_query @@ -93,7 +95,7 @@ class DirectoryHandler(BaseHandler): yield self._create_association(room_alias, room_id, servers) @defer.inlineCallbacks - def delete_association(self, user_id, room_alias): + def delete_association(self, requester, user_id, room_alias): # association deletion for human users can_delete = yield self._user_can_delete_alias(room_alias, user_id) @@ -112,7 +114,25 @@ class DirectoryHandler(BaseHandler): errcode=Codes.EXCLUSIVE ) - yield self._delete_association(room_alias) + room_id = yield self._delete_association(room_alias) + + try: + yield self.send_room_alias_update_event( + requester, + requester.user.to_string(), + room_id + ) + + yield self._update_canonical_alias( + requester, + requester.user.to_string(), + room_id, + room_alias, + ) + except AuthError as e: + logger.info("Failed to update alias events: %s", e) + + defer.returnValue(room_id) @defer.inlineCallbacks def delete_appservice_association(self, service, room_alias): @@ -129,11 +149,9 @@ class DirectoryHandler(BaseHandler): if not self.hs.is_mine(room_alias): raise SynapseError(400, "Room alias must be local") - yield self.store.delete_room_alias(room_alias) + room_id = yield self.store.delete_room_alias(room_alias) - # TODO - Looks like _update_room_alias_event has never been implemented - # if room_id: - # yield self._update_room_alias_events(user_id, room_id) + defer.returnValue(room_id) @defer.inlineCallbacks def get_association(self, room_alias): @@ -233,6 +251,28 @@ class DirectoryHandler(BaseHandler): ratelimit=False ) + @defer.inlineCallbacks + def _update_canonical_alias(self, requester, user_id, room_id, room_alias): + alias_event = yield self.state.get_current_state( + room_id, EventTypes.CanonicalAlias, "" + ) + + if alias_event.content.get("alias", "") != room_alias.to_string(): + return + + msg_handler = self.hs.get_handlers().message_handler + yield msg_handler.create_and_send_nonmember_event( + requester, + { + "type": EventTypes.CanonicalAlias, + "state_key": "", + "room_id": room_id, + "sender": user_id, + "content": {}, + }, + ratelimit=False + ) + @defer.inlineCallbacks def get_association_from_room_alias(self, room_alias): result = yield self.store.get_association_from_room_alias( diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py index 60c5ec77aa..59a23d6cb6 100644 --- a/synapse/rest/client/v1/directory.py +++ b/synapse/rest/client/v1/directory.py @@ -127,8 +127,9 @@ class ClientDirectoryServer(ClientV1RestServlet): room_alias = RoomAlias.from_string(room_alias) yield dir_handler.delete_association( - user.to_string(), room_alias + requester, user.to_string(), room_alias ) + logger.info( "User %s deleted alias %s", user.to_string(), -- cgit 1.5.1 From 7a386126206d4fea2b2c561dde6577e5cff107f3 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 17 Mar 2016 11:54:19 +0000 Subject: Remove another unused function from presence --- synapse/handlers/presence.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index cfbcf2d32c..d0c8f1328b 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -73,10 +73,6 @@ FEDERATION_PING_INTERVAL = 25 * 60 * 1000 assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER -def collect_presencelike_data(distributor, user, content): - return distributor.fire("collect_presencelike_data", user, content) - - class PresenceHandler(BaseHandler): def __init__(self, hs): -- cgit 1.5.1 From 56aa4e7a9a6846a72e9031e29555b05ed119e679 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 17 Mar 2016 15:24:19 +0000 Subject: Check canonical alias event exists --- synapse/handlers/directory.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index be9f2a21b2..6bcc5a5e2b 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -257,7 +257,8 @@ class DirectoryHandler(BaseHandler): room_id, EventTypes.CanonicalAlias, "" ) - if alias_event.content.get("alias", "") != room_alias.to_string(): + alias_str = room_alias.to_string() + if not alias_event or alias_event.content.get("alias", "") != alias_str: return msg_handler = self.hs.get_handlers().message_handler -- cgit 1.5.1 From 3c5f25507b7a62165a782420add4f9dedcec0b3e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 18 Mar 2016 13:55:16 +0000 Subject: Yield on EDU handling --- synapse/federation/federation_server.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index e8bfbe7cb5..a961b17aea 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -137,8 +137,8 @@ class FederationServer(FederationBase): logger.exception("Failed to handle PDU") if hasattr(transaction, "edus"): - for edu in [Edu(**x) for x in transaction.edus]: - self.received_edu( + for edu in (Edu(**x) for x in transaction.edus): + yield self.received_edu( transaction.origin, edu.edu_type, edu.content @@ -161,11 +161,12 @@ class FederationServer(FederationBase): ) defer.returnValue((200, response)) + @defer.inlineCallbacks def received_edu(self, origin, edu_type, content): received_edus_counter.inc() if edu_type in self.edu_handlers: - self.edu_handlers[edu_type](origin, content) + yield self.edu_handlers[edu_type](origin, content) else: logger.warn("Received EDU of type %s with no handler", edu_type) -- cgit 1.5.1 From 67ed8065dba960055c2e3d1740af12229b7d19a4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 18 Mar 2016 14:31:31 +0000 Subject: Dedupe requested event list in _get_events --- synapse/storage/events.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 552e7ca35b..285c586cfe 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -526,6 +526,9 @@ class EventsStore(SQLBaseStore): if not event_ids: defer.returnValue([]) + event_id_list = event_ids + event_ids = set(event_ids) + event_map = self._get_events_from_cache( event_ids, check_redacted=check_redacted, @@ -535,23 +538,18 @@ class EventsStore(SQLBaseStore): missing_events_ids = [e for e in event_ids if e not in event_map] - if not missing_events_ids: - defer.returnValue([ - event_map[e_id] for e_id in event_ids - if e_id in event_map and event_map[e_id] - ]) - - missing_events = yield self._enqueue_events( - missing_events_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) + if missing_events_ids: + missing_events = yield self._enqueue_events( + missing_events_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) - event_map.update(missing_events) + event_map.update(missing_events) defer.returnValue([ - event_map[e_id] for e_id in event_ids + event_map[e_id] for e_id in event_id_list if e_id in event_map and event_map[e_id] ]) -- cgit 1.5.1 From 58e207cd77b3b68b1908078418c9c9e9c3830ea5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 18 Mar 2016 14:31:44 +0000 Subject: Don't assume existence of event_id in __str__ --- synapse/events/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index bbfa5a7265..abed6b5e6b 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -168,5 +168,7 @@ class FrozenEvent(EventBase): def __repr__(self): return "" % ( - self.event_id, self.type, self.get("state_key", None), + self.get("event_id", None), + self.get("type", None), + self.get("state_key", None), ) -- cgit 1.5.1 From 9adf0e92bc3dbe4305beaf602406fc5ca51ea37e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 18 Mar 2016 15:12:50 +0000 Subject: Catch exceptions from EDU handling --- synapse/federation/federation_server.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index a961b17aea..76820b924b 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -166,7 +166,12 @@ class FederationServer(FederationBase): received_edus_counter.inc() if edu_type in self.edu_handlers: - yield self.edu_handlers[edu_type](origin, content) + try: + yield self.edu_handlers[edu_type](origin, content) + except SynapseError as e: + logger.info("Failed to handle edu %r: %r", edu_type, e) + except Exception as e: + logger.exception("Failed to handle edu %r", edu_type, e) else: logger.warn("Received EDU of type %s with no handler", edu_type) -- cgit 1.5.1 From 58f8226c7f2aaf9ebe39703edc91ad5cf1b01112 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 21 Mar 2016 14:20:34 +0000 Subject: remove unused current_state variable from on_receive_pdu --- synapse/handlers/federation.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index f599e817aa..c172877bde 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -123,7 +123,6 @@ class FederationHandler(BaseHandler): # FIXME (erikj): Awful hack to make the case where we are not currently # in the room work - current_state = None is_in_room = yield self.auth.check_host_in_room( event.room_id, self.server_name @@ -187,7 +186,6 @@ class FederationHandler(BaseHandler): event, state=state, backfilled=backfilled, - current_state=current_state, ) except AuthError as e: raise FederationError( -- cgit 1.5.1 From 3e7fac0d56dca5b389ef7a671c1cd6b0795724c8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 21 Mar 2016 14:03:20 +0000 Subject: Add published room list edit API --- synapse/api/auth.py | 54 ++++++++++++++++++++++++++++++++++--- synapse/handlers/directory.py | 16 +++++++++++ synapse/rest/client/v1/directory.py | 42 +++++++++++++++++++++++++++++ synapse/storage/room.py | 8 ++++++ 4 files changed, 116 insertions(+), 4 deletions(-) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 3038df4ab8..4f9c3c9db8 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -814,17 +814,16 @@ class Auth(object): return auth_ids - @log_function - def _can_send_event(self, event, auth_events): + def _get_send_level(self, etype, state_key, auth_events): key = (EventTypes.PowerLevels, "", ) send_level_event = auth_events.get(key) send_level = None if send_level_event: send_level = send_level_event.content.get("events", {}).get( - event.type + etype ) if send_level is None: - if hasattr(event, "state_key"): + if state_key is not None: send_level = send_level_event.content.get( "state_default", 50 ) @@ -838,6 +837,13 @@ class Auth(object): else: send_level = 0 + return send_level + + @log_function + def _can_send_event(self, event, auth_events): + send_level = self._get_send_level( + event.type, event.get("state_key", None), auth_events + ) user_level = self._get_user_power_level(event.user_id, auth_events) if user_level < send_level: @@ -982,3 +988,43 @@ class Auth(object): "You don't have permission to add ops level greater " "than your own" ) + + @defer.inlineCallbacks + def check_can_change_room_list(self, room_id, user): + """Check if the user is allowed to edit the room's entry in the + published room list. + + Args: + room_id (str) + user (UserID) + """ + + is_admin = yield self.is_server_admin(user) + if is_admin: + defer.returnValue(True) + + user_id = user.to_string() + yield self.check_joined_room(room_id, user_id) + + # We currently require the user is a "moderator" in the room. We do this + # by checking if they would (theoretically) be able to change the + # m.room.aliases events + power_level_event = yield self.state.get_current_state( + room_id, EventTypes.PowerLevels, "" + ) + + auth_events = {} + if power_level_event: + auth_events[(EventTypes.PowerLevels, "")] = power_level_event + + send_level = self._get_send_level( + EventTypes.Aliases, "", auth_events + ) + user_level = self._get_user_power_level(user_id, auth_events) + + if user_level < send_level: + raise AuthError( + 403, + "This server requires you to be a moderator in the room to" + " edit its room list entry" + ) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 6bcc5a5e2b..b2617c8898 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -317,3 +317,19 @@ class DirectoryHandler(BaseHandler): is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id)) defer.returnValue(is_admin) + + @defer.inlineCallbacks + def edit_published_room_list(self, requester, room_id, visibility): + if requester.is_guest: + raise AuthError(403, "Guests cannot edit the published room list") + + if visibility not in ["public", "private"]: + raise SynapseError(400, "Invalide visibility setting") + + room = yield self.store.get_room(room_id) + if room is None: + raise SynapseError(400, "Unknown room") + + yield self.auth.check_can_change_room_list(room_id, requester.user) + + yield self.store.set_room_is_public(room_id, visibility == "public") diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py index 59a23d6cb6..8ac09419dc 100644 --- a/synapse/rest/client/v1/directory.py +++ b/synapse/rest/client/v1/directory.py @@ -30,6 +30,7 @@ logger = logging.getLogger(__name__) def register_servlets(hs, http_server): ClientDirectoryServer(hs).register(http_server) + ClientDirectoryListServer(hs).register(http_server) class ClientDirectoryServer(ClientV1RestServlet): @@ -137,3 +138,44 @@ class ClientDirectoryServer(ClientV1RestServlet): ) defer.returnValue((200, {})) + + +class ClientDirectoryListServer(ClientV1RestServlet): + PATTERNS = client_path_patterns("/directory/list/room/(?P[^/]*)$") + + def __init__(self, hs): + super(ClientDirectoryListServer, self).__init__(hs) + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def on_GET(self, request, room_id): + room = yield self.store.get_room(room_id) + if room is None: + raise SynapseError(400, "Unknown room") + + defer.returnValue((200, { + "visibility": "public" if room["is_public"] else "private" + })) + + @defer.inlineCallbacks + def on_PUT(self, request, room_id): + requester = yield self.auth.get_user_by_req(request) + + content = parse_json_object_from_request(request) + visibility = content.get("visibility", "public") + + yield self.handlers.directory_handler.edit_published_room_list( + requester, room_id, visibility, + ) + + defer.returnValue((200, {})) + + @defer.inlineCallbacks + def on_DELETE(self, request, room_id): + requester = yield self.auth.get_user_by_req(request) + + yield self.handlers.directory_handler.edit_published_room_list( + requester, room_id, "private", + ) + + defer.returnValue((200, {})) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 46ab38a313..9be977f387 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -77,6 +77,14 @@ class RoomStore(SQLBaseStore): allow_none=True, ) + def set_room_is_public(self, room_id, is_public): + return self._simple_update_one( + table="rooms", + keyvalues={"room_id": room_id}, + updatevalues={"is_public": is_public}, + desc="set_room_is_public", + ) + def get_public_room_ids(self): return self._simple_select_onecol( table="rooms", -- cgit 1.5.1 From 5244c0b48ebb86273bbb79a1935cf5893aa6f310 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 21 Mar 2016 18:03:08 +0000 Subject: Remove unused backfilled parameter from persist_event --- synapse/federation/federation_server.py | 1 - synapse/handlers/federation.py | 38 +++++++++++++-------------------- synapse/storage/events.py | 22 ++++--------------- 3 files changed, 19 insertions(+), 42 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 76820b924b..429ab6ddec 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -531,7 +531,6 @@ class FederationServer(FederationBase): yield self.handler.on_receive_pdu( origin, pdu, - backfilled=False, state=state, auth_chain=auth_chain, ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c172877bde..267fedf114 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -102,7 +102,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def on_receive_pdu(self, origin, pdu, backfilled, state=None, + def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None): """ Called by the ReplicationLayer when we have a new pdu. We need to do auth checks and put it through the StateHandler. @@ -185,7 +185,6 @@ class FederationHandler(BaseHandler): origin, event, state=state, - backfilled=backfilled, ) except AuthError as e: raise FederationError( @@ -214,18 +213,17 @@ class FederationHandler(BaseHandler): except StoreError: logger.exception("Failed to store room.") - if not backfilled: - extra_users = [] - if event.type == EventTypes.Member: - target_user_id = event.state_key - target_user = UserID.from_string(target_user_id) - extra_users.append(target_user) + extra_users = [] + if event.type == EventTypes.Member: + target_user_id = event.state_key + target_user = UserID.from_string(target_user_id) + extra_users.append(target_user) - with PreserveLoggingContext(): - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=extra_users - ) + with PreserveLoggingContext(): + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) if event.type == EventTypes.Member: if event.membership == Membership.JOIN: @@ -645,7 +643,7 @@ class FederationHandler(BaseHandler): continue try: - self.on_receive_pdu(origin, p, backfilled=False) + self.on_receive_pdu(origin, p) except: logger.exception("Couldn't handle pdu") @@ -777,7 +775,6 @@ class FederationHandler(BaseHandler): event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, - backfilled=False, ) target_user = UserID.from_string(event.state_key) @@ -817,7 +814,6 @@ class FederationHandler(BaseHandler): event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, - backfilled=False, ) target_user = UserID.from_string(event.state_key) @@ -1072,8 +1068,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def _handle_new_event(self, origin, event, state=None, backfilled=False, - current_state=None, auth_events=None): + def _handle_new_event(self, origin, event, state=None, auth_events=None): outlier = event.internal_metadata.is_outlier() @@ -1083,7 +1078,7 @@ class FederationHandler(BaseHandler): auth_events=auth_events, ) - if not backfilled and not event.internal_metadata.is_outlier(): + if not event.internal_metadata.is_outlier(): action_generator = ActionGenerator(self.hs) yield action_generator.handle_push_actions_for_event( event, context, self @@ -1092,9 +1087,7 @@ class FederationHandler(BaseHandler): event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, - backfilled=backfilled, - is_new_state=(not outlier and not backfilled), - current_state=current_state, + is_new_state=not outlier, ) defer.returnValue((context, event_stream_id, max_stream_id)) @@ -1192,7 +1185,6 @@ class FederationHandler(BaseHandler): event_stream_id, max_stream_id = yield self.store.persist_event( event, new_event_context, - backfilled=False, is_new_state=True, current_state=state, ) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 285c586cfe..e444b64cee 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -101,30 +101,16 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks @log_function - def persist_event(self, event, context, backfilled=False, + def persist_event(self, event, context, is_new_state=True, current_state=None): - stream_ordering = None - if backfilled: - self.min_stream_token -= 1 - stream_ordering = self.min_stream_token - - if stream_ordering is None: - stream_ordering_manager = self._stream_id_gen.get_next() - else: - @contextmanager - def stream_ordering_manager(): - yield stream_ordering - stream_ordering_manager = stream_ordering_manager() - try: - with stream_ordering_manager as stream_ordering: + with self._stream_id_gen.get_next() as stream_ordering: event.internal_metadata.stream_ordering = stream_ordering yield self.runInteraction( "persist_event", self._persist_event_txn, event=event, context=context, - backfilled=backfilled, is_new_state=is_new_state, current_state=current_state, ) @@ -166,7 +152,7 @@ class EventsStore(SQLBaseStore): defer.returnValue(events[0] if events else None) @log_function - def _persist_event_txn(self, txn, event, context, backfilled, + def _persist_event_txn(self, txn, event, context, is_new_state=True, current_state=None): # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table @@ -198,7 +184,7 @@ class EventsStore(SQLBaseStore): return self._persist_events_txn( txn, [(event, context)], - backfilled=backfilled, + backfilled=False, is_new_state=is_new_state, ) -- cgit 1.5.1 From d3654694d09ccbc672c8a2373f09b4a3a24442b8 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 22 Mar 2016 00:52:31 +0000 Subject: an invalide is something else... --- synapse/handlers/directory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index b2617c8898..f6143521fb 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -324,7 +324,7 @@ class DirectoryHandler(BaseHandler): raise AuthError(403, "Guests cannot edit the published room list") if visibility not in ["public", "private"]: - raise SynapseError(400, "Invalide visibility setting") + raise SynapseError(400, "Invalid visibility setting") room = yield self.store.get_room(room_id) if room is None: -- cgit 1.5.1 From b5912776207b328ad1c2722ef01f836b54e5383e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2016 10:32:50 +0000 Subject: Make stateGroupCache honour CACHE_SIZE_FACTOR --- synapse/storage/_base.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 7dc67ecd57..583b77a835 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -26,6 +26,10 @@ from twisted.internet import defer import sys import time import threading +import os + + +CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) logger = logging.getLogger(__name__) @@ -163,7 +167,9 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, max_entries=hs.config.event_cache_size) - self._state_group_cache = DictionaryCache("*stateGroupCache*", 2000) + self._state_group_cache = DictionaryCache( + "*stateGroupCache*", 2000 * CACHE_SIZE_FACTOR + ) self._event_fetch_lock = threading.Condition() self._event_fetch_list = [] -- cgit 1.5.1 From 97785bfc0fe42619183e73432b897d2740fa74f8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2016 10:41:44 +0000 Subject: Doc string --- synapse/handlers/directory.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index f6143521fb..8eeb225811 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -320,6 +320,12 @@ class DirectoryHandler(BaseHandler): @defer.inlineCallbacks def edit_published_room_list(self, requester, room_id, visibility): + """Edit the entry of the room in the published room list. + + requester + room_id (str) + visibility (str): "public" or "private" + """ if requester.is_guest: raise AuthError(403, "Guests cannot edit the published room list") -- cgit 1.5.1 From 2c86187a1bd61901ecf4adca3dcced9f68cf26de Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2016 11:59:31 +0000 Subject: Don't cache events in _state_group_cache Instead, simply cache the event ids, relying on the event cache to cache the actual events. The problem was that while the state groups cache was limited in the number of groups it could hold, each individual group could consist of thousands of events. --- synapse/storage/state.py | 108 +++++++++++++++++++++-------------------------- 1 file changed, 48 insertions(+), 60 deletions(-) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 8ed8a21b0a..f06c734c4e 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -172,7 +172,7 @@ class StateStore(SQLBaseStore): defer.returnValue(events) def _get_state_groups_from_groups(self, groups, types): - """Returns dictionary state_group -> state event ids + """Returns dictionary state_group -> (dict of (type, state_key) -> event id) """ def f(txn, groups): if types is not None: @@ -183,7 +183,8 @@ class StateStore(SQLBaseStore): where_clause = "" sql = ( - "SELECT state_group, event_id FROM state_groups_state WHERE" + "SELECT state_group, event_id, type, state_key" + " FROM state_groups_state WHERE" " state_group IN (%s) %s" % ( ",".join("?" for _ in groups), where_clause, @@ -199,7 +200,8 @@ class StateStore(SQLBaseStore): results = {} for row in rows: - results.setdefault(row["state_group"], []).append(row["event_id"]) + key = (row["type"], row["state_key"]) + results.setdefault(row["state_group"], {})[key] = row["event_id"] return results chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)] @@ -296,7 +298,7 @@ class StateStore(SQLBaseStore): where a `state_key` of `None` matches all state_keys for the `type`. """ - is_all, state_dict = self._state_group_cache.get(group) + is_all, state_dict_ids = self._state_group_cache.get(group) type_to_key = {} missing_types = set() @@ -308,7 +310,7 @@ class StateStore(SQLBaseStore): if type_to_key.get(typ, object()) is not None: type_to_key.setdefault(typ, set()).add(state_key) - if (typ, state_key) not in state_dict: + if (typ, state_key) not in state_dict_ids: missing_types.add((typ, state_key)) sentinel = object() @@ -326,7 +328,7 @@ class StateStore(SQLBaseStore): got_all = not (missing_types or types is None) return { - k: v for k, v in state_dict.items() + k: v for k, v in state_dict_ids.items() if include(k[0], k[1]) }, missing_types, got_all @@ -340,8 +342,9 @@ class StateStore(SQLBaseStore): Args: group: The state group to lookup """ - is_all, state_dict = self._state_group_cache.get(group) - return state_dict, is_all + is_all, state_dict_ids = self._state_group_cache.get(group) + + return state_dict_ids, is_all @defer.inlineCallbacks def _get_state_for_groups(self, groups, types=None): @@ -354,84 +357,69 @@ class StateStore(SQLBaseStore): missing_groups = [] if types is not None: for group in set(groups): - state_dict, missing_types, got_all = self._get_some_state_from_cache( + state_dict_ids, missing_types, got_all = self._get_some_state_from_cache( group, types ) - results[group] = state_dict + results[group] = state_dict_ids if not got_all: missing_groups.append(group) else: for group in set(groups): - state_dict, got_all = self._get_all_state_from_cache( + state_dict_ids, got_all = self._get_all_state_from_cache( group ) - results[group] = state_dict + + results[group] = state_dict_ids if not got_all: missing_groups.append(group) - if not missing_groups: - defer.returnValue({ - group: { - type_tuple: event - for type_tuple, event in state.items() - if event - } - for group, state in results.items() - }) + if missing_groups: + # Okay, so we have some missing_types, lets fetch them. + cache_seq_num = self._state_group_cache.sequence - # Okay, so we have some missing_types, lets fetch them. - cache_seq_num = self._state_group_cache.sequence + group_to_state_dict = yield self._get_state_groups_from_groups( + missing_groups, types + ) - group_state_dict = yield self._get_state_groups_from_groups( - missing_groups, types - ) + # Now we want to update the cache with all the things we fetched + # from the database. + for group, group_state_dict in group_to_state_dict.items(): + if types: + # We delibrately put key -> None mappings into the cache to + # cache absence of the key, on the assumption that if we've + # explicitly asked for some types then we will probably ask + # for them again. + state_dict = {key: None for key in types} + state_dict.update(results[group]) + results[group] = state_dict + else: + state_dict = results[group] + + state_dict.update(group_state_dict) + + self._state_group_cache.update( + cache_seq_num, + key=group, + value=state_dict, + full=(types is None), + ) state_events = yield self._get_events( - [e_id for l in group_state_dict.values() for e_id in l], + [ev_id for sd in results.values() for ev_id in sd.values()], get_prev_content=False ) state_events = {e.event_id: e for e in state_events} - # Now we want to update the cache with all the things we fetched - # from the database. - for group, state_ids in group_state_dict.items(): - if types: - # We delibrately put key -> None mappings into the cache to - # cache absence of the key, on the assumption that if we've - # explicitly asked for some types then we will probably ask - # for them again. - state_dict = {key: None for key in types} - state_dict.update(results[group]) - results[group] = state_dict - else: - state_dict = results[group] - - for event_id in state_ids: - try: - state_event = state_events[event_id] - state_dict[(state_event.type, state_event.state_key)] = state_event - except KeyError: - # Hmm. So we do don't have that state event? Interesting. - logger.warn( - "Can't find state event %r for state group %r", - event_id, group, - ) - - self._state_group_cache.update( - cache_seq_num, - key=group, - value=state_dict, - full=(types is None), - ) - # Remove all the entries with None values. The None values were just # used for bookkeeping in the cache. for group, state_dict in results.items(): results[group] = { - key: event for key, event in state_dict.items() if event + key: state_events[event_id] + for key, event_id in state_dict.items() + if event_id and event_id in state_events } defer.returnValue(results) -- cgit 1.5.1 From 5defb25ac622d979c13d9e9d311e69c2ef7c15a5 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 22 Mar 2016 13:52:45 +0000 Subject: Use get_users_in_room to count the number of room members rather than using read_receipts --- synapse/push/bulk_push_rule_evaluator.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 87d5061fb0..76d7eb7ce0 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -107,7 +107,9 @@ class BulkPushRuleEvaluator: users_dict.items(), [event], {event.event_id: current_state} ) - evaluator = PushRuleEvaluatorForEvent(event, len(self.users_in_room)) + room_members = yield self.store.get_users_in_room(self.room_id) + + evaluator = PushRuleEvaluatorForEvent(event, len(room_members)) condition_cache = {} -- cgit 1.5.1 From 76d18a577661257abe0d047129383dbd09bbe9b1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2016 14:08:13 +0000 Subject: Bump get_aliases_for_room cache --- synapse/storage/directory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 012a0b414a..ef231a04dc 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -155,7 +155,7 @@ class DirectoryStore(SQLBaseStore): return room_id - @cached() + @cached(max_entries=5000) def get_aliases_for_room(self, room_id): return self._simple_select_onecol( "room_aliases", -- cgit 1.5.1 From 6cf0ba14663c263ea5f423a4b24b3915b8349e36 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2016 14:18:21 +0000 Subject: Bump get_unread_event_push_actions_by_room_for_user cache --- synapse/storage/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 5820539a92..dc5830450a 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -49,7 +49,7 @@ class EventPushActionsStore(SQLBaseStore): ) self._simple_insert_many_txn(txn, "event_push_actions", values) - @cachedInlineCallbacks(num_args=3, lru=True, tree=True) + @cachedInlineCallbacks(num_args=3, lru=True, tree=True, max_entries=5000) def get_unread_event_push_actions_by_room_for_user( self, room_id, user_id, last_read_event_id ): -- cgit 1.5.1 From d787e41b2046572d42c13b6aa9b4636f98f7f9e9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2016 14:44:48 +0000 Subject: Measure StateHandler._resolve_events --- synapse/state.py | 78 +++++++++++++++++++++++++++++--------------------------- 1 file changed, 40 insertions(+), 38 deletions(-) diff --git a/synapse/state.py b/synapse/state.py index b9a1387520..e096329721 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.metrics import Measure from synapse.api.constants import EventTypes from synapse.api.errors import AuthError from synapse.api.auth import AuthEventTypes @@ -263,48 +264,49 @@ class StateHandler(object): from (type, state_key) to event. prev_states is a list of event_ids. :rtype: (dict[(str, str), synapse.events.FrozenEvent], list[str]) """ - state = {} - for st in state_sets: - for e in st: - state.setdefault( - (e.type, e.state_key), - {} - )[e.event_id] = e - - unconflicted_state = { - k: v.values()[0] for k, v in state.items() - if len(v.values()) == 1 - } - - conflicted_state = { - k: v.values() - for k, v in state.items() - if len(v.values()) > 1 - } + with Measure(self.clock, "state._resolve_events"): + state = {} + for st in state_sets: + for e in st: + state.setdefault( + (e.type, e.state_key), + {} + )[e.event_id] = e + + unconflicted_state = { + k: v.values()[0] for k, v in state.items() + if len(v.values()) == 1 + } - if event_type: - prev_states_events = conflicted_state.get( - (event_type, state_key), [] - ) - prev_states = [s.event_id for s in prev_states_events] - else: - prev_states = [] + conflicted_state = { + k: v.values() + for k, v in state.items() + if len(v.values()) > 1 + } - auth_events = { - k: e for k, e in unconflicted_state.items() - if k[0] in AuthEventTypes - } + if event_type: + prev_states_events = conflicted_state.get( + (event_type, state_key), [] + ) + prev_states = [s.event_id for s in prev_states_events] + else: + prev_states = [] - try: - resolved_state = self._resolve_state_events( - conflicted_state, auth_events - ) - except: - logger.exception("Failed to resolve state") - raise + auth_events = { + k: e for k, e in unconflicted_state.items() + if k[0] in AuthEventTypes + } + + try: + resolved_state = self._resolve_state_events( + conflicted_state, auth_events + ) + except: + logger.exception("Failed to resolve state") + raise - new_state = unconflicted_state - new_state.update(resolved_state) + new_state = unconflicted_state + new_state.update(resolved_state) return new_state, prev_states -- cgit 1.5.1 From 99f929f36b396b7152b3840c11e8debc5505f673 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2016 15:31:13 +0000 Subject: Make StateHandler._state_cache only store event_ids. --- synapse/state.py | 24 +++++++++++++++++------- synapse/storage/events.py | 25 +++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/synapse/state.py b/synapse/state.py index e096329721..9d90a437d3 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -28,6 +28,7 @@ from collections import namedtuple import logging import hashlib +import os logger = logging.getLogger(__name__) @@ -35,8 +36,11 @@ logger = logging.getLogger(__name__) KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key")) -SIZE_OF_CACHE = 1000 -EVICTION_TIMEOUT_SECONDS = 20 +CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) + + +SIZE_OF_CACHE = int(5000 * CACHE_SIZE_FACTOR) +EVICTION_TIMEOUT_SECONDS = 60 * 60 class _StateCacheEntry(object): @@ -92,7 +96,9 @@ class StateHandler(object): if cache: cache.ts = self.clock.time_msec() - state = cache.state + + event_dict = yield self.store.get_events(cache.state.values()) + state = {(e.type, e.state_key): e for e in event_dict.values()} else: res = yield self.resolve_state_groups(room_id, event_ids) state = res[1] @@ -191,14 +197,18 @@ class StateHandler(object): cache = self._state_cache.get(frozenset(event_ids), None) if cache and cache.state_group: cache.ts = self.clock.time_msec() - prev_state = cache.state.get((event_type, state_key), None) + + event_dict = yield self.store.get_events(cache.state.values()) + state = {(e.type, e.state_key): e for e in event_dict.values()} + + prev_state = state.get((event_type, state_key), None) if prev_state: prev_state = prev_state.event_id prev_states = [prev_state] else: prev_states = [] defer.returnValue( - (cache.state_group, cache.state, prev_states) + (cache.state_group, state, prev_states) ) state_groups = yield self.store.get_state_groups( @@ -226,7 +236,7 @@ class StateHandler(object): if self._state_cache is not None: cache = _StateCacheEntry( - state=state, + state={key: event.event_id for key, event in state.items()}, state_group=name, ts=self.clock.time_msec() ) @@ -241,7 +251,7 @@ class StateHandler(object): if self._state_cache is not None: cache = _StateCacheEntry( - state=new_state, + state={key: event.event_id for key, event in new_state.items()}, state_group=None, ts=self.clock.time_msec() ) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index e444b64cee..584e659d4a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -151,6 +151,31 @@ class EventsStore(SQLBaseStore): defer.returnValue(events[0] if events else None) + @defer.inlineCallbacks + def get_events(self, event_ids, check_redacted=True, + get_prev_content=False, allow_rejected=False): + """Get events from the database + + Args: + event_ids (list): The event_ids of the events to fetch + check_redacted (bool): If True, check if event has been redacted + and redact it. + get_prev_content (bool): If True and event is a state event, + include the previous states content in the unsigned field. + allow_rejected (bool): If True return rejected events. + + Returns: + Deferred : Dict from event_id to event. + """ + events = yield self._get_events( + event_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + + defer.returnValue({e.event_id: e for e in events}) + @log_function def _persist_event_txn(self, txn, event, context, is_new_state=True, current_state=None): -- cgit 1.5.1 From c4a8cbd15a471d2a658de96abcc3254fc95de1bf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2016 16:06:21 +0000 Subject: Make LruCache use a dedicated _Node class --- synapse/util/caches/lrucache.py | 73 +++++++++++++++++++++++------------------ 1 file changed, 41 insertions(+), 32 deletions(-) diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index f7423f2fab..f9df445a8d 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -29,6 +29,16 @@ def enumerate_leaves(node, depth): yield m +class _Node(object): + __slots__ = ["prev_node", "next_node", "key", "value"] + + def __init__(self, prev_node, next_node, key, value): + self.prev_node = prev_node + self.next_node = next_node + self.key = key + self.value = value + + class LruCache(object): """ Least-recently-used cache. @@ -38,10 +48,9 @@ class LruCache(object): def __init__(self, max_size, keylen=1, cache_type=dict): cache = cache_type() self.cache = cache # Used for introspection. - list_root = [] - list_root[:] = [list_root, list_root, None, None] - - PREV, NEXT, KEY, VALUE = 0, 1, 2, 3 + list_root = _Node(None, None, None, None) + list_root.next_node = list_root + list_root.prev_node = list_root lock = threading.Lock() @@ -55,36 +64,36 @@ class LruCache(object): def add_node(key, value): prev_node = list_root - next_node = prev_node[NEXT] - node = [prev_node, next_node, key, value] - prev_node[NEXT] = node - next_node[PREV] = node + next_node = prev_node.next_node + node = _Node(prev_node, next_node, key, value) + prev_node.next_node = node + next_node.prev_node = node cache[key] = node def move_node_to_front(node): - prev_node = node[PREV] - next_node = node[NEXT] - prev_node[NEXT] = next_node - next_node[PREV] = prev_node + prev_node = node.prev_node + next_node = node.next_node + prev_node.next_node = next_node + next_node.prev_node = prev_node prev_node = list_root - next_node = prev_node[NEXT] - node[PREV] = prev_node - node[NEXT] = next_node - prev_node[NEXT] = node - next_node[PREV] = node + next_node = prev_node.next_node + node.prev_node = prev_node + node.next_node = next_node + prev_node.next_node = node + next_node.prev_node = node def delete_node(node): - prev_node = node[PREV] - next_node = node[NEXT] - prev_node[NEXT] = next_node - next_node[PREV] = prev_node + prev_node = node.prev_node + next_node = node.next_node + prev_node.next_node = next_node + next_node.prev_node = prev_node @synchronized def cache_get(key, default=None): node = cache.get(key, None) if node is not None: move_node_to_front(node) - return node[VALUE] + return node.value else: return default @@ -93,25 +102,25 @@ class LruCache(object): node = cache.get(key, None) if node is not None: move_node_to_front(node) - node[VALUE] = value + node.value = value else: add_node(key, value) if len(cache) > max_size: - todelete = list_root[PREV] + todelete = list_root.prev_node delete_node(todelete) - cache.pop(todelete[KEY], None) + cache.pop(todelete.key, None) @synchronized def cache_set_default(key, value): node = cache.get(key, None) if node is not None: - return node[VALUE] + return node.value else: add_node(key, value) if len(cache) > max_size: - todelete = list_root[PREV] + todelete = list_root.prev_node delete_node(todelete) - cache.pop(todelete[KEY], None) + cache.pop(todelete.key, None) return value @synchronized @@ -119,8 +128,8 @@ class LruCache(object): node = cache.get(key, None) if node: delete_node(node) - cache.pop(node[KEY], None) - return node[VALUE] + cache.pop(node.key, None) + return node.value else: return default @@ -137,8 +146,8 @@ class LruCache(object): @synchronized def cache_clear(): - list_root[NEXT] = list_root - list_root[PREV] = list_root + list_root.next_node = list_root + list_root.prev_node = list_root cache.clear() @synchronized -- cgit 1.5.1 From d531ebcb57de61bad0ac2e4231280d41d8db4404 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2016 18:02:36 +0000 Subject: Key StateHandler._state_cache off of state groups --- synapse/state.py | 61 +++++++++++++++++++------------------------------------- 1 file changed, 21 insertions(+), 40 deletions(-) diff --git a/synapse/state.py b/synapse/state.py index 9d90a437d3..14c0430017 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -90,18 +90,8 @@ class StateHandler(object): """ event_ids = yield self.store.get_latest_event_ids_in_room(room_id) - cache = None - if self._state_cache is not None: - cache = self._state_cache.get(frozenset(event_ids), None) - - if cache: - cache.ts = self.clock.time_msec() - - event_dict = yield self.store.get_events(cache.state.values()) - state = {(e.type, e.state_key): e for e in event_dict.values()} - else: - res = yield self.resolve_state_groups(room_id, event_ids) - state = res[1] + res = yield self.resolve_state_groups(room_id, event_ids) + state = res[1] if event_type: defer.returnValue(state.get((event_type, state_key))) @@ -193,24 +183,6 @@ class StateHandler(object): """ logger.debug("resolve_state_groups event_ids %s", event_ids) - if self._state_cache is not None: - cache = self._state_cache.get(frozenset(event_ids), None) - if cache and cache.state_group: - cache.ts = self.clock.time_msec() - - event_dict = yield self.store.get_events(cache.state.values()) - state = {(e.type, e.state_key): e for e in event_dict.values()} - - prev_state = state.get((event_type, state_key), None) - if prev_state: - prev_state = prev_state.event_id - prev_states = [prev_state] - else: - prev_states = [] - defer.returnValue( - (cache.state_group, state, prev_states) - ) - state_groups = yield self.store.get_state_groups( room_id, event_ids ) @@ -220,7 +192,7 @@ class StateHandler(object): state_groups.keys() ) - group_names = set(state_groups.keys()) + group_names = frozenset(state_groups.keys()) if len(group_names) == 1: name, state_list = state_groups.items().pop() state = { @@ -234,16 +206,25 @@ class StateHandler(object): else: prev_states = [] - if self._state_cache is not None: - cache = _StateCacheEntry( - state={key: event.event_id for key, event in state.items()}, - state_group=name, - ts=self.clock.time_msec() - ) + defer.returnValue((name, state, prev_states)) + + if self._state_cache is not None: + cache = self._state_cache.get(group_names, None) + if cache and cache.state_group: + cache.ts = self.clock.time_msec() - self._state_cache[frozenset(event_ids)] = cache + event_dict = yield self.store.get_events(cache.state.values()) + state = {(e.type, e.state_key): e for e in event_dict.values()} - defer.returnValue((name, state, prev_states)) + prev_state = state.get((event_type, state_key), None) + if prev_state: + prev_state = prev_state.event_id + prev_states = [prev_state] + else: + prev_states = [] + defer.returnValue( + (cache.state_group, state, prev_states) + ) new_state, prev_states = self._resolve_events( state_groups.values(), event_type, state_key @@ -256,7 +237,7 @@ class StateHandler(object): ts=self.clock.time_msec() ) - self._state_cache[frozenset(event_ids)] = cache + self._state_cache[group_names] = cache defer.returnValue((None, new_state, prev_states)) -- cgit 1.5.1 From 9e2e994395327956f846113566fd18c01f12441a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 09:28:07 +0000 Subject: Reduce cache size --- synapse/state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/state.py b/synapse/state.py index 14c0430017..41d32e664a 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -39,7 +39,7 @@ KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key")) CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) -SIZE_OF_CACHE = int(5000 * CACHE_SIZE_FACTOR) +SIZE_OF_CACHE = int(1000 * CACHE_SIZE_FACTOR) EVICTION_TIMEOUT_SECONDS = 60 * 60 -- cgit 1.5.1 From b6507869cdd5da18117dbbd0fbf78f4bdd4391f7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 10:32:10 +0000 Subject: Make get_invites return RoomsForUser --- synapse/push/__init__.py | 2 +- synapse/storage/roommember.py | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 65ef1b68a3..296c4447ec 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -317,7 +317,7 @@ class Pusher(object): @defer.inlineCallbacks def _get_badge_count(self): invites, joins = yield defer.gatherResults([ - self.store.get_invites_for_user(self.user_id), + self.store.get_invited_rooms_for_user(self.user_id), self.store.get_rooms_for_user(self.user_id), ], consumeErrors=True) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 0cd89260f2..430b49c12e 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -115,19 +115,17 @@ class RoomMemberStore(SQLBaseStore): ).addCallback(self._get_events) @cached() - def get_invites_for_user(self, user_id): - """ Get all the invite events for a user + def get_invited_rooms_for_user(self, user_id): + """ Get all the rooms the user is invited to Args: user_id (str): The user ID. Returns: - A deferred list of event objects. + A deferred list of RoomsForUser. """ return self.get_rooms_for_user_where_membership_is( user_id, [Membership.INVITE] - ).addCallback(lambda invites: self._get_events([ - invite.event_id for invite in invites - ])) + ) def get_leave_and_ban_events_for_user(self, user_id): """ Get all the leave events for a user -- cgit 1.5.1 From 34473a9c7f3e42db5154d2558e737fcab2546a81 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 10:42:19 +0000 Subject: Don't require alias in public room list. Rooms now no longer require an alias to be published. Also, changes the way we pull out state of each room to not require fetching all state events. --- synapse/handlers/room.py | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 051468989f..a07c0ee431 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -946,53 +946,52 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def handle_room(room_id): aliases = yield self.store.get_aliases_for_room(room_id) - if not aliases: - defer.returnValue(None) - state = yield self.state_handler.get_current_state(room_id) + def get_state(etype, state_key): + return self.state_handler.get_current_state(room_id, etype, state_key) - result = {"aliases": aliases, "room_id": room_id} + result = {"room_id": room_id} + if aliases: + result["aliases"] = aliases - name_event = state.get((EventTypes.Name, ""), None) + name_event = yield get_state(EventTypes.Name, "") if name_event: name = name_event.content.get("name", None) if name: result["name"] = name - topic_event = state.get((EventTypes.Topic, ""), None) + topic_event = yield get_state(EventTypes.Topic, "") if topic_event: topic = topic_event.content.get("topic", None) if topic: result["topic"] = topic - canonical_event = state.get((EventTypes.CanonicalAlias, ""), None) + canonical_event = yield get_state(EventTypes.CanonicalAlias, "") if canonical_event: canonical_alias = canonical_event.content.get("alias", None) if canonical_alias: result["canonical_alias"] = canonical_alias - visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None) + visibility_event = yield get_state(EventTypes.RoomHistoryVisibility, "") visibility = None if visibility_event: visibility = visibility_event.content.get("history_visibility", None) result["world_readable"] = visibility == "world_readable" - guest_event = state.get((EventTypes.GuestAccess, ""), None) + guest_event = yield get_state(EventTypes.GuestAccess, "") guest = None if guest_event: guest = guest_event.content.get("guest_access", None) result["guest_can_join"] = guest == "can_join" - avatar_event = state.get(("m.room.avatar", ""), None) + avatar_event = yield get_state("m.room.avatar", "") if avatar_event: avatar_url = avatar_event.content.get("url", None) if avatar_url: result["avatar_url"] = avatar_url - result["num_joined_members"] = sum( - 1 for (event_type, _), ev in state.items() - if event_type == EventTypes.Member and ev.membership == Membership.JOIN - ) + joined_users = yield self.store.get_users_in_room(room_id) + result["num_joined_members"] = len(joined_users) defer.returnValue(result) -- cgit 1.5.1 From 8b0dfc9fc4e98ea9c029c5f7b49efaf011ce1979 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 11:37:58 +0000 Subject: Don't cache events in get_current_state_for_key --- synapse/storage/events.py | 4 ++-- synapse/storage/state.py | 16 +++++++++------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index e444b64cee..7bcddf1f31 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -157,7 +157,7 @@ class EventsStore(SQLBaseStore): # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table if current_state: - txn.call_after(self.get_current_state_for_key.invalidate_all) + txn.call_after(self._get_current_state_for_key.invalidate_all) txn.call_after(self.get_rooms_for_user.invalidate_all) txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) @@ -441,7 +441,7 @@ class EventsStore(SQLBaseStore): for event, _ in state_events_and_contexts: if not context.rejected: txn.call_after( - self.get_current_state_for_key.invalidate, + self._get_current_state_for_key.invalidate, (event.room_id, event.type, event.state_key,) ) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index f06c734c4e..eab2c5a8ce 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -14,9 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore -from synapse.util.caches.descriptors import ( - cached, cachedInlineCallbacks, cachedList -) +from synapse.util.caches.descriptors import cached, cachedList from twisted.internet import defer @@ -155,8 +153,14 @@ class StateStore(SQLBaseStore): events = yield self._get_events(event_ids, get_prev_content=False) defer.returnValue(events) - @cachedInlineCallbacks(num_args=3) + @defer.inlineCallbacks def get_current_state_for_key(self, room_id, event_type, state_key): + event_ids = yield self._get_current_state_for_key(room_id, event_type, state_key) + events = yield self._get_events(event_ids, get_prev_content=False) + defer.returnValue(events) + + @cached(num_args=3) + def _get_current_state_for_key(self, room_id, event_type, state_key): def f(txn): sql = ( "SELECT event_id FROM current_state_events" @@ -167,9 +171,7 @@ class StateStore(SQLBaseStore): txn.execute(sql, args) results = txn.fetchall() return [r[0] for r in results] - event_ids = yield self.runInteraction("get_current_state_for_key", f) - events = yield self._get_events(event_ids, get_prev_content=False) - defer.returnValue(events) + return self.runInteraction("get_current_state_for_key", f) def _get_state_groups_from_groups(self, groups, types): """Returns dictionary state_group -> (dict of (type, state_key) -> event id) -- cgit 1.5.1 From d87a846ebceb81f78660c4900126eff6e3998b8a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 11:42:50 +0000 Subject: Don't cache events in get_recent_events_for_room --- synapse/storage/stream.py | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 7f4a827528..cf84938be5 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -36,7 +36,7 @@ what sort order was used: from twisted.internet import defer from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.caches.descriptors import cached from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken from synapse.util.logcontext import preserve_fn @@ -465,9 +465,25 @@ class StreamStore(SQLBaseStore): defer.returnValue((events, token)) - @cachedInlineCallbacks(num_args=4) + @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None): + rows, token = yield self.get_recent_event_ids_for_room( + room_id, limit, end_token, from_token + ) + + logger.debug("stream before") + events = yield self._get_events( + [r["event_id"] for r in rows], + get_prev_content=True + ) + logger.debug("stream after") + + self._set_before_and_after(events, rows) + + defer.returnValue((events, token)) + @cached(num_args=4) + def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None): end_token = RoomStreamToken.parse_stream_token(end_token) if from_token is None: @@ -517,21 +533,10 @@ class StreamStore(SQLBaseStore): return rows, token - rows, token = yield self.runInteraction( + return self.runInteraction( "get_recent_events_for_room", get_recent_events_for_room_txn ) - logger.debug("stream before") - events = yield self._get_events( - [r["event_id"] for r in rows], - get_prev_content=True - ) - logger.debug("stream after") - - self._set_before_and_after(events, rows) - - defer.returnValue((events, token)) - @defer.inlineCallbacks def get_room_events_max_id(self, direction='f'): token = yield self._stream_id_gen.get_max_token() -- cgit 1.5.1 From 0677fc1c4e48fb0b91a2f91b348d16e5ce676125 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 13:24:27 +0000 Subject: Comment --- synapse/handlers/room.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a07c0ee431..25225ea1c4 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -947,6 +947,9 @@ class RoomListHandler(BaseHandler): def handle_room(room_id): aliases = yield self.store.get_aliases_for_room(room_id) + # We pull each bit of state out indvidually to avoid pulling the + # full state into memory. Due to how the caching works this should + # be fairly quick, even if not originally in the cache. def get_state(etype, state_key): return self.state_handler.get_current_state(room_id, etype, state_key) -- cgit 1.5.1 From b2802a1351cf5dbfb68d0e0f96ad9fb16df98fe8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 13:45:57 +0000 Subject: Ensure published rooms have public join rules --- synapse/handlers/room.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 25225ea1c4..7062414adf 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -953,6 +953,13 @@ class RoomListHandler(BaseHandler): def get_state(etype, state_key): return self.state_handler.get_current_state(room_id, etype, state_key) + # Double check that this is actually a public room. + join_rules_event = yield get_state(EventTypes.JoinRules, "") + if join_rules_event: + join_rule = join_rules_event.content.get("join_rule", None) + if join_rule and join_rule != JoinRules.PUBLIC: + defer.returnValue(None) + result = {"room_id": room_id} if aliases: result["aliases"] = aliases -- cgit 1.5.1 From 84afeb41f32acdab22036b8c1efbb402eef31cd7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 13:49:10 +0000 Subject: Ensure all old public rooms have aliases --- synapse/handlers/room.py | 9 +++++---- synapse/storage/schema/delta/30/public_rooms.sql | 21 +++++++++++++++++++++ 2 files changed, 26 insertions(+), 4 deletions(-) create mode 100644 synapse/storage/schema/delta/30/public_rooms.sql diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 7062414adf..d5c56ce0d6 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -119,7 +119,8 @@ class RoomCreationHandler(BaseHandler): invite_3pid_list = config.get("invite_3pid", []) - is_public = config.get("visibility", None) == "public" + visibility = config.get("visibility", None) + is_public = visibility == "public" # autogen room IDs and try to create it. We may clash, so just # try a few times till one goes through, giving up eventually. @@ -155,9 +156,9 @@ class RoomCreationHandler(BaseHandler): preset_config = config.get( "preset", - RoomCreationPreset.PUBLIC_CHAT - if is_public - else RoomCreationPreset.PRIVATE_CHAT + RoomCreationPreset.PRIVATE_CHAT + if visibility == "private" + else RoomCreationPreset.PUBLIC_CHAT ) raw_initial_state = config.get("initial_state", []) diff --git a/synapse/storage/schema/delta/30/public_rooms.sql b/synapse/storage/schema/delta/30/public_rooms.sql new file mode 100644 index 0000000000..a48604faa8 --- /dev/null +++ b/synapse/storage/schema/delta/30/public_rooms.sql @@ -0,0 +1,21 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +/* This release removes the restriction that published rooms must have an alias, + * so we go back and ensure the only 'public' rooms are ones with an alias.*/ +UPDATE rooms SET is_public = 0 WHERE is_public = 1 AND room_id not in ( + SELECT room_id FROM room_aliases +); -- cgit 1.5.1 From 0c1a27b7877c08643ced4bdabe9843d69b0bcea1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 14:10:49 +0000 Subject: SQLite and postgres doesn't share a true literal --- synapse/storage/schema/delta/30/public_rooms.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/schema/delta/30/public_rooms.sql b/synapse/storage/schema/delta/30/public_rooms.sql index a48604faa8..3400898ed0 100644 --- a/synapse/storage/schema/delta/30/public_rooms.sql +++ b/synapse/storage/schema/delta/30/public_rooms.sql @@ -16,6 +16,6 @@ /* This release removes the restriction that published rooms must have an alias, * so we go back and ensure the only 'public' rooms are ones with an alias.*/ -UPDATE rooms SET is_public = 0 WHERE is_public = 1 AND room_id not in ( +UPDATE rooms SET is_public = (1 = 0) WHERE is_public = (1 = 1) AND room_id not in ( SELECT room_id FROM room_aliases ); -- cgit 1.5.1 From b2757655455fb2bf485c66affeb5a294eb9459c2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 14:15:32 +0000 Subject: Comment about weird SQL --- synapse/storage/schema/delta/30/public_rooms.sql | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/storage/schema/delta/30/public_rooms.sql b/synapse/storage/schema/delta/30/public_rooms.sql index 3400898ed0..f09db4faa6 100644 --- a/synapse/storage/schema/delta/30/public_rooms.sql +++ b/synapse/storage/schema/delta/30/public_rooms.sql @@ -15,7 +15,9 @@ /* This release removes the restriction that published rooms must have an alias, - * so we go back and ensure the only 'public' rooms are ones with an alias.*/ + * so we go back and ensure the only 'public' rooms are ones with an alias. + * We use (1 = 0) and (1 = 1) so that it works in both postgres and sqlite + */ UPDATE rooms SET is_public = (1 = 0) WHERE is_public = (1 = 1) AND room_id not in ( SELECT room_id FROM room_aliases ); -- cgit 1.5.1 From 75daede92f041500347a5f446229be5ca50c2b8e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Mar 2016 18:22:52 +0000 Subject: String intern --- synapse/storage/state.py | 12 +++++++++--- synapse/util/caches/__init__.py | 8 ++++++++ 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index eab2c5a8ce..1982b1c607 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -15,6 +15,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedList +from synapse.util.caches import intern_string from twisted.internet import defer @@ -155,7 +156,9 @@ class StateStore(SQLBaseStore): @defer.inlineCallbacks def get_current_state_for_key(self, room_id, event_type, state_key): - event_ids = yield self._get_current_state_for_key(room_id, event_type, state_key) + event_ids = yield self._get_current_state_for_key( + room_id, intern_string(event_type), intern_string(state_key) + ) events = yield self._get_events(event_ids, get_prev_content=False) defer.returnValue(events) @@ -202,7 +205,7 @@ class StateStore(SQLBaseStore): results = {} for row in rows: - key = (row["type"], row["state_key"]) + key = (intern_string(row["type"]), intern_string(row["state_key"])) results.setdefault(row["state_group"], {})[key] = row["event_id"] return results @@ -393,7 +396,10 @@ class StateStore(SQLBaseStore): # cache absence of the key, on the assumption that if we've # explicitly asked for some types then we will probably ask # for them again. - state_dict = {key: None for key in types} + state_dict = { + (intern_string(etype), intern_string(state_key)): None + for (etype, state_key) in types + } state_dict.update(results[group]) results[group] = state_dict else: diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 1a14904194..9d450fade5 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -14,6 +14,7 @@ # limitations under the License. import synapse.metrics +from lrucache import LruCache DEBUG_CACHES = False @@ -25,3 +26,10 @@ cache_counter = metrics.register_cache( lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, labels=["name"], ) + +_string_cache = LruCache(5000) +caches_by_name["string_cache"] = _string_cache + + +def intern_string(string): + return _string_cache.setdefault(string, string) -- cgit 1.5.1 From fe9794706ab817fcedc17f99693eb0823b339d93 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 14:58:08 +0000 Subject: Intern type and state_key on events --- synapse/events/__init__.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index abed6b5e6b..2ceac19adb 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -14,6 +14,7 @@ # limitations under the License. from synapse.util.frozenutils import freeze +from synapse.util.caches import intern_string # Whether we should use frozen_dict in FrozenEvent. Using frozen_dicts prevents @@ -140,6 +141,12 @@ class FrozenEvent(EventBase): unsigned = dict(event_dict.pop("unsigned", {})) + # We intern these strings because they turn up a lot (especially when + # caching). + event_dict["type"] = intern_string(event_dict["type"]) + if "state_key" in event_dict: + event_dict["state_key"] = intern_string(event_dict["state_key"]) + if USE_FROZEN_DICTS: frozen_dict = freeze(event_dict) else: -- cgit 1.5.1 From f96526ffc23fdd99ab47abda67fb579a1ad764f9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 15:01:05 +0000 Subject: Intern sender, event_id and room_id in events --- synapse/events/__init__.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 2ceac19adb..63004eaf04 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -146,6 +146,11 @@ class FrozenEvent(EventBase): event_dict["type"] = intern_string(event_dict["type"]) if "state_key" in event_dict: event_dict["state_key"] = intern_string(event_dict["state_key"]) + if "sender" in event_dict: + event_dict["sender"] = intern_string(event_dict["sender"]) + + event_dict["event_id"] = intern(event_dict["event_id"].encode('ascii')) + event_dict["room_id"] = intern(event_dict["room_id"].encode('ascii')) if USE_FROZEN_DICTS: frozen_dict = freeze(event_dict) -- cgit 1.5.1 From acdfef7b1443a8260c43e31e9944b74dfdf286dc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 16:13:05 +0000 Subject: Intern all the things --- synapse/events/__init__.py | 11 ++----- synapse/federation/federation_client.py | 1 + synapse/federation/transport/server.py | 28 ++++++++-------- synapse/http/server.py | 10 +++--- synapse/storage/_base.py | 3 +- synapse/storage/receipts.py | 21 ++++++------ synapse/storage/state.py | 10 +++--- synapse/util/caches/__init__.py | 58 ++++++++++++++++++++++++++++++++- 8 files changed, 97 insertions(+), 45 deletions(-) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 63004eaf04..23f8b612ae 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -14,7 +14,7 @@ # limitations under the License. from synapse.util.frozenutils import freeze -from synapse.util.caches import intern_string +from synapse.util.caches import intern_dict # Whether we should use frozen_dict in FrozenEvent. Using frozen_dicts prevents @@ -143,14 +143,7 @@ class FrozenEvent(EventBase): # We intern these strings because they turn up a lot (especially when # caching). - event_dict["type"] = intern_string(event_dict["type"]) - if "state_key" in event_dict: - event_dict["state_key"] = intern_string(event_dict["state_key"]) - if "sender" in event_dict: - event_dict["sender"] = intern_string(event_dict["sender"]) - - event_dict["event_id"] = intern(event_dict["event_id"].encode('ascii')) - event_dict["room_id"] = intern(event_dict["room_id"].encode('ascii')) + event_dict = intern_dict(event_dict) if USE_FROZEN_DICTS: frozen_dict = freeze(event_dict) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 83c1f46586..37ee469fa2 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -418,6 +418,7 @@ class FederationClient(FederationBase): "Failed to make_%s via %s: %s", membership, destination, e.message ) + raise raise RuntimeError("Failed to send to any server.") diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 208bff8d4f..d65a7893d8 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -175,7 +175,7 @@ class BaseFederationServlet(object): class FederationSendServlet(BaseFederationServlet): - PATH = "/send/([^/]*)/" + PATH = "/send/(?P[^/]*)/" def __init__(self, handler, server_name, **kwargs): super(FederationSendServlet, self).__init__( @@ -250,7 +250,7 @@ class FederationPullServlet(BaseFederationServlet): class FederationEventServlet(BaseFederationServlet): - PATH = "/event/([^/]*)/" + PATH = "/event/(?P[^/]*)/" # This is when someone asks for a data item for a given server data_id pair. def on_GET(self, origin, content, query, event_id): @@ -258,7 +258,7 @@ class FederationEventServlet(BaseFederationServlet): class FederationStateServlet(BaseFederationServlet): - PATH = "/state/([^/]*)/" + PATH = "/state/(?P[^/]*)/" # This is when someone asks for all data for a given context. def on_GET(self, origin, content, query, context): @@ -270,7 +270,7 @@ class FederationStateServlet(BaseFederationServlet): class FederationBackfillServlet(BaseFederationServlet): - PATH = "/backfill/([^/]*)/" + PATH = "/backfill/(?P[^/]*)/" def on_GET(self, origin, content, query, context): versions = query["v"] @@ -285,7 +285,7 @@ class FederationBackfillServlet(BaseFederationServlet): class FederationQueryServlet(BaseFederationServlet): - PATH = "/query/([^/]*)" + PATH = "/query/(?P[^/]*)" # This is when we receive a server-server Query def on_GET(self, origin, content, query, query_type): @@ -296,7 +296,7 @@ class FederationQueryServlet(BaseFederationServlet): class FederationMakeJoinServlet(BaseFederationServlet): - PATH = "/make_join/([^/]*)/([^/]*)" + PATH = "/make_join/(?P[^/]*)/(?P[^/]*)" @defer.inlineCallbacks def on_GET(self, origin, content, query, context, user_id): @@ -305,7 +305,7 @@ class FederationMakeJoinServlet(BaseFederationServlet): class FederationMakeLeaveServlet(BaseFederationServlet): - PATH = "/make_leave/([^/]*)/([^/]*)" + PATH = "/make_leave/(?P[^/]*)/(?P[^/]*)" @defer.inlineCallbacks def on_GET(self, origin, content, query, context, user_id): @@ -314,7 +314,7 @@ class FederationMakeLeaveServlet(BaseFederationServlet): class FederationSendLeaveServlet(BaseFederationServlet): - PATH = "/send_leave/([^/]*)/([^/]*)" + PATH = "/send_leave/(?P[^/]*)/(?P[^/]*)" @defer.inlineCallbacks def on_PUT(self, origin, content, query, room_id, txid): @@ -323,14 +323,14 @@ class FederationSendLeaveServlet(BaseFederationServlet): class FederationEventAuthServlet(BaseFederationServlet): - PATH = "/event_auth/([^/]*)/([^/]*)" + PATH = "/event_auth(?P[^/]*)/(?P[^/]*)" def on_GET(self, origin, content, query, context, event_id): return self.handler.on_event_auth(origin, context, event_id) class FederationSendJoinServlet(BaseFederationServlet): - PATH = "/send_join/([^/]*)/([^/]*)" + PATH = "/send_join/(?P[^/]*)/(?P[^/]*)" @defer.inlineCallbacks def on_PUT(self, origin, content, query, context, event_id): @@ -341,7 +341,7 @@ class FederationSendJoinServlet(BaseFederationServlet): class FederationInviteServlet(BaseFederationServlet): - PATH = "/invite/([^/]*)/([^/]*)" + PATH = "/invite/(?P[^/]*)/(?P[^/]*)" @defer.inlineCallbacks def on_PUT(self, origin, content, query, context, event_id): @@ -352,7 +352,7 @@ class FederationInviteServlet(BaseFederationServlet): class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet): - PATH = "/exchange_third_party_invite/([^/]*)" + PATH = "/exchange_third_party_invite/(?P[^/]*)" @defer.inlineCallbacks def on_PUT(self, origin, content, query, room_id): @@ -381,7 +381,7 @@ class FederationClientKeysClaimServlet(BaseFederationServlet): class FederationQueryAuthServlet(BaseFederationServlet): - PATH = "/query_auth/([^/]*)/([^/]*)" + PATH = "/query_auth/(?P[^/]*)/(?P[^/]*)" @defer.inlineCallbacks def on_POST(self, origin, content, query, context, event_id): @@ -394,7 +394,7 @@ class FederationQueryAuthServlet(BaseFederationServlet): class FederationGetMissingEventsServlet(BaseFederationServlet): # TODO(paul): Why does this path alone end with "/?" optional? - PATH = "/get_missing_events/([^/]*)/?" + PATH = "/get_missing_events/(?P[^/]*)/?" @defer.inlineCallbacks def on_POST(self, origin, content, query, room_id): diff --git a/synapse/http/server.py b/synapse/http/server.py index b17b190ee5..b82196fd5e 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -18,6 +18,7 @@ from synapse.api.errors import ( cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError, Codes ) from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.caches import intern_dict import synapse.metrics import synapse.events @@ -229,11 +230,12 @@ class JsonResource(HttpServer, resource.Resource): else: servlet_classname = "%r" % callback - args = [ - urllib.unquote(u).decode("UTF-8") if u else u for u in m.groups() - ] + kwargs = intern_dict({ + name: urllib.unquote(value).decode("UTF-8") if value else value + for name, value in m.groupdict().items() + }) - callback_return = yield callback(request, *args) + callback_return = yield callback(request, **kwargs) if callback_return is not None: code, response = callback_return self._send_response(request, code, response) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 583b77a835..b75b79df36 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -18,6 +18,7 @@ from synapse.api.errors import StoreError from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.caches.descriptors import Cache +from synapse.util.caches import intern_dict import synapse.metrics @@ -350,7 +351,7 @@ class SQLBaseStore(object): """ col_headers = list(column[0] for column in cursor.description) results = list( - dict(zip(col_headers, row)) for row in cursor.fetchall() + intern_dict(dict(zip(col_headers, row))) for row in cursor.fetchall() ) return results diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index dbc074d6b5..6b9d848eaa 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -62,18 +62,17 @@ class ReceiptsStore(SQLBaseStore): @cachedInlineCallbacks(num_args=2) def get_receipts_for_user(self, user_id, receipt_type): - def f(txn): - sql = ( - "SELECT room_id,event_id " - "FROM receipts_linearized " - "WHERE user_id = ? AND receipt_type = ? " - ) - txn.execute(sql, (user_id, receipt_type)) - return txn.fetchall() + rows = yield self._simple_select_list( + table="receipts_linearized", + keyvalues={ + "user_id": user_id, + "receipt_type": receipt_type, + }, + retcols=("room_id", "event_id"), + desc="get_receipts_for_user", + ) - defer.returnValue(dict( - (yield self.runInteraction("get_receipts_for_user", f)) - )) + defer.returnValue({row["room_id"]: row["event_id"] for row in rows}) @defer.inlineCallbacks def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None): diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 1982b1c607..03eecbbbb6 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -156,9 +156,7 @@ class StateStore(SQLBaseStore): @defer.inlineCallbacks def get_current_state_for_key(self, room_id, event_type, state_key): - event_ids = yield self._get_current_state_for_key( - room_id, intern_string(event_type), intern_string(state_key) - ) + event_ids = yield self._get_current_state_for_key(room_id, event_type, state_key) events = yield self._get_events(event_ids, get_prev_content=False) defer.returnValue(events) @@ -205,7 +203,7 @@ class StateStore(SQLBaseStore): results = {} for row in rows: - key = (intern_string(row["type"]), intern_string(row["state_key"])) + key = (row["type"], row["state_key"]) results.setdefault(row["state_group"], {})[key] = row["event_id"] return results @@ -286,7 +284,9 @@ class StateStore(SQLBaseStore): desc="_get_state_group_for_events", ) - defer.returnValue({row["event_id"]: row["state_group"] for row in rows}) + defer.returnValue({ + intern(row["event_id"].encode('ascii')): row["state_group"] for row in rows + }) def _get_some_state_from_cache(self, group, types): """Checks if group is in cache. See `_get_state_for_groups` diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 9d450fade5..838cec45fe 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -15,6 +15,9 @@ import synapse.metrics from lrucache import LruCache +import os + +CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) DEBUG_CACHES = False @@ -27,9 +30,62 @@ cache_counter = metrics.register_cache( labels=["name"], ) -_string_cache = LruCache(5000) +_string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR)) caches_by_name["string_cache"] = _string_cache +KNOWN_KEYS = { + key: key for key in + ( + "auth_events", + "content", + "depth", + "event_id", + "hashes", + "origin", + "origin_server_ts", + "prev_events", + "room_id", + "sender", + "signatures", + "state_key", + "type", + "unsigned", + "user_id", + ) +} + + def intern_string(string): + """Takes a (potentially) unicode string and interns using custom cache + """ return _string_cache.setdefault(string, string) + + +def intern_dict(dictionary): + """Takes a dictionary and interns well known keys and their values + """ + return _intern_known_values({ + _intern_key(key): value for key, value in dictionary.items() + }) + + +def _intern_known_values(dictionary): + intern_str_keys = ("event_id", "room_id") + intern_unicode_keys = ("sender", "user_id", "type", "state_key") + + for key in intern_str_keys: + val = dictionary.get(key, None) + if val is not None: + dictionary[key] = intern(val.encode('ascii')) + + for key in intern_unicode_keys: + val = dictionary.get(key, None) + if val is not None: + dictionary[key] = intern_string(val) + + return dictionary + + +def _intern_key(key): + return KNOWN_KEYS.get(key, key) -- cgit 1.5.1 From 2f0180b09e2a2afeed418a5840ae6b4fffcb4be4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 16:29:46 +0000 Subject: Don't bother interning keys that are already interned --- synapse/storage/state.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 03eecbbbb6..02cefdff26 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -284,9 +284,7 @@ class StateStore(SQLBaseStore): desc="_get_state_group_for_events", ) - defer.returnValue({ - intern(row["event_id"].encode('ascii')): row["state_group"] for row in rows - }) + defer.returnValue({row["event_id"]: row["state_group"] for row in rows}) def _get_some_state_from_cache(self, group, types): """Checks if group is in cache. See `_get_state_for_groups` -- cgit 1.5.1 From 8122ad7bab74b1a52188e350ca605033a9eca28e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Mar 2016 16:34:59 +0000 Subject: Simplify intern_dict --- synapse/util/caches/__init__.py | 27 ++++++++++----------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 838cec45fe..d53569ca49 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -65,27 +65,20 @@ def intern_string(string): def intern_dict(dictionary): """Takes a dictionary and interns well known keys and their values """ - return _intern_known_values({ - _intern_key(key): value for key, value in dictionary.items() - }) + return { + KNOWN_KEYS.get(key, key): _intern_known_values(key, value) + for key, value in dictionary.items() + } -def _intern_known_values(dictionary): +def _intern_known_values(key, value): intern_str_keys = ("event_id", "room_id") intern_unicode_keys = ("sender", "user_id", "type", "state_key") - for key in intern_str_keys: - val = dictionary.get(key, None) - if val is not None: - dictionary[key] = intern(val.encode('ascii')) + if key in intern_str_keys: + return intern(value.encode('ascii')) - for key in intern_unicode_keys: - val = dictionary.get(key, None) - if val is not None: - dictionary[key] = intern_string(val) + if key in intern_unicode_keys: + return intern_string(value) - return dictionary - - -def _intern_key(key): - return KNOWN_KEYS.get(key, key) + return value -- cgit 1.5.1