From 53216a500d2e0d815581ada87379ecece05b05a5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 29 Oct 2014 17:02:22 +0000 Subject: Add a run_on_reactor function --- synapse/util/async.py | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'synapse/util') diff --git a/synapse/util/async.py b/synapse/util/async.py index 647ea6142c..bf578f8bfb 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -21,3 +21,10 @@ def sleep(seconds): d = defer.Deferred() reactor.callLater(seconds, d.callback, seconds) return d + + +def run_on_reactor(): + """ This will cause the rest of the function to be invoked upon the next + iteration of the main loop + """ + return sleep(0) \ No newline at end of file -- cgit 1.4.1 From b29517bd013b82302b1a73072da8bfc39564dc1a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 30 Oct 2014 01:21:33 +0000 Subject: Add a request-id to each log line --- setup.py | 2 +- synapse/app/homeserver.py | 12 +++++- synapse/config/logger.py | 23 ++++++++---- synapse/crypto/keyclient.py | 10 +++-- synapse/http/client.py | 26 +++++++------ synapse/http/server.py | 13 ++++++- synapse/storage/_base.py | 16 ++++++-- synapse/util/async.py | 5 ++- synapse/util/logcontext.py | 85 ++++++++++++++++++++++++++++++++++++++++++ synapse/util/logutils.py | 1 + tests/util/test_log_context.py | 43 +++++++++++++++++++++ 11 files changed, 205 insertions(+), 31 deletions(-) create mode 100644 synapse/util/logcontext.py create mode 100644 tests/util/test_log_context.py (limited to 'synapse/util') diff --git a/setup.py b/setup.py index 660efd5b89..74eee31a78 100755 --- a/setup.py +++ b/setup.py @@ -54,6 +54,6 @@ setup( long_description=read("README.rst"), entry_points=""" [console_scripts] - synapse-homeserver=synapse.app.homeserver:run + synapse-homeserver=synapse.app.homeserver:main """ ) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 6394bc27d1..4e74f4d14c 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -33,6 +33,7 @@ from synapse.api.urls import ( ) from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory +from synapse.util.logcontext import LoggingContext from daemonize import Daemonize import twisted.manhole.telnet @@ -240,7 +241,7 @@ def setup(): daemon = Daemonize( app="synapse-homeserver", pid=config.pid_file, - action=reactor.run, + action=run, auto_close_fds=False, verbose=True, logger=logger, @@ -250,6 +251,13 @@ def setup(): else: reactor.run() +def run(): + with LoggingContext("run") as context: + reactor.run() + +def main(): + with LoggingContext("main") as context: + setup() if __name__ == '__main__': - setup() + main() diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 56cd095433..2a59bf9d15 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -14,7 +14,7 @@ # limitations under the License. from ._base import Config - +from synapse.util.logcontext import LoggingContextFilter from twisted.python.log import PythonLoggingObserver import logging import logging.config @@ -45,7 +45,8 @@ class LoggingConfig(Config): def setup_logging(self): log_format = ( - '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s' + "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s" + " - %(message)s" ) if self.log_config is None: @@ -54,12 +55,20 @@ class LoggingConfig(Config): level = logging.DEBUG # FIXME: we need a logging.WARN for a -q quiet option + logger = logging.getLogger('') + logger.setLevel(level) + formatter = logging.Formatter(log_format) + if self.log_file: + handler = logging.FileHandler(self.log_file) + else: + handler = logging.StreamHandler() + print handler + handler.setFormatter(formatter) + + handler.addFilter(LoggingContextFilter(request="")) - logging.basicConfig( - level=level, - filename=self.log_file, - format=log_format - ) + logger.addHandler(handler) + logger.info("Test") else: logging.config.fileConfig(self.log_config) diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py index 7cfec5148e..33fa9ca837 100644 --- a/synapse/crypto/keyclient.py +++ b/synapse/crypto/keyclient.py @@ -18,6 +18,7 @@ from twisted.web.http import HTTPClient from twisted.internet.protocol import Factory from twisted.internet import defer, reactor from synapse.http.endpoint import matrix_endpoint +from synapse.util.logcontext import PreserveLoggingContext import json import logging @@ -36,10 +37,11 @@ def fetch_server_key(server_name, ssl_context_factory): for i in range(5): try: - protocol = yield endpoint.connect(factory) - server_response, server_certificate = yield protocol.remote_key - defer.returnValue((server_response, server_certificate)) - return + with PreserveLoggingContext(): + protocol = yield endpoint.connect(factory) + server_response, server_certificate = yield protocol.remote_key + defer.returnValue((server_response, server_certificate)) + return except Exception as e: logger.exception(e) raise IOError("Cannot get key for %s" % server_name) diff --git a/synapse/http/client.py b/synapse/http/client.py index 46c90dbb76..8bda42364b 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -16,11 +16,14 @@ from twisted.internet import defer, reactor from twisted.internet.error import DNSLookupError -from twisted.web.client import _AgentBase, _URI, readBody, FileBodyProducer, PartialDownloadError +from twisted.web.client import ( + _AgentBase, _URI, readBody, FileBodyProducer, PartialDownloadError +) from twisted.web.http_headers import Headers from synapse.http.endpoint import matrix_endpoint from synapse.util.async import sleep +from synapse.util.logcontext import PreserveLoggingContext from syutil.jsonutil import encode_canonical_json @@ -106,16 +109,17 @@ class BaseHttpClient(object): producer = body_callback(method, url_bytes, headers_dict) try: - response = yield self.agent.request( - destination, - endpoint, - method, - path_bytes, - param_bytes, - query_bytes, - Headers(headers_dict), - producer - ) + with PreserveLoggingContext(): + response = yield self.agent.request( + destination, + endpoint, + method, + path_bytes, + param_bytes, + query_bytes, + Headers(headers_dict), + producer + ) logger.debug("Got response to %s", method) break diff --git a/synapse/http/server.py b/synapse/http/server.py index 8d419c02dd..ed1f1170cb 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -20,6 +20,7 @@ from syutil.jsonutil import ( from synapse.api.errors import ( cs_exception, SynapseError, CodeMessageException ) +from synapse.util.logcontext import LoggingContext from twisted.internet import defer, reactor from twisted.web import server, resource @@ -88,9 +89,19 @@ class JsonResource(HttpServer, resource.Resource): def render(self, request): """ This get's called by twisted every time someone sends us a request. """ - self._async_render(request) + self._async_render_with_logging_context(request) return server.NOT_DONE_YET + _request_id = 0 + + @defer.inlineCallbacks + def _async_render_with_logging_context(self, request): + request_id = "%s-%s" % (request.method, JsonResource._request_id) + JsonResource._request_id += 1 + with LoggingContext(request_id) as request_context: + request_context.request = request_id + yield self._async_render(request) + @defer.inlineCallbacks def _async_render(self, request): """ This get's called by twisted every time someone sends us a request. diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 65a86e9056..2faa63904e 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -19,6 +19,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError from synapse.api.events.utils import prune_event from synapse.util.logutils import log_function +from synapse.util.logcontext import PreserveLoggingContext, LoggingContext import collections import copy @@ -74,12 +75,19 @@ class SQLBaseStore(object): self.event_factory = hs.get_event_factory() self._clock = hs.get_clock() + @defer.inlineCallbacks def runInteraction(self, func, *args, **kwargs): """Wraps the .runInteraction() method on the underlying db_pool.""" + current_context = LoggingContext.current_context() def inner_func(txn, *args, **kwargs): - return func(LoggingTransaction(txn), *args, **kwargs) - - return self._db_pool.runInteraction(inner_func, *args, **kwargs) + with LoggingContext("runInteraction") as context: + current_context.copy_to(context) + return func(LoggingTransaction(txn), *args, **kwargs) + with PreserveLoggingContext(): + result = yield self._db_pool.runInteraction( + inner_func, *args, **kwargs + ) + defer.returnValue(result) def cursor_to_dict(self, cursor): """Converts a SQL cursor into an list of dicts. @@ -146,7 +154,7 @@ class SQLBaseStore(object): ) logger.debug( - "[SQL] %s Args=%s Func=%s", + "[SQL] %s Args=%s", sql, values.values(), ) diff --git a/synapse/util/async.py b/synapse/util/async.py index 647ea6142c..3d3fbe182c 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -16,8 +16,11 @@ from twisted.internet import defer, reactor +from .logcontext import PreserveLoggingContext +@defer.inlineCallbacks def sleep(seconds): d = defer.Deferred() reactor.callLater(seconds, d.callback, seconds) - return d + with PreserveLoggingContext(): + yield d diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py new file mode 100644 index 0000000000..46a2855a15 --- /dev/null +++ b/synapse/util/logcontext.py @@ -0,0 +1,85 @@ +from functools import wraps + +import threading +import logging + +class LoggingContext(object): + __slots__ = ["parent_context", "name", "__dict__"] + + thread_local = threading.local() + + class Sentinel(object): + __slots__ = [] + def copy_to(self, record): + pass + + sentinel = Sentinel() + + def __init__(self, name=None): + self.parent_context = None + self.name = name + + def __str__(self): + return "%s@%x" % (self.name, id(self)) + + @classmethod + def current_context(cls): + return getattr(cls.thread_local, "current_context", cls.sentinel) + + def __enter__(self): + if self.parent_context is not None: + raise Exception("Attempt to enter logging context multiple times") + self.parent_context = self.current_context() + self.thread_local.current_context = self + return self + + def __exit__(self, type, value, traceback): + if self.thread_local.current_context is not self: + logging.error( + "Current logging context %s is not the expected context %s", + self.thread_local.current_context, + self + ) + self.thread_local.current_context = self.parent_context + self.parent_context = None + + def __getattr__(self, name): + return getattr(self.parent_context, name) + + def copy_to(self, record): + if self.parent_context is not None: + self.parent_context.copy_to(record) + for key, value in self.__dict__.items(): + setattr(record, key, value) + + @classmethod + def wrap_callback(cls, callback): + context = cls.current_context() + @wraps(callback) + def wrapped(*args, **kargs): + cls.thread_local.current_context = context + return callback(*args, **kargs) + return wrapped + + +class LoggingContextFilter(logging.Filter): + def __init__(self, **defaults): + self.defaults = defaults + + def filter(self, record): + context = LoggingContext.current_context() + for key, value in self.defaults.items(): + setattr(record, key, value) + context.copy_to(record) + return True + + +class PreserveLoggingContext(object): + __slots__ = ["current_context"] + def __enter__(self): + self.current_context = LoggingContext.current_context() + + def __exit__(self, type, value, traceback): + LoggingContext.thread_local.current_context = self.current_context + + diff --git a/synapse/util/logutils.py b/synapse/util/logutils.py index fadf0bd510..903a6cf1b3 100644 --- a/synapse/util/logutils.py +++ b/synapse/util/logutils.py @@ -75,6 +75,7 @@ def trace_function(f): linenum = f.func_code.co_firstlineno pathname = f.func_code.co_filename + @wraps(f) def wrapped(*args, **kwargs): name = f.__module__ logger = logging.getLogger(name) diff --git a/tests/util/test_log_context.py b/tests/util/test_log_context.py new file mode 100644 index 0000000000..efa0f28bad --- /dev/null +++ b/tests/util/test_log_context.py @@ -0,0 +1,43 @@ +from twisted.internet import defer +from twisted.internet import reactor +from .. import unittest + +from synapse.util.async import sleep +from synapse.util.logcontext import LoggingContext + +class LoggingContextTestCase(unittest.TestCase): + + def _check_test_key(self, value): + self.assertEquals( + LoggingContext.current_context().test_key, value + ) + + def test_with_context(self): + with LoggingContext() as context_one: + context_one.test_key = "test" + self._check_test_key("test") + + def test_chaining(self): + with LoggingContext() as context_one: + context_one.test_key = "one" + with LoggingContext() as context_two: + self._check_test_key("one") + context_two.test_key = "two" + self._check_test_key("two") + self._check_test_key("one") + + @defer.inlineCallbacks + def test_sleep(self): + @defer.inlineCallbacks + def competing_callback(): + with LoggingContext() as competing_context: + competing_context.test_key = "competing" + yield sleep(0) + self._check_test_key("competing") + + reactor.callLater(0, competing_callback) + + with LoggingContext() as context_one: + context_one.test_key = "one" + yield sleep(0) + self._check_test_key("one") -- cgit 1.4.1 From fa955cc2a4666c94c2c23dd2d8f8c89b8dd37e9d Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 30 Oct 2014 10:13:46 +0000 Subject: Pep8 and a few doc strings --- synapse/config/logger.py | 6 +++--- synapse/util/logcontext.py | 51 +++++++++++++++++++++++++++++++++------------- 2 files changed, 40 insertions(+), 17 deletions(-) (limited to 'synapse/util') diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 2a59bf9d15..8566296433 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -19,6 +19,7 @@ from twisted.python.log import PythonLoggingObserver import logging import logging.config + class LoggingConfig(Config): def __init__(self, args): super(LoggingConfig, self).__init__(args) @@ -52,9 +53,9 @@ class LoggingConfig(Config): level = logging.INFO if self.verbosity: - level = logging.DEBUG + level = logging.DEBUG - # FIXME: we need a logging.WARN for a -q quiet option + # FIXME: we need a logging.WARN for a -q quiet option logger = logging.getLogger('') logger.setLevel(level) formatter = logging.Formatter(log_format) @@ -62,7 +63,6 @@ class LoggingConfig(Config): handler = logging.FileHandler(self.log_file) else: handler = logging.StreamHandler() - print handler handler.setFormatter(formatter) handler.addFilter(LoggingContextFilter(request="")) diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 46a2855a15..13176b05ce 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -1,15 +1,23 @@ -from functools import wraps - import threading import logging + class LoggingContext(object): + """Additional context for log formatting. Contexts are scoped within a + "with" block. Contexts inherit the state of their parent contexts. + Args: + name (str): Name for the context for debugging. + """ + __slots__ = ["parent_context", "name", "__dict__"] thread_local = threading.local() class Sentinel(object): + """Sentinel to represent the root context""" + __slots__ = [] + def copy_to(self, record): pass @@ -20,13 +28,15 @@ class LoggingContext(object): self.name = name def __str__(self): - return "%s@%x" % (self.name, id(self)) + return "%s@%x" % (self.name, id(self)) @classmethod def current_context(cls): + """Get the current logging context from thread local storage""" return getattr(cls.thread_local, "current_context", cls.sentinel) def __enter__(self): + """Enters this logging context into thread local storage""" if self.parent_context is not None: raise Exception("Attempt to enter logging context multiple times") self.parent_context = self.current_context() @@ -34,6 +44,11 @@ class LoggingContext(object): return self def __exit__(self, type, value, traceback): + """Restore the logging context in thread local storage to the state it + was before this context was entered. + Returns: + None to avoid suppressing any exeptions that were thrown. + """ if self.thread_local.current_context is not self: logging.error( "Current logging context %s is not the expected context %s", @@ -44,29 +59,32 @@ class LoggingContext(object): self.parent_context = None def __getattr__(self, name): + """Delegate member lookup to parent context""" return getattr(self.parent_context, name) def copy_to(self, record): + """Copy fields from this context and its parents to the record""" if self.parent_context is not None: self.parent_context.copy_to(record) for key, value in self.__dict__.items(): setattr(record, key, value) - @classmethod - def wrap_callback(cls, callback): - context = cls.current_context() - @wraps(callback) - def wrapped(*args, **kargs): - cls.thread_local.current_context = context - return callback(*args, **kargs) - return wrapped - class LoggingContextFilter(logging.Filter): + """Logging filter that adds values from the current logging context to each + record. + Args: + **defaults: Default values to avoid formatters complaining about + missing fields + """ def __init__(self, **defaults): self.defaults = defaults def filter(self, record): + """Add each fields from the logging contexts to the record. + Returns: + True to include the record in the log output. + """ context = LoggingContext.current_context() for key, value in self.defaults.items(): setattr(record, key, value) @@ -75,11 +93,16 @@ class LoggingContextFilter(logging.Filter): class PreserveLoggingContext(object): + """Captures the current logging context and restores it when the scope is + exited. Used to restore the context after a function using + @defer.inlineCallbacks is resumed by a callback from the reactor.""" + __slots__ = ["current_context"] + def __enter__(self): + """Captures the current logging context""" self.current_context = LoggingContext.current_context() def __exit__(self, type, value, traceback): + """Restores the current logging context""" LoggingContext.thread_local.current_context = self.current_context - - -- cgit 1.4.1 From 4317c8e5835f0c15bf882f737d3e3c2a5b85f73f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 6 Nov 2014 15:10:55 +0000 Subject: Implement new replace_state and changed prev_state `prev_state` is now a list of previous state ids, similiar to prev_events. `replace_state` now points to what we think was replaced. --- synapse/api/events/__init__.py | 1 + synapse/handlers/directory.py | 5 +- synapse/handlers/federation.py | 4 +- synapse/handlers/message.py | 11 ++-- synapse/handlers/profile.py | 6 +-- synapse/handlers/room.py | 16 ++---- synapse/rest/room.py | 2 +- synapse/state.py | 39 ++------------ synapse/storage/__init__.py | 92 +++++++++++++++++++++++++--------- synapse/storage/_base.py | 66 +++++++++++++++++------- synapse/storage/event_federation.py | 64 ++++++++++++++++++++--- synapse/storage/schema/event_edges.sql | 40 ++++++++++----- synapse/util/jsonobject.py | 2 +- 13 files changed, 220 insertions(+), 128 deletions(-) (limited to 'synapse/util') diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py index 168b812311..fc3f350570 100644 --- a/synapse/api/events/__init__.py +++ b/synapse/api/events/__init__.py @@ -60,6 +60,7 @@ class SynapseEvent(JsonEncodedObject): "age_ts", "prev_content", "prev_state", + "replaces_state", "redacted_because", "origin_server_ts", ] diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 6e897e915d..164363cdc5 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -147,10 +147,7 @@ class DirectoryHandler(BaseHandler): content={"aliases": aliases}, ) - snapshot = yield self.store.snapshot_room( - room_id=room_id, - user_id=user_id, - ) + snapshot = yield self.store.snapshot_room(event) yield self._on_new_room_event( event, snapshot, extra_users=[user_id], suppress_auth=True diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 1464a60937..513ec9a5e3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -313,9 +313,7 @@ class FederationHandler(BaseHandler): state_key=user_id, ) - snapshot = yield self.store.snapshot_room( - event.room_id, event.user_id, - ) + snapshot = yield self.store.snapshot_room(event) snapshot.fill_out_prev_events(event) yield self.state_handler.annotate_state_groups(event) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c6f6ab14d1..8394013df3 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -81,7 +81,7 @@ class MessageHandler(BaseHandler): user = self.hs.parse_userid(event.user_id) assert user.is_mine, "User must be our own: %s" % (user,) - snapshot = yield self.store.snapshot_room(event.room_id, event.user_id) + snapshot = yield self.store.snapshot_room(event) yield self._on_new_room_event( event, snapshot, suppress_auth=suppress_auth @@ -141,12 +141,7 @@ class MessageHandler(BaseHandler): SynapseError if something went wrong. """ - snapshot = yield self.store.snapshot_room( - event.room_id, - event.user_id, - state_type=event.type, - state_key=event.state_key, - ) + snapshot = yield self.store.snapshot_room(event) yield self._on_new_room_event(event, snapshot) @@ -214,7 +209,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def send_feedback(self, event): - snapshot = yield self.store.snapshot_room(event.room_id, event.user_id) + snapshot = yield self.store.snapshot_room(event) # store message in db yield self._on_new_room_event(event, snapshot) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 4cd0a06093..e47814483a 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -17,7 +17,6 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError, CodeMessageException from synapse.api.constants import Membership -from synapse.api.events.room import RoomMemberEvent from ._base import BaseHandler @@ -196,10 +195,7 @@ class ProfileHandler(BaseHandler): ) for j in joins: - snapshot = yield self.store.snapshot_room( - j.room_id, j.state_key, RoomMemberEvent.TYPE, - j.state_key - ) + snapshot = yield self.store.snapshot_room(j) content = { "membership": j.content["membership"], diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index f176ad39bf..55c893eb58 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -122,10 +122,7 @@ class RoomCreationHandler(BaseHandler): @defer.inlineCallbacks def handle_event(event): - snapshot = yield self.store.snapshot_room( - room_id=room_id, - user_id=user_id, - ) + snapshot = yield self.store.snapshot_room(event) logger.debug("Event: %s", event) @@ -364,10 +361,8 @@ class RoomMemberHandler(BaseHandler): """ target_user_id = event.state_key - snapshot = yield self.store.snapshot_room( - event.room_id, event.user_id, - RoomMemberEvent.TYPE, target_user_id - ) + snapshot = yield self.store.snapshot_room(event) + ## TODO(markjh): get prev state from snapshot. prev_state = yield self.store.get_room_member( target_user_id, event.room_id @@ -442,10 +437,7 @@ class RoomMemberHandler(BaseHandler): content=content, ) - snapshot = yield self.store.snapshot_room( - room_id, joinee.to_string(), RoomMemberEvent.TYPE, - joinee.to_string() - ) + snapshot = yield self.store.snapshot_room(new_event) yield self._do_join(new_event, snapshot, room_host=host, do_auth=True) diff --git a/synapse/rest/room.py b/synapse/rest/room.py index ec0ce78fda..997895dab0 100644 --- a/synapse/rest/room.py +++ b/synapse/rest/room.py @@ -138,7 +138,7 @@ class RoomStateEventRestServlet(RestServlet): raise SynapseError( 404, "Event not found.", errcode=Codes.NOT_FOUND ) - defer.returnValue((200, data[0].get_dict()["content"])) + defer.returnValue((200, data.get_dict()["content"])) @defer.inlineCallbacks def on_PUT(self, request, room_id, event_type, state_key): diff --git a/synapse/state.py b/synapse/state.py index 32744e047c..97a8160a33 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -45,40 +45,6 @@ class StateHandler(object): self.server_name = hs.hostname self.hs = hs - @defer.inlineCallbacks - @log_function - def handle_new_event(self, event, snapshot): - """ Given an event this works out if a) we have sufficient power level - to update the state and b) works out what the prev_state should be. - - Returns: - Deferred: Resolved with a boolean indicating if we successfully - updated the state. - - Raised: - AuthError - """ - # This needs to be done in a transaction. - - if not hasattr(event, "state_key"): - return - - # Now I need to fill out the prev state and work out if it has auth - # (w.r.t. to power levels) - - snapshot.fill_out_prev_events(event) - yield self.annotate_state_groups(event) - - if event.old_state_events: - current_state = event.old_state_events.get( - (event.type, event.state_key) - ) - - if current_state: - event.prev_state = current_state.event_id - - defer.returnValue(True) - @defer.inlineCallbacks @log_function def annotate_state_groups(self, event, old_state=None): @@ -111,7 +77,10 @@ class StateHandler(object): event.old_state_events = copy.deepcopy(new_state) if hasattr(event, "state_key"): - new_state[(event.type, event.state_key)] = event + key = (event.type, event.state_key) + if key in new_state: + event.replaces_state = new_state[key].event_id + new_state[key] = event event.state_group = None event.state_events = new_state diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 6b8fed4502..2d62fc2ed0 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -242,8 +242,8 @@ class DataStore(RoomMemberStore, RoomStore, "state_key": event.state_key, } - if hasattr(event, "prev_state"): - vals["prev_state"] = event.prev_state + if hasattr(event, "replaces_state"): + vals["prev_state"] = event.replaces_state self._simple_insert_txn(txn, "state_events", vals) @@ -258,6 +258,40 @@ class DataStore(RoomMemberStore, RoomStore, } ) + for e_id, h in event.prev_state: + self._simple_insert_txn( + txn, + table="event_edges", + values={ + "event_id": event.event_id, + "prev_event_id": e_id, + "room_id": event.room_id, + "is_state": 1, + }, + or_ignore=True, + ) + + if not backfilled: + self._simple_insert_txn( + txn, + table="state_forward_extremities", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + } + ) + + for prev_state_id, _ in event.prev_state: + self._simple_delete_txn( + txn, + table="state_forward_extremities", + keyvalues={ + "event_id": prev_state_id, + } + ) + for hash_alg, hash_base64 in event.hashes.items(): hash_bytes = decode_base64(hash_base64) self._store_event_content_hash_txn( @@ -357,7 +391,7 @@ class DataStore(RoomMemberStore, RoomStore, ], ) - def snapshot_room(self, room_id, user_id, state_type=None, state_key=None): + def snapshot_room(self, event): """Snapshot the room for an update by a user Args: room_id (synapse.types.RoomId): The room to snapshot. @@ -368,16 +402,29 @@ class DataStore(RoomMemberStore, RoomStore, synapse.storage.Snapshot: A snapshot of the state of the room. """ def _snapshot(txn): - membership_state = self._get_room_member(txn, user_id, room_id) - prev_events = self._get_latest_events_in_room(txn, room_id) + prev_events = self._get_latest_events_in_room( + txn, + event.room_id + ) + + prev_state = None + state_key = None + if hasattr(event, "state_key"): + state_key = event.state_key + prev_state = self._get_latest_state_in_room( + txn, + event.room_id, + type=event.type, + state_key=state_key, + ) return Snapshot( store=self, - room_id=room_id, - user_id=user_id, + room_id=event.room_id, + user_id=event.user_id, prev_events=prev_events, - membership_state=membership_state, - state_type=state_type, + prev_state=prev_state, + state_type=event.type, state_key=state_key, ) @@ -400,30 +447,29 @@ class Snapshot(object): """ def __init__(self, store, room_id, user_id, prev_events, - membership_state, state_type=None, state_key=None, - prev_state_pdu=None): + prev_state, state_type=None, state_key=None): self.store = store self.room_id = room_id self.user_id = user_id self.prev_events = prev_events - self.membership_state = membership_state + self.prev_state = prev_state self.state_type = state_type self.state_key = state_key - self.prev_state_pdu = prev_state_pdu def fill_out_prev_events(self, event): - if hasattr(event, "prev_events"): - return + if not hasattr(event, "prev_events"): + event.prev_events = [ + (event_id, hashes) + for event_id, hashes, _ in self.prev_events + ] - event.prev_events = [ - (event_id, hashes) - for event_id, hashes, _ in self.prev_events - ] + if self.prev_events: + event.depth = max([int(v) for _, _, v in self.prev_events]) + 1 + else: + event.depth = 0 - if self.prev_events: - event.depth = max([int(v) for _, _, v in self.prev_events]) + 1 - else: - event.depth = 0 + if not hasattr(event, "prev_state") and self.prev_state is not None: + event.prev_state = self.prev_state def schema_path(schema): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 7d445b4633..7821fc4726 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -245,7 +245,6 @@ class SQLBaseStore(object): return [r[0] for r in txn.fetchall()] - def _simple_select_onecol(self, table, keyvalues, retcol): """Executes a SELECT query on the named table, which returns a list comprising of the values of the named column from the selected rows. @@ -273,17 +272,30 @@ class SQLBaseStore(object): keyvalues : dict of column names and values to select the rows with retcols : list of strings giving the names of the columns to return """ + return self.runInteraction( + "_simple_select_list", + self._simple_select_list_txn, + table, keyvalues, retcols + ) + + def _simple_select_list_txn(self, txn, table, keyvalues, retcols): + """Executes a SELECT query on the named table, which may return zero or + more rows, returning the result as a list of dicts. + + Args: + txn : Transaction object + table : string giving the table name + keyvalues : dict of column names and values to select the rows with + retcols : list of strings giving the names of the columns to return + """ sql = "SELECT %s FROM %s WHERE %s" % ( ", ".join(retcols), table, - " AND ".join("%s = ?" % (k) for k in keyvalues) + " AND ".join("%s = ?" % (k, ) for k in keyvalues) ) - def func(txn): - txn.execute(sql, keyvalues.values()) - return self.cursor_to_dict(txn) - - return self.runInteraction("_simple_select_list", func) + txn.execute(sql, keyvalues.values()) + return self.cursor_to_dict(txn) def _simple_update_one(self, table, keyvalues, updatevalues, retcols=None): @@ -417,6 +429,10 @@ class SQLBaseStore(object): d.pop("topological_ordering", None) d.pop("processed", None) d["origin_server_ts"] = d.pop("ts", 0) + replaces_state = d.pop("prev_state", None) + + if replaces_state: + d["replaces_state"] = replaces_state d.update(json.loads(row_dict["unrecognized_keys"])) d["content"] = json.loads(d["content"]) @@ -450,16 +466,32 @@ class SQLBaseStore(object): k: encode_base64(v) for k, v in signatures.items() } - ev.prev_events = self._get_prev_events(txn, ev.event_id) - - if hasattr(ev, "prev_state"): - # Load previous state_content. - # TODO: Should we be pulling this out above? - cursor = txn.execute(select_event_sql, (ev.prev_state,)) - prevs = self.cursor_to_dict(cursor) - if prevs: - prev = self._parse_event_from_row(prevs[0]) - ev.prev_content = prev.content + prevs = self._get_prev_events_and_state(txn, ev.event_id) + + ev.prev_events = [ + (e_id, h) + for e_id, h, is_state in prevs + if is_state == 0 + ] + + if hasattr(ev, "state_key"): + ev.prev_state = [ + (e_id, h) + for e_id, h, is_state in prevs + if is_state == 1 + ] + + if hasattr(ev, "replaces_state"): + # Load previous state_content. + # FIXME (erikj): Handle multiple prev_states. + cursor = txn.execute( + select_event_sql, + (ev.replaces_state,) + ) + prevs = self.cursor_to_dict(cursor) + if prevs: + prev = self._parse_event_from_row(prevs[0]) + ev.prev_content = prev.content if not hasattr(ev, "redacted"): logger.debug("Doesn't have redacted key: %s", ev) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index f427aba879..180a764134 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -69,19 +69,21 @@ class EventFederationStore(SQLBaseStore): return results - def _get_prev_events(self, txn, event_id): - prev_ids = self._simple_select_onecol_txn( + def _get_latest_state_in_room(self, txn, room_id, type, state_key): + event_ids = self._simple_select_onecol_txn( txn, - table="event_edges", + table="state_forward_extremities", keyvalues={ - "event_id": event_id, + "room_id": room_id, + "type": type, + "state_key": state_key, }, - retcol="prev_event_id", + retcol="event_id", ) results = [] - for prev_event_id in prev_ids: - hashes = self._get_event_reference_hashes_txn(txn, prev_event_id) + for event_id in event_ids: + hashes = self._get_event_reference_hashes_txn(txn, event_id) prev_hashes = { k: encode_base64(v) for k, v in hashes.items() if k == "sha256" @@ -90,6 +92,53 @@ class EventFederationStore(SQLBaseStore): return results + def _get_prev_events(self, txn, event_id): + results = self._get_prev_events_and_state( + txn, + event_id, + is_state=0, + ) + + return [(e_id, h, ) for e_id, h, _ in results] + + def _get_prev_state(self, txn, event_id): + results = self._get_prev_events_and_state( + txn, + event_id, + is_state=1, + ) + + return [(e_id, h, ) for e_id, h, _ in results] + + def _get_prev_events_and_state(self, txn, event_id, is_state=None): + keyvalues = { + "event_id": event_id, + } + + if is_state is not None: + keyvalues["is_state"] = is_state + + res = self._simple_select_list_txn( + txn, + table="event_edges", + keyvalues=keyvalues, + retcols=["prev_event_id", "is_state"], + ) + + results = [] + for d in res: + hashes = self._get_event_reference_hashes_txn( + txn, + d["prev_event_id"] + ) + prev_hashes = { + k: encode_base64(v) for k, v in hashes.items() + if k == "sha256" + } + results.append((d["prev_event_id"], prev_hashes, d["is_state"])) + + return results + def get_min_depth(self, room_id): return self.runInteraction( "get_min_depth", @@ -135,6 +184,7 @@ class EventFederationStore(SQLBaseStore): "event_id": event_id, "prev_event_id": e_id, "room_id": room_id, + "is_state": 0, }, or_ignore=True, ) diff --git a/synapse/storage/schema/event_edges.sql b/synapse/storage/schema/event_edges.sql index e5f768c705..51695826a8 100644 --- a/synapse/storage/schema/event_edges.sql +++ b/synapse/storage/schema/event_edges.sql @@ -1,7 +1,7 @@ CREATE TABLE IF NOT EXISTS event_forward_extremities( - event_id TEXT, - room_id TEXT, + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE ); @@ -10,8 +10,8 @@ CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id); CREATE TABLE IF NOT EXISTS event_backward_extremities( - event_id TEXT, - room_id TEXT, + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE ); @@ -20,10 +20,11 @@ CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id CREATE TABLE IF NOT EXISTS event_edges( - event_id TEXT, - prev_event_id TEXT, - room_id TEXT, - CONSTRAINT uniqueness UNIQUE (event_id, prev_event_id, room_id) + event_id TEXT NOT NULL, + prev_event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + is_state INTEGER NOT NULL, + CONSTRAINT uniqueness UNIQUE (event_id, prev_event_id, room_id, is_state) ); CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id); @@ -31,8 +32,8 @@ CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id); CREATE TABLE IF NOT EXISTS room_depth( - room_id TEXT, - min_depth INTEGER, + room_id TEXT NOT NULL, + min_depth INTEGER NOT NULL, CONSTRAINT uniqueness UNIQUE (room_id) ); @@ -40,10 +41,25 @@ CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id); create TABLE IF NOT EXISTS event_destinations( - event_id TEXT, - destination TEXT, + event_id TEXT NOT NULL, + destination TEXT NOT NULL, delivered_ts INTEGER DEFAULT 0, -- or 0 if not delivered CONSTRAINT uniqueness UNIQUE (event_id, destination) ON CONFLICT REPLACE ); CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id); + + +CREATE TABLE IF NOT EXISTS state_forward_extremities( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + type TEXT NOT NULL, + state_key TEXT NOT NULL, + CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE +); + +CREATE INDEX IF NOT EXISTS st_extrem_keys ON state_forward_extremities( + room_id, type, state_key +); +CREATE INDEX IF NOT EXISTS st_extrem_id ON state_forward_extremities(event_id); + diff --git a/synapse/util/jsonobject.py b/synapse/util/jsonobject.py index c91eb897a8..e79b68f661 100644 --- a/synapse/util/jsonobject.py +++ b/synapse/util/jsonobject.py @@ -80,7 +80,7 @@ class JsonEncodedObject(object): def get_full_dict(self): d = { - k: v for (k, v) in self.__dict__.items() + k: _encode(v) for (k, v) in self.__dict__.items() if k in self.valid_keys or k in self.internal_keys } d.update(self.unrecognized_keys) -- cgit 1.4.1 From 97c7c34f6f71238dcb364374733c1f2200d6b982 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 19 Nov 2014 16:37:43 +0000 Subject: Preserve logging context in a few more places, drop the logging context after it has been stashed to reduce potential for confusion --- synapse/handlers/events.py | 8 +++++--- synapse/notifier.py | 40 ++++++++++++++++++++++------------------ synapse/util/__init__.py | 7 ++++++- synapse/util/logcontext.py | 4 ++++ 4 files changed, 37 insertions(+), 22 deletions(-) (limited to 'synapse/util') diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 93dcd40324..4993c92b74 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -15,6 +15,7 @@ from twisted.internet import defer +from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function from ._base import BaseHandler @@ -66,9 +67,10 @@ class EventStreamHandler(BaseHandler): rm_handler = self.hs.get_handlers().room_member_handler room_ids = yield rm_handler.get_rooms_for_user(auth_user) - events, tokens = yield self.notifier.get_events_for( - auth_user, room_ids, pagin_config, timeout - ) + with PreserveLoggingContext(): + events, tokens = yield self.notifier.get_events_for( + auth_user, room_ids, pagin_config, timeout + ) chunks = [self.hs.serialize_event(e) for e in events] diff --git a/synapse/notifier.py b/synapse/notifier.py index f38c410e33..022d60a3b5 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -16,6 +16,7 @@ from twisted.internet import defer, reactor from synapse.util.logutils import log_function +from synapse.util.logcontext import PreserveLoggingContext import logging @@ -79,6 +80,8 @@ class Notifier(object): self.event_sources = hs.get_event_sources() + self.clock = hs.get_clock() + hs.get_distributor().observe( "user_joined_room", self._user_joined_room ) @@ -127,9 +130,10 @@ class Notifier(object): def eb(failure): logger.exception("Failed to notify listener", failure) - yield defer.DeferredList( - [notify(l).addErrback(eb) for l in listeners] - ) + with PreserveLoggingContext(): + yield defer.DeferredList( + [notify(l).addErrback(eb) for l in listeners] + ) @defer.inlineCallbacks @log_function @@ -175,9 +179,10 @@ class Notifier(object): failure.getTracebackObject()) ) - yield defer.DeferredList( - [notify(l).addErrback(eb) for l in listeners] - ) + with PreserveLoggingContext(): + yield defer.DeferredList( + [notify(l).addErrback(eb) for l in listeners] + ) def get_events_for(self, user, rooms, pagination_config, timeout): """ For the given user and rooms, return any new events for them. If @@ -206,29 +211,28 @@ class Notifier(object): timeout, deferred, ) + def _timeout_listener(): + # TODO (erikj): We should probably set to_token to the current + # max rather than reusing from_token. + listener.notify( + self, + [], + listener.from_token, + listener.from_token, + ) if timeout: - reactor.callLater(timeout/1000.0, self._timeout_listener, listener) + self.clock.call_later(timeout/1000.0, _timeout_listener) self._register_with_keys(listener) yield self._check_for_updates(listener) if not timeout: - self._timeout_listener(listener) + _timeout_listener() return - def _timeout_listener(self, listener): - # TODO (erikj): We should probably set to_token to the current max - # rather than reusing from_token. - listener.notify( - self, - [], - listener.from_token, - listener.from_token, - ) - @log_function def _register_with_keys(self, listener): for room in listener.rooms: diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index c9a73b0413..9ad613b8f1 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.util.logcontext import LoggingContext from twisted.internet import reactor @@ -35,7 +36,11 @@ class Clock(object): return self.time() * 1000 def call_later(self, delay, callback): - return reactor.callLater(delay, callback) + current_context = LoggingContext.current_context() + def wrapped_callback(): + current_context.thread_local.current_context = current_context + callback() + return reactor.callLater(delay, wrapped_callback) def cancel_call_later(self, timer): timer.cancel() diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 13176b05ce..2f430a0f19 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -18,6 +18,9 @@ class LoggingContext(object): __slots__ = [] + def __str__(self): + return "sentinel" + def copy_to(self, record): pass @@ -102,6 +105,7 @@ class PreserveLoggingContext(object): def __enter__(self): """Captures the current logging context""" self.current_context = LoggingContext.current_context() + LoggingContext.thread_local.current_context = LoggingContext.sentinel def __exit__(self, type, value, traceback): """Restores the current logging context""" -- cgit 1.4.1 From ca91bb2f7f22604ba582b149759cc9d7c6ae18c0 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 19 Nov 2014 17:18:55 +0000 Subject: Sometimes there isn't a current logging context --- synapse/util/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 9ad613b8f1..e57fb0e914 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -38,7 +38,7 @@ class Clock(object): def call_later(self, delay, callback): current_context = LoggingContext.current_context() def wrapped_callback(): - current_context.thread_local.current_context = current_context + LoggingContext.thread_local.current_context = current_context callback() return reactor.callLater(delay, wrapped_callback) -- cgit 1.4.1