diff options
author | David Baker <dave@matrix.org> | 2016-08-18 17:15:26 +0100 |
---|---|---|
committer | David Baker <dave@matrix.org> | 2016-08-18 17:15:26 +0100 |
commit | 602c84cd9c7d8d14fa758c94301a946a02e17494 (patch) | |
tree | 0ef6783ebdd7bde9760bf7eaf6a683e7246815e4 /synapse/storage | |
parent | Merge remote-tracking branch 'origin/develop' into dbkr/notifications_api (diff) | |
parent | Merge pull request #1027 from matrix-org/erikj/appservice_stream (diff) | |
download | synapse-602c84cd9c7d8d14fa758c94301a946a02e17494.tar.xz |
Merge remote-tracking branch 'origin/develop' into dbkr/notifications_api
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/__init__.py | 8 | ||||
-rw-r--r-- | synapse/storage/_base.py | 72 | ||||
-rw-r--r-- | synapse/storage/appservice.py | 188 | ||||
-rw-r--r-- | synapse/storage/directory.py | 37 | ||||
-rw-r--r-- | synapse/storage/events.py | 3 | ||||
-rw-r--r-- | synapse/storage/prepare_database.py | 2 | ||||
-rw-r--r-- | synapse/storage/presence.py | 32 | ||||
-rw-r--r-- | synapse/storage/registration.py | 103 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 12 | ||||
-rw-r--r-- | synapse/storage/schema/delta/34/appservice_stream.sql | 23 | ||||
-rw-r--r-- | synapse/storage/schema/delta/34/cache_stream.py | 46 | ||||
-rw-r--r-- | synapse/storage/schema/delta/34/push_display_name_rename.sql | 20 |
12 files changed, 382 insertions, 164 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 73fb334dd6..7efc5bfeef 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -50,6 +50,7 @@ from .openid import OpenIdStore from .client_ips import ClientIpStore from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator +from .engines import PostgresEngine from synapse.api.constants import PresenceState from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -123,6 +124,13 @@ class DataStore(RoomMemberStore, RoomStore, extra_tables=[("deleted_pushers", "stream_id")], ) + if isinstance(self.database_engine, PostgresEngine): + self._cache_id_gen = StreamIdGenerator( + db_conn, "cache_invalidation_stream", "stream_id", + ) + else: + self._cache_id_gen = None + events_max = self._stream_id_gen.get_current_token() event_cache_prefill, min_event_val = self._get_cache_dict( db_conn, "events", diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 0117fdc639..029f6612e6 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -19,6 +19,7 @@ from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.caches.descriptors import Cache from synapse.util.caches import intern_dict +from synapse.storage.engines import PostgresEngine import synapse.metrics @@ -305,13 +306,14 @@ class SQLBaseStore(object): func, *args, **kwargs ) - with PreserveLoggingContext(): - result = yield self._db_pool.runWithConnection( - inner_func, *args, **kwargs - ) - - for after_callback, after_args in after_callbacks: - after_callback(*after_args) + try: + with PreserveLoggingContext(): + result = yield self._db_pool.runWithConnection( + inner_func, *args, **kwargs + ) + finally: + for after_callback, after_args in after_callbacks: + after_callback(*after_args) defer.returnValue(result) @defer.inlineCallbacks @@ -860,6 +862,62 @@ class SQLBaseStore(object): return cache, min_val + def _invalidate_cache_and_stream(self, txn, cache_func, keys): + """Invalidates the cache and adds it to the cache stream so slaves + will know to invalidate their caches. + + This should only be used to invalidate caches where slaves won't + otherwise know from other replication streams that the cache should + be invalidated. + """ + txn.call_after(cache_func.invalidate, keys) + + if isinstance(self.database_engine, PostgresEngine): + # get_next() returns a context manager which is designed to wrap + # the transaction. However, we want to only get an ID when we want + # to use it, here, so we need to call __enter__ manually, and have + # __exit__ called after the transaction finishes. + ctx = self._cache_id_gen.get_next() + stream_id = ctx.__enter__() + txn.call_after(ctx.__exit__, None, None, None) + txn.call_after(self.hs.get_notifier().on_new_replication_data) + + self._simple_insert_txn( + txn, + table="cache_invalidation_stream", + values={ + "stream_id": stream_id, + "cache_func": cache_func.__name__, + "keys": list(keys), + "invalidation_ts": self.clock.time_msec(), + } + ) + + def get_all_updated_caches(self, last_id, current_id, limit): + if last_id == current_id: + return defer.succeed([]) + + def get_all_updated_caches_txn(txn): + # We purposefully don't bound by the current token, as we want to + # send across cache invalidations as quickly as possible. Cache + # invalidations are idempotent, so duplicates are fine. + sql = ( + "SELECT stream_id, cache_func, keys, invalidation_ts" + " FROM cache_invalidation_stream" + " WHERE stream_id > ? ORDER BY stream_id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, limit,)) + return txn.fetchall() + return self.runInteraction( + "get_all_updated_caches", get_all_updated_caches_txn + ) + + def get_cache_stream_token(self): + if self._cache_id_gen: + return self._cache_id_gen.get_current_token() + else: + return 0 + class _RollbackButIsFineException(Exception): """ This exception is used to rollback a transaction without implying diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index d1ee533fac..b496b918b7 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -218,38 +218,37 @@ class ApplicationServiceTransactionStore(SQLBaseStore): Returns: AppServiceTransaction: A new transaction. """ + def _create_appservice_txn(txn): + # work out new txn id (highest txn id for this service += 1) + # The highest id may be the last one sent (in which case it is last_txn) + # or it may be the highest in the txns list (which are waiting to be/are + # being sent) + last_txn_id = self._get_last_txn(txn, service.id) + + txn.execute( + "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?", + (service.id,) + ) + highest_txn_id = txn.fetchone()[0] + if highest_txn_id is None: + highest_txn_id = 0 + + new_txn_id = max(highest_txn_id, last_txn_id) + 1 + + # Insert new txn into txn table + event_ids = json.dumps([e.event_id for e in events]) + txn.execute( + "INSERT INTO application_services_txns(as_id, txn_id, event_ids) " + "VALUES(?,?,?)", + (service.id, new_txn_id, event_ids) + ) + return AppServiceTransaction( + service=service, id=new_txn_id, events=events + ) + return self.runInteraction( "create_appservice_txn", - self._create_appservice_txn, - service, events - ) - - def _create_appservice_txn(self, txn, service, events): - # work out new txn id (highest txn id for this service += 1) - # The highest id may be the last one sent (in which case it is last_txn) - # or it may be the highest in the txns list (which are waiting to be/are - # being sent) - last_txn_id = self._get_last_txn(txn, service.id) - - txn.execute( - "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?", - (service.id,) - ) - highest_txn_id = txn.fetchone()[0] - if highest_txn_id is None: - highest_txn_id = 0 - - new_txn_id = max(highest_txn_id, last_txn_id) + 1 - - # Insert new txn into txn table - event_ids = json.dumps([e.event_id for e in events]) - txn.execute( - "INSERT INTO application_services_txns(as_id, txn_id, event_ids) " - "VALUES(?,?,?)", - (service.id, new_txn_id, event_ids) - ) - return AppServiceTransaction( - service=service, id=new_txn_id, events=events + _create_appservice_txn, ) def complete_appservice_txn(self, txn_id, service): @@ -263,39 +262,38 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves if this transaction was stored successfully. """ - return self.runInteraction( - "complete_appservice_txn", - self._complete_appservice_txn, - txn_id, service - ) - - def _complete_appservice_txn(self, txn, txn_id, service): txn_id = int(txn_id) - # Debugging query: Make sure the txn being completed is EXACTLY +1 from - # what was there before. If it isn't, we've got problems (e.g. the AS - # has probably missed some events), so whine loudly but still continue, - # since it shouldn't fail completion of the transaction. - last_txn_id = self._get_last_txn(txn, service.id) - if (last_txn_id + 1) != txn_id: - logger.error( - "appservice: Completing a transaction which has an ID > 1 from " - "the last ID sent to this AS. We've either dropped events or " - "sent it to the AS out of order. FIX ME. last_txn=%s " - "completing_txn=%s service_id=%s", last_txn_id, txn_id, - service.id + def _complete_appservice_txn(txn): + # Debugging query: Make sure the txn being completed is EXACTLY +1 from + # what was there before. If it isn't, we've got problems (e.g. the AS + # has probably missed some events), so whine loudly but still continue, + # since it shouldn't fail completion of the transaction. + last_txn_id = self._get_last_txn(txn, service.id) + if (last_txn_id + 1) != txn_id: + logger.error( + "appservice: Completing a transaction which has an ID > 1 from " + "the last ID sent to this AS. We've either dropped events or " + "sent it to the AS out of order. FIX ME. last_txn=%s " + "completing_txn=%s service_id=%s", last_txn_id, txn_id, + service.id + ) + + # Set current txn_id for AS to 'txn_id' + self._simple_upsert_txn( + txn, "application_services_state", dict(as_id=service.id), + dict(last_txn=txn_id) ) - # Set current txn_id for AS to 'txn_id' - self._simple_upsert_txn( - txn, "application_services_state", dict(as_id=service.id), - dict(last_txn=txn_id) - ) + # Delete txn + self._simple_delete_txn( + txn, "application_services_txns", + dict(txn_id=txn_id, as_id=service.id) + ) - # Delete txn - self._simple_delete_txn( - txn, "application_services_txns", - dict(txn_id=txn_id, as_id=service.id) + return self.runInteraction( + "complete_appservice_txn", + _complete_appservice_txn, ) @defer.inlineCallbacks @@ -309,10 +307,25 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves to an AppServiceTransaction or None. """ + def _get_oldest_unsent_txn(txn): + # Monotonically increasing txn ids, so just select the smallest + # one in the txns table (we delete them when they are sent) + txn.execute( + "SELECT * FROM application_services_txns WHERE as_id=?" + " ORDER BY txn_id ASC LIMIT 1", + (service.id,) + ) + rows = self.cursor_to_dict(txn) + if not rows: + return None + + entry = rows[0] + + return entry + entry = yield self.runInteraction( "get_oldest_unsent_appservice_txn", - self._get_oldest_unsent_txn, - service + _get_oldest_unsent_txn, ) if not entry: @@ -326,22 +339,6 @@ class ApplicationServiceTransactionStore(SQLBaseStore): service=service, id=entry["txn_id"], events=events )) - def _get_oldest_unsent_txn(self, txn, service): - # Monotonically increasing txn ids, so just select the smallest - # one in the txns table (we delete them when they are sent) - txn.execute( - "SELECT * FROM application_services_txns WHERE as_id=?" - " ORDER BY txn_id ASC LIMIT 1", - (service.id,) - ) - rows = self.cursor_to_dict(txn) - if not rows: - return None - - entry = rows[0] - - return entry - def _get_last_txn(self, txn, service_id): txn.execute( "SELECT last_txn FROM application_services_state WHERE as_id=?", @@ -352,3 +349,42 @@ class ApplicationServiceTransactionStore(SQLBaseStore): return 0 else: return int(last_txn_id[0]) # select 'last_txn' col + + def set_appservice_last_pos(self, pos): + def set_appservice_last_pos_txn(txn): + txn.execute( + "UPDATE appservice_stream_position SET stream_ordering = ?", (pos,) + ) + return self.runInteraction( + "set_appservice_last_pos", set_appservice_last_pos_txn + ) + + @defer.inlineCallbacks + def get_new_events_for_appservice(self, current_id, limit): + """Get all new evnets""" + + def get_new_events_for_appservice_txn(txn): + sql = ( + "SELECT e.stream_ordering, e.event_id" + " FROM events AS e, appservice_stream_position AS a" + " WHERE a.stream_ordering < e.stream_ordering AND e.stream_ordering <= ?" + " ORDER BY e.stream_ordering ASC" + " LIMIT ?" + ) + + txn.execute(sql, (current_id, limit)) + rows = txn.fetchall() + + upper_bound = current_id + if len(rows) == limit: + upper_bound = rows[-1][0] + + return upper_bound, [row[1] for row in rows] + + upper_bound, event_ids = yield self.runInteraction( + "get_new_events_for_appservice", get_new_events_for_appservice_txn, + ) + + events = yield self._get_events(event_ids) + + defer.returnValue((upper_bound, events)) diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index ef231a04dc..9caaf81f2c 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -82,32 +82,39 @@ class DirectoryStore(SQLBaseStore): Returns: Deferred """ - try: - yield self._simple_insert( + def alias_txn(txn): + self._simple_insert_txn( + txn, "room_aliases", { "room_alias": room_alias.to_string(), "room_id": room_id, "creator": creator, }, - desc="create_room_alias_association", - ) - except self.database_engine.module.IntegrityError: - raise SynapseError( - 409, "Room alias %s already exists" % room_alias.to_string() ) - for server in servers: - # TODO(erikj): Fix this to bulk insert - yield self._simple_insert( - "room_alias_servers", - { + self._simple_insert_many_txn( + txn, + table="room_alias_servers", + values=[{ "room_alias": room_alias.to_string(), "server": server, - }, - desc="create_room_alias_association", + } for server in servers], ) - self.get_aliases_for_room.invalidate((room_id,)) + + self._invalidate_cache_and_stream( + txn, self.get_aliases_for_room, (room_id,) + ) + + try: + ret = yield self.runInteraction( + "create_room_alias_association", alias_txn + ) + except self.database_engine.module.IntegrityError: + raise SynapseError( + 409, "Room alias %s already exists" % room_alias.to_string() + ) + defer.returnValue(ret) def get_room_alias_creator(self, room_alias): return self._simple_select_one_onecol( diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d2feee8dbb..ad026b5e0b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -600,7 +600,8 @@ class EventsStore(SQLBaseStore): "rejections", "redactions", "room_memberships", - "state_events" + "state_events", + "topics" ): txn.executemany( "DELETE FROM %s WHERE event_id = ?" % (table,), diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 8801669a6b..b94ce7bea1 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 33 +SCHEMA_VERSION = 34 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index d03f7c541e..21d0696640 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -189,18 +189,30 @@ class PresenceStore(SQLBaseStore): desc="add_presence_list_pending", ) - @defer.inlineCallbacks def set_presence_list_accepted(self, observer_localpart, observed_userid): - result = yield self._simple_update_one( - table="presence_list", - keyvalues={"user_id": observer_localpart, - "observed_user_id": observed_userid}, - updatevalues={"accepted": True}, - desc="set_presence_list_accepted", + def update_presence_list_txn(txn): + result = self._simple_update_one_txn( + txn, + table="presence_list", + keyvalues={ + "user_id": observer_localpart, + "observed_user_id": observed_userid + }, + updatevalues={"accepted": True}, + ) + + self._invalidate_cache_and_stream( + txn, self.get_presence_list_accepted, (observer_localpart,) + ) + self._invalidate_cache_and_stream( + txn, self.get_presence_list_observers_accepted, (observed_userid,) + ) + + return result + + return self.runInteraction( + "set_presence_list_accepted", update_presence_list_txn, ) - self.get_presence_list_accepted.invalidate((observer_localpart,)) - self.get_presence_list_observers_accepted.invalidate((observed_userid,)) - defer.returnValue(result) def get_presence_list(self, observer_localpart, accepted=None): if accepted: diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 7e7d32eb66..e404fa72de 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -93,7 +93,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): desc="add_refresh_token_to_user", ) - @defer.inlineCallbacks def register(self, user_id, token=None, password_hash=None, was_guest=False, make_guest=False, appservice_id=None, create_profile_with_localpart=None, admin=False): @@ -115,7 +114,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): Raises: StoreError if the user_id could not be registered. """ - yield self.runInteraction( + return self.runInteraction( "register", self._register, user_id, @@ -127,8 +126,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): create_profile_with_localpart, admin ) - self.get_user_by_id.invalidate((user_id,)) - self.is_guest.invalidate((user_id,)) def _register( self, @@ -210,6 +207,11 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): (create_profile_with_localpart,) ) + self._invalidate_cache_and_stream( + txn, self.get_user_by_id, (user_id,) + ) + txn.call_after(self.is_guest.invalidate, (user_id,)) + @cached() def get_user_by_id(self, user_id): return self._simple_select_one( @@ -236,22 +238,31 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): return self.runInteraction("get_users_by_id_case_insensitive", f) - @defer.inlineCallbacks def user_set_password_hash(self, user_id, password_hash): """ NB. This does *not* evict any cache because the one use for this removes most of the entries subsequently anyway so it would be pointless. Use flush_user separately. """ - yield self._simple_update_one('users', { - 'name': user_id - }, { - 'password_hash': password_hash - }) - self.get_user_by_id.invalidate((user_id,)) + def user_set_password_hash_txn(txn): + self._simple_update_one_txn( + txn, + 'users', { + 'name': user_id + }, + { + 'password_hash': password_hash + } + ) + self._invalidate_cache_and_stream( + txn, self.get_user_by_id, (user_id,) + ) + return self.runInteraction( + "user_set_password_hash", user_set_password_hash_txn + ) @defer.inlineCallbacks - def user_delete_access_tokens(self, user_id, except_token_ids=[], + def user_delete_access_tokens(self, user_id, except_token_id=None, device_id=None, delete_refresh_tokens=False): """ @@ -259,7 +270,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): Args: user_id (str): ID of user the tokens belong to - except_token_ids (list[str]): list of access_tokens which should + except_token_id (str): list of access_tokens IDs which should *not* be deleted device_id (str|None): ID of device the tokens are associated with. If None, tokens associated with any device (or no device) will @@ -269,53 +280,45 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): Returns: defer.Deferred: """ - def f(txn, table, except_tokens, call_after_delete): - sql = "SELECT token FROM %s WHERE user_id = ?" % table - clauses = [user_id] - + def f(txn): + keyvalues = { + "user_id": user_id, + } if device_id is not None: - sql += " AND device_id = ?" - clauses.append(device_id) + keyvalues["device_id"] = device_id - if except_tokens: - sql += " AND id NOT IN (%s)" % ( - ",".join(["?" for _ in except_tokens]), + if delete_refresh_tokens: + self._simple_delete_txn( + txn, + table="refresh_tokens", + keyvalues=keyvalues, ) - clauses += except_tokens - - txn.execute(sql, clauses) - rows = txn.fetchall() + items = keyvalues.items() + where_clause = " AND ".join(k + " = ?" for k, _ in items) + values = [v for _, v in items] + if except_token_id: + where_clause += " AND id != ?" + values.append(except_token_id) - n = 100 - chunks = [rows[i:i + n] for i in xrange(0, len(rows), n)] - for chunk in chunks: - if call_after_delete: - for row in chunk: - txn.call_after(call_after_delete, (row[0],)) + txn.execute( + "SELECT token FROM access_tokens WHERE %s" % where_clause, + values + ) + rows = self.cursor_to_dict(txn) - txn.execute( - "DELETE FROM %s WHERE token in (%s)" % ( - table, - ",".join(["?" for _ in chunk]), - ), [r[0] for r in chunk] + for row in rows: + self._invalidate_cache_and_stream( + txn, self.get_user_by_access_token, (row["token"],) ) - # delete refresh tokens first, to stop new access tokens being - # allocated while our backs are turned - if delete_refresh_tokens: - yield self.runInteraction( - "user_delete_access_tokens", f, - table="refresh_tokens", - except_tokens=[], - call_after_delete=None, + txn.execute( + "DELETE FROM access_tokens WHERE %s" % where_clause, + values ) yield self.runInteraction( "user_delete_access_tokens", f, - table="access_tokens", - except_tokens=except_token_ids, - call_after_delete=self.get_user_by_access_token.invalidate, ) def delete_access_token(self, access_token): @@ -328,7 +331,9 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): }, ) - txn.call_after(self.get_user_by_access_token.invalidate, (access_token,)) + self._invalidate_cache_and_stream( + txn, self.get_user_by_access_token, (access_token,) + ) return self.runInteraction("delete_access_token", f) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 8bd693be72..a422ddf633 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -277,7 +277,6 @@ class RoomMemberStore(SQLBaseStore): user_id, membership_list=[Membership.JOIN], ) - @defer.inlineCallbacks def forget(self, user_id, room_id): """Indicate that user_id wishes to discard history for room_id.""" def f(txn): @@ -292,10 +291,13 @@ class RoomMemberStore(SQLBaseStore): " room_id = ?" ) txn.execute(sql, (user_id, room_id)) - yield self.runInteraction("forget_membership", f) - self.was_forgotten_at.invalidate_all() - self.who_forgot_in_room.invalidate_all() - self.did_forget.invalidate((user_id, room_id)) + + txn.call_after(self.was_forgotten_at.invalidate_all) + txn.call_after(self.did_forget.invalidate, (user_id, room_id)) + self._invalidate_cache_and_stream( + txn, self.who_forgot_in_room, (room_id,) + ) + return self.runInteraction("forget_membership", f) @cachedInlineCallbacks(num_args=2) def did_forget(self, user_id, room_id): diff --git a/synapse/storage/schema/delta/34/appservice_stream.sql b/synapse/storage/schema/delta/34/appservice_stream.sql new file mode 100644 index 0000000000..69e16eda0f --- /dev/null +++ b/synapse/storage/schema/delta/34/appservice_stream.sql @@ -0,0 +1,23 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS appservice_stream_position( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + stream_ordering BIGINT, + CHECK (Lock='X') +); + +INSERT INTO appservice_stream_position (stream_ordering) + SELECT COALESCE(MAX(stream_ordering), 0) FROM events; diff --git a/synapse/storage/schema/delta/34/cache_stream.py b/synapse/storage/schema/delta/34/cache_stream.py new file mode 100644 index 0000000000..3b63a1562d --- /dev/null +++ b/synapse/storage/schema/delta/34/cache_stream.py @@ -0,0 +1,46 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage.prepare_database import get_statements +from synapse.storage.engines import PostgresEngine + +import logging + +logger = logging.getLogger(__name__) + + +# This stream is used to notify replication slaves that some caches have +# been invalidated that they cannot infer from the other streams. +CREATE_TABLE = """ +CREATE TABLE cache_invalidation_stream ( + stream_id BIGINT, + cache_func TEXT, + keys TEXT[], + invalidation_ts BIGINT +); + +CREATE INDEX cache_invalidation_stream_id ON cache_invalidation_stream(stream_id); +""" + + +def run_create(cur, database_engine, *args, **kwargs): + if not isinstance(database_engine, PostgresEngine): + return + + for statement in get_statements(CREATE_TABLE.splitlines()): + cur.execute(statement) + + +def run_upgrade(cur, database_engine, *args, **kwargs): + pass diff --git a/synapse/storage/schema/delta/34/push_display_name_rename.sql b/synapse/storage/schema/delta/34/push_display_name_rename.sql new file mode 100644 index 0000000000..0d9fe1a99a --- /dev/null +++ b/synapse/storage/schema/delta/34/push_display_name_rename.sql @@ -0,0 +1,20 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +DELETE FROM push_rules WHERE rule_id = 'global/override/.m.rule.contains_display_name'; +UPDATE push_rules SET rule_id = 'global/override/.m.rule.contains_display_name' WHERE rule_id = 'global/underride/.m.rule.contains_display_name'; + +DELETE FROM push_rules_enable WHERE rule_id = 'global/override/.m.rule.contains_display_name'; +UPDATE push_rules_enable SET rule_id = 'global/override/.m.rule.contains_display_name' WHERE rule_id = 'global/underride/.m.rule.contains_display_name'; |