diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 73fb334dd6..7efc5bfeef 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -50,6 +50,7 @@ from .openid import OpenIdStore
from .client_ips import ClientIpStore
from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator
+from .engines import PostgresEngine
from synapse.api.constants import PresenceState
from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -123,6 +124,13 @@ class DataStore(RoomMemberStore, RoomStore,
extra_tables=[("deleted_pushers", "stream_id")],
)
+ if isinstance(self.database_engine, PostgresEngine):
+ self._cache_id_gen = StreamIdGenerator(
+ db_conn, "cache_invalidation_stream", "stream_id",
+ )
+ else:
+ self._cache_id_gen = None
+
events_max = self._stream_id_gen.get_current_token()
event_cache_prefill, min_event_val = self._get_cache_dict(
db_conn, "events",
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 0117fdc639..b0923a9cad 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -19,6 +19,7 @@ from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.caches.descriptors import Cache
from synapse.util.caches import intern_dict
+from synapse.storage.engines import PostgresEngine
import synapse.metrics
@@ -305,13 +306,14 @@ class SQLBaseStore(object):
func, *args, **kwargs
)
- with PreserveLoggingContext():
- result = yield self._db_pool.runWithConnection(
- inner_func, *args, **kwargs
- )
-
- for after_callback, after_args in after_callbacks:
- after_callback(*after_args)
+ try:
+ with PreserveLoggingContext():
+ result = yield self._db_pool.runWithConnection(
+ inner_func, *args, **kwargs
+ )
+ finally:
+ for after_callback, after_args in after_callbacks:
+ after_callback(*after_args)
defer.returnValue(result)
@defer.inlineCallbacks
@@ -860,6 +862,58 @@ class SQLBaseStore(object):
return cache, min_val
+ def _invalidate_cache_and_stream(self, txn, cache_func, keys):
+ """Invalidates the cache and adds it to the cache stream so slaves
+ will know to invalidate their caches.
+
+ This should only be used to invalidate caches where slaves won't
+ otherwise know from other replication streams that the cache should
+ be invalidated.
+ """
+ txn.call_after(cache_func.invalidate, keys)
+
+ if isinstance(self.database_engine, PostgresEngine):
+ # get_next() returns a context manager which is designed to wrap
+ # the transaction. However, we want to only get an ID when we want
+ # to use it, here, so we need to call __enter__ manually, and have
+ # __exit__ called after the transaction finishes.
+ ctx = self._cache_id_gen.get_next()
+ stream_id = ctx.__enter__()
+ txn.call_after(ctx.__exit__, None, None, None)
+
+ self._simple_insert_txn(
+ txn,
+ table="cache_invalidation_stream",
+ values={
+ "stream_id": stream_id,
+ "cache_func": cache_func.__name__,
+ "keys": list(keys),
+ "invalidation_ts": self.clock.time_msec(),
+ }
+ )
+
+ def get_all_updated_caches(self, last_id, current_id, limit):
+ def get_all_updated_caches_txn(txn):
+ # We purposefully don't bound by the current token, as we want to
+ # send across cache invalidations as quickly as possible. Cache
+ # invalidations are idempotent, so duplicates are fine.
+ sql = (
+ "SELECT stream_id, cache_func, keys, invalidation_ts"
+ " FROM cache_invalidation_stream"
+ " WHERE stream_id > ? ORDER BY stream_id ASC LIMIT ?"
+ )
+ txn.execute(sql, (last_id, limit,))
+ return txn.fetchall()
+ return self.runInteraction(
+ "get_all_updated_caches", get_all_updated_caches_txn
+ )
+
+ def get_cache_stream_token(self):
+ if self._cache_id_gen:
+ return self._cache_id_gen.get_current_token()
+ else:
+ return 0
+
class _RollbackButIsFineException(Exception):
""" This exception is used to rollback a transaction without implying
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index ef231a04dc..9caaf81f2c 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -82,32 +82,39 @@ class DirectoryStore(SQLBaseStore):
Returns:
Deferred
"""
- try:
- yield self._simple_insert(
+ def alias_txn(txn):
+ self._simple_insert_txn(
+ txn,
"room_aliases",
{
"room_alias": room_alias.to_string(),
"room_id": room_id,
"creator": creator,
},
- desc="create_room_alias_association",
- )
- except self.database_engine.module.IntegrityError:
- raise SynapseError(
- 409, "Room alias %s already exists" % room_alias.to_string()
)
- for server in servers:
- # TODO(erikj): Fix this to bulk insert
- yield self._simple_insert(
- "room_alias_servers",
- {
+ self._simple_insert_many_txn(
+ txn,
+ table="room_alias_servers",
+ values=[{
"room_alias": room_alias.to_string(),
"server": server,
- },
- desc="create_room_alias_association",
+ } for server in servers],
)
- self.get_aliases_for_room.invalidate((room_id,))
+
+ self._invalidate_cache_and_stream(
+ txn, self.get_aliases_for_room, (room_id,)
+ )
+
+ try:
+ ret = yield self.runInteraction(
+ "create_room_alias_association", alias_txn
+ )
+ except self.database_engine.module.IntegrityError:
+ raise SynapseError(
+ 409, "Room alias %s already exists" % room_alias.to_string()
+ )
+ defer.returnValue(ret)
def get_room_alias_creator(self, room_alias):
return self._simple_select_one_onecol(
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 8801669a6b..b94ce7bea1 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 33
+SCHEMA_VERSION = 34
dir_path = os.path.abspath(os.path.dirname(__file__))
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index d03f7c541e..21d0696640 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -189,18 +189,30 @@ class PresenceStore(SQLBaseStore):
desc="add_presence_list_pending",
)
- @defer.inlineCallbacks
def set_presence_list_accepted(self, observer_localpart, observed_userid):
- result = yield self._simple_update_one(
- table="presence_list",
- keyvalues={"user_id": observer_localpart,
- "observed_user_id": observed_userid},
- updatevalues={"accepted": True},
- desc="set_presence_list_accepted",
+ def update_presence_list_txn(txn):
+ result = self._simple_update_one_txn(
+ txn,
+ table="presence_list",
+ keyvalues={
+ "user_id": observer_localpart,
+ "observed_user_id": observed_userid
+ },
+ updatevalues={"accepted": True},
+ )
+
+ self._invalidate_cache_and_stream(
+ txn, self.get_presence_list_accepted, (observer_localpart,)
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_presence_list_observers_accepted, (observed_userid,)
+ )
+
+ return result
+
+ return self.runInteraction(
+ "set_presence_list_accepted", update_presence_list_txn,
)
- self.get_presence_list_accepted.invalidate((observer_localpart,))
- self.get_presence_list_observers_accepted.invalidate((observed_userid,))
- defer.returnValue(result)
def get_presence_list(self, observer_localpart, accepted=None):
if accepted:
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 8bd693be72..a422ddf633 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -277,7 +277,6 @@ class RoomMemberStore(SQLBaseStore):
user_id, membership_list=[Membership.JOIN],
)
- @defer.inlineCallbacks
def forget(self, user_id, room_id):
"""Indicate that user_id wishes to discard history for room_id."""
def f(txn):
@@ -292,10 +291,13 @@ class RoomMemberStore(SQLBaseStore):
" room_id = ?"
)
txn.execute(sql, (user_id, room_id))
- yield self.runInteraction("forget_membership", f)
- self.was_forgotten_at.invalidate_all()
- self.who_forgot_in_room.invalidate_all()
- self.did_forget.invalidate((user_id, room_id))
+
+ txn.call_after(self.was_forgotten_at.invalidate_all)
+ txn.call_after(self.did_forget.invalidate, (user_id, room_id))
+ self._invalidate_cache_and_stream(
+ txn, self.who_forgot_in_room, (room_id,)
+ )
+ return self.runInteraction("forget_membership", f)
@cachedInlineCallbacks(num_args=2)
def did_forget(self, user_id, room_id):
diff --git a/synapse/storage/schema/delta/34/cache_stream.py b/synapse/storage/schema/delta/34/cache_stream.py
new file mode 100644
index 0000000000..3b63a1562d
--- /dev/null
+++ b/synapse/storage/schema/delta/34/cache_stream.py
@@ -0,0 +1,46 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage.prepare_database import get_statements
+from synapse.storage.engines import PostgresEngine
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+# This stream is used to notify replication slaves that some caches have
+# been invalidated that they cannot infer from the other streams.
+CREATE_TABLE = """
+CREATE TABLE cache_invalidation_stream (
+ stream_id BIGINT,
+ cache_func TEXT,
+ keys TEXT[],
+ invalidation_ts BIGINT
+);
+
+CREATE INDEX cache_invalidation_stream_id ON cache_invalidation_stream(stream_id);
+"""
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+ if not isinstance(database_engine, PostgresEngine):
+ return
+
+ for statement in get_statements(CREATE_TABLE.splitlines()):
+ cur.execute(statement)
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+ pass
|