diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 73fb334dd6..7efc5bfeef 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -50,6 +50,7 @@ from .openid import OpenIdStore
from .client_ips import ClientIpStore
from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator
+from .engines import PostgresEngine
from synapse.api.constants import PresenceState
from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -123,6 +124,13 @@ class DataStore(RoomMemberStore, RoomStore,
extra_tables=[("deleted_pushers", "stream_id")],
)
+ if isinstance(self.database_engine, PostgresEngine):
+ self._cache_id_gen = StreamIdGenerator(
+ db_conn, "cache_invalidation_stream", "stream_id",
+ )
+ else:
+ self._cache_id_gen = None
+
events_max = self._stream_id_gen.get_current_token()
event_cache_prefill, min_event_val = self._get_cache_dict(
db_conn, "events",
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 0117fdc639..029f6612e6 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -19,6 +19,7 @@ from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.caches.descriptors import Cache
from synapse.util.caches import intern_dict
+from synapse.storage.engines import PostgresEngine
import synapse.metrics
@@ -305,13 +306,14 @@ class SQLBaseStore(object):
func, *args, **kwargs
)
- with PreserveLoggingContext():
- result = yield self._db_pool.runWithConnection(
- inner_func, *args, **kwargs
- )
-
- for after_callback, after_args in after_callbacks:
- after_callback(*after_args)
+ try:
+ with PreserveLoggingContext():
+ result = yield self._db_pool.runWithConnection(
+ inner_func, *args, **kwargs
+ )
+ finally:
+ for after_callback, after_args in after_callbacks:
+ after_callback(*after_args)
defer.returnValue(result)
@defer.inlineCallbacks
@@ -860,6 +862,62 @@ class SQLBaseStore(object):
return cache, min_val
+ def _invalidate_cache_and_stream(self, txn, cache_func, keys):
+ """Invalidates the cache and adds it to the cache stream so slaves
+ will know to invalidate their caches.
+
+ This should only be used to invalidate caches where slaves won't
+ otherwise know from other replication streams that the cache should
+ be invalidated.
+ """
+ txn.call_after(cache_func.invalidate, keys)
+
+ if isinstance(self.database_engine, PostgresEngine):
+ # get_next() returns a context manager which is designed to wrap
+ # the transaction. However, we want to only get an ID when we want
+ # to use it, here, so we need to call __enter__ manually, and have
+ # __exit__ called after the transaction finishes.
+ ctx = self._cache_id_gen.get_next()
+ stream_id = ctx.__enter__()
+ txn.call_after(ctx.__exit__, None, None, None)
+ txn.call_after(self.hs.get_notifier().on_new_replication_data)
+
+ self._simple_insert_txn(
+ txn,
+ table="cache_invalidation_stream",
+ values={
+ "stream_id": stream_id,
+ "cache_func": cache_func.__name__,
+ "keys": list(keys),
+ "invalidation_ts": self.clock.time_msec(),
+ }
+ )
+
+ def get_all_updated_caches(self, last_id, current_id, limit):
+ if last_id == current_id:
+ return defer.succeed([])
+
+ def get_all_updated_caches_txn(txn):
+ # We purposefully don't bound by the current token, as we want to
+ # send across cache invalidations as quickly as possible. Cache
+ # invalidations are idempotent, so duplicates are fine.
+ sql = (
+ "SELECT stream_id, cache_func, keys, invalidation_ts"
+ " FROM cache_invalidation_stream"
+ " WHERE stream_id > ? ORDER BY stream_id ASC LIMIT ?"
+ )
+ txn.execute(sql, (last_id, limit,))
+ return txn.fetchall()
+ return self.runInteraction(
+ "get_all_updated_caches", get_all_updated_caches_txn
+ )
+
+ def get_cache_stream_token(self):
+ if self._cache_id_gen:
+ return self._cache_id_gen.get_current_token()
+ else:
+ return 0
+
class _RollbackButIsFineException(Exception):
""" This exception is used to rollback a transaction without implying
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index d1ee533fac..f0c88e05cd 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -352,3 +352,42 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
return 0
else:
return int(last_txn_id[0]) # select 'last_txn' col
+
+ def set_appservice_last_pos(self, pos):
+ def set_appservice_last_pos_txn(txn):
+ txn.execute(
+ "UPDATE appservice_stream_position SET stream_ordering = ?", (pos,)
+ )
+ return self.runInteraction(
+ "set_appservice_last_pos", set_appservice_last_pos_txn
+ )
+
+ @defer.inlineCallbacks
+ def get_new_events_for_appservice(self, current_id, limit):
+ """Get all new evnets"""
+
+ def get_new_events_for_appservice_txn(txn):
+ sql = (
+ "SELECT e.stream_ordering, e.event_id"
+ " FROM events AS e, appservice_stream_position AS a"
+ " WHERE a.stream_ordering < e.stream_ordering AND e.stream_ordering <= ?"
+ " ORDER BY e.stream_ordering ASC"
+ " LIMIT ?"
+ )
+
+ txn.execute(sql, (current_id, limit))
+ rows = txn.fetchall()
+
+ upper_bound = current_id
+ if len(rows) == limit:
+ upper_bound = rows[-1][0]
+
+ return upper_bound, [row[1] for row in rows]
+
+ upper_bound, event_ids = yield self.runInteraction(
+ "get_new_events_for_appservice", get_new_events_for_appservice_txn,
+ )
+
+ events = yield self._get_events(event_ids)
+
+ defer.returnValue((upper_bound, events))
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index ef231a04dc..9caaf81f2c 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -82,32 +82,39 @@ class DirectoryStore(SQLBaseStore):
Returns:
Deferred
"""
- try:
- yield self._simple_insert(
+ def alias_txn(txn):
+ self._simple_insert_txn(
+ txn,
"room_aliases",
{
"room_alias": room_alias.to_string(),
"room_id": room_id,
"creator": creator,
},
- desc="create_room_alias_association",
- )
- except self.database_engine.module.IntegrityError:
- raise SynapseError(
- 409, "Room alias %s already exists" % room_alias.to_string()
)
- for server in servers:
- # TODO(erikj): Fix this to bulk insert
- yield self._simple_insert(
- "room_alias_servers",
- {
+ self._simple_insert_many_txn(
+ txn,
+ table="room_alias_servers",
+ values=[{
"room_alias": room_alias.to_string(),
"server": server,
- },
- desc="create_room_alias_association",
+ } for server in servers],
)
- self.get_aliases_for_room.invalidate((room_id,))
+
+ self._invalidate_cache_and_stream(
+ txn, self.get_aliases_for_room, (room_id,)
+ )
+
+ try:
+ ret = yield self.runInteraction(
+ "create_room_alias_association", alias_txn
+ )
+ except self.database_engine.module.IntegrityError:
+ raise SynapseError(
+ 409, "Room alias %s already exists" % room_alias.to_string()
+ )
+ defer.returnValue(ret)
def get_room_alias_creator(self, room_alias):
return self._simple_select_one_onecol(
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index d2feee8dbb..ad026b5e0b 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -600,7 +600,8 @@ class EventsStore(SQLBaseStore):
"rejections",
"redactions",
"room_memberships",
- "state_events"
+ "state_events",
+ "topics"
):
txn.executemany(
"DELETE FROM %s WHERE event_id = ?" % (table,),
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 8801669a6b..b94ce7bea1 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 33
+SCHEMA_VERSION = 34
dir_path = os.path.abspath(os.path.dirname(__file__))
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index d03f7c541e..21d0696640 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -189,18 +189,30 @@ class PresenceStore(SQLBaseStore):
desc="add_presence_list_pending",
)
- @defer.inlineCallbacks
def set_presence_list_accepted(self, observer_localpart, observed_userid):
- result = yield self._simple_update_one(
- table="presence_list",
- keyvalues={"user_id": observer_localpart,
- "observed_user_id": observed_userid},
- updatevalues={"accepted": True},
- desc="set_presence_list_accepted",
+ def update_presence_list_txn(txn):
+ result = self._simple_update_one_txn(
+ txn,
+ table="presence_list",
+ keyvalues={
+ "user_id": observer_localpart,
+ "observed_user_id": observed_userid
+ },
+ updatevalues={"accepted": True},
+ )
+
+ self._invalidate_cache_and_stream(
+ txn, self.get_presence_list_accepted, (observer_localpart,)
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_presence_list_observers_accepted, (observed_userid,)
+ )
+
+ return result
+
+ return self.runInteraction(
+ "set_presence_list_accepted", update_presence_list_txn,
)
- self.get_presence_list_accepted.invalidate((observer_localpart,))
- self.get_presence_list_observers_accepted.invalidate((observed_userid,))
- defer.returnValue(result)
def get_presence_list(self, observer_localpart, accepted=None):
if accepted:
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 7e7d32eb66..19cb3b31c6 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -251,7 +251,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
self.get_user_by_id.invalidate((user_id,))
@defer.inlineCallbacks
- def user_delete_access_tokens(self, user_id, except_token_ids=[],
+ def user_delete_access_tokens(self, user_id, except_token_id=None,
device_id=None,
delete_refresh_tokens=False):
"""
@@ -259,7 +259,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
Args:
user_id (str): ID of user the tokens belong to
- except_token_ids (list[str]): list of access_tokens which should
+ except_token_id (str): list of access_tokens IDs which should
*not* be deleted
device_id (str|None): ID of device the tokens are associated with.
If None, tokens associated with any device (or no device) will
@@ -269,53 +269,45 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
Returns:
defer.Deferred:
"""
- def f(txn, table, except_tokens, call_after_delete):
- sql = "SELECT token FROM %s WHERE user_id = ?" % table
- clauses = [user_id]
-
+ def f(txn):
+ keyvalues = {
+ "user_id": user_id,
+ }
if device_id is not None:
- sql += " AND device_id = ?"
- clauses.append(device_id)
+ keyvalues["device_id"] = device_id
- if except_tokens:
- sql += " AND id NOT IN (%s)" % (
- ",".join(["?" for _ in except_tokens]),
+ if delete_refresh_tokens:
+ self._simple_delete_txn(
+ txn,
+ table="refresh_tokens",
+ keyvalues=keyvalues,
)
- clauses += except_tokens
-
- txn.execute(sql, clauses)
- rows = txn.fetchall()
+ items = keyvalues.items()
+ where_clause = " AND ".join(k + " = ?" for k, _ in items)
+ values = [v for _, v in items]
+ if except_token_id:
+ where_clause += " AND id != ?"
+ values.append(except_token_id)
- n = 100
- chunks = [rows[i:i + n] for i in xrange(0, len(rows), n)]
- for chunk in chunks:
- if call_after_delete:
- for row in chunk:
- txn.call_after(call_after_delete, (row[0],))
+ txn.execute(
+ "SELECT token FROM access_tokens WHERE %s" % where_clause,
+ values
+ )
+ rows = self.cursor_to_dict(txn)
- txn.execute(
- "DELETE FROM %s WHERE token in (%s)" % (
- table,
- ",".join(["?" for _ in chunk]),
- ), [r[0] for r in chunk]
+ for row in rows:
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_by_access_token, (row["token"],)
)
- # delete refresh tokens first, to stop new access tokens being
- # allocated while our backs are turned
- if delete_refresh_tokens:
- yield self.runInteraction(
- "user_delete_access_tokens", f,
- table="refresh_tokens",
- except_tokens=[],
- call_after_delete=None,
+ txn.execute(
+ "DELETE FROM access_tokens WHERE %s" % where_clause,
+ values
)
yield self.runInteraction(
"user_delete_access_tokens", f,
- table="access_tokens",
- except_tokens=except_token_ids,
- call_after_delete=self.get_user_by_access_token.invalidate,
)
def delete_access_token(self, access_token):
@@ -328,7 +320,9 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
},
)
- txn.call_after(self.get_user_by_access_token.invalidate, (access_token,))
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_by_access_token, (access_token,)
+ )
return self.runInteraction("delete_access_token", f)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 8bd693be72..a422ddf633 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -277,7 +277,6 @@ class RoomMemberStore(SQLBaseStore):
user_id, membership_list=[Membership.JOIN],
)
- @defer.inlineCallbacks
def forget(self, user_id, room_id):
"""Indicate that user_id wishes to discard history for room_id."""
def f(txn):
@@ -292,10 +291,13 @@ class RoomMemberStore(SQLBaseStore):
" room_id = ?"
)
txn.execute(sql, (user_id, room_id))
- yield self.runInteraction("forget_membership", f)
- self.was_forgotten_at.invalidate_all()
- self.who_forgot_in_room.invalidate_all()
- self.did_forget.invalidate((user_id, room_id))
+
+ txn.call_after(self.was_forgotten_at.invalidate_all)
+ txn.call_after(self.did_forget.invalidate, (user_id, room_id))
+ self._invalidate_cache_and_stream(
+ txn, self.who_forgot_in_room, (room_id,)
+ )
+ return self.runInteraction("forget_membership", f)
@cachedInlineCallbacks(num_args=2)
def did_forget(self, user_id, room_id):
diff --git a/synapse/storage/schema/delta/34/appservice_stream.sql b/synapse/storage/schema/delta/34/appservice_stream.sql
new file mode 100644
index 0000000000..69e16eda0f
--- /dev/null
+++ b/synapse/storage/schema/delta/34/appservice_stream.sql
@@ -0,0 +1,23 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS appservice_stream_position(
+ Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
+ stream_ordering BIGINT,
+ CHECK (Lock='X')
+);
+
+INSERT INTO appservice_stream_position (stream_ordering)
+ SELECT COALESCE(MAX(stream_ordering), 0) FROM events;
diff --git a/synapse/storage/schema/delta/34/cache_stream.py b/synapse/storage/schema/delta/34/cache_stream.py
new file mode 100644
index 0000000000..3b63a1562d
--- /dev/null
+++ b/synapse/storage/schema/delta/34/cache_stream.py
@@ -0,0 +1,46 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage.prepare_database import get_statements
+from synapse.storage.engines import PostgresEngine
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+# This stream is used to notify replication slaves that some caches have
+# been invalidated that they cannot infer from the other streams.
+CREATE_TABLE = """
+CREATE TABLE cache_invalidation_stream (
+ stream_id BIGINT,
+ cache_func TEXT,
+ keys TEXT[],
+ invalidation_ts BIGINT
+);
+
+CREATE INDEX cache_invalidation_stream_id ON cache_invalidation_stream(stream_id);
+"""
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+ if not isinstance(database_engine, PostgresEngine):
+ return
+
+ for statement in get_statements(CREATE_TABLE.splitlines()):
+ cur.execute(statement)
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+ pass
diff --git a/synapse/storage/schema/delta/34/push_display_name_rename.sql b/synapse/storage/schema/delta/34/push_display_name_rename.sql
new file mode 100644
index 0000000000..0d9fe1a99a
--- /dev/null
+++ b/synapse/storage/schema/delta/34/push_display_name_rename.sql
@@ -0,0 +1,20 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+DELETE FROM push_rules WHERE rule_id = 'global/override/.m.rule.contains_display_name';
+UPDATE push_rules SET rule_id = 'global/override/.m.rule.contains_display_name' WHERE rule_id = 'global/underride/.m.rule.contains_display_name';
+
+DELETE FROM push_rules_enable WHERE rule_id = 'global/override/.m.rule.contains_display_name';
+UPDATE push_rules_enable SET rule_id = 'global/override/.m.rule.contains_display_name' WHERE rule_id = 'global/underride/.m.rule.contains_display_name';
|