From a24bc5b2dc3a5d81cdfbe7be367dbb461d85b999 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 23 May 2016 18:33:51 +0100 Subject: Add GET /notifications API --- synapse/storage/event_push_actions.py | 28 ++++++++++++++++++++++++++++ synapse/storage/receipts.py | 25 +++++++++++++++++++++++++ 2 files changed, 53 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 4dae51a172..a9cb042b5a 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -191,6 +191,34 @@ class EventPushActionsStore(SQLBaseStore): } for row in after_read_receipt + no_read_receipt ]) + @defer.inlineCallbacks + def get_push_actions_for_user(self, user_id, before=None, limit=50): + def f(txn): + before_clause = "" + if before: + before_clause = "AND stream_ordering < ?" + args = [user_id, before, limit] + else: + args = [user_id, limit] + sql = ( + "SELECT event_id, room_id, stream_ordering, topological_ordering," + " actions, profile_tag" + " FROM event_push_actions" + " WHERE user_id = ? %s" + " ORDER BY stream_ordering DESC" + " LIMIT ?" + % (before_clause,) + ) + txn.execute(sql, args) + return self.cursor_to_dict(txn) + + push_actions = yield self.runInteraction( + "get_push_actions_for_user", f + ) + for pa in push_actions: + pa["actions"] = json.loads(pa["actions"]) + defer.returnValue(push_actions) + @defer.inlineCallbacks def get_time_of_last_push_action_before(self, stream_ordering): def f(txn): diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index f1774f0e44..d147a60602 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -74,6 +74,31 @@ class ReceiptsStore(SQLBaseStore): defer.returnValue({row["room_id"]: row["event_id"] for row in rows}) + @defer.inlineCallbacks + def get_receipts_for_user_with_orderings(self, user_id, receipt_type): + def f(txn): + sql = ( + "SELECT rl.room_id, rl.event_id," + " e.topological_ordering, e.stream_ordering" + " FROM receipts_linearized rl," + " events e" + " WHERE rl.room_id = e.room_id" + " AND rl.event_id = e.event_id" + " AND user_id = ?" + ) + txn.execute(sql, (user_id,)) + return txn.fetchall() + rows = yield self.runInteraction( + "get_receipts_for_user_with_orderings", f + ) + defer.returnValue({ + row[0]: { + "event_id": row[1], + "topological_ordering": row[2], + "stream_ordering": row[3], + } for row in rows + }) + @defer.inlineCallbacks def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None): """Get receipts for multiple rooms for sending to clients. -- cgit 1.5.1 From 37b7e846200f00a36c6084d426ab73ee5d0e0218 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 24 May 2016 11:33:32 +0100 Subject: Include the ts the notif was received at --- synapse/rest/client/v2_alpha/notifications.py | 1 + synapse/storage/event_push_actions.py | 12 +++++++----- 2 files changed, 8 insertions(+), 5 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/rest/client/v2_alpha/notifications.py b/synapse/rest/client/v2_alpha/notifications.py index 9600256962..4d84230e68 100644 --- a/synapse/rest/client/v2_alpha/notifications.py +++ b/synapse/rest/client/v2_alpha/notifications.py @@ -68,6 +68,7 @@ class NotificationsServlet(RestServlet): "room_id": pa["room_id"], "profile_tag": pa["profile_tag"], "actions": pa["actions"], + "ts": pa["received_ts"], "event": serialize_event( notif_events[pa["event_id"]], self.clock.time_msec(), diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index a9cb042b5a..5123072c44 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -201,11 +201,13 @@ class EventPushActionsStore(SQLBaseStore): else: args = [user_id, limit] sql = ( - "SELECT event_id, room_id, stream_ordering, topological_ordering," - " actions, profile_tag" - " FROM event_push_actions" - " WHERE user_id = ? %s" - " ORDER BY stream_ordering DESC" + "SELECT epa.event_id, epa.room_id," + " epa.stream_ordering, epa.topological_ordering," + " epa.actions, epa.profile_tag, e.received_ts" + " FROM event_push_actions epa, events e" + " WHERE epa.room_id = e.room_id AND epa.event_id = e.event_id" + " AND epa.user_id = ? %s" + " ORDER BY epa.stream_ordering DESC" " LIMIT ?" % (before_clause,) ) -- cgit 1.5.1 From 8a57cc3123d36606fa2cf014b541e39c68cff72c Mon Sep 17 00:00:00 2001 From: Benjamin Saunders Date: Sun, 14 Aug 2016 11:50:22 -0700 Subject: Add missing database corruption recovery case Signed-off-by: Benjamin Saunders --- synapse/storage/events.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d2feee8dbb..ad026b5e0b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -600,7 +600,8 @@ class EventsStore(SQLBaseStore): "rejections", "redactions", "room_memberships", - "state_events" + "state_events", + "topics" ): txn.executemany( "DELETE FROM %s WHERE event_id = ?" % (table,), -- cgit 1.5.1 From 99bbd90b0da01690bd1193898138c64008ed71ad Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Aug 2016 09:45:44 +0100 Subject: Always run txn.after_callbacks --- synapse/storage/_base.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 0117fdc639..e516b6de3e 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -305,13 +305,14 @@ class SQLBaseStore(object): func, *args, **kwargs ) - with PreserveLoggingContext(): - result = yield self._db_pool.runWithConnection( - inner_func, *args, **kwargs - ) - - for after_callback, after_args in after_callbacks: - after_callback(*after_args) + try: + with PreserveLoggingContext(): + result = yield self._db_pool.runWithConnection( + inner_func, *args, **kwargs + ) + finally: + for after_callback, after_args in after_callbacks: + after_callback(*after_args) defer.returnValue(result) @defer.inlineCallbacks -- cgit 1.5.1 From 4d70d1f80ea688304abdcbbf3ee01f6ab932abc7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Aug 2016 10:21:25 +0100 Subject: Add some invalidations to a cache_stream --- synapse/storage/__init__.py | 3 ++ synapse/storage/_base.py | 18 ++++++++++ synapse/storage/directory.py | 37 ++++++++++++--------- synapse/storage/prepare_database.py | 2 +- synapse/storage/presence.py | 32 ++++++++++++------ synapse/storage/roommember.py | 12 ++++--- synapse/storage/schema/delta/34/cache_stream.py | 44 +++++++++++++++++++++++++ 7 files changed, 117 insertions(+), 31 deletions(-) create mode 100644 synapse/storage/schema/delta/34/cache_stream.py (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 73fb334dd6..a0c029a2fc 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -122,6 +122,9 @@ class DataStore(RoomMemberStore, RoomStore, db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")], ) + self._cache_id_gen = StreamIdGenerator( + db_conn, "cache_stream", "stream_id", + ) events_max = self._stream_id_gen.get_current_token() event_cache_prefill, min_event_val = self._get_cache_dict( diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e516b6de3e..02d9098ddd 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -861,6 +861,24 @@ class SQLBaseStore(object): return cache, min_val + def _invalidate_cache_and_stream(self, txn, cache_func, keys): + txn.call_after(cache_func.invalidate, keys) + + ctx = self._cache_id_gen.get_next() + stream_id = ctx.__enter__() + txn.call_after(ctx.__exit__, None, None, None) + + self._simple_insert_txn( + txn, + table="cache_stream", + values={ + "stream_id": stream_id, + "cache_func": cache_func.__name__, + "keys": list(keys), + "invalidation_ts": self.clock.time_msec(), + } + ) + class _RollbackButIsFineException(Exception): """ This exception is used to rollback a transaction without implying diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index ef231a04dc..9caaf81f2c 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -82,32 +82,39 @@ class DirectoryStore(SQLBaseStore): Returns: Deferred """ - try: - yield self._simple_insert( + def alias_txn(txn): + self._simple_insert_txn( + txn, "room_aliases", { "room_alias": room_alias.to_string(), "room_id": room_id, "creator": creator, }, - desc="create_room_alias_association", - ) - except self.database_engine.module.IntegrityError: - raise SynapseError( - 409, "Room alias %s already exists" % room_alias.to_string() ) - for server in servers: - # TODO(erikj): Fix this to bulk insert - yield self._simple_insert( - "room_alias_servers", - { + self._simple_insert_many_txn( + txn, + table="room_alias_servers", + values=[{ "room_alias": room_alias.to_string(), "server": server, - }, - desc="create_room_alias_association", + } for server in servers], ) - self.get_aliases_for_room.invalidate((room_id,)) + + self._invalidate_cache_and_stream( + txn, self.get_aliases_for_room, (room_id,) + ) + + try: + ret = yield self.runInteraction( + "create_room_alias_association", alias_txn + ) + except self.database_engine.module.IntegrityError: + raise SynapseError( + 409, "Room alias %s already exists" % room_alias.to_string() + ) + defer.returnValue(ret) def get_room_alias_creator(self, room_alias): return self._simple_select_one_onecol( diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 8801669a6b..b94ce7bea1 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 33 +SCHEMA_VERSION = 34 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index d03f7c541e..21d0696640 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -189,18 +189,30 @@ class PresenceStore(SQLBaseStore): desc="add_presence_list_pending", ) - @defer.inlineCallbacks def set_presence_list_accepted(self, observer_localpart, observed_userid): - result = yield self._simple_update_one( - table="presence_list", - keyvalues={"user_id": observer_localpart, - "observed_user_id": observed_userid}, - updatevalues={"accepted": True}, - desc="set_presence_list_accepted", + def update_presence_list_txn(txn): + result = self._simple_update_one_txn( + txn, + table="presence_list", + keyvalues={ + "user_id": observer_localpart, + "observed_user_id": observed_userid + }, + updatevalues={"accepted": True}, + ) + + self._invalidate_cache_and_stream( + txn, self.get_presence_list_accepted, (observer_localpart,) + ) + self._invalidate_cache_and_stream( + txn, self.get_presence_list_observers_accepted, (observed_userid,) + ) + + return result + + return self.runInteraction( + "set_presence_list_accepted", update_presence_list_txn, ) - self.get_presence_list_accepted.invalidate((observer_localpart,)) - self.get_presence_list_observers_accepted.invalidate((observed_userid,)) - defer.returnValue(result) def get_presence_list(self, observer_localpart, accepted=None): if accepted: diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 8bd693be72..a422ddf633 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -277,7 +277,6 @@ class RoomMemberStore(SQLBaseStore): user_id, membership_list=[Membership.JOIN], ) - @defer.inlineCallbacks def forget(self, user_id, room_id): """Indicate that user_id wishes to discard history for room_id.""" def f(txn): @@ -292,10 +291,13 @@ class RoomMemberStore(SQLBaseStore): " room_id = ?" ) txn.execute(sql, (user_id, room_id)) - yield self.runInteraction("forget_membership", f) - self.was_forgotten_at.invalidate_all() - self.who_forgot_in_room.invalidate_all() - self.did_forget.invalidate((user_id, room_id)) + + txn.call_after(self.was_forgotten_at.invalidate_all) + txn.call_after(self.did_forget.invalidate, (user_id, room_id)) + self._invalidate_cache_and_stream( + txn, self.who_forgot_in_room, (room_id,) + ) + return self.runInteraction("forget_membership", f) @cachedInlineCallbacks(num_args=2) def did_forget(self, user_id, room_id): diff --git a/synapse/storage/schema/delta/34/cache_stream.py b/synapse/storage/schema/delta/34/cache_stream.py new file mode 100644 index 0000000000..4c350bfb11 --- /dev/null +++ b/synapse/storage/schema/delta/34/cache_stream.py @@ -0,0 +1,44 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage.prepare_database import get_statements +from synapse.storage.engines import PostgresEngine + +import logging + +logger = logging.getLogger(__name__) + + +CREATE_TABLE = """ +CREATE TABLE cache_stream ( + stream_id BIGINT, + cache_func TEXT, + keys TEXT[], + invalidation_ts BIGINT +); + +CREATE INDEX cache_stream_id ON cache_stream(stream_id); +""" + + +def run_create(cur, database_engine, *args, **kwargs): + if not isinstance(database_engine, PostgresEngine): + return + + for statement in get_statements(CREATE_TABLE.splitlines()): + cur.execute(statement) + + +def run_upgrade(cur, database_engine, *args, **kwargs): + pass -- cgit 1.5.1 From 64e7e1185392972fd85718bfa55248b041f56b82 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Aug 2016 11:16:45 +0100 Subject: Implement cache replication stream --- synapse/app/synchrotron.py | 13 --------- synapse/replication/resource.py | 21 ++++++++++++- synapse/replication/slave/storage/_base.py | 30 ++++++++++++++++++- synapse/storage/__init__.py | 11 +++++-- synapse/storage/_base.py | 47 ++++++++++++++++++++++-------- 5 files changed, 92 insertions(+), 30 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 3dca1c37a0..207a75d89e 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -338,16 +338,10 @@ class SynchrotronServer(HomeServer): http_client = self.get_simple_http_client() store = self.get_datastore() replication_url = self.config.worker_replication_url - clock = self.get_clock() notifier = self.get_notifier() presence_handler = self.get_presence_handler() typing_handler = self.get_typing_handler() - def expire_broken_caches(): - store.who_forgot_in_room.invalidate_all() - store.get_presence_list_accepted.invalidate_all() - store.get_presence_list_observers_accepted.invalidate_all() - def notify_from_stream( result, stream_name, stream_key, room=None, user=None ): @@ -409,19 +403,12 @@ class SynchrotronServer(HomeServer): result, "typing", "typing_key", room="room_id" ) - next_expire_broken_caches_ms = 0 while True: try: args = store.stream_positions() args.update(typing_handler.stream_positions()) args["timeout"] = 30000 result = yield http_client.get_json(replication_url, args=args) - now_ms = clock.time_msec() - if now_ms > next_expire_broken_caches_ms: - expire_broken_caches() - next_expire_broken_caches_ms = ( - now_ms + store.BROKEN_CACHE_EXPIRY_MS - ) yield store.process_replication(result) typing_handler.process_replication(result) yield presence_handler.process_replication(result) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 8c2d487ff4..84993b33b3 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -41,6 +41,7 @@ STREAM_NAMES = ( ("push_rules",), ("pushers",), ("state",), + ("caches",), ) @@ -70,6 +71,7 @@ class ReplicationResource(Resource): * "backfill": Old events that have been backfilled from other servers. * "push_rules": Per user changes to push rules. * "pushers": Per user changes to their pushers. + * "caches": Cache invalidations. The API takes two additional query parameters: @@ -129,6 +131,7 @@ class ReplicationResource(Resource): push_rules_token, room_stream_token = self.store.get_push_rules_stream_token() pushers_token = self.store.get_pushers_stream_token() state_token = self.store.get_state_stream_token() + caches_token = self.store.get_cache_stream_token() defer.returnValue(_ReplicationToken( room_stream_token, @@ -140,6 +143,7 @@ class ReplicationResource(Resource): push_rules_token, pushers_token, state_token, + caches_token, )) @request_handler() @@ -188,6 +192,7 @@ class ReplicationResource(Resource): yield self.push_rules(writer, current_token, limit, request_streams) yield self.pushers(writer, current_token, limit, request_streams) yield self.state(writer, current_token, limit, request_streams) + yield self.caches(writer, current_token, limit, request_streams) self.streams(writer, current_token, request_streams) logger.info("Replicated %d rows", writer.total) @@ -379,6 +384,20 @@ class ReplicationResource(Resource): "position", "type", "state_key", "event_id" )) + @defer.inlineCallbacks + def caches(self, writer, current_token, limit, request_streams): + current_position = current_token.caches + + caches = request_streams.get("caches") + + if caches is not None: + updated_caches = yield self.store.get_all_updated_caches( + caches, current_position, limit + ) + writer.write_header_and_rows("caches", updated_caches, ( + "position", "cache_func", "keys", "invalidation_ts" + )) + class _Writer(object): """Writes the streams as a JSON object as the response to the request""" @@ -407,7 +426,7 @@ class _Writer(object): class _ReplicationToken(collections.namedtuple("_ReplicationToken", ( "events", "presence", "typing", "receipts", "account_data", "backfill", - "push_rules", "pushers", "state" + "push_rules", "pushers", "state", "caches", ))): __slots__ = [] diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index 46e43ce1c7..24c9946d6a 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -14,15 +14,43 @@ # limitations under the License. from synapse.storage._base import SQLBaseStore +from synapse.storage.engines import PostgresEngine from twisted.internet import defer +from ._slaved_id_tracker import SlavedIdTracker + +import logging + +logger = logging.getLogger(__name__) + class BaseSlavedStore(SQLBaseStore): def __init__(self, db_conn, hs): super(BaseSlavedStore, self).__init__(hs) + if isinstance(self.database_engine, PostgresEngine): + self._cache_id_gen = SlavedIdTracker( + db_conn, "cache_stream", "stream_id", + ) + else: + self._cache_id_gen = None def stream_positions(self): - return {} + pos = {} + if self._cache_id_gen: + pos["caches"] = self._cache_id_gen.get_current_token() + return pos def process_replication(self, result): + stream = result.get("caches") + if stream: + for row in stream["rows"]: + ( + position, cache_func, keys, invalidation_ts, + ) = row + + try: + getattr(self, cache_func).invalidate(tuple(keys)) + except AttributeError: + logger.warn("Got unexpected cache_func: %r", cache_func) + self._cache_id_gen.advance(int(stream["position"])) return defer.succeed(None) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index a0c029a2fc..8af492b69f 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -50,6 +50,7 @@ from .openid import OpenIdStore from .client_ips import ClientIpStore from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator +from .engines import PostgresEngine from synapse.api.constants import PresenceState from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -122,9 +123,13 @@ class DataStore(RoomMemberStore, RoomStore, db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")], ) - self._cache_id_gen = StreamIdGenerator( - db_conn, "cache_stream", "stream_id", - ) + + if isinstance(self.database_engine, PostgresEngine): + self._cache_id_gen = StreamIdGenerator( + db_conn, "cache_stream", "stream_id", + ) + else: + self._cache_id_gen = None events_max = self._stream_id_gen.get_current_token() event_cache_prefill, min_event_val = self._get_cache_dict( diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 02d9098ddd..e3edc2cde6 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -19,6 +19,7 @@ from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.caches.descriptors import Cache from synapse.util.caches import intern_dict +from synapse.storage.engines import PostgresEngine import synapse.metrics @@ -864,21 +865,43 @@ class SQLBaseStore(object): def _invalidate_cache_and_stream(self, txn, cache_func, keys): txn.call_after(cache_func.invalidate, keys) - ctx = self._cache_id_gen.get_next() - stream_id = ctx.__enter__() - txn.call_after(ctx.__exit__, None, None, None) + if isinstance(self.database_engine, PostgresEngine): + ctx = self._cache_id_gen.get_next() + stream_id = ctx.__enter__() + txn.call_after(ctx.__exit__, None, None, None) + + self._simple_insert_txn( + txn, + table="cache_stream", + values={ + "stream_id": stream_id, + "cache_func": cache_func.__name__, + "keys": list(keys), + "invalidation_ts": self.clock.time_msec(), + } + ) - self._simple_insert_txn( - txn, - table="cache_stream", - values={ - "stream_id": stream_id, - "cache_func": cache_func.__name__, - "keys": list(keys), - "invalidation_ts": self.clock.time_msec(), - } + def get_all_updated_caches(self, last_id, current_id, limit): + def get_all_updated_caches_txn(txn): + # We purposefully don't bound by the current token, as we want to + # send across cache invalidations as quickly as possible. Cache + # invalidations are idempotent, so duplicates are fine. + sql = ( + "SELECT stream_id, cache_func, keys, invalidation_ts FROM cache_stream" + " WHERE stream_id > ? ORDER BY stream_id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, limit,)) + return txn.fetchall() + return self.runInteraction( + "get_all_updated_caches", get_all_updated_caches_txn ) + def get_cache_stream_token(self): + if self._cache_id_gen: + return self._cache_id_gen.get_current_token() + else: + return 0 + class _RollbackButIsFineException(Exception): """ This exception is used to rollback a transaction without implying -- cgit 1.5.1 From d9664344ecf481a73b2189f5bc65f4f784222969 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Aug 2016 11:45:57 +0100 Subject: Rename table. Add docs. --- synapse/replication/slave/storage/_base.py | 2 +- synapse/storage/__init__.py | 2 +- synapse/storage/_base.py | 12 ++++++++++-- synapse/storage/schema/delta/34/cache_stream.py | 6 ++++-- 4 files changed, 16 insertions(+), 6 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index 24c9946d6a..d839d169ab 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -29,7 +29,7 @@ class BaseSlavedStore(SQLBaseStore): super(BaseSlavedStore, self).__init__(hs) if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen = SlavedIdTracker( - db_conn, "cache_stream", "stream_id", + db_conn, "cache_invalidation_stream", "stream_id", ) else: self._cache_id_gen = None diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 8af492b69f..7efc5bfeef 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -126,7 +126,7 @@ class DataStore(RoomMemberStore, RoomStore, if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen = StreamIdGenerator( - db_conn, "cache_stream", "stream_id", + db_conn, "cache_invalidation_stream", "stream_id", ) else: self._cache_id_gen = None diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e3edc2cde6..c55776994d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -863,6 +863,13 @@ class SQLBaseStore(object): return cache, min_val def _invalidate_cache_and_stream(self, txn, cache_func, keys): + """Invalidates the cache and adds it to the cache stream so slaves + will know to invalidate their caches. + + This should only be used to invalidate caches where slaves won't + otherwise know from other replication streams that the cache should + be invalidated. + """ txn.call_after(cache_func.invalidate, keys) if isinstance(self.database_engine, PostgresEngine): @@ -872,7 +879,7 @@ class SQLBaseStore(object): self._simple_insert_txn( txn, - table="cache_stream", + table="cache_invalidation_stream", values={ "stream_id": stream_id, "cache_func": cache_func.__name__, @@ -887,7 +894,8 @@ class SQLBaseStore(object): # send across cache invalidations as quickly as possible. Cache # invalidations are idempotent, so duplicates are fine. sql = ( - "SELECT stream_id, cache_func, keys, invalidation_ts FROM cache_stream" + "SELECT stream_id, cache_func, keys, invalidation_ts" + " FROM cache_invalidation_stream" " WHERE stream_id > ? ORDER BY stream_id ASC LIMIT ?" ) txn.execute(sql, (last_id, limit,)) diff --git a/synapse/storage/schema/delta/34/cache_stream.py b/synapse/storage/schema/delta/34/cache_stream.py index 4c350bfb11..3b63a1562d 100644 --- a/synapse/storage/schema/delta/34/cache_stream.py +++ b/synapse/storage/schema/delta/34/cache_stream.py @@ -20,15 +20,17 @@ import logging logger = logging.getLogger(__name__) +# This stream is used to notify replication slaves that some caches have +# been invalidated that they cannot infer from the other streams. CREATE_TABLE = """ -CREATE TABLE cache_stream ( +CREATE TABLE cache_invalidation_stream ( stream_id BIGINT, cache_func TEXT, keys TEXT[], invalidation_ts BIGINT ); -CREATE INDEX cache_stream_id ON cache_stream(stream_id); +CREATE INDEX cache_invalidation_stream_id ON cache_invalidation_stream(stream_id); """ -- cgit 1.5.1 From 89e786bd85331b73204294e6066dc538f1c9869c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Aug 2016 13:45:26 +0100 Subject: Doc get_next() context manager usage --- synapse/storage/_base.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c55776994d..b0923a9cad 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -873,6 +873,10 @@ class SQLBaseStore(object): txn.call_after(cache_func.invalidate, keys) if isinstance(self.database_engine, PostgresEngine): + # get_next() returns a context manager which is designed to wrap + # the transaction. However, we want to only get an ID when we want + # to use it, here, so we need to call __enter__ manually, and have + # __exit__ called after the transaction finishes. ctx = self._cache_id_gen.get_next() stream_id = ctx.__enter__() txn.call_after(ctx.__exit__, None, None, None) -- cgit 1.5.1 From dc3a00f24f301ab08750fdd8ca6ae040ba290e1e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Aug 2016 17:04:39 +0100 Subject: Refactor user_delete_access_tokens. Invalidate get_user_by_access_token to slaves. --- synapse/handlers/auth.py | 6 ++-- synapse/push/pusherpool.py | 8 ++--- synapse/storage/registration.py | 70 +++++++++++++++++++---------------------- 3 files changed, 39 insertions(+), 45 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index a582d6334b..6986930c0d 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -741,7 +741,7 @@ class AuthHandler(BaseHandler): def set_password(self, user_id, newpassword, requester=None): password_hash = self.hash(newpassword) - except_access_token_ids = [requester.access_token_id] if requester else [] + except_access_token_id = requester.access_token_id if requester else None try: yield self.store.user_set_password_hash(user_id, password_hash) @@ -750,10 +750,10 @@ class AuthHandler(BaseHandler): raise SynapseError(404, "Unknown user", Codes.NOT_FOUND) raise e yield self.store.user_delete_access_tokens( - user_id, except_access_token_ids + user_id, except_access_token_id ) yield self.hs.get_pusherpool().remove_pushers_by_user( - user_id, except_access_token_ids + user_id, except_access_token_id ) @defer.inlineCallbacks diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 5853ec36a9..54c0f1b849 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -102,14 +102,14 @@ class PusherPool: yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) @defer.inlineCallbacks - def remove_pushers_by_user(self, user_id, except_token_ids=[]): + def remove_pushers_by_user(self, user_id, except_access_token_id=None): all = yield self.store.get_all_pushers() logger.info( - "Removing all pushers for user %s except access tokens ids %r", - user_id, except_token_ids + "Removing all pushers for user %s except access tokens id %r", + user_id, except_access_token_id ) for p in all: - if p['user_name'] == user_id and p['access_token'] not in except_token_ids: + if p['user_name'] == user_id and p['access_token'] != except_access_token_id: logger.info( "Removing pusher for app id %s, pushkey %s, user %s", p['app_id'], p['pushkey'], p['user_name'] diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 7e7d32eb66..19cb3b31c6 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -251,7 +251,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): self.get_user_by_id.invalidate((user_id,)) @defer.inlineCallbacks - def user_delete_access_tokens(self, user_id, except_token_ids=[], + def user_delete_access_tokens(self, user_id, except_token_id=None, device_id=None, delete_refresh_tokens=False): """ @@ -259,7 +259,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): Args: user_id (str): ID of user the tokens belong to - except_token_ids (list[str]): list of access_tokens which should + except_token_id (str): list of access_tokens IDs which should *not* be deleted device_id (str|None): ID of device the tokens are associated with. If None, tokens associated with any device (or no device) will @@ -269,53 +269,45 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): Returns: defer.Deferred: """ - def f(txn, table, except_tokens, call_after_delete): - sql = "SELECT token FROM %s WHERE user_id = ?" % table - clauses = [user_id] - + def f(txn): + keyvalues = { + "user_id": user_id, + } if device_id is not None: - sql += " AND device_id = ?" - clauses.append(device_id) + keyvalues["device_id"] = device_id - if except_tokens: - sql += " AND id NOT IN (%s)" % ( - ",".join(["?" for _ in except_tokens]), + if delete_refresh_tokens: + self._simple_delete_txn( + txn, + table="refresh_tokens", + keyvalues=keyvalues, ) - clauses += except_tokens - - txn.execute(sql, clauses) - rows = txn.fetchall() + items = keyvalues.items() + where_clause = " AND ".join(k + " = ?" for k, _ in items) + values = [v for _, v in items] + if except_token_id: + where_clause += " AND id != ?" + values.append(except_token_id) - n = 100 - chunks = [rows[i:i + n] for i in xrange(0, len(rows), n)] - for chunk in chunks: - if call_after_delete: - for row in chunk: - txn.call_after(call_after_delete, (row[0],)) + txn.execute( + "SELECT token FROM access_tokens WHERE %s" % where_clause, + values + ) + rows = self.cursor_to_dict(txn) - txn.execute( - "DELETE FROM %s WHERE token in (%s)" % ( - table, - ",".join(["?" for _ in chunk]), - ), [r[0] for r in chunk] + for row in rows: + self._invalidate_cache_and_stream( + txn, self.get_user_by_access_token, (row["token"],) ) - # delete refresh tokens first, to stop new access tokens being - # allocated while our backs are turned - if delete_refresh_tokens: - yield self.runInteraction( - "user_delete_access_tokens", f, - table="refresh_tokens", - except_tokens=[], - call_after_delete=None, + txn.execute( + "DELETE FROM access_tokens WHERE %s" % where_clause, + values ) yield self.runInteraction( "user_delete_access_tokens", f, - table="access_tokens", - except_tokens=except_token_ids, - call_after_delete=self.get_user_by_access_token.invalidate, ) def delete_access_token(self, access_token): @@ -328,7 +320,9 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): }, ) - txn.call_after(self.get_user_by_access_token.invalidate, (access_token,)) + self._invalidate_cache_and_stream( + txn, self.get_user_by_access_token, (access_token,) + ) return self.runInteraction("delete_access_token", f) -- cgit 1.5.1 From a2427981b798eb157d48b4e695a0d025357a2b01 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Aug 2016 18:10:54 +0100 Subject: Use cached get_user_by_access_token in slaves --- synapse/replication/slave/storage/_base.py | 2 +- synapse/replication/slave/storage/registration.py | 2 +- synapse/storage/_base.py | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index d839d169ab..f19540d6bb 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -51,6 +51,6 @@ class BaseSlavedStore(SQLBaseStore): try: getattr(self, cache_func).invalidate(tuple(keys)) except AttributeError: - logger.warn("Got unexpected cache_func: %r", cache_func) + logger.info("Got unexpected cache_func: %r", cache_func) self._cache_id_gen.advance(int(stream["position"])) return defer.succeed(None) diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py index 307833f9e1..38b78b97fc 100644 --- a/synapse/replication/slave/storage/registration.py +++ b/synapse/replication/slave/storage/registration.py @@ -25,6 +25,6 @@ class SlavedRegistrationStore(BaseSlavedStore): # TODO: use the cached version and invalidate deleted tokens get_user_by_access_token = RegistrationStore.__dict__[ "get_user_by_access_token" - ].orig + ] _query_for_auth = DataStore._query_for_auth.__func__ diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index b0923a9cad..0a2e78fd81 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -880,6 +880,7 @@ class SQLBaseStore(object): ctx = self._cache_id_gen.get_next() stream_id = ctx.__enter__() txn.call_after(ctx.__exit__, None, None, None) + txn.call_after(self.hs.get_notifier().on_new_replication_data) self._simple_insert_txn( txn, -- cgit 1.5.1 From ad423222572c9e32a87b39c6ee87afd94664895c Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 16 Aug 2016 16:56:30 +0100 Subject: Add migration script To port existing rule actions & enable entries to the new name --- .../schema/delta/34/push_display_name_rename.sql | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 synapse/storage/schema/delta/34/push_display_name_rename.sql (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/34/push_display_name_rename.sql b/synapse/storage/schema/delta/34/push_display_name_rename.sql new file mode 100644 index 0000000000..51d9f82393 --- /dev/null +++ b/synapse/storage/schema/delta/34/push_display_name_rename.sql @@ -0,0 +1,17 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +UPDATE push_rules SET rule_id = 'global/override/.m.rule.contains_display_name' WHERE rule_id = 'global/underride/.m.rule.contains_display_name'; +UPDATE push_rules_enable SET rule_id = 'global/override/.m.rule.contains_display_name' WHERE rule_id = 'global/underride/.m.rule.contains_display_name'; -- cgit 1.5.1 From 949629291cbb71d55a549ef35287a75c1d5bb691 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 16 Aug 2016 17:05:34 +0100 Subject: Do it in storage function --- synapse/replication/resource.py | 2 +- synapse/storage/_base.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 0996054f6b..84993b33b3 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -390,7 +390,7 @@ class ReplicationResource(Resource): caches = request_streams.get("caches") - if caches is not None and current_position != caches: + if caches is not None: updated_caches = yield self.store.get_all_updated_caches( caches, current_position, limit ) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 0a2e78fd81..029f6612e6 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -894,6 +894,9 @@ class SQLBaseStore(object): ) def get_all_updated_caches(self, last_id, current_id, limit): + if last_id == current_id: + return defer.succeed([]) + def get_all_updated_caches_txn(txn): # We purposefully don't bound by the current token, as we want to # send across cache invalidations as quickly as possible. Cache -- cgit 1.5.1 From 732cf72b861c99492b240a21e4539a3c40474596 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 17 Aug 2016 18:12:21 +0100 Subject: Fix push_display_name_rename schema update --- synapse/storage/schema/delta/34/push_display_name_rename.sql | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/34/push_display_name_rename.sql b/synapse/storage/schema/delta/34/push_display_name_rename.sql index 51d9f82393..0d9fe1a99a 100644 --- a/synapse/storage/schema/delta/34/push_display_name_rename.sql +++ b/synapse/storage/schema/delta/34/push_display_name_rename.sql @@ -13,5 +13,8 @@ * limitations under the License. */ +DELETE FROM push_rules WHERE rule_id = 'global/override/.m.rule.contains_display_name'; UPDATE push_rules SET rule_id = 'global/override/.m.rule.contains_display_name' WHERE rule_id = 'global/underride/.m.rule.contains_display_name'; + +DELETE FROM push_rules_enable WHERE rule_id = 'global/override/.m.rule.contains_display_name'; UPDATE push_rules_enable SET rule_id = 'global/override/.m.rule.contains_display_name' WHERE rule_id = 'global/underride/.m.rule.contains_display_name'; -- cgit 1.5.1 From 9da84a9a1ef0b88d2e6170d706425ab36431abda Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Aug 2016 11:54:41 +0100 Subject: Make AppserviceHandler stream events from database This is for two reasons: 1. Suppresses duplicates correctly, as the notifier doesn't do any duplicate suppression. 2. Makes it easier to connect the AppserviceHandler to the replication stream. --- synapse/handlers/appservice.py | 65 +++++++++++++++------- synapse/notifier.py | 2 +- synapse/storage/appservice.py | 39 +++++++++++++ .../storage/schema/delta/34/appservice_stream.sql | 23 ++++++++ tests/handlers/test_appservice.py | 9 ++- 5 files changed, 113 insertions(+), 25 deletions(-) create mode 100644 synapse/storage/schema/delta/34/appservice_stream.sql (limited to 'synapse/storage') diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 79805cdc2e..84341b0d20 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -17,6 +17,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes from synapse.util.metrics import Measure +from synapse.util.logcontext import preserve_fn import logging @@ -45,35 +46,57 @@ class ApplicationServicesHandler(object): self.clock = hs.get_clock() @defer.inlineCallbacks - def notify_interested_services(self, event): + def notify_interested_services(self, current_id): """Notifies (pushes) all application services interested in this event. Pushing is done asynchronously, so this method won't block for any prolonged length of time. Args: - event(Event): The event to push out to interested services. + current_id(int): The current maximum ID. """ + services = yield self.store.get_app_services() + if not services: + return + with Measure(self.clock, "notify_interested_services"): - # Gather interested services - services = yield self._get_services_for_event(event) - if len(services) == 0: - return # no services need notifying - - # Do we know this user exists? If not, poke the user query API for - # all services which match that user regex. This needs to block as these - # user queries need to be made BEFORE pushing the event. - yield self._check_user_exists(event.sender) - if event.type == EventTypes.Member: - yield self._check_user_exists(event.state_key) - - if not self.started_scheduler: - self.scheduler.start().addErrback(log_failure) - self.started_scheduler = True - - # Fork off pushes to these services - for service in services: - self.scheduler.submit_event_for_as(service, event) + upper_bound = current_id + limit = 100 + while True: + upper_bound, events = yield self.store.get_new_events_for_appservice( + upper_bound, limit + ) + + logger.info("Current_id: %r, upper_bound: %r", current_id, upper_bound) + + if not events: + break + + for event in events: + # Gather interested services + services = yield self._get_services_for_event(event) + if len(services) == 0: + continue # no services need notifying + + # Do we know this user exists? If not, poke the user query API for + # all services which match that user regex. This needs to block as + # these user queries need to be made BEFORE pushing the event. + yield self._check_user_exists(event.sender) + if event.type == EventTypes.Member: + yield self._check_user_exists(event.state_key) + + if not self.started_scheduler: + self.scheduler.start().addErrback(log_failure) + self.started_scheduler = True + + # Fork off pushes to these services + for service in services: + preserve_fn(self.scheduler.submit_event_for_as)(service, event) + + yield self.store.set_appservice_last_pos(upper_bound) + + if len(events) < limit: + break @defer.inlineCallbacks def query_user_exists(self, user_id): diff --git a/synapse/notifier.py b/synapse/notifier.py index e4a25f2411..40a148994f 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -214,7 +214,7 @@ class Notifier(object): def _on_new_room_event(self, event, room_stream_id, extra_users=[]): """Notify any user streams that are interested in this room event""" # poke any interested application service. - self.appservice_handler.notify_interested_services(event) + self.appservice_handler.notify_interested_services(room_stream_id) if event.type == EventTypes.Member and event.membership == Membership.JOIN: self._user_joined_room(event.state_key, event.room_id) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index d1ee533fac..f0c88e05cd 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -352,3 +352,42 @@ class ApplicationServiceTransactionStore(SQLBaseStore): return 0 else: return int(last_txn_id[0]) # select 'last_txn' col + + def set_appservice_last_pos(self, pos): + def set_appservice_last_pos_txn(txn): + txn.execute( + "UPDATE appservice_stream_position SET stream_ordering = ?", (pos,) + ) + return self.runInteraction( + "set_appservice_last_pos", set_appservice_last_pos_txn + ) + + @defer.inlineCallbacks + def get_new_events_for_appservice(self, current_id, limit): + """Get all new evnets""" + + def get_new_events_for_appservice_txn(txn): + sql = ( + "SELECT e.stream_ordering, e.event_id" + " FROM events AS e, appservice_stream_position AS a" + " WHERE a.stream_ordering < e.stream_ordering AND e.stream_ordering <= ?" + " ORDER BY e.stream_ordering ASC" + " LIMIT ?" + ) + + txn.execute(sql, (current_id, limit)) + rows = txn.fetchall() + + upper_bound = current_id + if len(rows) == limit: + upper_bound = rows[-1][0] + + return upper_bound, [row[1] for row in rows] + + upper_bound, event_ids = yield self.runInteraction( + "get_new_events_for_appservice", get_new_events_for_appservice_txn, + ) + + events = yield self._get_events(event_ids) + + defer.returnValue((upper_bound, events)) diff --git a/synapse/storage/schema/delta/34/appservice_stream.sql b/synapse/storage/schema/delta/34/appservice_stream.sql new file mode 100644 index 0000000000..69e16eda0f --- /dev/null +++ b/synapse/storage/schema/delta/34/appservice_stream.sql @@ -0,0 +1,23 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS appservice_stream_position( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + stream_ordering BIGINT, + CHECK (Lock='X') +); + +INSERT INTO appservice_stream_position (stream_ordering) + SELECT COALESCE(MAX(stream_ordering), 0) FROM events; diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 9c1e5cc67c..7fe88172c0 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -53,8 +53,9 @@ class AppServiceHandlerTestCase(unittest.TestCase): type="m.room.message", room_id="!foo:bar" ) + self.mock_store.get_new_events_for_appservice.return_value = (0, [event]) self.mock_as_api.push = Mock() - yield self.handler.notify_interested_services(event) + yield self.handler.notify_interested_services(0) self.mock_scheduler.submit_event_for_as.assert_called_once_with( interested_service, event ) @@ -74,7 +75,8 @@ class AppServiceHandlerTestCase(unittest.TestCase): ) self.mock_as_api.push = Mock() self.mock_as_api.query_user = Mock() - yield self.handler.notify_interested_services(event) + self.mock_store.get_new_events_for_appservice.return_value = (0, [event]) + yield self.handler.notify_interested_services(0) self.mock_as_api.query_user.assert_called_once_with( services[0], user_id ) @@ -96,7 +98,8 @@ class AppServiceHandlerTestCase(unittest.TestCase): ) self.mock_as_api.push = Mock() self.mock_as_api.query_user = Mock() - yield self.handler.notify_interested_services(event) + self.mock_store.get_new_events_for_appservice.return_value = (0, [event]) + yield self.handler.notify_interested_services(0) self.assertFalse( self.mock_as_api.query_user.called, "query_user called when it shouldn't have been." -- cgit 1.5.1 From 07229bbdae6081f0c91a60e76de8fa848903b5bd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Aug 2016 14:59:55 +0100 Subject: Add appservice worker --- synapse/app/appservice.py | 211 ++++++++++++++++++++++ synapse/config/appservice.py | 1 + synapse/handlers/appservice.py | 89 +++++---- synapse/replication/slave/storage/appservice.py | 10 + synapse/replication/slave/storage/registration.py | 3 + synapse/storage/appservice.py | 145 ++++++++------- synapse/storage/registration.py | 33 ++-- 7 files changed, 369 insertions(+), 123 deletions(-) create mode 100644 synapse/app/appservice.py (limited to 'synapse/storage') diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py new file mode 100644 index 0000000000..afc3709409 --- /dev/null +++ b/synapse/app/appservice.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.server import HomeServer +from synapse.config._base import ConfigError +from synapse.config.logger import setup_logging +from synapse.config.homeserver import HomeServerConfig +from synapse.http.site import SynapseSite +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.replication.slave.storage.directory import DirectoryStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.storage.engines import create_engine +from synapse.util.async import sleep +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string + +from twisted.internet import reactor, defer +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import sys +import logging +import gc + +logger = logging.getLogger("synapse.app.appservice") + + +class AppserviceSlaveStore( + DirectoryStore, SlavedEventStore, SlavedApplicationServiceStore, + SlavedRegistrationStore, +): + pass + + +class AppserviceServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = AppserviceSlaveStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse appservice now listening on port %d", port) + + def start_listening(self, listeners): + for listener in listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.worker_replication_url + appservice_handler = self.get_application_service_handler() + + @defer.inlineCallbacks + def replicate(results): + stream = results.get("events") + if stream: + max_stream_id = stream["position"] + yield appservice_handler.notify_interested_services(max_stream_id) + + while True: + try: + logger.info("Hitting replication") + args = store.stream_positions() + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + logger.info("Got replication response") + yield store.process_replication(result) + replicate(result) + except: + logger.exception("Error replicating from %r", replication_url) + yield sleep(30) + + +def start(config_options): + try: + config = HomeServerConfig.load_config( + "Synapse appservice", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + assert config.worker_app == "synapse.app.appservice" + + setup_logging(config.worker_log_config, config.worker_log_file) + + database_engine = create_engine(config.database_config) + + if config.notify_appservices: + sys.stderr.write( + "\nThe appservices must be disabled in the main synapse process" + "\nbefore they can be run in a separate worker." + "\nPlease add ``notify_appservices: false`` to the main config" + "\n" + ) + sys.exit(1) + + # Force the pushers to start since they will be disabled in the main config + config.notify_appservices = True + + ps = AppserviceServer( + config.server_name, + db_config=config.database_config, + config=config, + version_string="Synapse/" + get_version_string(synapse), + database_engine=database_engine, + ) + + ps.setup() + ps.start_listening(config.worker_listeners) + + def run(): + with LoggingContext("run"): + logger.info("Running") + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + gc.set_threshold(*config.gc_thresholds) + reactor.run() + + def start(): + ps.replicate() + ps.get_datastore().start_profiling() + + reactor.callWhenRunning(start) + + if config.worker_daemonize: + daemon = Daemonize( + app="synapse-appservice", + pid=config.worker_pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() + + +if __name__ == '__main__': + with LoggingContext("main"): + start(sys.argv[1:]) diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index eade803909..70d28892c6 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -28,6 +28,7 @@ class AppServiceConfig(Config): def read_config(self, config): self.app_service_config_files = config.get("app_service_config_files", []) + self.notify_appservices = config.get("notify_appservices", True) def default_config(cls, **kwargs): return """\ diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 84341b0d20..6556dd1ae8 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -44,6 +44,10 @@ class ApplicationServicesHandler(object): self.scheduler = hs.get_application_service_scheduler() self.started_scheduler = False self.clock = hs.get_clock() + self.notify_appservices = hs.config.notify_appservices + + self.current_max = 0 + self.is_processing = False @defer.inlineCallbacks def notify_interested_services(self, current_id): @@ -56,47 +60,56 @@ class ApplicationServicesHandler(object): current_id(int): The current maximum ID. """ services = yield self.store.get_app_services() - if not services: + if not services or not self.notify_appservices: return - with Measure(self.clock, "notify_interested_services"): - upper_bound = current_id - limit = 100 - while True: - upper_bound, events = yield self.store.get_new_events_for_appservice( - upper_bound, limit - ) - - logger.info("Current_id: %r, upper_bound: %r", current_id, upper_bound) - - if not events: - break - - for event in events: - # Gather interested services - services = yield self._get_services_for_event(event) - if len(services) == 0: - continue # no services need notifying - - # Do we know this user exists? If not, poke the user query API for - # all services which match that user regex. This needs to block as - # these user queries need to be made BEFORE pushing the event. - yield self._check_user_exists(event.sender) - if event.type == EventTypes.Member: - yield self._check_user_exists(event.state_key) - - if not self.started_scheduler: - self.scheduler.start().addErrback(log_failure) - self.started_scheduler = True - - # Fork off pushes to these services - for service in services: - preserve_fn(self.scheduler.submit_event_for_as)(service, event) - - yield self.store.set_appservice_last_pos(upper_bound) + self.current_max = max(self.current_max, current_id) + if self.is_processing: + return - if len(events) < limit: - break + with Measure(self.clock, "notify_interested_services"): + self.is_processing = True + try: + upper_bound = self.current_max + limit = 100 + while True: + upper_bound, events = yield self.store.get_new_events_for_appservice( + upper_bound, limit + ) + + if not events: + break + + for event in events: + # Gather interested services + services = yield self._get_services_for_event(event) + if len(services) == 0: + continue # no services need notifying + + # Do we know this user exists? If not, poke the user + # query API for all services which match that user regex. + # This needs to block as these user queries need to be + # made BEFORE pushing the event. + yield self._check_user_exists(event.sender) + if event.type == EventTypes.Member: + yield self._check_user_exists(event.state_key) + + if not self.started_scheduler: + self.scheduler.start().addErrback(log_failure) + self.started_scheduler = True + + # Fork off pushes to these services + for service in services: + preserve_fn(self.scheduler.submit_event_for_as)( + service, event + ) + + yield self.store.set_appservice_last_pos(upper_bound) + + if len(events) < limit: + break + finally: + self.is_processing = False @defer.inlineCallbacks def query_user_exists(self, user_id): diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py index 25792d9429..a374f2f1a2 100644 --- a/synapse/replication/slave/storage/appservice.py +++ b/synapse/replication/slave/storage/appservice.py @@ -28,3 +28,13 @@ class SlavedApplicationServiceStore(BaseSlavedStore): get_app_service_by_token = DataStore.get_app_service_by_token.__func__ get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__ + get_app_services = DataStore.get_app_services.__func__ + get_new_events_for_appservice = DataStore.get_new_events_for_appservice.__func__ + create_appservice_txn = DataStore.create_appservice_txn.__func__ + get_appservices_by_state = DataStore.get_appservices_by_state.__func__ + get_oldest_unsent_txn = DataStore.get_oldest_unsent_txn.__func__ + _get_last_txn = DataStore._get_last_txn.__func__ + complete_appservice_txn = DataStore.complete_appservice_txn.__func__ + get_appservice_state = DataStore.get_appservice_state.__func__ + set_appservice_last_pos = DataStore.set_appservice_last_pos.__func__ + set_appservice_state = DataStore.set_appservice_state.__func__ diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py index 38b78b97fc..e27c7332d2 100644 --- a/synapse/replication/slave/storage/registration.py +++ b/synapse/replication/slave/storage/registration.py @@ -28,3 +28,6 @@ class SlavedRegistrationStore(BaseSlavedStore): ] _query_for_auth = DataStore._query_for_auth.__func__ + get_user_by_id = RegistrationStore.__dict__[ + "get_user_by_id" + ] diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index f0c88e05cd..b496b918b7 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -218,38 +218,37 @@ class ApplicationServiceTransactionStore(SQLBaseStore): Returns: AppServiceTransaction: A new transaction. """ - return self.runInteraction( - "create_appservice_txn", - self._create_appservice_txn, - service, events - ) + def _create_appservice_txn(txn): + # work out new txn id (highest txn id for this service += 1) + # The highest id may be the last one sent (in which case it is last_txn) + # or it may be the highest in the txns list (which are waiting to be/are + # being sent) + last_txn_id = self._get_last_txn(txn, service.id) - def _create_appservice_txn(self, txn, service, events): - # work out new txn id (highest txn id for this service += 1) - # The highest id may be the last one sent (in which case it is last_txn) - # or it may be the highest in the txns list (which are waiting to be/are - # being sent) - last_txn_id = self._get_last_txn(txn, service.id) + txn.execute( + "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?", + (service.id,) + ) + highest_txn_id = txn.fetchone()[0] + if highest_txn_id is None: + highest_txn_id = 0 - txn.execute( - "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?", - (service.id,) - ) - highest_txn_id = txn.fetchone()[0] - if highest_txn_id is None: - highest_txn_id = 0 + new_txn_id = max(highest_txn_id, last_txn_id) + 1 - new_txn_id = max(highest_txn_id, last_txn_id) + 1 + # Insert new txn into txn table + event_ids = json.dumps([e.event_id for e in events]) + txn.execute( + "INSERT INTO application_services_txns(as_id, txn_id, event_ids) " + "VALUES(?,?,?)", + (service.id, new_txn_id, event_ids) + ) + return AppServiceTransaction( + service=service, id=new_txn_id, events=events + ) - # Insert new txn into txn table - event_ids = json.dumps([e.event_id for e in events]) - txn.execute( - "INSERT INTO application_services_txns(as_id, txn_id, event_ids) " - "VALUES(?,?,?)", - (service.id, new_txn_id, event_ids) - ) - return AppServiceTransaction( - service=service, id=new_txn_id, events=events + return self.runInteraction( + "create_appservice_txn", + _create_appservice_txn, ) def complete_appservice_txn(self, txn_id, service): @@ -263,39 +262,38 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves if this transaction was stored successfully. """ - return self.runInteraction( - "complete_appservice_txn", - self._complete_appservice_txn, - txn_id, service - ) - - def _complete_appservice_txn(self, txn, txn_id, service): txn_id = int(txn_id) - # Debugging query: Make sure the txn being completed is EXACTLY +1 from - # what was there before. If it isn't, we've got problems (e.g. the AS - # has probably missed some events), so whine loudly but still continue, - # since it shouldn't fail completion of the transaction. - last_txn_id = self._get_last_txn(txn, service.id) - if (last_txn_id + 1) != txn_id: - logger.error( - "appservice: Completing a transaction which has an ID > 1 from " - "the last ID sent to this AS. We've either dropped events or " - "sent it to the AS out of order. FIX ME. last_txn=%s " - "completing_txn=%s service_id=%s", last_txn_id, txn_id, - service.id + def _complete_appservice_txn(txn): + # Debugging query: Make sure the txn being completed is EXACTLY +1 from + # what was there before. If it isn't, we've got problems (e.g. the AS + # has probably missed some events), so whine loudly but still continue, + # since it shouldn't fail completion of the transaction. + last_txn_id = self._get_last_txn(txn, service.id) + if (last_txn_id + 1) != txn_id: + logger.error( + "appservice: Completing a transaction which has an ID > 1 from " + "the last ID sent to this AS. We've either dropped events or " + "sent it to the AS out of order. FIX ME. last_txn=%s " + "completing_txn=%s service_id=%s", last_txn_id, txn_id, + service.id + ) + + # Set current txn_id for AS to 'txn_id' + self._simple_upsert_txn( + txn, "application_services_state", dict(as_id=service.id), + dict(last_txn=txn_id) ) - # Set current txn_id for AS to 'txn_id' - self._simple_upsert_txn( - txn, "application_services_state", dict(as_id=service.id), - dict(last_txn=txn_id) - ) + # Delete txn + self._simple_delete_txn( + txn, "application_services_txns", + dict(txn_id=txn_id, as_id=service.id) + ) - # Delete txn - self._simple_delete_txn( - txn, "application_services_txns", - dict(txn_id=txn_id, as_id=service.id) + return self.runInteraction( + "complete_appservice_txn", + _complete_appservice_txn, ) @defer.inlineCallbacks @@ -309,10 +307,25 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves to an AppServiceTransaction or None. """ + def _get_oldest_unsent_txn(txn): + # Monotonically increasing txn ids, so just select the smallest + # one in the txns table (we delete them when they are sent) + txn.execute( + "SELECT * FROM application_services_txns WHERE as_id=?" + " ORDER BY txn_id ASC LIMIT 1", + (service.id,) + ) + rows = self.cursor_to_dict(txn) + if not rows: + return None + + entry = rows[0] + + return entry + entry = yield self.runInteraction( "get_oldest_unsent_appservice_txn", - self._get_oldest_unsent_txn, - service + _get_oldest_unsent_txn, ) if not entry: @@ -326,22 +339,6 @@ class ApplicationServiceTransactionStore(SQLBaseStore): service=service, id=entry["txn_id"], events=events )) - def _get_oldest_unsent_txn(self, txn, service): - # Monotonically increasing txn ids, so just select the smallest - # one in the txns table (we delete them when they are sent) - txn.execute( - "SELECT * FROM application_services_txns WHERE as_id=?" - " ORDER BY txn_id ASC LIMIT 1", - (service.id,) - ) - rows = self.cursor_to_dict(txn) - if not rows: - return None - - entry = rows[0] - - return entry - def _get_last_txn(self, txn, service_id): txn.execute( "SELECT last_txn FROM application_services_state WHERE as_id=?", diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 19cb3b31c6..e404fa72de 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -93,7 +93,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): desc="add_refresh_token_to_user", ) - @defer.inlineCallbacks def register(self, user_id, token=None, password_hash=None, was_guest=False, make_guest=False, appservice_id=None, create_profile_with_localpart=None, admin=False): @@ -115,7 +114,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): Raises: StoreError if the user_id could not be registered. """ - yield self.runInteraction( + return self.runInteraction( "register", self._register, user_id, @@ -127,8 +126,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): create_profile_with_localpart, admin ) - self.get_user_by_id.invalidate((user_id,)) - self.is_guest.invalidate((user_id,)) def _register( self, @@ -210,6 +207,11 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): (create_profile_with_localpart,) ) + self._invalidate_cache_and_stream( + txn, self.get_user_by_id, (user_id,) + ) + txn.call_after(self.is_guest.invalidate, (user_id,)) + @cached() def get_user_by_id(self, user_id): return self._simple_select_one( @@ -236,19 +238,28 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): return self.runInteraction("get_users_by_id_case_insensitive", f) - @defer.inlineCallbacks def user_set_password_hash(self, user_id, password_hash): """ NB. This does *not* evict any cache because the one use for this removes most of the entries subsequently anyway so it would be pointless. Use flush_user separately. """ - yield self._simple_update_one('users', { - 'name': user_id - }, { - 'password_hash': password_hash - }) - self.get_user_by_id.invalidate((user_id,)) + def user_set_password_hash_txn(txn): + self._simple_update_one_txn( + txn, + 'users', { + 'name': user_id + }, + { + 'password_hash': password_hash + } + ) + self._invalidate_cache_and_stream( + txn, self.get_user_by_id, (user_id,) + ) + return self.runInteraction( + "user_set_password_hash", user_set_password_hash_txn + ) @defer.inlineCallbacks def user_delete_access_tokens(self, user_id, except_token_id=None, -- cgit 1.5.1 From 1e4217c90c730e8dc18e2f8ce11c15f498ad7909 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 18 Aug 2016 17:53:44 +0100 Subject: Explicit join --- synapse/storage/receipts.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index cb4e04a679..2c1df0e2b9 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -100,8 +100,8 @@ class ReceiptsStore(SQLBaseStore): sql = ( "SELECT rl.room_id, rl.event_id," " e.topological_ordering, e.stream_ordering" - " FROM receipts_linearized rl," - " events e" + " FROM receipts_linearized AS rl" + " INNER JOIN events AS e USING (room_id, event_id)" " WHERE rl.room_id = e.room_id" " AND rl.event_id = e.event_id" " AND user_id = ?" -- cgit 1.5.1 From b770435389a9c827582884912b0a2761d0eed812 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Aug 2016 10:19:29 +0100 Subject: Make get_new_events_for_appservice use indices --- synapse/storage/appservice.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index b496b918b7..a854a87eab 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -366,8 +366,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore): def get_new_events_for_appservice_txn(txn): sql = ( "SELECT e.stream_ordering, e.event_id" - " FROM events AS e, appservice_stream_position AS a" - " WHERE a.stream_ordering < e.stream_ordering AND e.stream_ordering <= ?" + " FROM events AS e" + " WHERE" + " (SELECT stream_ordering FROM appservice_stream_position)" + " < e.stream_ordering" + " AND e.stream_ordering <= ?" " ORDER BY e.stream_ordering ASC" " LIMIT ?" ) -- cgit 1.5.1 From ba214a5e325adbf8ab430cb15f55d2c7544eba8b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Aug 2016 11:59:29 +0100 Subject: Remove lru option --- synapse/storage/_base.py | 2 +- synapse/storage/event_push_actions.py | 2 +- synapse/storage/push_rule.py | 4 ++-- synapse/storage/pusher.py | 2 +- synapse/storage/receipts.py | 2 +- synapse/storage/signatures.py | 2 +- synapse/storage/state.py | 4 ++-- synapse/util/caches/descriptors.py | 31 ++++++++----------------------- tests/storage/test__base.py | 2 +- 9 files changed, 18 insertions(+), 33 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 029f6612e6..49fa8614f2 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -166,7 +166,7 @@ class SQLBaseStore(object): self._txn_perf_counters = PerformanceCounters() self._get_event_counters = PerformanceCounters() - self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, + self._get_event_cache = Cache("*getEvent*", keylen=3, max_entries=hs.config.event_cache_size) self._state_group_cache = DictionaryCache( diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index df4000d0da..c65c9c9c47 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -56,7 +56,7 @@ class EventPushActionsStore(SQLBaseStore): ) self._simple_insert_many_txn(txn, "event_push_actions", values) - @cachedInlineCallbacks(num_args=3, lru=True, tree=True, max_entries=5000) + @cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000) def get_unread_event_push_actions_by_room_for_user( self, room_id, user_id, last_read_event_id ): diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 8183b7f1b0..86e4a3a81d 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -48,7 +48,7 @@ def _load_rules(rawrules, enabled_map): class PushRuleStore(SQLBaseStore): - @cachedInlineCallbacks(lru=True) + @cachedInlineCallbacks() def get_push_rules_for_user(self, user_id): rows = yield self._simple_select_list( table="push_rules", @@ -72,7 +72,7 @@ class PushRuleStore(SQLBaseStore): defer.returnValue(rules) - @cachedInlineCallbacks(lru=True) + @cachedInlineCallbacks() def get_push_rules_enabled_for_user(self, user_id): results = yield self._simple_select_list( table="push_rules_enable", diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index a7d7c54d7e..8f5f8f24a9 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -135,7 +135,7 @@ class PusherStore(SQLBaseStore): "get_all_updated_pushers", get_all_updated_pushers_txn ) - @cachedInlineCallbacks(lru=True, num_args=1, max_entries=15000) + @cachedInlineCallbacks(num_args=1, max_entries=15000) def get_if_user_has_pusher(self, user_id): result = yield self._simple_select_many_batch( table='pushers', diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 8c26f39fbb..3ad916103f 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -120,7 +120,7 @@ class ReceiptsStore(SQLBaseStore): defer.returnValue([ev for res in results.values() for ev in res]) - @cachedInlineCallbacks(num_args=3, max_entries=5000, lru=True, tree=True) + @cachedInlineCallbacks(num_args=3, max_entries=5000, tree=True) def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None): """Get receipts for a single room for sending to clients. diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index ea6823f18d..e1dca927d7 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -25,7 +25,7 @@ from synapse.util.caches.descriptors import cached, cachedList class SignatureStore(SQLBaseStore): """Persistence for event signatures and hashes""" - @cached(lru=True) + @cached() def get_event_reference_hash(self, event_id): return self._get_event_reference_hashes_txn(event_id) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 5b743db67a..0e8fa93e1f 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -174,7 +174,7 @@ class StateStore(SQLBaseStore): return [r[0] for r in results] return self.runInteraction("get_current_state_for_key", f) - @cached(num_args=2, lru=True, max_entries=1000) + @cached(num_args=2, max_entries=1000) def _get_state_group_from_group(self, group, types): raise NotImplementedError() @@ -272,7 +272,7 @@ class StateStore(SQLBaseStore): state_map = yield self.get_state_for_events([event_id], types) defer.returnValue(state_map[event_id]) - @cached(num_args=2, lru=True, max_entries=10000) + @cached(num_args=2, max_entries=10000) def _get_state_group_for_event(self, room_id, event_id): return self._simple_select_one_onecol( table="event_to_state_groups", diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 5cd277f2f2..c38f01ead0 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -26,8 +26,6 @@ from . import DEBUG_CACHES, register_cache from twisted.internet import defer -from collections import OrderedDict - import os import functools import inspect @@ -54,16 +52,11 @@ class Cache(object): "metrics", ) - def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False): - if True: - cache_type = TreeCache if tree else dict - self.cache = LruCache( - max_size=max_entries, keylen=keylen, cache_type=cache_type - ) - self.max_entries = None - else: - self.cache = OrderedDict() - self.max_entries = max_entries + def __init__(self, name, max_entries=1000, keylen=1, tree=False): + cache_type = TreeCache if tree else dict + self.cache = LruCache( + max_size=max_entries, keylen=keylen, cache_type=cache_type + ) self.name = name self.keylen = keylen @@ -102,10 +95,6 @@ class Cache(object): self.prefill(key, value, callback=callback) def prefill(self, key, value, callback=None): - if self.max_entries is not None: - while len(self.cache) >= self.max_entries: - self.cache.popitem(last=False, callback=None) - self.cache.set(key, value, callback=callback) def invalidate(self, key): @@ -164,7 +153,7 @@ class CacheDescriptor(object): defer.returnValue(r1 + r2) """ - def __init__(self, orig, max_entries=1000, num_args=1, lru=True, tree=False, + def __init__(self, orig, max_entries=1000, num_args=1, tree=False, inlineCallbacks=False): max_entries = int(max_entries * CACHE_SIZE_FACTOR) @@ -177,7 +166,6 @@ class CacheDescriptor(object): self.max_entries = max_entries self.num_args = num_args - self.lru = lru self.tree = tree all_args = inspect.getargspec(orig) @@ -200,7 +188,6 @@ class CacheDescriptor(object): name=self.orig.__name__, max_entries=self.max_entries, keylen=self.num_args, - lru=self.lru, tree=self.tree, ) @@ -427,22 +414,20 @@ class _CacheContext(object): self.cache.invalidate(self.key) -def cached(max_entries=1000, num_args=1, lru=True, tree=False): +def cached(max_entries=1000, num_args=1, tree=False): return lambda orig: CacheDescriptor( orig, max_entries=max_entries, num_args=num_args, - lru=lru, tree=tree, ) -def cachedInlineCallbacks(max_entries=1000, num_args=1, lru=False, tree=False): +def cachedInlineCallbacks(max_entries=1000, num_args=1, tree=False): return lambda orig: CacheDescriptor( orig, max_entries=max_entries, num_args=num_args, - lru=lru, tree=tree, inlineCallbacks=True, ) diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index 9d99eea8d0..ed074ce9ec 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -72,7 +72,7 @@ class CacheTestCase(unittest.TestCase): cache.get(3) def test_eviction_lru(self): - cache = Cache("test", max_entries=2, lru=True) + cache = Cache("test", max_entries=2) cache.prefill(1, "one") cache.prefill(2, "two") -- cgit 1.5.1 From f164fd922024308e702269a881328f7de980e9eb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Aug 2016 14:07:27 +0100 Subject: Move _bulk_get_push_rules_for_room to storage layer --- synapse/push/action_generator.py | 2 +- synapse/push/bulk_push_rule_evaluator.py | 41 +++++------------------ synapse/storage/push_rule.py | 56 ++++++++++++++++++++++++++++++++ 3 files changed, 65 insertions(+), 34 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index b2c94bfaac..ed2ccc4dfb 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -40,7 +40,7 @@ class ActionGenerator: def handle_push_actions_for_event(self, event, context): with Measure(self.clock, "evaluator_for_event"): bulk_evaluator = yield evaluator_for_event( - event, self.hs, self.store, context.current_state + event, self.hs, self.store, context.state_group, context.current_state ) with Measure(self.clock, "action_for_event_by_user"): diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 756e5da513..004eded61f 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -36,35 +36,11 @@ def _get_rules(room_id, user_ids, store): @defer.inlineCallbacks -def evaluator_for_event(event, hs, store, current_state): - room_id = event.room_id - # We also will want to generate notifs for other people in the room so - # their unread countss are correct in the event stream, but to avoid - # generating them for bot / AS users etc, we only do so for people who've - # sent a read receipt into the room. - - local_users_in_room = set( - e.state_key for e in current_state.values() - if e.type == EventTypes.Member and e.membership == Membership.JOIN - and hs.is_mine_id(e.state_key) +def evaluator_for_event(event, hs, store, state_group, current_state): + rules_by_user = yield store.bulk_get_push_rules_for_room( + event.room_id, state_group, current_state ) - # users in the room who have pushers need to get push rules run because - # that's how their pushers work - if_users_with_pushers = yield store.get_if_users_have_pushers( - local_users_in_room - ) - user_ids = set( - uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher - ) - - users_with_receipts = yield store.get_users_with_read_receipts_in_room(room_id) - - # any users with pushers must be ours: they have pushers - for uid in users_with_receipts: - if uid in local_users_in_room: - user_ids.add(uid) - # if this event is an invite event, we may need to run rules for the user # who's been invited, otherwise they won't get told they've been invited if event.type == 'm.room.member' and event.content['membership'] == 'invite': @@ -72,12 +48,12 @@ def evaluator_for_event(event, hs, store, current_state): if invited_user and hs.is_mine_id(invited_user): has_pusher = yield store.user_has_pusher(invited_user) if has_pusher: - user_ids.add(invited_user) - - rules_by_user = yield _get_rules(room_id, user_ids, store) + rules_by_user[invited_user] = yield store.get_push_rules_for_user( + invited_user + ) defer.returnValue(BulkPushRuleEvaluator( - room_id, rules_by_user, user_ids, store + event.room_id, rules_by_user, store )) @@ -90,10 +66,9 @@ class BulkPushRuleEvaluator: the same logic to run the actual rules, but could be optimised further (see https://matrix.org/jira/browse/SYN-562) """ - def __init__(self, room_id, rules_by_user, users_in_room, store): + def __init__(self, room_id, rules_by_user, store): self.room_id = room_id self.rules_by_user = rules_by_user - self.users_in_room = users_in_room self.store = store @defer.inlineCallbacks diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 86e4a3a81d..ca929bc239 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -16,6 +16,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList from synapse.push.baserules import list_with_base_rules +from synapse.api.constants import EventTypes, Membership from twisted.internet import defer import logging @@ -123,6 +124,61 @@ class PushRuleStore(SQLBaseStore): defer.returnValue(results) + def bulk_get_push_rules_for_room(self, room_id, state_group, current_state): + if not state_group: + # If state_group is None it means it has yet to be assigned a + # state group, i.e. we need to make sure that calls with a state_group + # of None don't hit previous cached calls with a None state_group. + # To do this we set the state_group to a new object as object() != object() + state_group = object() + + return self._bulk_get_push_rules_for_room(room_id, state_group, current_state) + + @cachedInlineCallbacks(num_args=2) + def _bulk_get_push_rules_for_room(self, room_id, state_group, current_state, + cache_context): + # We don't use `state_group`, its there so that we can cache based + # on it. However, its important that its never None, since two current_state's + # with a state_group of None are likely to be different. + # See bulk_get_push_rules_for_room for how we work around this. + assert state_group is not None + + # We also will want to generate notifs for other people in the room so + # their unread countss are correct in the event stream, but to avoid + # generating them for bot / AS users etc, we only do so for people who've + # sent a read receipt into the room. + local_users_in_room = set( + e.state_key for e in current_state.values() + if e.type == EventTypes.Member and e.membership == Membership.JOIN + and self.hs.is_mine_id(e.state_key) + ) + + # users in the room who have pushers need to get push rules run because + # that's how their pushers work + if_users_with_pushers = yield self.get_if_users_have_pushers( + local_users_in_room, cache_context=cache_context, + ) + user_ids = set( + uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher + ) + + users_with_receipts = yield self.get_users_with_read_receipts_in_room( + room_id, cache_context=cache_context, + ) + + # any users with pushers must be ours: they have pushers + for uid in users_with_receipts: + if uid in local_users_in_room: + user_ids.add(uid) + + rules_by_user = yield self.bulk_get_push_rules( + user_ids, cache_context=cache_context + ) + + rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None} + + defer.returnValue(rules_by_user) + @cachedList(cached_method_name="get_push_rules_enabled_for_user", list_name="user_ids", num_args=1, inlineCallbacks=True) def bulk_get_push_rules_enabled(self, user_ids): -- cgit 1.5.1 From dc76a3e909535d99f0b6b4a76279a14685324dc4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Aug 2016 15:02:38 +0100 Subject: Make cache_context an explicit option --- synapse/storage/push_rule.py | 2 +- synapse/util/caches/descriptors.py | 35 +++++++++++++++++++++++++++-------- tests/storage/test__base.py | 4 ++-- 3 files changed, 30 insertions(+), 11 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index ca929bc239..247dd15694 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -134,7 +134,7 @@ class PushRuleStore(SQLBaseStore): return self._bulk_get_push_rules_for_room(room_id, state_group, current_state) - @cachedInlineCallbacks(num_args=2) + @cachedInlineCallbacks(num_args=2, cache_context=True) def _bulk_get_push_rules_for_room(self, room_id, state_group, current_state, cache_context): # We don't use `state_group`, its there so that we can cache based diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index c38f01ead0..e7a74d3da8 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -146,7 +146,7 @@ class CacheDescriptor(object): invalidated) by adding a special "cache_context" argument to the function and passing that as a kwarg to all caches called. For example:: - @cachedInlineCallbacks() + @cachedInlineCallbacks(cache_context=True) def foo(self, key, cache_context): r1 = yield self.bar1(key, cache_context=cache_context) r2 = yield self.bar2(key, cache_context=cache_context) @@ -154,7 +154,7 @@ class CacheDescriptor(object): """ def __init__(self, orig, max_entries=1000, num_args=1, tree=False, - inlineCallbacks=False): + inlineCallbacks=False, cache_context=False): max_entries = int(max_entries * CACHE_SIZE_FACTOR) self.orig = orig @@ -171,15 +171,28 @@ class CacheDescriptor(object): all_args = inspect.getargspec(orig) self.arg_names = all_args.args[1:num_args + 1] - if "cache_context" in self.arg_names: - self.arg_names.remove("cache_context") + if "cache_context" in all_args.args: + if not cache_context: + raise ValueError( + "Cannot have a 'cache_context' arg without setting" + " cache_context=True" + ) + try: + self.arg_names.remove("cache_context") + except ValueError: + pass + elif cache_context: + raise ValueError( + "Cannot have cache_context=True without having an arg" + " named `cache_context`" + ) - self.add_cache_context = "cache_context" in all_args.args + self.add_cache_context = cache_context if len(self.arg_names) < self.num_args: raise Exception( "Not enough explicit positional arguments to key off of for %r." - " (@cached cannot key off of *args or **kwars)" + " (@cached cannot key off of *args or **kwargs)" % (orig.__name__,) ) @@ -193,12 +206,16 @@ class CacheDescriptor(object): @functools.wraps(self.orig) def wrapped(*args, **kwargs): + # If we're passed a cache_context then we'll want to call its invalidate() + # whenever we are invalidated cache_context = kwargs.pop("cache_context", None) if cache_context: context_callback = cache_context.invalidate else: context_callback = None + # Add our own `cache_context` to argument list if the wrapped function + # has asked for one self_context = _CacheContext(cache, None) if self.add_cache_context: kwargs["cache_context"] = self_context @@ -414,22 +431,24 @@ class _CacheContext(object): self.cache.invalidate(self.key) -def cached(max_entries=1000, num_args=1, tree=False): +def cached(max_entries=1000, num_args=1, tree=False, cache_context=False): return lambda orig: CacheDescriptor( orig, max_entries=max_entries, num_args=num_args, tree=tree, + cache_context=cache_context, ) -def cachedInlineCallbacks(max_entries=1000, num_args=1, tree=False): +def cachedInlineCallbacks(max_entries=1000, num_args=1, tree=False, cache_context=False): return lambda orig: CacheDescriptor( orig, max_entries=max_entries, num_args=num_args, tree=tree, inlineCallbacks=True, + cache_context=cache_context, ) diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index ed074ce9ec..eab0c8d219 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -211,7 +211,7 @@ class CacheDecoratorTestCase(unittest.TestCase): callcount[0] += 1 return key - @cached() + @cached(cache_context=True) def func2(self, key, cache_context): callcount2[0] += 1 return self.func(key, cache_context=cache_context) @@ -244,7 +244,7 @@ class CacheDecoratorTestCase(unittest.TestCase): callcount[0] += 1 return key - @cached() + @cached(cache_context=True) def func2(self, key, cache_context): callcount2[0] += 1 return self.func(key, cache_context=cache_context) -- cgit 1.5.1 From c0d7d9d6429584f51a8174a331e72a894009f3c8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Aug 2016 15:13:58 +0100 Subject: Rename to on_invalidate --- synapse/storage/push_rule.py | 6 +++--- synapse/util/caches/descriptors.py | 26 ++++++++++---------------- tests/storage/test__base.py | 4 ++-- 3 files changed, 15 insertions(+), 21 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 247dd15694..78334a98cf 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -156,14 +156,14 @@ class PushRuleStore(SQLBaseStore): # users in the room who have pushers need to get push rules run because # that's how their pushers work if_users_with_pushers = yield self.get_if_users_have_pushers( - local_users_in_room, cache_context=cache_context, + local_users_in_room, on_invalidate=cache_context.invalidate, ) user_ids = set( uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher ) users_with_receipts = yield self.get_users_with_read_receipts_in_room( - room_id, cache_context=cache_context, + room_id, on_invalidate=cache_context.invalidate, ) # any users with pushers must be ours: they have pushers @@ -172,7 +172,7 @@ class PushRuleStore(SQLBaseStore): user_ids.add(uid) rules_by_user = yield self.bulk_get_push_rules( - user_ids, cache_context=cache_context + user_ids, on_invalidate=cache_context.invalidate, ) rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None} diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index e7a74d3da8..e93ff40dc0 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -148,8 +148,8 @@ class CacheDescriptor(object): @cachedInlineCallbacks(cache_context=True) def foo(self, key, cache_context): - r1 = yield self.bar1(key, cache_context=cache_context) - r2 = yield self.bar2(key, cache_context=cache_context) + r1 = yield self.bar1(key, on_invalidate=cache_context.invalidate) + r2 = yield self.bar2(key, on_invalidate=cache_context.invalidate) defer.returnValue(r1 + r2) """ @@ -208,11 +208,7 @@ class CacheDescriptor(object): def wrapped(*args, **kwargs): # If we're passed a cache_context then we'll want to call its invalidate() # whenever we are invalidated - cache_context = kwargs.pop("cache_context", None) - if cache_context: - context_callback = cache_context.invalidate - else: - context_callback = None + invalidate_callback = kwargs.pop("on_invalidate", None) # Add our own `cache_context` to argument list if the wrapped function # has asked for one @@ -226,7 +222,7 @@ class CacheDescriptor(object): self_context.key = cache_key try: - cached_result_d = cache.get(cache_key, callback=context_callback) + cached_result_d = cache.get(cache_key, callback=invalidate_callback) observer = cached_result_d.observe() if DEBUG_CACHES: @@ -263,7 +259,7 @@ class CacheDescriptor(object): ret.addErrback(onErr) ret = ObservableDeferred(ret, consumeErrors=True) - cache.update(sequence, cache_key, ret, callback=context_callback) + cache.update(sequence, cache_key, ret, callback=invalidate_callback) return preserve_context_over_deferred(ret.observe()) @@ -332,11 +328,9 @@ class CacheListDescriptor(object): @functools.wraps(self.orig) def wrapped(*args, **kwargs): - cache_context = kwargs.pop("cache_context", None) - if cache_context: - context_callback = cache_context.invalidate - else: - context_callback = None + # If we're passed a cache_context then we'll want to call its invalidate() + # whenever we are invalidated + invalidate_callback = kwargs.pop("on_invalidate", None) arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs) keyargs = [arg_dict[arg_nm] for arg_nm in self.arg_names] @@ -352,7 +346,7 @@ class CacheListDescriptor(object): key[self.list_pos] = arg try: - res = cache.get(tuple(key), callback=context_callback) + res = cache.get(tuple(key), callback=invalidate_callback) if not res.has_succeeded(): res = res.observe() res.addCallback(lambda r, arg: (arg, r), arg) @@ -388,7 +382,7 @@ class CacheListDescriptor(object): key[self.list_pos] = arg cache.update( sequence, tuple(key), observer, - callback=context_callback + callback=invalidate_callback ) def invalidate(f, key): diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index eab0c8d219..4fc3639de0 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -214,7 +214,7 @@ class CacheDecoratorTestCase(unittest.TestCase): @cached(cache_context=True) def func2(self, key, cache_context): callcount2[0] += 1 - return self.func(key, cache_context=cache_context) + return self.func(key, on_invalidate=cache_context.invalidate) a = A() yield a.func2("foo") @@ -247,7 +247,7 @@ class CacheDecoratorTestCase(unittest.TestCase): @cached(cache_context=True) def func2(self, key, cache_context): callcount2[0] += 1 - return self.func(key, cache_context=cache_context) + return self.func(key, on_invalidate=cache_context.invalidate) a = A() yield a.func2("foo") -- cgit 1.5.1 From 47dd8f02a16ea3f29f4233925d0616eab5e37d2a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Aug 2016 17:40:23 +0100 Subject: Measure _get_event_from_row --- synapse/storage/events.py | 84 ++++++++++++++++++++++++----------------------- 1 file changed, 43 insertions(+), 41 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ad026b5e0b..aa23fc00f5 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -22,6 +22,7 @@ from synapse.events.utils import prune_event from synapse.util.async import ObservableDeferred from synapse.util.logcontext import preserve_fn, PreserveLoggingContext from synapse.util.logutils import log_function +from synapse.util.metrics import Measure from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError @@ -1132,56 +1133,57 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def _get_event_from_row(self, internal_metadata, js, redacted, rejected_reason=None): - d = json.loads(js) - internal_metadata = json.loads(internal_metadata) - - if rejected_reason: - rejected_reason = yield self._simple_select_one_onecol( - table="rejections", - keyvalues={"event_id": rejected_reason}, - retcol="reason", - desc="_get_event_from_row_rejected_reason", - ) + with Measure(self._clock, "_get_event_from_row"): + d = json.loads(js) + internal_metadata = json.loads(internal_metadata) + + if rejected_reason: + rejected_reason = yield self._simple_select_one_onecol( + table="rejections", + keyvalues={"event_id": rejected_reason}, + retcol="reason", + desc="_get_event_from_row_rejected_reason", + ) - original_ev = FrozenEvent( - d, - internal_metadata_dict=internal_metadata, - rejected_reason=rejected_reason, - ) + original_ev = FrozenEvent( + d, + internal_metadata_dict=internal_metadata, + rejected_reason=rejected_reason, + ) - redacted_event = None - if redacted: - redacted_event = prune_event(original_ev) + redacted_event = None + if redacted: + redacted_event = prune_event(original_ev) - redaction_id = yield self._simple_select_one_onecol( - table="redactions", - keyvalues={"redacts": redacted_event.event_id}, - retcol="event_id", - desc="_get_event_from_row_redactions", - ) + redaction_id = yield self._simple_select_one_onecol( + table="redactions", + keyvalues={"redacts": redacted_event.event_id}, + retcol="event_id", + desc="_get_event_from_row_redactions", + ) - redacted_event.unsigned["redacted_by"] = redaction_id - # Get the redaction event. + redacted_event.unsigned["redacted_by"] = redaction_id + # Get the redaction event. - because = yield self.get_event( - redaction_id, - check_redacted=False, - allow_none=True, - ) + because = yield self.get_event( + redaction_id, + check_redacted=False, + allow_none=True, + ) - if because: - # It's fine to do add the event directly, since get_pdu_json - # will serialise this field correctly - redacted_event.unsigned["redacted_because"] = because + if because: + # It's fine to do add the event directly, since get_pdu_json + # will serialise this field correctly + redacted_event.unsigned["redacted_because"] = because - cache_entry = _EventCacheEntry( - event=original_ev, - redacted_event=redacted_event, - ) + cache_entry = _EventCacheEntry( + event=original_ev, + redacted_event=redacted_event, + ) - self._get_event_cache.prefill((original_ev.event_id,), cache_entry) + self._get_event_cache.prefill((original_ev.event_id,), cache_entry) - defer.returnValue(cache_entry) + defer.returnValue(cache_entry) @defer.inlineCallbacks def count_daily_messages(self): -- cgit 1.5.1 From 37adde32dce5b51d2d273525f89a6a494dd00bb3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Aug 2016 18:02:19 +0100 Subject: Move defer.returnValue out of Measure --- synapse/storage/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index aa23fc00f5..97aef25321 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1183,7 +1183,7 @@ class EventsStore(SQLBaseStore): self._get_event_cache.prefill((original_ev.event_id,), cache_entry) - defer.returnValue(cache_entry) + defer.returnValue(cache_entry) @defer.inlineCallbacks def count_daily_messages(self): -- cgit 1.5.1 From aad8a1a82517f6f7522ae99989d36fd3951f44a6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 22 Aug 2016 16:29:46 +0100 Subject: Delete old received_transactions --- .../storage/schema/delta/34/received_txn_purge.py | 32 ++++++++++++++++++++++ synapse/storage/transactions.py | 17 +++++++++--- 2 files changed, 45 insertions(+), 4 deletions(-) create mode 100644 synapse/storage/schema/delta/34/received_txn_purge.py (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/34/received_txn_purge.py b/synapse/storage/schema/delta/34/received_txn_purge.py new file mode 100644 index 0000000000..033144341c --- /dev/null +++ b/synapse/storage/schema/delta/34/received_txn_purge.py @@ -0,0 +1,32 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage.engines import PostgresEngine + +import logging + +logger = logging.getLogger(__name__) + + +def run_create(cur, database_engine, *args, **kwargs): + if isinstance(database_engine, PostgresEngine): + cur.execute("TRUNCATE received_transactions") + else: + cur.execute("DELETE FROM received_transactions") + + cur.execute("CREATE INDEX received_transactions_ts ON received_transactions(ts)") + + +def run_upgrade(cur, database_engine, *args, **kwargs): + pass diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 6258ff1725..58d4de4f1d 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -62,10 +62,9 @@ class TransactionStore(SQLBaseStore): self.last_transaction = {} reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns) - hs.get_clock().looping_call( - self._persist_in_mem_txns, - 1000, - ) + self._clock.looping_call(self._persist_in_mem_txns, 1000) + + self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000) def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have @@ -127,6 +126,7 @@ class TransactionStore(SQLBaseStore): "origin": origin, "response_code": code, "response_json": buffer(encode_canonical_json(response_dict)), + "ts": self._clock.time_msec(), }, or_ignore=True, desc="set_received_txn_response", @@ -383,3 +383,12 @@ class TransactionStore(SQLBaseStore): yield self.runInteraction("_persist_in_mem_txns", f) except: logger.exception("Failed to persist transactions!") + + def _cleanup_transactions(self): + now = self._clock.time_msec() + month_ago = now - 30 * 24 * 60 * 60 * 1000 + + def _cleanup_transactions_txn(txn): + txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,)) + + return self.runInteraction("_persist_in_mem_txns", _cleanup_transactions_txn) -- cgit 1.5.1 From 921913935176f5bd3df5e5b960d87c94a2adb304 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 23 Aug 2016 15:23:39 +0100 Subject: Preserve some logcontexts --- synapse/app/homeserver.py | 3 ++- synapse/appservice/scheduler.py | 6 +++--- synapse/crypto/keyring.py | 36 ++++++++++++++++---------------- synapse/federation/federation_base.py | 7 ++++--- synapse/federation/federation_client.py | 17 +++++++++------ synapse/handlers/appservice.py | 8 +++---- synapse/handlers/federation.py | 28 +++++++++++++------------ synapse/handlers/message.py | 35 ++++++++++++++++++------------- synapse/handlers/typing.py | 12 +++++++---- synapse/notifier.py | 7 ++++++- synapse/push/push_tools.py | 9 ++++---- synapse/push/pusherpool.py | 12 ++++++----- synapse/rest/client/v2_alpha/register.py | 3 +-- synapse/storage/events.py | 16 ++++++++------ synapse/storage/stream.py | 6 +++--- synapse/util/async.py | 9 ++++---- synapse/util/logcontext.py | 15 +++++++++---- synapse/visibility.py | 6 +++--- 18 files changed, 136 insertions(+), 99 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 54f35900f8..4493d3b847 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -50,7 +50,7 @@ from synapse.api.urls import ( ) from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, logcontext_tracer from synapse.metrics import register_memory_metrics, get_metrics_for from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX @@ -449,6 +449,7 @@ def run(hs): # Uncomment to enable tracing of log context changes. # sys.settrace(logcontext_tracer) with LoggingContext("run"): + sys.settrace(logcontext_tracer) change_resource_limit(hs.config.soft_file_limit) if hs.config.gc_thresholds: gc.set_threshold(*hs.config.gc_thresholds) diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 6450a12890..68a9de17b8 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -150,12 +150,12 @@ class _TransactionController(object): if service_is_up: sent = yield txn.send(self.as_api) if sent: - txn.complete(self.store) + yield txn.complete(self.store) else: - self._start_recoverer(service) + preserve_fn(self._start_recoverer)(service) except Exception as e: logger.exception(e) - self._start_recoverer(service) + preserve_fn(self._start_recoverer)(service) @defer.inlineCallbacks def on_recovered(self, recoverer): diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 1735ca9345..d7211ee9b3 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -308,15 +308,15 @@ class Keyring(object): @defer.inlineCallbacks def get_keys_from_store(self, server_name_and_key_ids): - res = yield defer.gatherResults( + res = yield preserve_context_over_deferred(defer.gatherResults( [ - self.store.get_server_verify_keys( + preserve_fn(self.store.get_server_verify_keys)( server_name, key_ids ).addCallback(lambda ks, server: (server, ks), server_name) for server_name, key_ids in server_name_and_key_ids ], consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) defer.returnValue(dict(res)) @@ -337,13 +337,13 @@ class Keyring(object): ) defer.returnValue({}) - results = yield defer.gatherResults( + results = yield preserve_context_over_deferred(defer.gatherResults( [ - get_key(p_name, p_keys) + preserve_fn(get_key)(p_name, p_keys) for p_name, p_keys in self.perspective_servers.items() ], consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) union_of_keys = {} for result in results: @@ -383,13 +383,13 @@ class Keyring(object): defer.returnValue(keys) - results = yield defer.gatherResults( + results = yield preserve_context_over_deferred(defer.gatherResults( [ - get_key(server_name, key_ids) + preserve_fn(get_key)(server_name, key_ids) for server_name, key_ids in server_name_and_key_ids ], consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) merged = {} for result in results: @@ -466,9 +466,9 @@ class Keyring(object): for server_name, response_keys in processed_response.items(): keys.setdefault(server_name, {}).update(response_keys) - yield defer.gatherResults( + yield preserve_context_over_deferred(defer.gatherResults( [ - self.store_keys( + preserve_fn(self.store_keys)( server_name=server_name, from_server=perspective_name, verify_keys=response_keys, @@ -476,7 +476,7 @@ class Keyring(object): for server_name, response_keys in keys.items() ], consumeErrors=True - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) defer.returnValue(keys) @@ -524,7 +524,7 @@ class Keyring(object): keys.update(response_keys) - yield defer.gatherResults( + yield preserve_context_over_deferred(defer.gatherResults( [ preserve_fn(self.store_keys)( server_name=key_server_name, @@ -534,7 +534,7 @@ class Keyring(object): for key_server_name, verify_keys in keys.items() ], consumeErrors=True - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) defer.returnValue(keys) @@ -600,7 +600,7 @@ class Keyring(object): response_keys.update(verify_keys) response_keys.update(old_verify_keys) - yield defer.gatherResults( + yield preserve_context_over_deferred(defer.gatherResults( [ preserve_fn(self.store.store_server_keys_json)( server_name=server_name, @@ -613,7 +613,7 @@ class Keyring(object): for key_id in updated_key_ids ], consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) results[server_name] = response_keys @@ -702,7 +702,7 @@ class Keyring(object): A deferred that completes when the keys are stored. """ # TODO(markjh): Store whether the keys have expired. - yield defer.gatherResults( + yield preserve_context_over_deferred(defer.gatherResults( [ preserve_fn(self.store.store_server_verify_key)( server_name, server_name, key.time_added, key @@ -710,4 +710,4 @@ class Keyring(object): for key_id, key in verify_keys.items() ], consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index da2f5e8cfd..2339cc9034 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -23,6 +23,7 @@ from synapse.crypto.event_signing import check_event_content_hash from synapse.api.errors import SynapseError from synapse.util import unwrapFirstError +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred import logging @@ -102,10 +103,10 @@ class FederationBase(object): warn, pdu ) - valid_pdus = yield defer.gatherResults( + valid_pdus = yield preserve_context_over_deferred(defer.gatherResults( deferreds, consumeErrors=True - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) if include_none: defer.returnValue(valid_pdus) @@ -129,7 +130,7 @@ class FederationBase(object): for pdu in pdus ] - deferreds = self.keyring.verify_json_objects_for_server([ + deferreds = preserve_fn(self.keyring.verify_json_objects_for_server)([ (p.origin, p.get_pdu_json()) for p in redacted_pdus ]) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 9ba3151713..f2b3aceb49 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -27,6 +27,7 @@ from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logutils import log_function +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred from synapse.events import FrozenEvent import synapse.metrics @@ -225,10 +226,10 @@ class FederationClient(FederationBase): ] # FIXME: We should handle signature failures more gracefully. - pdus[:] = yield defer.gatherResults( + pdus[:] = yield preserve_context_over_deferred(defer.gatherResults( self._check_sigs_and_hashes(pdus), consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) defer.returnValue(pdus) @@ -457,14 +458,16 @@ class FederationClient(FederationBase): batch = set(missing_events[i:i + batch_size]) deferreds = [ - self.get_pdu( + preserve_fn(self.get_pdu)( destinations=random_server_list(), event_id=e_id, ) for e_id in batch ] - res = yield defer.DeferredList(deferreds, consumeErrors=True) + res = yield preserve_context_over_deferred( + defer.DeferredList(deferreds, consumeErrors=True) + ) for success, result in res: if success: signed_events.append(result) @@ -853,14 +856,16 @@ class FederationClient(FederationBase): return srvs deferreds = [ - self.get_pdu( + preserve_fn(self.get_pdu)( destinations=random_server_list(), event_id=e_id, ) for e_id, depth in ordered_missing[:limit - len(signed_events)] ] - res = yield defer.DeferredList(deferreds, consumeErrors=True) + res = yield preserve_context_over_deferred( + defer.DeferredList(deferreds, consumeErrors=True) + ) for (result, val), (e_id, _) in zip(res, ordered_missing): if result and val: signed_events.append(val) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index dd285452cd..306686a384 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes from synapse.util.metrics import Measure -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred import logging @@ -163,10 +163,10 @@ class ApplicationServicesHandler(object): def query_3pe(self, kind, protocol, fields): services = yield self._get_services_for_3pn(protocol) - results = yield defer.DeferredList([ - self.appservice_api.query_3pe(service, kind, protocol, fields) + results = yield preserve_context_over_deferred(defer.DeferredList([ + preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields) for service in services - ], consumeErrors=True) + ], consumeErrors=True)) ret = [] for (success, result) in results: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 328f8f4842..fe3092b14b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -26,7 +26,9 @@ from synapse.api.errors import ( from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.events.validator import EventValidator from synapse.util import unwrapFirstError -from synapse.util.logcontext import PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import ( + PreserveLoggingContext, preserve_fn, preserve_context_over_deferred +) from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor from synapse.util.frozenutils import unfreeze @@ -361,9 +363,9 @@ class FederationHandler(BaseHandler): missing_auth - failed_to_fetch ) - results = yield defer.gatherResults( + results = yield preserve_context_over_deferred(defer.gatherResults( [ - self.replication_layer.get_pdu( + preserve_fn(self.replication_layer.get_pdu)( [dest], event_id, outlier=True, @@ -372,7 +374,7 @@ class FederationHandler(BaseHandler): for event_id in missing_auth - failed_to_fetch ], consumeErrors=True - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) auth_events.update({a.event_id: a for a in results}) required_auth.update( a_id for event in results for a_id, _ in event.auth_events @@ -552,10 +554,10 @@ class FederationHandler(BaseHandler): event_ids = list(extremities.keys()) - states = yield defer.gatherResults([ - self.state_handler.resolve_state_groups(room_id, [e]) + states = yield preserve_context_over_deferred(defer.gatherResults([ + preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e]) for e in event_ids - ]) + ])) states = dict(zip(event_ids, [s[1] for s in states])) for e_id, _ in sorted_extremeties_tuple: @@ -1166,9 +1168,9 @@ class FederationHandler(BaseHandler): a bunch of outliers, but not a chunk of individual events that depend on each other for state calculations. """ - contexts = yield defer.gatherResults( + contexts = yield preserve_context_over_deferred(defer.gatherResults( [ - self._prep_event( + preserve_fn(self._prep_event)( origin, ev_info["event"], state=ev_info.get("state"), @@ -1176,7 +1178,7 @@ class FederationHandler(BaseHandler): ) for ev_info in event_infos ] - ) + )) yield self.store.persist_events( [ @@ -1460,9 +1462,9 @@ class FederationHandler(BaseHandler): # Do auth conflict res. logger.info("Different auth: %s", different_auth) - different_events = yield defer.gatherResults( + different_events = yield preserve_context_over_deferred(defer.gatherResults( [ - self.store.get_event( + preserve_fn(self.store.get_event)( d, allow_none=True, allow_rejected=False, @@ -1471,7 +1473,7 @@ class FederationHandler(BaseHandler): if d in have_events and not have_events[d] ], consumeErrors=True - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) if different_events: local_view = dict(auth_events) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index dc76d34a52..4c3cd9d12e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -28,7 +28,8 @@ from synapse.types import ( from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLock from synapse.util.caches.snapshot_cache import SnapshotCache -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.metrics import measure_func from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -502,15 +503,17 @@ class MessageHandler(BaseHandler): lambda states: states[event.event_id] ) - (messages, token), current_state = yield defer.gatherResults( - [ - self.store.get_recent_events_for_room( - event.room_id, - limit=limit, - end_token=room_end_token, - ), - deferred_room_state, - ] + (messages, token), current_state = yield preserve_context_over_deferred( + defer.gatherResults( + [ + preserve_fn(self.store.get_recent_events_for_room)( + event.room_id, + limit=limit, + end_token=room_end_token, + ), + deferred_room_state, + ] + ) ).addErrback(unwrapFirstError) messages = yield filter_events_for_client( @@ -719,9 +722,9 @@ class MessageHandler(BaseHandler): presence, receipts, (messages, token) = yield defer.gatherResults( [ - get_presence(), - get_receipts(), - self.store.get_recent_events_for_room( + preserve_fn(get_presence)(), + preserve_fn(get_receipts)(), + preserve_fn(self.store.get_recent_events_for_room)( room_id, limit=limit, end_token=now_token.room_key, @@ -755,6 +758,7 @@ class MessageHandler(BaseHandler): defer.returnValue(ret) + @measure_func("_create_new_client_event") @defer.inlineCallbacks def _create_new_client_event(self, builder, prev_event_ids=None): if prev_event_ids: @@ -806,6 +810,7 @@ class MessageHandler(BaseHandler): (event, context,) ) + @measure_func("handle_new_client_event") @defer.inlineCallbacks def handle_new_client_event( self, @@ -934,7 +939,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _notify(): yield run_on_reactor() - self.notifier.on_new_room_event( + yield self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) @@ -944,6 +949,6 @@ class MessageHandler(BaseHandler): # If invite, remove room_state from unsigned before sending. event.unsigned.pop("invite_room_state", None) - federation_handler.handle_new_event( + preserve_fn(federation_handler.handle_new_event)( event, destinations=destinations, ) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 5589296c09..46181984c0 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -16,7 +16,9 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import ( + PreserveLoggingContext, preserve_fn, preserve_context_over_deferred, +) from synapse.util.metrics import Measure from synapse.types import UserID @@ -169,13 +171,13 @@ class TypingHandler(object): deferreds = [] for domain in domains: if domain == self.server_name: - self._push_update_local( + preserve_fn(self._push_update_local)( room_id=room_id, user_id=user_id, typing=typing ) else: - deferreds.append(self.federation.send_edu( + deferreds.append(preserve_fn(self.federation.send_edu)( destination=domain, edu_type="m.typing", content={ @@ -185,7 +187,9 @@ class TypingHandler(object): }, )) - yield defer.DeferredList(deferreds, consumeErrors=True) + yield preserve_context_over_deferred( + defer.DeferredList(deferreds, consumeErrors=True) + ) @defer.inlineCallbacks def _recv_edu(self, origin, content): diff --git a/synapse/notifier.py b/synapse/notifier.py index c48024096d..b86648f5e4 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -19,7 +19,7 @@ from synapse.api.errors import AuthError from synapse.util.logutils import log_function from synapse.util.async import ObservableDeferred -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import PreserveLoggingContext, preserve_fn from synapse.util.metrics import Measure from synapse.types import StreamToken from synapse.visibility import filter_events_for_client @@ -174,6 +174,7 @@ class Notifier(object): lambda: len(self.user_to_user_stream), ) + @preserve_fn def on_new_room_event(self, event, room_stream_id, max_room_stream_id, extra_users=[]): """ Used by handlers to inform the notifier something has happened @@ -195,6 +196,7 @@ class Notifier(object): self.notify_replication() + @preserve_fn def _notify_pending_new_room_events(self, max_room_stream_id): """Notify for the room events that were queued waiting for a previous event to be persisted. @@ -212,6 +214,7 @@ class Notifier(object): else: self._on_new_room_event(event, room_stream_id, extra_users) + @preserve_fn def _on_new_room_event(self, event, room_stream_id, extra_users=[]): """Notify any user streams that are interested in this room event""" # poke any interested application service. @@ -226,6 +229,7 @@ class Notifier(object): rooms=[event.room_id], ) + @preserve_fn def on_new_event(self, stream_key, new_token, users=[], rooms=[]): """ Used to inform listeners that something has happend event wise. @@ -252,6 +256,7 @@ class Notifier(object): self.notify_replication() + @preserve_fn def on_new_replication_data(self): """Used to inform replication listeners that something has happend without waking up any of the normal user event streams""" diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index d555a33e9a..becb8ef1ae 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -17,14 +17,15 @@ from twisted.internet import defer from synapse.util.presentable_names import ( calculate_room_name, name_from_member_event ) +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred @defer.inlineCallbacks def get_badge_count(store, user_id): - invites, joins = yield defer.gatherResults([ - store.get_invited_rooms_for_user(user_id), - store.get_rooms_for_user(user_id), - ], consumeErrors=True) + invites, joins = yield preserve_context_over_deferred(defer.gatherResults([ + preserve_fn(store.get_invited_rooms_for_user)(user_id), + preserve_fn(store.get_rooms_for_user)(user_id), + ], consumeErrors=True)) my_receipts_by_room = yield store.get_receipts_for_user( user_id, "m.read", diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 54c0f1b849..3837be523d 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -17,7 +17,7 @@ from twisted.internet import defer import pusher -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred from synapse.util.async import run_on_reactor import logging @@ -130,10 +130,12 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): deferreds.append( - p.on_new_notifications(min_stream_id, max_stream_id) + preserve_fn(p.on_new_notifications)( + min_stream_id, max_stream_id + ) ) - yield defer.gatherResults(deferreds) + yield preserve_context_over_deferred(defer.gatherResults(deferreds)) except: logger.exception("Exception in pusher on_new_notifications") @@ -155,10 +157,10 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): deferreds.append( - p.on_new_receipts(min_stream_id, max_stream_id) + preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id) ) - yield defer.gatherResults(deferreds) + yield preserve_context_over_deferred(defer.gatherResults(deferreds)) except: logger.exception("Exception in pusher on_new_receipts") diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 943f5676a3..2121bd75ea 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -403,10 +403,9 @@ class RegisterRestServlet(RestServlet): # register the user's device device_id = params.get("device_id") initial_display_name = params.get("initial_device_display_name") - device_id = self.device_handler.check_device_registered( + return self.device_handler.check_device_registered( user_id, device_id, initial_display_name ) - return device_id @defer.inlineCallbacks def _do_guest_registration(self): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 97aef25321..57e5005285 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -20,7 +20,9 @@ from synapse.events import FrozenEvent, USE_FROZEN_DICTS from synapse.events.utils import prune_event from synapse.util.async import ObservableDeferred -from synapse.util.logcontext import preserve_fn, PreserveLoggingContext +from synapse.util.logcontext import ( + preserve_fn, PreserveLoggingContext, preserve_context_over_deferred +) from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.api.constants import EventTypes @@ -202,7 +204,7 @@ class EventsStore(SQLBaseStore): deferreds = [] for room_id, evs_ctxs in partitioned.items(): - d = self._event_persist_queue.add_to_queue( + d = preserve_fn(self._event_persist_queue.add_to_queue)( room_id, evs_ctxs, backfilled=backfilled, current_state=None, @@ -212,7 +214,9 @@ class EventsStore(SQLBaseStore): for room_id in partitioned.keys(): self._maybe_start_persisting(room_id) - return defer.gatherResults(deferreds, consumeErrors=True) + return preserve_context_over_deferred( + defer.gatherResults(deferreds, consumeErrors=True) + ) @defer.inlineCallbacks @log_function @@ -225,7 +229,7 @@ class EventsStore(SQLBaseStore): self._maybe_start_persisting(event.room_id) - yield deferred + yield preserve_context_over_deferred(deferred) max_persisted_id = yield self._stream_id_gen.get_current_token() defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id)) @@ -1088,7 +1092,7 @@ class EventsStore(SQLBaseStore): if not allow_rejected: rows[:] = [r for r in rows if not r["rejects"]] - res = yield defer.gatherResults( + res = yield preserve_context_over_deferred(defer.gatherResults( [ preserve_fn(self._get_event_from_row)( row["internal_metadata"], row["json"], row["redacts"], @@ -1097,7 +1101,7 @@ class EventsStore(SQLBaseStore): for row in rows ], consumeErrors=True - ) + )) defer.returnValue({ e.event.event_id: e diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 862c5c3ea1..0577a0525b 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -39,7 +39,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred from synapse.storage.engines import PostgresEngine, Sqlite3Engine import logging @@ -234,12 +234,12 @@ class StreamStore(SQLBaseStore): results = {} room_ids = list(room_ids) for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): - res = yield defer.gatherResults([ + res = yield preserve_context_over_deferred(defer.gatherResults([ preserve_fn(self.get_room_events_stream_for_room)( room_id, from_key, to_key, limit, order=order, ) for room_id in rm_ids - ]) + ])) results.update(dict(zip(rm_ids, res))) defer.returnValue(results) diff --git a/synapse/util/async.py b/synapse/util/async.py index c84b23ff46..347fb1e380 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -146,10 +146,10 @@ def concurrently_execute(func, args, limit): except StopIteration: pass - return defer.gatherResults([ + return preserve_context_over_deferred(defer.gatherResults([ preserve_fn(_concurrently_execute_inner)() for _ in xrange(limit) - ], consumeErrors=True).addErrback(unwrapFirstError) + ], consumeErrors=True)).addErrback(unwrapFirstError) class Linearizer(object): @@ -181,7 +181,8 @@ class Linearizer(object): self.key_to_defer[key] = new_defer if current_defer: - yield preserve_context_over_deferred(current_defer) + with PreserveLoggingContext(): + yield current_defer @contextmanager def _ctx_manager(): @@ -264,7 +265,7 @@ class ReadWriteLock(object): curr_readers.clear() self.key_to_current_writer[key] = new_defer - yield defer.gatherResults(to_wait_on) + yield preserve_context_over_deferred(defer.gatherResults(to_wait_on)) @contextmanager def _ctx_manager(): diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 7a87045f87..6c83eb213d 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -297,12 +297,13 @@ def preserve_context_over_fn(fn, *args, **kwargs): return res -def preserve_context_over_deferred(deferred): +def preserve_context_over_deferred(deferred, context=None): """Given a deferred wrap it such that any callbacks added later to it will be invoked with the current context. """ - current_context = LoggingContext.current_context() - d = _PreservingContextDeferred(current_context) + if context is None: + context = LoggingContext.current_context() + d = _PreservingContextDeferred(context) deferred.chainDeferred(d) return d @@ -316,7 +317,13 @@ def preserve_fn(f): def g(*args, **kwargs): with PreserveLoggingContext(current): - return f(*args, **kwargs) + res = f(*args, **kwargs) + if isinstance(res, defer.Deferred): + return preserve_context_over_deferred( + res, context=LoggingContext.sentinel + ) + else: + return res return g diff --git a/synapse/visibility.py b/synapse/visibility.py index 948ad51772..cc12c0a23d 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.constants import Membership, EventTypes -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred import logging @@ -55,12 +55,12 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state): given events events ([synapse.events.EventBase]): list of events to filter """ - forgotten = yield defer.gatherResults([ + forgotten = yield preserve_context_over_deferred(defer.gatherResults([ preserve_fn(store.who_forgot_in_room)( room_id, ) for room_id in frozenset(e.room_id for e in events) - ], consumeErrors=True) + ], consumeErrors=True)) # Set of membership event_ids that have been forgotten event_id_forgotten = frozenset( -- cgit 1.5.1