diff options
author | Paul "LeoNerd" Evans <paul@matrix.org> | 2016-08-18 14:21:01 +0100 |
---|---|---|
committer | Paul "LeoNerd" Evans <paul@matrix.org> | 2016-08-18 14:21:01 +0100 |
commit | d5bf7a4a991b0bfe2134bb5e5c7e194f33f037aa (patch) | |
tree | 2187b123096728d0ff329d354368ef48daed1fdb /synapse/storage | |
parent | Since empty lookups now return 200/empty list not 404, we can safely log fail... (diff) | |
parent | Merge pull request #1025 from matrix-org/erikj/appservice_stream (diff) | |
download | synapse-d5bf7a4a991b0bfe2134bb5e5c7e194f33f037aa.tar.xz |
Merge remote-tracking branch 'origin/develop' into paul/thirdpartylookup
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/__init__.py | 8 | ||||
-rw-r--r-- | synapse/storage/_base.py | 72 | ||||
-rw-r--r-- | synapse/storage/appservice.py | 39 | ||||
-rw-r--r-- | synapse/storage/directory.py | 37 | ||||
-rw-r--r-- | synapse/storage/events.py | 3 | ||||
-rw-r--r-- | synapse/storage/prepare_database.py | 2 | ||||
-rw-r--r-- | synapse/storage/presence.py | 32 | ||||
-rw-r--r-- | synapse/storage/registration.py | 70 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 12 | ||||
-rw-r--r-- | synapse/storage/schema/delta/34/appservice_stream.sql | 23 | ||||
-rw-r--r-- | synapse/storage/schema/delta/34/cache_stream.py | 46 | ||||
-rw-r--r-- | synapse/storage/schema/delta/34/push_display_name_rename.sql | 20 |
12 files changed, 287 insertions, 77 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 73fb334dd6..7efc5bfeef 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -50,6 +50,7 @@ from .openid import OpenIdStore from .client_ips import ClientIpStore from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator +from .engines import PostgresEngine from synapse.api.constants import PresenceState from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -123,6 +124,13 @@ class DataStore(RoomMemberStore, RoomStore, extra_tables=[("deleted_pushers", "stream_id")], ) + if isinstance(self.database_engine, PostgresEngine): + self._cache_id_gen = StreamIdGenerator( + db_conn, "cache_invalidation_stream", "stream_id", + ) + else: + self._cache_id_gen = None + events_max = self._stream_id_gen.get_current_token() event_cache_prefill, min_event_val = self._get_cache_dict( db_conn, "events", diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 0117fdc639..029f6612e6 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -19,6 +19,7 @@ from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.caches.descriptors import Cache from synapse.util.caches import intern_dict +from synapse.storage.engines import PostgresEngine import synapse.metrics @@ -305,13 +306,14 @@ class SQLBaseStore(object): func, *args, **kwargs ) - with PreserveLoggingContext(): - result = yield self._db_pool.runWithConnection( - inner_func, *args, **kwargs - ) - - for after_callback, after_args in after_callbacks: - after_callback(*after_args) + try: + with PreserveLoggingContext(): + result = yield self._db_pool.runWithConnection( + inner_func, *args, **kwargs + ) + finally: + for after_callback, after_args in after_callbacks: + after_callback(*after_args) defer.returnValue(result) @defer.inlineCallbacks @@ -860,6 +862,62 @@ class SQLBaseStore(object): return cache, min_val + def _invalidate_cache_and_stream(self, txn, cache_func, keys): + """Invalidates the cache and adds it to the cache stream so slaves + will know to invalidate their caches. + + This should only be used to invalidate caches where slaves won't + otherwise know from other replication streams that the cache should + be invalidated. + """ + txn.call_after(cache_func.invalidate, keys) + + if isinstance(self.database_engine, PostgresEngine): + # get_next() returns a context manager which is designed to wrap + # the transaction. However, we want to only get an ID when we want + # to use it, here, so we need to call __enter__ manually, and have + # __exit__ called after the transaction finishes. + ctx = self._cache_id_gen.get_next() + stream_id = ctx.__enter__() + txn.call_after(ctx.__exit__, None, None, None) + txn.call_after(self.hs.get_notifier().on_new_replication_data) + + self._simple_insert_txn( + txn, + table="cache_invalidation_stream", + values={ + "stream_id": stream_id, + "cache_func": cache_func.__name__, + "keys": list(keys), + "invalidation_ts": self.clock.time_msec(), + } + ) + + def get_all_updated_caches(self, last_id, current_id, limit): + if last_id == current_id: + return defer.succeed([]) + + def get_all_updated_caches_txn(txn): + # We purposefully don't bound by the current token, as we want to + # send across cache invalidations as quickly as possible. Cache + # invalidations are idempotent, so duplicates are fine. + sql = ( + "SELECT stream_id, cache_func, keys, invalidation_ts" + " FROM cache_invalidation_stream" + " WHERE stream_id > ? ORDER BY stream_id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, limit,)) + return txn.fetchall() + return self.runInteraction( + "get_all_updated_caches", get_all_updated_caches_txn + ) + + def get_cache_stream_token(self): + if self._cache_id_gen: + return self._cache_id_gen.get_current_token() + else: + return 0 + class _RollbackButIsFineException(Exception): """ This exception is used to rollback a transaction without implying diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index d1ee533fac..f0c88e05cd 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -352,3 +352,42 @@ class ApplicationServiceTransactionStore(SQLBaseStore): return 0 else: return int(last_txn_id[0]) # select 'last_txn' col + + def set_appservice_last_pos(self, pos): + def set_appservice_last_pos_txn(txn): + txn.execute( + "UPDATE appservice_stream_position SET stream_ordering = ?", (pos,) + ) + return self.runInteraction( + "set_appservice_last_pos", set_appservice_last_pos_txn + ) + + @defer.inlineCallbacks + def get_new_events_for_appservice(self, current_id, limit): + """Get all new evnets""" + + def get_new_events_for_appservice_txn(txn): + sql = ( + "SELECT e.stream_ordering, e.event_id" + " FROM events AS e, appservice_stream_position AS a" + " WHERE a.stream_ordering < e.stream_ordering AND e.stream_ordering <= ?" + " ORDER BY e.stream_ordering ASC" + " LIMIT ?" + ) + + txn.execute(sql, (current_id, limit)) + rows = txn.fetchall() + + upper_bound = current_id + if len(rows) == limit: + upper_bound = rows[-1][0] + + return upper_bound, [row[1] for row in rows] + + upper_bound, event_ids = yield self.runInteraction( + "get_new_events_for_appservice", get_new_events_for_appservice_txn, + ) + + events = yield self._get_events(event_ids) + + defer.returnValue((upper_bound, events)) diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index ef231a04dc..9caaf81f2c 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -82,32 +82,39 @@ class DirectoryStore(SQLBaseStore): Returns: Deferred """ - try: - yield self._simple_insert( + def alias_txn(txn): + self._simple_insert_txn( + txn, "room_aliases", { "room_alias": room_alias.to_string(), "room_id": room_id, "creator": creator, }, - desc="create_room_alias_association", - ) - except self.database_engine.module.IntegrityError: - raise SynapseError( - 409, "Room alias %s already exists" % room_alias.to_string() ) - for server in servers: - # TODO(erikj): Fix this to bulk insert - yield self._simple_insert( - "room_alias_servers", - { + self._simple_insert_many_txn( + txn, + table="room_alias_servers", + values=[{ "room_alias": room_alias.to_string(), "server": server, - }, - desc="create_room_alias_association", + } for server in servers], ) - self.get_aliases_for_room.invalidate((room_id,)) + + self._invalidate_cache_and_stream( + txn, self.get_aliases_for_room, (room_id,) + ) + + try: + ret = yield self.runInteraction( + "create_room_alias_association", alias_txn + ) + except self.database_engine.module.IntegrityError: + raise SynapseError( + 409, "Room alias %s already exists" % room_alias.to_string() + ) + defer.returnValue(ret) def get_room_alias_creator(self, room_alias): return self._simple_select_one_onecol( diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d2feee8dbb..ad026b5e0b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -600,7 +600,8 @@ class EventsStore(SQLBaseStore): "rejections", "redactions", "room_memberships", - "state_events" + "state_events", + "topics" ): txn.executemany( "DELETE FROM %s WHERE event_id = ?" % (table,), diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 8801669a6b..b94ce7bea1 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 33 +SCHEMA_VERSION = 34 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index d03f7c541e..21d0696640 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -189,18 +189,30 @@ class PresenceStore(SQLBaseStore): desc="add_presence_list_pending", ) - @defer.inlineCallbacks def set_presence_list_accepted(self, observer_localpart, observed_userid): - result = yield self._simple_update_one( - table="presence_list", - keyvalues={"user_id": observer_localpart, - "observed_user_id": observed_userid}, - updatevalues={"accepted": True}, - desc="set_presence_list_accepted", + def update_presence_list_txn(txn): + result = self._simple_update_one_txn( + txn, + table="presence_list", + keyvalues={ + "user_id": observer_localpart, + "observed_user_id": observed_userid + }, + updatevalues={"accepted": True}, + ) + + self._invalidate_cache_and_stream( + txn, self.get_presence_list_accepted, (observer_localpart,) + ) + self._invalidate_cache_and_stream( + txn, self.get_presence_list_observers_accepted, (observed_userid,) + ) + + return result + + return self.runInteraction( + "set_presence_list_accepted", update_presence_list_txn, ) - self.get_presence_list_accepted.invalidate((observer_localpart,)) - self.get_presence_list_observers_accepted.invalidate((observed_userid,)) - defer.returnValue(result) def get_presence_list(self, observer_localpart, accepted=None): if accepted: diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 7e7d32eb66..19cb3b31c6 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -251,7 +251,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): self.get_user_by_id.invalidate((user_id,)) @defer.inlineCallbacks - def user_delete_access_tokens(self, user_id, except_token_ids=[], + def user_delete_access_tokens(self, user_id, except_token_id=None, device_id=None, delete_refresh_tokens=False): """ @@ -259,7 +259,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): Args: user_id (str): ID of user the tokens belong to - except_token_ids (list[str]): list of access_tokens which should + except_token_id (str): list of access_tokens IDs which should *not* be deleted device_id (str|None): ID of device the tokens are associated with. If None, tokens associated with any device (or no device) will @@ -269,53 +269,45 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): Returns: defer.Deferred: """ - def f(txn, table, except_tokens, call_after_delete): - sql = "SELECT token FROM %s WHERE user_id = ?" % table - clauses = [user_id] - + def f(txn): + keyvalues = { + "user_id": user_id, + } if device_id is not None: - sql += " AND device_id = ?" - clauses.append(device_id) + keyvalues["device_id"] = device_id - if except_tokens: - sql += " AND id NOT IN (%s)" % ( - ",".join(["?" for _ in except_tokens]), + if delete_refresh_tokens: + self._simple_delete_txn( + txn, + table="refresh_tokens", + keyvalues=keyvalues, ) - clauses += except_tokens - - txn.execute(sql, clauses) - rows = txn.fetchall() + items = keyvalues.items() + where_clause = " AND ".join(k + " = ?" for k, _ in items) + values = [v for _, v in items] + if except_token_id: + where_clause += " AND id != ?" + values.append(except_token_id) - n = 100 - chunks = [rows[i:i + n] for i in xrange(0, len(rows), n)] - for chunk in chunks: - if call_after_delete: - for row in chunk: - txn.call_after(call_after_delete, (row[0],)) + txn.execute( + "SELECT token FROM access_tokens WHERE %s" % where_clause, + values + ) + rows = self.cursor_to_dict(txn) - txn.execute( - "DELETE FROM %s WHERE token in (%s)" % ( - table, - ",".join(["?" for _ in chunk]), - ), [r[0] for r in chunk] + for row in rows: + self._invalidate_cache_and_stream( + txn, self.get_user_by_access_token, (row["token"],) ) - # delete refresh tokens first, to stop new access tokens being - # allocated while our backs are turned - if delete_refresh_tokens: - yield self.runInteraction( - "user_delete_access_tokens", f, - table="refresh_tokens", - except_tokens=[], - call_after_delete=None, + txn.execute( + "DELETE FROM access_tokens WHERE %s" % where_clause, + values ) yield self.runInteraction( "user_delete_access_tokens", f, - table="access_tokens", - except_tokens=except_token_ids, - call_after_delete=self.get_user_by_access_token.invalidate, ) def delete_access_token(self, access_token): @@ -328,7 +320,9 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): }, ) - txn.call_after(self.get_user_by_access_token.invalidate, (access_token,)) + self._invalidate_cache_and_stream( + txn, self.get_user_by_access_token, (access_token,) + ) return self.runInteraction("delete_access_token", f) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 8bd693be72..a422ddf633 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -277,7 +277,6 @@ class RoomMemberStore(SQLBaseStore): user_id, membership_list=[Membership.JOIN], ) - @defer.inlineCallbacks def forget(self, user_id, room_id): """Indicate that user_id wishes to discard history for room_id.""" def f(txn): @@ -292,10 +291,13 @@ class RoomMemberStore(SQLBaseStore): " room_id = ?" ) txn.execute(sql, (user_id, room_id)) - yield self.runInteraction("forget_membership", f) - self.was_forgotten_at.invalidate_all() - self.who_forgot_in_room.invalidate_all() - self.did_forget.invalidate((user_id, room_id)) + + txn.call_after(self.was_forgotten_at.invalidate_all) + txn.call_after(self.did_forget.invalidate, (user_id, room_id)) + self._invalidate_cache_and_stream( + txn, self.who_forgot_in_room, (room_id,) + ) + return self.runInteraction("forget_membership", f) @cachedInlineCallbacks(num_args=2) def did_forget(self, user_id, room_id): diff --git a/synapse/storage/schema/delta/34/appservice_stream.sql b/synapse/storage/schema/delta/34/appservice_stream.sql new file mode 100644 index 0000000000..69e16eda0f --- /dev/null +++ b/synapse/storage/schema/delta/34/appservice_stream.sql @@ -0,0 +1,23 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS appservice_stream_position( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + stream_ordering BIGINT, + CHECK (Lock='X') +); + +INSERT INTO appservice_stream_position (stream_ordering) + SELECT COALESCE(MAX(stream_ordering), 0) FROM events; diff --git a/synapse/storage/schema/delta/34/cache_stream.py b/synapse/storage/schema/delta/34/cache_stream.py new file mode 100644 index 0000000000..3b63a1562d --- /dev/null +++ b/synapse/storage/schema/delta/34/cache_stream.py @@ -0,0 +1,46 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage.prepare_database import get_statements +from synapse.storage.engines import PostgresEngine + +import logging + +logger = logging.getLogger(__name__) + + +# This stream is used to notify replication slaves that some caches have +# been invalidated that they cannot infer from the other streams. +CREATE_TABLE = """ +CREATE TABLE cache_invalidation_stream ( + stream_id BIGINT, + cache_func TEXT, + keys TEXT[], + invalidation_ts BIGINT +); + +CREATE INDEX cache_invalidation_stream_id ON cache_invalidation_stream(stream_id); +""" + + +def run_create(cur, database_engine, *args, **kwargs): + if not isinstance(database_engine, PostgresEngine): + return + + for statement in get_statements(CREATE_TABLE.splitlines()): + cur.execute(statement) + + +def run_upgrade(cur, database_engine, *args, **kwargs): + pass diff --git a/synapse/storage/schema/delta/34/push_display_name_rename.sql b/synapse/storage/schema/delta/34/push_display_name_rename.sql new file mode 100644 index 0000000000..0d9fe1a99a --- /dev/null +++ b/synapse/storage/schema/delta/34/push_display_name_rename.sql @@ -0,0 +1,20 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +DELETE FROM push_rules WHERE rule_id = 'global/override/.m.rule.contains_display_name'; +UPDATE push_rules SET rule_id = 'global/override/.m.rule.contains_display_name' WHERE rule_id = 'global/underride/.m.rule.contains_display_name'; + +DELETE FROM push_rules_enable WHERE rule_id = 'global/override/.m.rule.contains_display_name'; +UPDATE push_rules_enable SET rule_id = 'global/override/.m.rule.contains_display_name' WHERE rule_id = 'global/underride/.m.rule.contains_display_name'; |