From a7b65bdedf512f646a3ca2478fb96a914856de35 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 May 2015 12:17:33 +0100 Subject: Add config option to turn off freezing events. Use new encode_json api and ujson.loads --- synapse/storage/events.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d2a010bd88..20a8d81794 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -17,7 +17,7 @@ from _base import SQLBaseStore, _RollbackButIsFineException from twisted.internet import defer, reactor -from synapse.events import FrozenEvent +from synapse.events import FrozenEvent, USE_FROZEN_DICTS from synapse.events.utils import prune_event from synapse.util.logcontext import preserve_context_over_deferred @@ -26,11 +26,11 @@ from synapse.api.constants import EventTypes from synapse.crypto.event_signing import compute_event_reference_hash from syutil.base64util import decode_base64 -from syutil.jsonutil import encode_canonical_json +from syutil.jsonutil import encode_json from contextlib import contextmanager import logging -import simplejson as json +import ujson as json logger = logging.getLogger(__name__) @@ -166,8 +166,9 @@ class EventsStore(SQLBaseStore): allow_none=True, ) - metadata_json = encode_canonical_json( - event.internal_metadata.get_dict() + metadata_json = encode_json( + event.internal_metadata.get_dict(), + using_frozen_dicts=USE_FROZEN_DICTS ).decode("UTF-8") # If we have already persisted this event, we don't need to do any @@ -235,12 +236,14 @@ class EventsStore(SQLBaseStore): "event_id": event.event_id, "room_id": event.room_id, "internal_metadata": metadata_json, - "json": encode_canonical_json(event_dict).decode("UTF-8"), + "json": encode_json( + event_dict, using_frozen_dicts=USE_FROZEN_DICTS + ).decode("UTF-8"), }, ) - content = encode_canonical_json( - event.content + content = encode_json( + event.content, using_frozen_dicts=USE_FROZEN_DICTS ).decode("UTF-8") vals = { @@ -266,8 +269,8 @@ class EventsStore(SQLBaseStore): ] } - vals["unrecognized_keys"] = encode_canonical_json( - unrec + vals["unrecognized_keys"] = encode_json( + unrec, using_frozen_dicts=USE_FROZEN_DICTS ).decode("UTF-8") sql = ( -- cgit 1.5.1 From 6e174632289330aa494b15c3814743daebbf2127 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 1 Jun 2015 16:38:55 +0100 Subject: Don't explode if we don't have the event --- synapse/storage/events.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 20a8d81794..2caf0aae80 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -736,7 +736,8 @@ class EventsStore(SQLBaseStore): because = yield self.get_event( redaction_id, - check_redacted=False + check_redacted=False, + allow_none=True, ) if because: @@ -746,6 +747,7 @@ class EventsStore(SQLBaseStore): prev = yield self.get_event( ev.unsigned["replaces_state"], get_prev_content=False, + allow_none=True, ) if prev: ev.unsigned["prev_content"] = prev.get_dict()["content"] -- cgit 1.5.1 From d8866d72771fa04bd58a77a03aded45bf3ff2293 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Jun 2015 14:45:17 +0100 Subject: Caches should be bound to instances. Before, caches were global and so different instances of the stores would share caches. This caused problems in the unit tests. --- synapse/storage/_base.py | 45 +++++++++++++------- tests/storage/test__base.py | 84 ++++++++++++++++++++++---------------- tests/storage/test_registration.py | 2 +- 3 files changed, 81 insertions(+), 50 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 39884c2afe..8d33def6c6 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -127,7 +127,7 @@ class Cache(object): self.cache.clear() -def cached(max_entries=1000, num_args=1, lru=False): +class CacheDescriptor(object): """ A method decorator that applies a memoizing cache around the function. The function is presumed to take zero or more arguments, which are used in @@ -141,25 +141,32 @@ def cached(max_entries=1000, num_args=1, lru=False): which can be used to insert values into the cache specifically, without calling the calculation function. """ - def wrap(orig): + def __init__(self, orig, max_entries=1000, num_args=1, lru=False): + self.orig = orig + + self.max_entries = max_entries + self.num_args = num_args + self.lru = lru + + def __get__(self, obj, objtype=None): cache = Cache( - name=orig.__name__, - max_entries=max_entries, - keylen=num_args, - lru=lru, + name=self.orig.__name__, + max_entries=self.max_entries, + keylen=self.num_args, + lru=self.lru, ) - @functools.wraps(orig) + @functools.wraps(self.orig) @defer.inlineCallbacks - def wrapped(self, *keyargs): + def wrapped(*keyargs): try: - cached_result = cache.get(*keyargs) + cached_result = cache.get(*keyargs[:self.num_args]) if DEBUG_CACHES: - actual_result = yield orig(self, *keyargs) + actual_result = yield self.orig(obj, *keyargs) if actual_result != cached_result: logger.error( "Stale cache entry %s%r: cached: %r, actual %r", - orig.__name__, keyargs, + self.orig.__name__, keyargs, cached_result, actual_result, ) raise ValueError("Stale cache entry") @@ -170,18 +177,28 @@ def cached(max_entries=1000, num_args=1, lru=False): # while the SELECT is executing (SYN-369) sequence = cache.sequence - ret = yield orig(self, *keyargs) + ret = yield self.orig(obj, *keyargs) - cache.update(sequence, *keyargs + (ret,)) + cache.update(sequence, *keyargs[:self.num_args] + (ret,)) defer.returnValue(ret) wrapped.invalidate = cache.invalidate wrapped.invalidate_all = cache.invalidate_all wrapped.prefill = cache.prefill + + obj.__dict__[self.orig.__name__] = wrapped + return wrapped - return wrap + +def cached(max_entries=1000, num_args=1, lru=False): + return lambda orig: CacheDescriptor( + orig, + max_entries=max_entries, + num_args=num_args, + lru=lru + ) class LoggingTransaction(object): diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index 96caf8c4c1..8c3d2952bd 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -96,73 +96,84 @@ class CacheDecoratorTestCase(unittest.TestCase): @defer.inlineCallbacks def test_passthrough(self): - @cached() - def func(self, key): - return key + class A(object): + @cached() + def func(self, key): + return key - self.assertEquals((yield func(self, "foo")), "foo") - self.assertEquals((yield func(self, "bar")), "bar") + a = A() + + self.assertEquals((yield a.func("foo")), "foo") + self.assertEquals((yield a.func("bar")), "bar") @defer.inlineCallbacks def test_hit(self): callcount = [0] - @cached() - def func(self, key): - callcount[0] += 1 - return key + class A(object): + @cached() + def func(self, key): + callcount[0] += 1 + return key - yield func(self, "foo") + a = A() + yield a.func("foo") self.assertEquals(callcount[0], 1) - self.assertEquals((yield func(self, "foo")), "foo") + self.assertEquals((yield a.func("foo")), "foo") self.assertEquals(callcount[0], 1) @defer.inlineCallbacks def test_invalidate(self): callcount = [0] - @cached() - def func(self, key): - callcount[0] += 1 - return key + class A(object): + @cached() + def func(self, key): + callcount[0] += 1 + return key - yield func(self, "foo") + a = A() + yield a.func("foo") self.assertEquals(callcount[0], 1) - func.invalidate("foo") + a.func.invalidate("foo") - yield func(self, "foo") + yield a.func("foo") self.assertEquals(callcount[0], 2) def test_invalidate_missing(self): - @cached() - def func(self, key): - return key + class A(object): + @cached() + def func(self, key): + return key - func.invalidate("what") + A().func.invalidate("what") @defer.inlineCallbacks def test_max_entries(self): callcount = [0] - @cached(max_entries=10) - def func(self, key): - callcount[0] += 1 - return key + class A(object): + @cached(max_entries=10) + def func(self, key): + callcount[0] += 1 + return key - for k in range(0,12): - yield func(self, k) + a = A() + + for k in range(0, 12): + yield a.func(k) self.assertEquals(callcount[0], 12) # There must have been at least 2 evictions, meaning if we calculate # all 12 values again, we must get called at least 2 more times for k in range(0,12): - yield func(self, k) + yield a.func(k) self.assertTrue(callcount[0] >= 14, msg="Expected callcount >= 14, got %d" % (callcount[0])) @@ -171,12 +182,15 @@ class CacheDecoratorTestCase(unittest.TestCase): def test_prefill(self): callcount = [0] - @cached() - def func(self, key): - callcount[0] += 1 - return key + class A(object): + @cached() + def func(self, key): + callcount[0] += 1 + return key + + a = A() - func.prefill("foo", 123) + a.func.prefill("foo", 123) - self.assertEquals((yield func(self, "foo")), 123) + self.assertEquals((yield a.func("foo")), 123) self.assertEquals(callcount[0], 0) diff --git a/tests/storage/test_registration.py b/tests/storage/test_registration.py index 78f6004204..2702291178 100644 --- a/tests/storage/test_registration.py +++ b/tests/storage/test_registration.py @@ -46,7 +46,7 @@ class RegistrationStoreTestCase(unittest.TestCase): (yield self.store.get_user_by_id(self.user_id)) ) - result = yield self.store.get_user_by_token(self.tokens[1]) + result = yield self.store.get_user_by_token(self.tokens[0]) self.assertDictContainsSubset( { -- cgit 1.5.1 From 0d7d9c37b67bc9956524faf2a1b60c6fae07ae42 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Jun 2015 14:45:55 +0100 Subject: Add cache to get_state_groups --- synapse/storage/state.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index b24de34f23..f2b17f29ea 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -81,19 +81,23 @@ class StateStore(SQLBaseStore): f, ) - @defer.inlineCallbacks - def c(vals): - vals[:] = yield self._get_events(vals, get_prev_content=False) - - yield defer.gatherResults( + state_list = yield defer.gatherResults( [ - c(vals) - for vals in states.values() + self._fetch_events_for_group(group, vals) + for group, vals in states.items() ], consumeErrors=True, ) - defer.returnValue(states) + defer.returnValue(dict(state_list)) + + @cached(num_args=1) + def _fetch_events_for_group(self, state_group, events): + return self._get_events( + events, get_prev_content=False + ).addCallback( + lambda evs: (state_group, evs) + ) def _store_state_groups_txn(self, txn, event, context): if context.current_state is None: -- cgit 1.5.1 From 63a7b3ad1eb6f5e959ead1bae36d9037cad888a2 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 4 Jun 2015 16:16:01 +0100 Subject: Add script to (re)convert the pushers table to changing the unique key. Also give the python db upgrade scripts the database engine so they can convert parameter strings, and add *args **kwargs to the upgrade function so we can add more args in future and previous scripts will ignore them. --- synapse/storage/__init__.py | 2 +- .../schema/delta/14/upgrade_appservice_db.py | 2 +- synapse/storage/schema/delta/20/pushers.py | 77 ++++++++++++++++++++++ 3 files changed, 79 insertions(+), 2 deletions(-) create mode 100644 synapse/storage/schema/delta/20/pushers.py (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 75af44d787..71757c393a 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -348,7 +348,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, module_name, absolute_path, python_file ) logger.debug("Running script %s", relative_path) - module.run_upgrade(cur) + module.run_upgrade(cur, database_engine) elif ext == ".sql": # A plain old .sql file, just read and execute it logger.debug("Applying schema %s", relative_path) diff --git a/synapse/storage/schema/delta/14/upgrade_appservice_db.py b/synapse/storage/schema/delta/14/upgrade_appservice_db.py index 9f3a4dd4c5..61232f9757 100644 --- a/synapse/storage/schema/delta/14/upgrade_appservice_db.py +++ b/synapse/storage/schema/delta/14/upgrade_appservice_db.py @@ -18,7 +18,7 @@ import logging logger = logging.getLogger(__name__) -def run_upgrade(cur): +def run_upgrade(cur, *args, **kwargs): cur.execute("SELECT id, regex FROM application_services_regex") for row in cur.fetchall(): try: diff --git a/synapse/storage/schema/delta/20/pushers.py b/synapse/storage/schema/delta/20/pushers.py new file mode 100644 index 0000000000..f09b41559c --- /dev/null +++ b/synapse/storage/schema/delta/20/pushers.py @@ -0,0 +1,77 @@ +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +""" +Main purpose of this upgrade is to change the unique key on the +pushers table again (it was missed when the v16 full schema was +made) but this also changes the pushkey and data columns to text. +When selecting a bytea column into a text column, postgres inserts +the hex encoded data, and there's no portable way of getting the +UTF-8 bytes, so we have to do it in Python. +""" + +import logging + +logger = logging.getLogger(__name__) + + +def run_upgrade(cur, database_engine, *args, **kwargs): + logger.info("Porting pushers table...") + cur.execute(""" + CREATE TABLE IF NOT EXISTS pushers2 ( + id BIGINT PRIMARY KEY, + user_name TEXT NOT NULL, + access_token BIGINT DEFAULT NULL, + profile_tag VARCHAR(32) NOT NULL, + kind VARCHAR(8) NOT NULL, + app_id VARCHAR(64) NOT NULL, + app_display_name VARCHAR(64) NOT NULL, + device_display_name VARCHAR(128) NOT NULL, + pushkey TEXT NOT NULL, + ts BIGINT NOT NULL, + lang VARCHAR(8), + data TEXT, + last_token TEXT, + last_success BIGINT, + failing_since BIGINT, + UNIQUE (app_id, pushkey, user_name) + ) + """) + cur.execute("""SELECT + id, user_name, access_token, profile_tag, kind, + app_id, app_display_name, device_display_name, + pushkey, ts, lang, data, last_token, last_success, + failing_since + FROM pushers + """) + count = 0 + for row in cur.fetchall(): + row = list(row) + row[8] = bytes(row[8]).decode("utf-8") + row[11] = bytes(row[11]).decode("utf-8") + cur.execute(database_engine.convert_param_style(""" + INSERT into pushers2 ( + id, user_name, access_token, profile_tag, kind, + app_id, app_display_name, device_display_name, + pushkey, ts, lang, data, last_token, last_success, + failing_since + ) values (%s)""" % (','.join(['?' for _ in range(len(row))]))), + row + ) + count += 1 + cur.execute("DROP TABLE pushers") + cur.execute("ALTER TABLE pushers2 RENAME TO pushers") + logger.info("Moved %d pushers to new table", count) + -- cgit 1.5.1 From da84946de416aff67eea67550cc2b68c95d2868d Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 4 Jun 2015 16:43:45 +0100 Subject: pep8 --- synapse/storage/schema/delta/20/pushers.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/20/pushers.py b/synapse/storage/schema/delta/20/pushers.py index f09b41559c..543e57bbe2 100644 --- a/synapse/storage/schema/delta/20/pushers.py +++ b/synapse/storage/schema/delta/20/pushers.py @@ -74,4 +74,3 @@ def run_upgrade(cur, database_engine, *args, **kwargs): cur.execute("DROP TABLE pushers") cur.execute("ALTER TABLE pushers2 RENAME TO pushers") logger.info("Moved %d pushers to new table", count) - -- cgit 1.5.1 From 1e365e88bd9496e60de56e45ea4603eb3b81d8c9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 11 Jun 2015 15:50:39 +0100 Subject: Bump schema version --- synapse/storage/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 71757c393a..c137f47820 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -51,7 +51,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 19 +SCHEMA_VERSION = 20 dir_path = os.path.abspath(os.path.dirname(__file__)) -- cgit 1.5.1