diff options
-rw-r--r-- | synapse/app/appservice.py | 8 | ||||
-rw-r--r-- | synapse/app/client_reader.py | 8 | ||||
-rw-r--r-- | synapse/app/federation_reader.py | 8 | ||||
-rw-r--r-- | synapse/app/federation_sender.py | 8 | ||||
-rwxr-xr-x | synapse/app/homeserver.py | 9 | ||||
-rw-r--r-- | synapse/app/media_repository.py | 8 | ||||
-rw-r--r-- | synapse/app/pusher.py | 9 | ||||
-rw-r--r-- | synapse/app/synchrotron.py | 9 | ||||
-rw-r--r-- | synapse/events/snapshot.py | 26 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 15 | ||||
-rw-r--r-- | synapse/state.py | 11 | ||||
-rw-r--r-- | synapse/storage/background_updates.py | 15 | ||||
-rw-r--r-- | synapse/storage/event_federation.py | 24 | ||||
-rw-r--r-- | synapse/storage/events.py | 281 | ||||
-rw-r--r-- | synapse/storage/state.py | 10 | ||||
-rw-r--r-- | synapse/util/logcontext.py | 61 | ||||
-rw-r--r-- | tests/util/test_log_context.py | 61 |
17 files changed, 425 insertions, 146 deletions
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 83ee3e3ce3..a6f1e7594e 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -29,7 +29,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.storage.engines import create_engine from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -187,7 +187,11 @@ def start(config_options): ps.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 7ed0de4117..a821a6ce62 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -35,7 +35,7 @@ from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -193,7 +193,11 @@ def start(config_options): ss.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index ca742de6b2..e52b0f240d 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -31,7 +31,7 @@ from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -184,7 +184,11 @@ def start(config_options): ss.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 0cf5b196e6..76c4cc54d1 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -35,7 +35,7 @@ from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -193,7 +193,11 @@ def start(config_options): ps.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 0b9d78c13c..2cdd2d39ff 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -52,7 +52,7 @@ from synapse.api.urls import ( ) from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.metrics import register_memory_metrics, get_metrics_for from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX @@ -456,7 +456,12 @@ def run(hs): def in_thread(): # Uncomment to enable tracing of log context changes. # sys.settrace(logcontext_tracer) - with LoggingContext("run"): + + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): change_resource_limit(hs.config.soft_file_limit) if hs.config.gc_thresholds: gc.set_threshold(*hs.config.gc_thresholds) diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index c5579d9e38..3c7984a237 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -32,7 +32,7 @@ from synapse.storage.engines import create_engine from synapse.storage.media_repository import MediaRepositoryStore from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -190,7 +190,11 @@ def start(config_options): ss.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index b025db54d4..ab682e52ec 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -31,7 +31,8 @@ from synapse.storage.engines import create_engine from synapse.storage import DataStore from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, preserve_fn, \ + PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -275,7 +276,11 @@ def start(config_options): ps.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 449fac771b..a68bb873fe 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -48,7 +48,8 @@ from synapse.storage.presence import PresenceStore, UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, preserve_fn, \ + PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.stringutils import random_string @@ -496,7 +497,11 @@ def start(config_options): ss.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 11605b34a3..6be18880b9 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -15,6 +15,32 @@ class EventContext(object): + """ + Attributes: + current_state_ids (dict[(str, str), str]): + The current state map including the current event. + (type, state_key) -> event_id + + prev_state_ids (dict[(str, str), str]): + The current state map excluding the current event. + (type, state_key) -> event_id + + state_group (int): state group id + rejected (bool|str): A rejection reason if the event was rejected, else + False + + push_actions (list[(str, list[object])]): list of (user_id, actions) + tuples + + prev_group (int): Previously persisted state group. ``None`` for an + outlier. + delta_ids (dict[(str, str), str]): Delta from ``prev_group``. + (type, state_key) -> event_id. ``None`` for an outlier. + + prev_state_events (?): XXX: is this ever set to anything other than + the empty list? + """ + __slots__ = [ "current_state_ids", "prev_state_ids", diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0cd5501b05..888dd01240 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -933,8 +933,9 @@ class FederationHandler(BaseHandler): # lots of requests for missing prev_events which we do actually # have. Hence we fire off the deferred, but don't wait for it. - synapse.util.logcontext.reset_context_after_deferred( - self._handle_queued_pdus(room_queue)) + synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)( + room_queue + ) defer.returnValue(True) @@ -1537,7 +1538,17 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _prep_event(self, origin, event, state=None, auth_events=None): + """ + + Args: + origin: + event: + state: + auth_events: + Returns: + Deferred, which resolves to synapse.events.snapshot.EventContext + """ context = yield self.state_handler.compute_event_context( event, old_state=state, ) diff --git a/synapse/state.py b/synapse/state.py index 383d32b163..9a523a1b89 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -177,17 +177,12 @@ class StateHandler(object): @defer.inlineCallbacks def compute_event_context(self, event, old_state=None): - """ Fills out the context with the `current state` of the graph. The - `current state` here is defined to be the state of the event graph - just before the event - i.e. it never includes `event` - - If `event` has `auth_events` then this will also fill out the - `auth_events` field on `context` from the `current_state`. + """Build an EventContext structure for the event. Args: - event (EventBase) + event (synapse.events.EventBase): Returns: - an EventContext + synapse.events.snapshot.EventContext: """ context = EventContext() diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 94b2bcc54a..813ad59e56 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import synapse.util.async from ._base import SQLBaseStore from . import engines @@ -84,24 +85,14 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_performance = {} self._background_update_queue = [] self._background_update_handlers = {} - self._background_update_timer = None @defer.inlineCallbacks def start_doing_background_updates(self): - assert self._background_update_timer is None, \ - "background updates already running" - logger.info("Starting background schema updates") while True: - sleep = defer.Deferred() - self._background_update_timer = self._clock.call_later( - self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None - ) - try: - yield sleep - finally: - self._background_update_timer = None + yield synapse.util.async.sleep( + self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.) try: result = yield self.do_next_background_update( diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 256e50dc20..0d97de2fe7 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -201,19 +201,19 @@ class EventFederationStore(SQLBaseStore): def _update_min_depth_for_room_txn(self, txn, room_id, depth): min_depth = self._get_min_depth_interaction(txn, room_id) - do_insert = depth < min_depth if min_depth else True + if min_depth and depth >= min_depth: + return - if do_insert: - self._simple_upsert_txn( - txn, - table="room_depth", - keyvalues={ - "room_id": room_id, - }, - values={ - "min_depth": depth, - }, - ) + self._simple_upsert_txn( + txn, + table="room_depth", + keyvalues={ + "room_id": room_id, + }, + values={ + "min_depth": depth, + }, + ) def _handle_mult_prev_events(self, txn, events): """ diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 72319c35ae..4d3cfc336f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -34,14 +34,16 @@ from canonicaljson import encode_canonical_json from collections import deque, namedtuple, OrderedDict from functools import wraps -import synapse import synapse.metrics - import logging import math import ujson as json +# these are only included to make the type annotations work +from synapse.events import EventBase # noqa: F401 +from synapse.events.snapshot import EventContext # noqa: F401 + logger = logging.getLogger(__name__) @@ -82,6 +84,11 @@ class _EventPeristenceQueue(object): def add_to_queue(self, room_id, events_and_contexts, backfilled): """Add events to the queue, with the given persist_event options. + + Args: + room_id (str): + events_and_contexts (list[(EventBase, EventContext)]): + backfilled (bool): """ queue = self._event_persist_queues.setdefault(room_id, deque()) if queue: @@ -227,6 +234,17 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks @log_function def persist_event(self, event, context, backfilled=False): + """ + + Args: + event (EventBase): + context (EventContext): + backfilled (bool): + + Returns: + Deferred: resolves to (int, int): the stream ordering of ``event``, + and the stream ordering of the latest persisted event + """ deferred = self._event_persist_queue.add_to_queue( event.room_id, [(event, context)], backfilled=backfilled, @@ -253,6 +271,16 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def _persist_events(self, events_and_contexts, backfilled=False, delete_existing=False): + """Persist events to db + + Args: + events_and_contexts (list[(EventBase, EventContext)]): + backfilled (bool): + delete_existing (bool): + + Returns: + Deferred: resolves when the events have been persisted + """ if not events_and_contexts: return @@ -554,11 +582,91 @@ class EventsStore(SQLBaseStore): and the rejections table. Things reading from those table will need to check whether the event was rejected. - If delete_existing is True then existing events will be purged from the - database before insertion. This is useful when retrying due to IntegrityError. + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + events_and_contexts (list[(EventBase, EventContext)]): + events to persist + backfilled (bool): True if the events were backfilled + delete_existing (bool): True to purge existing table rows for the + events from the database. This is useful when retrying due to + IntegrityError. + current_state_for_room (dict[str, (list[str], list[str])]): + The current-state delta for each room. For each room, a tuple + (to_delete, to_insert), being a list of event ids to be removed + from the current state, and a list of event ids to be added to + the current state. + new_forward_extremeties (dict[str, list[str]]): + The new forward extremities for each room. For each room, a + list of the event ids which are the forward extremities. + """ + self._update_current_state_txn(txn, current_state_for_room) + max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering - for room_id, current_state_tuple in current_state_for_room.iteritems(): + self._update_forward_extremities_txn( + txn, + new_forward_extremities=new_forward_extremeties, + max_stream_order=max_stream_order, + ) + + # Ensure that we don't have the same event twice. + events_and_contexts = self._filter_events_and_contexts_for_duplicates( + events_and_contexts, + ) + + self._update_room_depths_txn( + txn, + events_and_contexts=events_and_contexts, + backfilled=backfilled, + ) + + # _update_outliers_txn filters out any events which have already been + # persisted, and returns the filtered list. + events_and_contexts = self._update_outliers_txn( + txn, + events_and_contexts=events_and_contexts, + ) + + # From this point onwards the events are only events that we haven't + # seen before. + + if delete_existing: + # For paranoia reasons, we go and delete all the existing entries + # for these events so we can reinsert them. + # This gets around any problems with some tables already having + # entries. + self._delete_existing_rows_txn( + txn, + events_and_contexts=events_and_contexts, + ) + + self._store_event_txn( + txn, + events_and_contexts=events_and_contexts, + ) + + # Insert into the state_groups, state_groups_state, and + # event_to_state_groups tables. + self._store_mult_state_groups_txn(txn, events_and_contexts) + + # _store_rejected_events_txn filters out any events which were + # rejected, and returns the filtered list. + events_and_contexts = self._store_rejected_events_txn( + txn, + events_and_contexts=events_and_contexts, + ) + + # From this point onwards the events are only ones that weren't + # rejected. + + self._update_metadata_tables_txn( + txn, + events_and_contexts=events_and_contexts, + backfilled=backfilled, + ) + + def _update_current_state_txn(self, txn, state_delta_by_room): + for room_id, current_state_tuple in state_delta_by_room.iteritems(): to_delete, to_insert = current_state_tuple txn.executemany( "DELETE FROM current_state_events WHERE event_id = ?", @@ -608,7 +716,9 @@ class EventsStore(SQLBaseStore): txn, self.get_current_state_ids, (room_id,) ) - for room_id, new_extrem in new_forward_extremeties.items(): + def _update_forward_extremities_txn(self, txn, new_forward_extremities, + max_stream_order): + for room_id, new_extrem in new_forward_extremities.items(): self._simple_delete_txn( txn, table="event_forward_extremities", @@ -626,7 +736,7 @@ class EventsStore(SQLBaseStore): "event_id": ev_id, "room_id": room_id, } - for room_id, new_extrem in new_forward_extremeties.items() + for room_id, new_extrem in new_forward_extremities.items() for ev_id in new_extrem ], ) @@ -643,13 +753,22 @@ class EventsStore(SQLBaseStore): "event_id": event_id, "stream_ordering": max_stream_order, } - for room_id, new_extrem in new_forward_extremeties.items() + for room_id, new_extrem in new_forward_extremities.items() for event_id in new_extrem ] ) - # Ensure that we don't have the same event twice. - # Pick the earliest non-outlier if there is one, else the earliest one. + @classmethod + def _filter_events_and_contexts_for_duplicates(cls, events_and_contexts): + """Ensure that we don't have the same event twice. + + Pick the earliest non-outlier if there is one, else the earliest one. + + Args: + events_and_contexts (list[(EventBase, EventContext)]): + Returns: + list[(EventBase, EventContext)]: filtered list + """ new_events_and_contexts = OrderedDict() for event, context in events_and_contexts: prev_event_context = new_events_and_contexts.get(event.event_id) @@ -662,9 +781,17 @@ class EventsStore(SQLBaseStore): new_events_and_contexts[event.event_id] = (event, context) else: new_events_and_contexts[event.event_id] = (event, context) + return new_events_and_contexts.values() - events_and_contexts = new_events_and_contexts.values() + def _update_room_depths_txn(self, txn, events_and_contexts, backfilled): + """Update min_depth for each room + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + events_and_contexts (list[(EventBase, EventContext)]): events + we are persisting + backfilled (bool): True if the events were backfilled + """ depth_updates = {} for event, context in events_and_contexts: # Remove the any existing cache entries for the event_ids @@ -683,6 +810,21 @@ class EventsStore(SQLBaseStore): for room_id, depth in depth_updates.items(): self._update_min_depth_for_room_txn(txn, room_id, depth) + def _update_outliers_txn(self, txn, events_and_contexts): + """Update any outliers with new event info. + + This turns outliers into ex-outliers (unless the new event was + rejected). + + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + events_and_contexts (list[(EventBase, EventContext)]): events + we are persisting + + Returns: + list[(EventBase, EventContext)] new list, without events which + are already in the events table. + """ txn.execute( "SELECT event_id, outlier FROM events WHERE event_id in (%s)" % ( ",".join(["?"] * len(events_and_contexts)), @@ -697,19 +839,16 @@ class EventsStore(SQLBaseStore): to_remove = set() for event, context in events_and_contexts: - if context.rejected: - # If the event is rejected then we don't care if the event - # was an outlier or not. - if event.event_id in have_persisted: - # If we have already seen the event then ignore it. - to_remove.add(event) - continue - if event.event_id not in have_persisted: continue to_remove.add(event) + if context.rejected: + # If the event is rejected then we don't care if the event + # was an outlier or not. + continue + outlier_persisted = have_persisted[event.event_id] if not event.internal_metadata.is_outlier() and outlier_persisted: # We received a copy of an event that we had already stored as @@ -764,37 +903,19 @@ class EventsStore(SQLBaseStore): # event isn't an outlier any more. self._update_backward_extremeties(txn, [event]) - events_and_contexts = [ + return [ ec for ec in events_and_contexts if ec[0] not in to_remove ] + @classmethod + def _delete_existing_rows_txn(cls, txn, events_and_contexts): if not events_and_contexts: - # Make sure we don't pass an empty list to functions that expect to - # be storing at least one element. + # nothing to do here return - # From this point onwards the events are only events that we haven't - # seen before. + logger.info("Deleting existing") - def event_dict(event): - return { - k: v - for k, v in event.get_dict().items() - if k not in [ - "redacted", - "redacted_because", - ] - } - - if delete_existing: - # For paranoia reasons, we go and delete all the existing entries - # for these events so we can reinsert them. - # This gets around any problems with some tables already having - # entries. - - logger.info("Deleting existing") - - for table in ( + for table in ( "events", "event_auth", "event_json", @@ -817,11 +938,34 @@ class EventsStore(SQLBaseStore): "redactions", "room_memberships", "topics" - ): - txn.executemany( - "DELETE FROM %s WHERE event_id = ?" % (table,), - [(ev.event_id,) for ev, _ in events_and_contexts] - ) + ): + txn.executemany( + "DELETE FROM %s WHERE event_id = ?" % (table,), + [(ev.event_id,) for ev, _ in events_and_contexts] + ) + + def _store_event_txn(self, txn, events_and_contexts): + """Insert new events into the event and event_json tables + + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + events_and_contexts (list[(EventBase, EventContext)]): events + we are persisting + """ + + if not events_and_contexts: + # nothing to do here + return + + def event_dict(event): + return { + k: v + for k, v in event.get_dict().items() + if k not in [ + "redacted", + "redacted_because", + ] + } self._simple_insert_many_txn( txn, @@ -865,6 +1009,19 @@ class EventsStore(SQLBaseStore): ], ) + def _store_rejected_events_txn(self, txn, events_and_contexts): + """Add rows to the 'rejections' table for received events which were + rejected + + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + events_and_contexts (list[(EventBase, EventContext)]): events + we are persisting + + Returns: + list[(EventBase, EventContext)] new list, without the rejected + events. + """ # Remove the rejected events from the list now that we've added them # to the events table and the events_json table. to_remove = set() @@ -876,17 +1033,24 @@ class EventsStore(SQLBaseStore): ) to_remove.add(event) - events_and_contexts = [ + return [ ec for ec in events_and_contexts if ec[0] not in to_remove ] + def _update_metadata_tables_txn(self, txn, events_and_contexts, backfilled): + """Update all the miscellaneous tables for new events + + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + events_and_contexts (list[(EventBase, EventContext)]): events + we are persisting + backfilled (bool): True if the events were backfilled + """ + if not events_and_contexts: - # Make sure we don't pass an empty list to functions that expect to - # be storing at least one element. + # nothing to do here return - # From this point onwards the events are only ones that weren't rejected. - for event, context in events_and_contexts: # Insert all the push actions into the event_push_actions table. if context.push_actions: @@ -915,10 +1079,6 @@ class EventsStore(SQLBaseStore): ], ) - # Insert into the state_groups, state_groups_state, and - # event_to_state_groups tables. - self._store_mult_state_groups_txn(txn, events_and_contexts) - # Update the event_forward_extremities, event_backward_extremities and # event_edges tables. self._handle_mult_prev_events( @@ -1005,13 +1165,6 @@ class EventsStore(SQLBaseStore): # Prefill the event cache self._add_to_cache(txn, events_and_contexts) - if backfilled: - # Backfilled events come before the current state so we don't need - # to update the current state table - return - - return - def _add_to_cache(self, txn, events_and_contexts): to_prefill = [] diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 27f1ec89ec..1b42bea07a 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -136,6 +136,16 @@ class StateStore(SQLBaseStore): continue if context.current_state_ids is None: + # AFAIK, this can never happen + logger.error( + "Non-outlier event %s had current_state_ids==None", + event.event_id) + continue + + # if the event was rejected, just give it the same state as its + # predecessor. + if context.rejected: + state_groups[event.event_id] = context.prev_group continue state_groups[event.event_id] = context.state_group diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 1135a683af..ff67b1d794 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -318,47 +318,44 @@ def preserve_context_over_deferred(deferred, context=None): return d -def reset_context_after_deferred(deferred): - """If the deferred is incomplete, add a callback which will reset the - context. - - This is useful when you want to fire off a deferred, but don't want to - wait for it to complete. (The deferred will restore the current log context - when it completes, so if you don't do anything, it will leak log context.) - - (If this feels asymmetric, consider it this way: we are effectively forking - a new thread of execution. We are probably currently within a - ``with LoggingContext()`` block, which is supposed to have a single entry - and exit point. But by spawning off another deferred, we are effectively - adding a new exit point.) +def preserve_fn(f): + """Wraps a function, to ensure that the current context is restored after + return from the function, and that the sentinel context is set once the + deferred returned by the funtion completes. - Args: - deferred (defer.Deferred): deferred + Useful for wrapping functions that return a deferred which you don't yield + on. """ def reset_context(result): LoggingContext.set_current_context(LoggingContext.sentinel) return result - if not deferred.called: - deferred.addBoth(reset_context) - - -def preserve_fn(f): - """Ensures that function is called with correct context and that context is - restored after return. Useful for wrapping functions that return a deferred - which you don't yield on. - """ + # XXX: why is this here rather than inside g? surely we want to preserve + # the context from the time the function was called, not when it was + # wrapped? current = LoggingContext.current_context() def g(*args, **kwargs): - with PreserveLoggingContext(current): - res = f(*args, **kwargs) - if isinstance(res, defer.Deferred): - return preserve_context_over_deferred( - res, context=LoggingContext.sentinel - ) - else: - return res + res = f(*args, **kwargs) + if isinstance(res, defer.Deferred) and not res.called: + # The function will have reset the context before returning, so + # we need to restore it now. + LoggingContext.set_current_context(current) + + # The original context will be restored when the deferred + # completes, but there is nothing waiting for it, so it will + # get leaked into the reactor or some other function which + # wasn't expecting it. We therefore need to reset the context + # here. + # + # (If this feels asymmetric, consider it this way: we are + # effectively forking a new thread of execution. We are + # probably currently within a ``with LoggingContext()`` block, + # which is supposed to have a single entry and exit point. But + # by spawning off another deferred, we are effectively + # adding a new exit point.) + res.addBoth(reset_context) + return res return g diff --git a/tests/util/test_log_context.py b/tests/util/test_log_context.py index 65a330a0e9..9ffe209c4d 100644 --- a/tests/util/test_log_context.py +++ b/tests/util/test_log_context.py @@ -1,8 +1,10 @@ +import twisted.python.failure from twisted.internet import defer from twisted.internet import reactor from .. import unittest from synapse.util.async import sleep +from synapse.util import logcontext from synapse.util.logcontext import LoggingContext @@ -33,3 +35,62 @@ class LoggingContextTestCase(unittest.TestCase): context_one.test_key = "one" yield sleep(0) self._check_test_key("one") + + def _test_preserve_fn(self, function): + sentinel_context = LoggingContext.current_context() + + callback_completed = [False] + + @defer.inlineCallbacks + def cb(): + context_one.test_key = "one" + yield function() + self._check_test_key("one") + + callback_completed[0] = True + + with LoggingContext() as context_one: + context_one.test_key = "one" + + # fire off function, but don't wait on it. + logcontext.preserve_fn(cb)() + + self._check_test_key("one") + + # now wait for the function under test to have run, and check that + # the logcontext is left in a sane state. + d2 = defer.Deferred() + + def check_logcontext(): + if not callback_completed[0]: + reactor.callLater(0.01, check_logcontext) + return + + # make sure that the context was reset before it got thrown back + # into the reactor + try: + self.assertIs(LoggingContext.current_context(), + sentinel_context) + d2.callback(None) + except BaseException: + d2.errback(twisted.python.failure.Failure()) + + reactor.callLater(0.01, check_logcontext) + + # test is done once d2 finishes + return d2 + + def test_preserve_fn_with_blocking_fn(self): + @defer.inlineCallbacks + def blocking_function(): + yield sleep(0) + + return self._test_preserve_fn(blocking_function) + + def test_preserve_fn_with_non_blocking_fn(self): + @defer.inlineCallbacks + def nonblocking_function(): + with logcontext.PreserveLoggingContext(): + yield defer.succeed(None) + + return self._test_preserve_fn(nonblocking_function) |