From 476899295f5fd6cff64799bcbc84cd4bf9005e33 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 8 May 2015 16:32:18 +0100 Subject: Change the way we do logging contexts so that they survive divergences --- synapse/util/__init__.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'synapse/util/__init__.py') diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 79109d0b19..364b927851 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from twisted.internet import defer, reactor, task @@ -50,8 +50,10 @@ class Clock(object): current_context = LoggingContext.current_context() def wrapped_callback(): - LoggingContext.thread_local.current_context = current_context - callback() + with PreserveLoggingContext(): + LoggingContext.thread_local.current_context = current_context + callback() + return reactor.callLater(delay, wrapped_callback) def cancel_call_later(self, timer): -- cgit 1.4.1 From 2236ef6c92b7964665f5c43b941754d70aa506d8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 8 May 2015 19:53:34 +0100 Subject: Fix up leak. Add warnings. --- synapse/handlers/_base.py | 11 +++++++---- synapse/handlers/federation.py | 29 ++++++++++++++++----------- synapse/handlers/presence.py | 10 ++++++---- synapse/handlers/typing.py | 4 +++- synapse/http/server.py | 6 ++++-- synapse/util/__init__.py | 3 ++- synapse/util/distributor.py | 45 ++++++++++++++++++++---------------------- synapse/util/logcontext.py | 11 ++++++++++- 8 files changed, 70 insertions(+), 49 deletions(-) (limited to 'synapse/util/__init__.py') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 4b3f4eadab..ddc5c21e7d 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -20,6 +20,8 @@ from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.api.constants import Membership, EventTypes from synapse.types import UserID +from synapse.util.logcontext import PreserveLoggingContext + import logging @@ -137,10 +139,11 @@ class BaseHandler(object): "Failed to get destination from event %s", s.event_id ) - # Don't block waiting on waking up all the listeners. - notify_d = self.notifier.on_new_room_event( - event, extra_users=extra_users - ) + with PreserveLoggingContext(): + # Don't block waiting on waking up all the listeners. + notify_d = self.notifier.on_new_room_event( + event, extra_users=extra_users + ) def log_failure(f): logger.warn( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 85e2757227..77c315c47c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -21,6 +21,7 @@ from synapse.api.errors import ( AuthError, FederationError, StoreError, ) from synapse.api.constants import EventTypes, Membership, RejectedReason +from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor from synapse.util.frozenutils import unfreeze @@ -197,9 +198,10 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(target_user_id) extra_users.append(target_user) - d = self.notifier.on_new_room_event( - event, extra_users=extra_users - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + event, extra_users=extra_users + ) def log_failure(f): logger.warn( @@ -431,9 +433,10 @@ class FederationHandler(BaseHandler): auth_events=auth_events, ) - d = self.notifier.on_new_room_event( - new_event, extra_users=[joinee] - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + new_event, extra_users=[joinee] + ) def log_failure(f): logger.warn( @@ -512,9 +515,10 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(target_user_id) extra_users.append(target_user) - d = self.notifier.on_new_room_event( - event, extra_users=extra_users - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + event, extra_users=extra_users + ) def log_failure(f): logger.warn( @@ -594,9 +598,10 @@ class FederationHandler(BaseHandler): ) target_user = UserID.from_string(event.state_key) - d = self.notifier.on_new_room_event( - event, extra_users=[target_user], - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + event, extra_users=[target_user], + ) def log_failure(f): logger.warn( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 6ae39a1d37..1edab05492 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError from synapse.api.constants import PresenceState +from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function from synapse.types import UserID import synapse.metrics @@ -808,10 +809,11 @@ class PresenceHandler(BaseHandler): def push_update_to_clients(self, observed_user, users_to_push=[], room_ids=[], statuscache=None): - self.notifier.on_new_user_event( - users_to_push, - room_ids, - ) + with PreserveLoggingContext(): + self.notifier.on_new_user_event( + users_to_push, + room_ids, + ) class PresenceEventSource(object): diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index c0b2bd7db0..64fe51aa3e 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -18,6 +18,7 @@ from twisted.internet import defer from ._base import BaseHandler from synapse.api.errors import SynapseError, AuthError +from synapse.util.logcontext import PreserveLoggingContext from synapse.types import UserID import logging @@ -216,7 +217,8 @@ class TypingNotificationHandler(BaseHandler): self._latest_room_serial += 1 self._room_serials[room_id] = self._latest_room_serial - self.notifier.on_new_user_event(rooms=[room_id]) + with PreserveLoggingContext(): + self.notifier.on_new_user_event(rooms=[room_id]) class TypingNotificationEventSource(object): diff --git a/synapse/http/server.py b/synapse/http/server.py index 93ecbd7589..73efbff4f2 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -17,7 +17,7 @@ from synapse.api.errors import ( cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError ) -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext import synapse.metrics from syutil.jsonutil import ( @@ -85,7 +85,9 @@ def request_handler(request_handler): "Received request: %s %s", request.method, request.path ) - yield request_handler(self, request) + d = request_handler(self, request) + with PreserveLoggingContext(): + yield d code = request.code except CodeMessageException as e: code = e.code diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 364b927851..fd3eb1f574 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -54,7 +54,8 @@ class Clock(object): LoggingContext.thread_local.current_context = current_context callback() - return reactor.callLater(delay, wrapped_callback) + with PreserveLoggingContext(): + return reactor.callLater(delay, wrapped_callback) def cancel_call_later(self, timer): timer.cancel() diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 9d9c350397..5b150cb0e5 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.logcontext import PreserveLoggingContext - from twisted.internet import defer import logging @@ -93,7 +91,6 @@ class Signal(object): Each observer callable may return a Deferred.""" self.observers.append(observer) - @defer.inlineCallbacks def fire(self, *args, **kwargs): """Invokes every callable in the observer list, passing in the args and kwargs. Exceptions thrown by observers are logged but ignored. It is @@ -101,24 +98,24 @@ class Signal(object): Returns a Deferred that will complete when all the observers have completed.""" - with PreserveLoggingContext(): - deferreds = [] - for observer in self.observers: - d = defer.maybeDeferred(observer, *args, **kwargs) - - def eb(failure): - logger.warning( - "%s signal observer %s failed: %r", - self.name, observer, failure, - exc_info=( - failure.type, - failure.value, - failure.getTracebackObject())) - if not self.suppress_failures: - failure.raiseException() - deferreds.append(d.addErrback(eb)) - results = [] - for deferred in deferreds: - result = yield deferred - results.append(result) - defer.returnValue(results) + + def eb(failure): + logger.warning( + "%s signal observer %s failed: %r", + self.name, observer, failure, + exc_info=( + failure.type, + failure.value, + failure.getTracebackObject())) + if not self.suppress_failures: + failure.raiseException() + + deferreds = [ + defer.maybeDeferred(observer, *args, **kwargs) + for observer in self.observers + ] + + d = defer.gatherResults(deferreds, consumeErrors=True) + d.addErrback(eb) + + return d diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 3dce8d2bf3..a92d518b43 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -132,6 +132,13 @@ class PreserveLoggingContext(object): """Restores the current logging context""" LoggingContext.thread_local.current_context = self.current_context + if self.current_context is not LoggingContext.sentinel: + if self.current_context.parent_context is None: + logger.warn( + "Restoring dead context: %s", + self.current_context, + ) + def preserve_context_over_fn(fn, *args, **kwargs): """Takes a function and invokes it with the given arguments, but removes @@ -169,6 +176,8 @@ def preserve_context_over_deferred(deferred): res = d.errback(failure) return res - deferred.addCallbacks(cb, eb) + if deferred.called: + return deferred + deferred.addCallbacks(cb, eb) return d -- cgit 1.4.1 From 95dedb866f04ee4ae034c35130f2a8dc86243fbb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 May 2015 13:14:29 +0100 Subject: Unwrap defer.gatherResults failures --- synapse/federation/federation_base.py | 4 +++- synapse/handlers/federation.py | 3 ++- synapse/handlers/message.py | 5 +++-- synapse/handlers/profile.py | 3 ++- synapse/handlers/room.py | 4 ++-- synapse/util/__init__.py | 6 ++++++ 6 files changed, 18 insertions(+), 7 deletions(-) (limited to 'synapse/util/__init__.py') diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 21a763214b..5217d91aab 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -24,6 +24,8 @@ from synapse.crypto.event_signing import check_event_content_hash from synapse.api.errors import SynapseError +from synapse.util import unwrapFirstError + import logging @@ -94,7 +96,7 @@ class FederationBase(object): yield defer.gatherResults( [do(pdu) for pdu in pdus], consumeErrors=True - ) + ).addErrback(unwrapFirstError) defer.returnValue(signed_pdus) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 77c315c47c..cd85001578 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -21,6 +21,7 @@ from synapse.api.errors import ( AuthError, FederationError, StoreError, ) from synapse.api.constants import EventTypes, Membership, RejectedReason +from synapse.util import unwrapFirstError from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor @@ -926,7 +927,7 @@ class FederationHandler(BaseHandler): if d in have_events and not have_events[d] ], consumeErrors=True - ) + ).addErrback(unwrapFirstError) if different_events: local_view = dict(auth_events) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 22e19af17f..b7d52647d7 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -20,6 +20,7 @@ from synapse.api.errors import RoomError, SynapseError from synapse.streams.config import PaginationConfig from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator +from synapse.util import unwrapFirstError from synapse.util.logcontext import PreserveLoggingContext from synapse.types import UserID @@ -303,7 +304,7 @@ class MessageHandler(BaseHandler): event.room_id ), ] - ) + ).addErrback(unwrapFirstError) start_token = now_token.copy_and_replace("room_key", token[0]) end_token = now_token.copy_and_replace("room_key", token[1]) @@ -328,7 +329,7 @@ class MessageHandler(BaseHandler): yield defer.gatherResults( [handle_room(e) for e in room_list], consumeErrors=True - ) + ).addErrback(unwrapFirstError) ret = { "rooms": rooms_ret, diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index ffb449d457..71ff78ab23 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError, CodeMessageException from synapse.api.constants import EventTypes, Membership from synapse.types import UserID +from synapse.util import unwrapFirstError from ._base import BaseHandler @@ -159,7 +160,7 @@ class ProfileHandler(BaseHandler): self.store.get_profile_avatar_url(user.localpart), ], consumeErrors=True - ) + ).addErrback(unwrapFirstError) state["displayname"] = displayname state["avatar_url"] = avatar_url diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index cfa2e38ed2..ea5abba6ab 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -21,7 +21,7 @@ from ._base import BaseHandler from synapse.types import UserID, RoomAlias, RoomID from synapse.api.constants import EventTypes, Membership, JoinRules from synapse.api.errors import StoreError, SynapseError -from synapse.util import stringutils +from synapse.util import stringutils, unwrapFirstError from synapse.util.async import run_on_reactor from synapse.events.utils import serialize_event @@ -537,7 +537,7 @@ class RoomListHandler(BaseHandler): for room in chunk ], consumeErrors=True, - ) + ).addErrback(unwrapFirstError) for i, room in enumerate(chunk): room["num_joined_members"] = len(results[i]) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index fd3eb1f574..c1a16b639a 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -23,6 +23,12 @@ import logging logger = logging.getLogger(__name__) +def unwrapFirstError(failure): + # defer.gatherResults and DeferredLists wrap failures. + failure.trap(defer.FirstError) + return failure.value.subFailure + + class Clock(object): """A small utility that obtains current time-of-day so that time may be mocked during unit-tests. -- cgit 1.4.1 From cdb3757942fefdcdc3d33b9c6d7c9e44decefd6f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 13:31:55 +0100 Subject: Refactor _get_events --- synapse/storage/_base.py | 346 +++++++++++++---------------------------------- synapse/storage/state.py | 2 +- synapse/util/__init__.py | 28 ++++ 3 files changed, 123 insertions(+), 253 deletions(-) (limited to 'synapse/util/__init__.py') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 46a1c07460..a6c6676f8d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -17,6 +17,7 @@ import logging from synapse.api.errors import StoreError from synapse.events import FrozenEvent from synapse.events.utils import prune_event +from synapse.util import unwrap_deferred from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_context_over_fn, LoggingContext from synapse.util.lrucache import LruCache @@ -28,7 +29,6 @@ from twisted.internet import defer from collections import namedtuple, OrderedDict import functools -import itertools import simplejson as json import sys import time @@ -870,35 +870,43 @@ class SQLBaseStore(object): @defer.inlineCallbacks def _get_events(self, event_ids, check_redacted=True, - get_prev_content=False, desc="_get_events"): - N = 50 # Only fetch 100 events at a time. + get_prev_content=False, allow_rejected=False, txn=None): + if not event_ids: + defer.returnValue([]) - ds = [ - self._fetch_events( - event_ids[i*N:(i+1)*N], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - ) - for i in range(1 + len(event_ids) / N) - ] + event_map = self._get_events_from_cache( + event_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) - res = yield defer.gatherResults(ds, consumeErrors=True) + missing_events = [e for e in event_ids if e not in event_map] - defer.returnValue( - list(itertools.chain(*res)) + missing_events = yield self._fetch_events( + txn, + missing_events, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, ) + event_map.update(missing_events) + + defer.returnValue([ + event_map[e_id] for e_id in event_ids + if e_id in event_map and event_map[e_id] + ]) + def _get_events_txn(self, txn, event_ids, check_redacted=True, - get_prev_content=False): - N = 50 # Only fetch 100 events at a time. - return list(itertools.chain(*[ - self._fetch_events_txn( - txn, event_ids[i*N:(i+1)*N], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - ) - for i in range(1 + len(event_ids) / N) - ])) + get_prev_content=False, allow_rejected=False): + return unwrap_deferred(self._get_events( + event_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + txn=txn, + )) def _invalidate_get_event_cache(self, event_id): for check_redacted in (False, True): @@ -909,68 +917,24 @@ class SQLBaseStore(object): def _get_event_txn(self, txn, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False): - start_time = time.time() * 1000 - - def update_counter(desc, last_time): - curr_time = self._get_event_counters.update(desc, last_time) - sql_getevents_timer.inc_by(curr_time - last_time, desc) - return curr_time - - try: - ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content) - - if allow_rejected or not ret.rejected_reason: - return ret - else: - return None - except KeyError: - pass - finally: - start_time = update_counter("event_cache", start_time) - - sql = ( - "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " - "FROM event_json as e " - "LEFT JOIN rejections as rej USING (event_id) " - "LEFT JOIN redactions as r ON e.event_id = r.redacts " - "WHERE e.event_id = ? " - "LIMIT 1 " - ) - - txn.execute(sql, (event_id,)) - - res = txn.fetchone() - - if not res: - return None - - internal_metadata, js, redacted, rejected_reason = res - - start_time = update_counter("select_event", start_time) - - result = self._get_event_from_row_txn( - txn, internal_metadata, js, redacted, + events = self._get_events_txn( + txn, [event_id], check_redacted=check_redacted, get_prev_content=get_prev_content, - rejected_reason=rejected_reason, + allow_rejected=allow_rejected, ) - self._get_event_cache.prefill(event_id, check_redacted, get_prev_content, result) - if allow_rejected or not rejected_reason: - return result - else: - return None - - def _fetch_events_txn(self, txn, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): - if not events: - return [] + return events[0] if events else None + def _get_events_from_cache(self, events, check_redacted, get_prev_content, + allow_rejected): event_map = {} for event_id in events: try: - ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content) + ret = self._get_event_cache.get( + event_id, check_redacted, get_prev_content + ) if allow_rejected or not ret.rejected_reason: event_map[event_id] = ret @@ -979,200 +943,81 @@ class SQLBaseStore(object): except KeyError: pass - missing_events = [ - e for e in events - if e not in event_map - ] - - if missing_events: - sql = ( - "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " - " FROM event_json as e" - " LEFT JOIN rejections as rej USING (event_id)" - " LEFT JOIN redactions as r ON e.event_id = r.redacts" - " WHERE e.event_id IN (%s)" - ) % (",".join(["?"]*len(missing_events)),) - - txn.execute(sql, missing_events) - rows = txn.fetchall() - - res = [ - self._get_event_from_row_txn( - txn, row[0], row[1], row[2], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - rejected_reason=row[3], - ) - for row in rows - ] - - event_map.update({ - e.event_id: e - for e in res if e - }) - - for e in res: - self._get_event_cache.prefill( - e.event_id, check_redacted, get_prev_content, e - ) - - return [ - event_map[e_id] for e_id in events - if e_id in event_map and event_map[e_id] - ] + return event_map @defer.inlineCallbacks - def _fetch_events(self, events, check_redacted=True, + def _fetch_events(self, txn, events, check_redacted=True, get_prev_content=False, allow_rejected=False): if not events: - defer.returnValue([]) - - event_map = {} + defer.returnValue({}) - for event_id in events: - try: - ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content) - - if allow_rejected or not ret.rejected_reason: - event_map[event_id] = ret - else: - event_map[event_id] = None - except KeyError: - pass + rows = [] + N = 2 + for i in range(1 + len(events) / N): + evs = events[i*N:(i + 1)*N] + if not evs: + break - missing_events = [ - e for e in events - if e not in event_map - ] - - if missing_events: sql = ( "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " " FROM event_json as e" " LEFT JOIN rejections as rej USING (event_id)" " LEFT JOIN redactions as r ON e.event_id = r.redacts" " WHERE e.event_id IN (%s)" - ) % (",".join(["?"]*len(missing_events)),) - - rows = yield self._execute( - "_fetch_events", - None, - sql, - *missing_events - ) - - res_ds = [ - self._get_event_from_row( - row[0], row[1], row[2], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - rejected_reason=row[3], - ) - for row in rows - ] + ) % (",".join(["?"]*len(evs)),) - res = yield defer.gatherResults(res_ds, consumeErrors=True) + if txn: + txn.execute(sql, evs) + rows.extend(txn.fetchall()) + else: + res = yield self._execute("_fetch_events", None, sql, *evs) + rows.extend(res) - event_map.update({ - e.event_id: e - for e in res if e - }) + res = [] + for row in rows: + e = yield self._get_event_from_row( + txn, + row[0], row[1], row[2], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + rejected_reason=row[3], + ) + res.append(e) - for e in res: - self._get_event_cache.prefill( - e.event_id, check_redacted, get_prev_content, e - ) + for e in res: + self._get_event_cache.prefill( + e.event_id, check_redacted, get_prev_content, e + ) - defer.returnValue([ - event_map[e_id] for e_id in events - if e_id in event_map and event_map[e_id] - ]) + defer.returnValue({ + e.event_id: e + for e in res if e + }) @defer.inlineCallbacks - def _get_event_from_row(self, internal_metadata, js, redacted, + def _get_event_from_row(self, txn, internal_metadata, js, redacted, check_redacted=True, get_prev_content=False, rejected_reason=None): - - start_time = time.time() * 1000 - - def update_counter(desc, last_time): - curr_time = self._get_event_counters.update(desc, last_time) - sql_getevents_timer.inc_by(curr_time - last_time, desc) - return curr_time - d = json.loads(js) - start_time = update_counter("decode_json", start_time) - internal_metadata = json.loads(internal_metadata) - start_time = update_counter("decode_internal", start_time) - - if rejected_reason: - rejected_reason = yield self._simple_select_one_onecol( - desc="_get_event_from_row", - table="rejections", - keyvalues={"event_id": rejected_reason}, - retcol="reason", - ) - - ev = FrozenEvent( - d, - internal_metadata_dict=internal_metadata, - rejected_reason=rejected_reason, - ) - start_time = update_counter("build_frozen_event", start_time) - - if check_redacted and redacted: - ev = prune_event(ev) - - redaction_id = yield self._simple_select_one_onecol( - desc="_get_event_from_row", - table="redactions", - keyvalues={"redacts": ev.event_id}, - retcol="event_id", - ) - - ev.unsigned["redacted_by"] = redaction_id - # Get the redaction event. - because = yield self.get_event_txn( - redaction_id, - check_redacted=False - ) - - if because: - ev.unsigned["redacted_because"] = because - start_time = update_counter("redact_event", start_time) - - if get_prev_content and "replaces_state" in ev.unsigned: - prev = yield self.get_event( - ev.unsigned["replaces_state"], - get_prev_content=False, - ) - if prev: - ev.unsigned["prev_content"] = prev.get_dict()["content"] - start_time = update_counter("get_prev_content", start_time) - - defer.returnValue(ev) - - def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, - check_redacted=True, get_prev_content=False, - rejected_reason=None): - - start_time = time.time() * 1000 - - def update_counter(desc, last_time): - curr_time = self._get_event_counters.update(desc, last_time) - sql_getevents_timer.inc_by(curr_time - last_time, desc) - return curr_time - - d = json.loads(js) - start_time = update_counter("decode_json", start_time) + def select(txn, *args, **kwargs): + if txn: + return self._simple_select_one_onecol_txn(txn, *args, **kwargs) + else: + return self._simple_select_one_onecol( + *args, + desc="_get_event_from_row", **kwargs + ) - internal_metadata = json.loads(internal_metadata) - start_time = update_counter("decode_internal", start_time) + def get_event(txn, *args, **kwargs): + if txn: + return self._get_event_txn(txn, *args, **kwargs) + else: + return self.get_event(*args, **kwargs) if rejected_reason: - rejected_reason = self._simple_select_one_onecol_txn( + rejected_reason = yield select( txn, table="rejections", keyvalues={"event_id": rejected_reason}, @@ -1184,12 +1029,11 @@ class SQLBaseStore(object): internal_metadata_dict=internal_metadata, rejected_reason=rejected_reason, ) - start_time = update_counter("build_frozen_event", start_time) if check_redacted and redacted: ev = prune_event(ev) - redaction_id = self._simple_select_one_onecol_txn( + redaction_id = yield select( txn, table="redactions", keyvalues={"redacts": ev.event_id}, @@ -1199,7 +1043,7 @@ class SQLBaseStore(object): ev.unsigned["redacted_by"] = redaction_id # Get the redaction event. - because = self._get_event_txn( + because = yield get_event( txn, redaction_id, check_redacted=False @@ -1207,19 +1051,17 @@ class SQLBaseStore(object): if because: ev.unsigned["redacted_because"] = because - start_time = update_counter("redact_event", start_time) if get_prev_content and "replaces_state" in ev.unsigned: - prev = self._get_event_txn( + prev = yield get_event( txn, ev.unsigned["replaces_state"], get_prev_content=False, ) if prev: ev.unsigned["prev_content"] = prev.get_dict()["content"] - start_time = update_counter("get_prev_content", start_time) - return ev + defer.returnValue(ev) def _parse_events(self, rows): return self.runInteraction( diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 483b316e9f..26fd3b3e67 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -85,7 +85,7 @@ class StateStore(SQLBaseStore): @defer.inlineCallbacks def c(vals): - vals[:] = yield self._fetch_events(vals, get_prev_content=False) + vals[:] = yield self._get_events(vals, get_prev_content=False) yield defer.gatherResults( [ diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index c1a16b639a..b9afb3364d 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -29,6 +29,34 @@ def unwrapFirstError(failure): return failure.value.subFailure +def unwrap_deferred(d): + """Given a deferred that we know has completed, return its value or raise + the failure as an exception + """ + if not d.called: + raise RuntimeError("deferred has not finished") + + res = [] + + def f(r): + res.append(r) + return r + d.addCallback(f) + + if res: + return res[0] + + def f(r): + res.append(r) + return r + d.addErrback(f) + + if res: + res[0].raiseException() + else: + raise RuntimeError("deferred did not call callbacks") + + class Clock(object): """A small utility that obtains current time-of-day so that time may be mocked during unit-tests. -- cgit 1.4.1 From 7cd6a6f6cf03e5bfde99b839bdc67f8f5513cdc5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 15:34:02 +0100 Subject: Awful idea for speeding up fetching of events --- synapse/storage/_base.py | 4 ++ synapse/storage/events.py | 167 ++++++++++++++++++++++++++++++++++++---------- synapse/util/__init__.py | 8 +-- 3 files changed, 139 insertions(+), 40 deletions(-) (limited to 'synapse/util/__init__.py') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index a8f8989e38..c20ff3a572 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -299,6 +299,10 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, max_entries=hs.config.event_cache_size) + self._event_fetch_lock = threading.Lock() + self._event_fetch_list = [] + self._event_fetch_ongoing = False + self.database_engine = hs.database_engine self._stream_id_gen = StreamIdGenerator() diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 0aa4e0d445..be88328ce5 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -15,7 +15,7 @@ from _base import SQLBaseStore, _RollbackButIsFineException -from twisted.internet import defer +from twisted.internet import defer, reactor from synapse.events import FrozenEvent from synapse.events.utils import prune_event @@ -89,18 +89,17 @@ class EventsStore(SQLBaseStore): Returns: Deferred : A FrozenEvent. """ - event = yield self.runInteraction( - "get_event", self._get_event_txn, - event_id, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, + events = yield self._get_events( + [event_id], + check_redacted=True, + get_prev_content=False, + allow_rejected=False, ) - if not event and not allow_none: + if not events and not allow_none: raise RuntimeError("Could not find event %s" % (event_id,)) - defer.returnValue(event) + defer.returnValue(events[0] if events else None) @log_function def _persist_event_txn(self, txn, event, context, backfilled, @@ -420,13 +419,21 @@ class EventsStore(SQLBaseStore): if e_id in event_map and event_map[e_id] ]) - missing_events = yield self._fetch_events( - txn, - missing_events_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) + if not txn: + missing_events = yield self._enqueue_events( + missing_events_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + else: + missing_events = self._fetch_events_txn( + txn, + missing_events_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) event_map.update(missing_events) @@ -492,11 +499,82 @@ class EventsStore(SQLBaseStore): )) @defer.inlineCallbacks - def _fetch_events(self, txn, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): + def _enqueue_events(self, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): if not events: defer.returnValue({}) + def do_fetch(txn): + event_list = [] + try: + with self._event_fetch_lock: + event_list = self._event_fetch_list + self._event_fetch_list = [] + + if not event_list: + return + + event_id_lists = zip(*event_list)[0] + event_ids = [ + item for sublist in event_id_lists for item in sublist + ] + rows = self._fetch_event_rows(txn, event_ids) + + row_dict = { + r["event_id"]: r + for r in rows + } + + for ids, d in event_list: + d.callback( + [ + row_dict[i] for i in ids + if i in row_dict + ] + ) + except Exception as e: + for _, d in event_list: + try: + reactor.callFromThread(d.errback, e) + except: + pass + finally: + with self._event_fetch_lock: + self._event_fetch_ongoing = False + + def cb(rows): + return defer.gatherResults([ + self._get_event_from_row( + None, + row["internal_metadata"], row["json"], row["redacts"], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + rejected_reason=row["rejects"], + ) + for row in rows + ]) + + d = defer.Deferred() + d.addCallback(cb) + with self._event_fetch_lock: + self._event_fetch_list.append( + (events, d) + ) + + if not self._event_fetch_ongoing: + self.runInteraction( + "do_fetch", + do_fetch + ) + + res = yield d + + defer.returnValue({ + e.event_id: e + for e in res if e + }) + + def _fetch_event_rows(self, txn, events): rows = [] N = 200 for i in range(1 + len(events) / N): @@ -505,43 +583,56 @@ class EventsStore(SQLBaseStore): break sql = ( - "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " + "SELECT " + " e.event_id as event_id, " + " e.internal_metadata," + " e.json," + " r.redacts as redacts," + " rej.event_id as rejects " " FROM event_json as e" " LEFT JOIN rejections as rej USING (event_id)" " LEFT JOIN redactions as r ON e.event_id = r.redacts" " WHERE e.event_id IN (%s)" ) % (",".join(["?"]*len(evs)),) - if txn: - txn.execute(sql, evs) - rows.extend(txn.fetchall()) - else: - res = yield self._execute("_fetch_events", None, sql, *evs) - rows.extend(res) + txn.execute(sql, evs) + rows.extend(self.cursor_to_dict(txn)) + + return rows + + @defer.inlineCallbacks + def _fetch_events(self, txn, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): + if not events: + defer.returnValue({}) + + if txn: + rows = self._fetch_event_rows( + txn, events, + ) + else: + rows = yield self.runInteraction( + self._fetch_event_rows, + events, + ) res = yield defer.gatherResults( [ defer.maybeDeferred( self._get_event_from_row, txn, - row[0], row[1], row[2], + row["internal_metadata"], row["json"], row["redacts"], check_redacted=check_redacted, get_prev_content=get_prev_content, - rejected_reason=row[3], + rejected_reason=row["rejects"], ) for row in rows - ], - consumeErrors=True, + ] ) - for e in res: - self._get_event_cache.prefill( - e.event_id, check_redacted, get_prev_content, e - ) - defer.returnValue({ - e.event_id: e - for e in res if e + r.event_id: r + for r in res }) @defer.inlineCallbacks @@ -611,6 +702,10 @@ class EventsStore(SQLBaseStore): if prev: ev.unsigned["prev_content"] = prev.get_dict()["content"] + self._get_event_cache.prefill( + ev.event_id, check_redacted, get_prev_content, ev + ) + defer.returnValue(ev) def _parse_events(self, rows): diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index b9afb3364d..260714ccc2 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -80,16 +80,16 @@ class Clock(object): def stop_looping_call(self, loop): loop.stop() - def call_later(self, delay, callback): + def call_later(self, delay, callback, *args, **kwargs): current_context = LoggingContext.current_context() - def wrapped_callback(): + def wrapped_callback(*args, **kwargs): with PreserveLoggingContext(): LoggingContext.thread_local.current_context = current_context - callback() + callback(*args, **kwargs) with PreserveLoggingContext(): - return reactor.callLater(delay, wrapped_callback) + return reactor.callLater(delay, wrapped_callback, *args, **kwargs) def cancel_call_later(self, timer): timer.cancel() -- cgit 1.4.1