From 4b91c313a94a4a89998e097e79a96a4423cf1b9f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 27 Mar 2019 16:15:59 +0000 Subject: Combine the CurrentStateDeltaStream into the EventStream --- synapse/storage/events.py | 3 --- 1 file changed, 3 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 428300ea0a..0b0a4dcdd3 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2287,9 +2287,6 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore defer.returnValue((int(res["topological_ordering"]), int(res["stream_ordering"]))) - def get_max_current_state_delta_stream_id(self): - return self._stream_id_gen.get_current_token() - def get_all_updated_current_state_deltas(self, from_token, to_token, limit): def get_all_updated_current_state_deltas_txn(txn): sql = """ -- cgit 1.5.1 From 2e060774adae2055bf9f3ce3e4ac6018b1034382 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 29 Mar 2019 00:37:16 +1100 Subject: Run `black` on some storage modules that the stats branch touches (#4959) --- changelog.d/4959.misc | 1 + synapse/storage/events.py | 542 +++++++++++++++++++----------------------- synapse/storage/roommember.py | 173 +++++++------- 3 files changed, 336 insertions(+), 380 deletions(-) create mode 100644 changelog.d/4959.misc (limited to 'synapse/storage/events.py') diff --git a/changelog.d/4959.misc b/changelog.d/4959.misc new file mode 100644 index 0000000000..dd4275501f --- /dev/null +++ b/changelog.d/4959.misc @@ -0,0 +1 @@ +Run `black` to clean up formatting on `synapse/storage/roommember.py` and `synapse/storage/events.py`. \ No newline at end of file diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 428300ea0a..b1d5f469c8 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -30,7 +30,6 @@ from twisted.internet import defer import synapse.metrics from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError -# these are only included to make the type annotations work from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 from synapse.metrics.background_process_metrics import run_as_background_process @@ -51,8 +50,11 @@ from synapse.util.metrics import Measure logger = logging.getLogger(__name__) persist_event_counter = Counter("synapse_storage_events_persisted_events", "") -event_counter = Counter("synapse_storage_events_persisted_events_sep", "", - ["type", "origin_type", "origin_entity"]) +event_counter = Counter( + "synapse_storage_events_persisted_events_sep", + "", + ["type", "origin_type", "origin_entity"], +) # The number of times we are recalculating the current state state_delta_counter = Counter("synapse_storage_events_state_delta", "") @@ -60,13 +62,15 @@ state_delta_counter = Counter("synapse_storage_events_state_delta", "") # The number of times we are recalculating state when there is only a # single forward extremity state_delta_single_event_counter = Counter( - "synapse_storage_events_state_delta_single_event", "") + "synapse_storage_events_state_delta_single_event", "" +) # The number of times we are reculating state when we could have resonably # calculated the delta when we calculated the state for an event we were # persisting. state_delta_reuse_delta_counter = Counter( - "synapse_storage_events_state_delta_reuse_delta", "") + "synapse_storage_events_state_delta_reuse_delta", "" +) def encode_json(json_object): @@ -84,9 +88,9 @@ class _EventPeristenceQueue(object): concurrent transaction per room. """ - _EventPersistQueueItem = namedtuple("_EventPersistQueueItem", ( - "events_and_contexts", "backfilled", "deferred", - )) + _EventPersistQueueItem = namedtuple( + "_EventPersistQueueItem", ("events_and_contexts", "backfilled", "deferred") + ) def __init__(self): self._event_persist_queues = {} @@ -119,11 +123,13 @@ class _EventPeristenceQueue(object): deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True) - queue.append(self._EventPersistQueueItem( - events_and_contexts=events_and_contexts, - backfilled=backfilled, - deferred=deferred, - )) + queue.append( + self._EventPersistQueueItem( + events_and_contexts=events_and_contexts, + backfilled=backfilled, + deferred=deferred, + ) + ) return deferred.observe() @@ -191,6 +197,7 @@ def _retry_on_integrity_error(func): Args: func: function that returns a Deferred and accepts a `delete_existing` arg """ + @wraps(func) @defer.inlineCallbacks def f(self, *args, **kwargs): @@ -206,8 +213,12 @@ def _retry_on_integrity_error(func): # inherits from EventFederationStore so that we can call _update_backward_extremities # and _handle_mult_prev_events (though arguably those could both be moved in here) -class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore, - BackgroundUpdateStore): +class EventsStore( + StateGroupWorkerStore, + EventFederationStore, + EventsWorkerStore, + BackgroundUpdateStore, +): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" @@ -265,8 +276,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore deferreds = [] for room_id, evs_ctxs in iteritems(partitioned): d = self._event_persist_queue.add_to_queue( - room_id, evs_ctxs, - backfilled=backfilled, + room_id, evs_ctxs, backfilled=backfilled ) deferreds.append(d) @@ -296,8 +306,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore and the stream ordering of the latest persisted event """ deferred = self._event_persist_queue.add_to_queue( - event.room_id, [(event, context)], - backfilled=backfilled, + event.room_id, [(event, context)], backfilled=backfilled ) self._maybe_start_persisting(event.room_id) @@ -312,16 +321,16 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore def persisting_queue(item): with Measure(self._clock, "persist_events"): yield self._persist_events( - item.events_and_contexts, - backfilled=item.backfilled, + item.events_and_contexts, backfilled=item.backfilled ) self._event_persist_queue.handle_queue(room_id, persisting_queue) @_retry_on_integrity_error @defer.inlineCallbacks - def _persist_events(self, events_and_contexts, backfilled=False, - delete_existing=False): + def _persist_events( + self, events_and_contexts, backfilled=False, delete_existing=False + ): """Persist events to db Args: @@ -345,13 +354,11 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore ) with stream_ordering_manager as stream_orderings: - for (event, context), stream, in zip( - events_and_contexts, stream_orderings - ): + for (event, context), stream in zip(events_and_contexts, stream_orderings): event.internal_metadata.stream_ordering = stream chunks = [ - events_and_contexts[x:x + 100] + events_and_contexts[x : x + 100] for x in range(0, len(events_and_contexts), 100) ] @@ -445,12 +452,9 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore state_delta_reuse_delta_counter.inc() break - logger.info( - "Calculating state delta for room %s", room_id, - ) + logger.info("Calculating state delta for room %s", room_id) with Measure( - self._clock, - "persist_events.get_new_state_after_events", + self._clock, "persist_events.get_new_state_after_events" ): res = yield self._get_new_state_after_events( room_id, @@ -470,11 +474,10 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore state_delta_for_room[room_id] = ([], delta_ids) elif current_state is not None: with Measure( - self._clock, - "persist_events.calculate_state_delta", + self._clock, "persist_events.calculate_state_delta" ): delta = yield self._calculate_state_delta( - room_id, current_state, + room_id, current_state ) state_delta_for_room[room_id] = delta @@ -498,7 +501,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # backfilled events have negative stream orderings, so we don't # want to set the event_persisted_position to that. synapse.metrics.event_persisted_position.set( - chunk[-1][0].internal_metadata.stream_ordering, + chunk[-1][0].internal_metadata.stream_ordering ) for event, context in chunk: @@ -515,9 +518,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore event_counter.labels(event.type, origin_type, origin_entity).inc() for room_id, new_state in iteritems(current_state_for_room): - self.get_current_state_ids.prefill( - (room_id, ), new_state - ) + self.get_current_state_ids.prefill((room_id,), new_state) for room_id, latest_event_ids in iteritems(new_forward_extremeties): self.get_latest_event_ids_in_room.prefill( @@ -535,8 +536,10 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # we're only interested in new events which aren't outliers and which aren't # being rejected. new_events = [ - event for event, ctx in event_contexts - if not event.internal_metadata.is_outlier() and not ctx.rejected + event + for event, ctx in event_contexts + if not event.internal_metadata.is_outlier() + and not ctx.rejected and not event.internal_metadata.is_soft_failed() ] @@ -544,15 +547,11 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore result = set(latest_event_ids) # add all the new events to the list - result.update( - event.event_id for event in new_events - ) + result.update(event.event_id for event in new_events) # Now remove all events which are prev_events of any of the new events result.difference_update( - e_id - for event in new_events - for e_id in event.prev_event_ids() + e_id for event in new_events for e_id in event.prev_event_ids() ) # Finally, remove any events which are prev_events of any existing events. @@ -592,17 +591,14 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore results.extend(r[0] for r in txn) for chunk in batch_iter(event_ids, 100): - yield self.runInteraction( - "_get_events_which_are_prevs", - _get_events, - chunk, - ) + yield self.runInteraction("_get_events_which_are_prevs", _get_events, chunk) defer.returnValue(results) @defer.inlineCallbacks - def _get_new_state_after_events(self, room_id, events_context, old_latest_event_ids, - new_latest_event_ids): + def _get_new_state_after_events( + self, room_id, events_context, old_latest_event_ids, new_latest_event_ids + ): """Calculate the current state dict after adding some new events to a room @@ -642,7 +638,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore if not ev.internal_metadata.is_outlier(): raise Exception( "Context for new event %s has no state " - "group" % (ev.event_id, ), + "group" % (ev.event_id,) ) continue @@ -682,9 +678,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore if missing_event_ids: # Now pull out the state groups for any missing events from DB - event_to_groups = yield self._get_state_group_for_events( - missing_event_ids, - ) + event_to_groups = yield self._get_state_group_for_events(missing_event_ids) event_id_to_state_group.update(event_to_groups) # State groups of old_latest_event_ids @@ -710,9 +704,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore new_state_group = next(iter(new_state_groups)) old_state_group = next(iter(old_state_groups)) - delta_ids = state_group_deltas.get( - (old_state_group, new_state_group,), None - ) + delta_ids = state_group_deltas.get((old_state_group, new_state_group), None) if delta_ids is not None: # We have a delta from the existing to new current state, # so lets just return that. If we happen to already have @@ -735,9 +727,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # Ok, we need to defer to the state handler to resolve our state sets. - state_groups = { - sg: state_groups_map[sg] for sg in new_state_groups - } + state_groups = {sg: state_groups_map[sg] for sg in new_state_groups} events_map = {ev.event_id: ev for ev, _ in events_context} @@ -755,8 +745,11 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore logger.debug("calling resolve_state_groups from preserve_events") res = yield self._state_resolution_handler.resolve_state_groups( - room_id, room_version, state_groups, events_map, - state_res_store=StateResolutionStore(self) + room_id, + room_version, + state_groups, + events_map, + state_res_store=StateResolutionStore(self), ) defer.returnValue((res.state, None)) @@ -774,22 +767,26 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore """ existing_state = yield self.get_current_state_ids(room_id) - to_delete = [ - key for key in existing_state - if key not in current_state - ] + to_delete = [key for key in existing_state if key not in current_state] to_insert = { - key: ev_id for key, ev_id in iteritems(current_state) + key: ev_id + for key, ev_id in iteritems(current_state) if ev_id != existing_state.get(key) } defer.returnValue((to_delete, to_insert)) @log_function - def _persist_events_txn(self, txn, events_and_contexts, backfilled, - delete_existing=False, state_delta_for_room={}, - new_forward_extremeties={}): + def _persist_events_txn( + self, + txn, + events_and_contexts, + backfilled, + delete_existing=False, + state_delta_for_room={}, + new_forward_extremeties={}, + ): """Insert some number of room events into the necessary database tables. Rejected events are only inserted into the events table, the events_json table, @@ -828,20 +825,17 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # Ensure that we don't have the same event twice. events_and_contexts = self._filter_events_and_contexts_for_duplicates( - events_and_contexts, + events_and_contexts ) self._update_room_depths_txn( - txn, - events_and_contexts=events_and_contexts, - backfilled=backfilled, + txn, events_and_contexts=events_and_contexts, backfilled=backfilled ) # _update_outliers_txn filters out any events which have already been # persisted, and returns the filtered list. events_and_contexts = self._update_outliers_txn( - txn, - events_and_contexts=events_and_contexts, + txn, events_and_contexts=events_and_contexts ) # From this point onwards the events are only events that we haven't @@ -852,15 +846,9 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # for these events so we can reinsert them. # This gets around any problems with some tables already having # entries. - self._delete_existing_rows_txn( - txn, - events_and_contexts=events_and_contexts, - ) + self._delete_existing_rows_txn(txn, events_and_contexts=events_and_contexts) - self._store_event_txn( - txn, - events_and_contexts=events_and_contexts, - ) + self._store_event_txn(txn, events_and_contexts=events_and_contexts) # Insert into event_to_state_groups. self._store_event_state_mappings_txn(txn, events_and_contexts) @@ -889,8 +877,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # _store_rejected_events_txn filters out any events which were # rejected, and returns the filtered list. events_and_contexts = self._store_rejected_events_txn( - txn, - events_and_contexts=events_and_contexts, + txn, events_and_contexts=events_and_contexts ) # From this point onwards the events are only ones that weren't @@ -920,22 +907,40 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore WHERE room_id = ? AND type = ? AND state_key = ? ) """ - txn.executemany(sql, ( + txn.executemany( + sql, ( - max_stream_order, room_id, etype, state_key, None, - room_id, etype, state_key, - ) - for etype, state_key in to_delete - # We sanity check that we're deleting rather than updating - if (etype, state_key) not in to_insert - )) - txn.executemany(sql, ( + ( + max_stream_order, + room_id, + etype, + state_key, + None, + room_id, + etype, + state_key, + ) + for etype, state_key in to_delete + # We sanity check that we're deleting rather than updating + if (etype, state_key) not in to_insert + ), + ) + txn.executemany( + sql, ( - max_stream_order, room_id, etype, state_key, ev_id, - room_id, etype, state_key, - ) - for (etype, state_key), ev_id in iteritems(to_insert) - )) + ( + max_stream_order, + room_id, + etype, + state_key, + ev_id, + room_id, + etype, + state_key, + ) + for (etype, state_key), ev_id in iteritems(to_insert) + ), + ) # Now we actually update the current_state_events table @@ -964,7 +969,8 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore txn.call_after( self._curr_state_delta_stream_cache.entity_has_changed, - room_id, max_stream_order, + room_id, + max_stream_order, ) # Invalidate the various caches @@ -982,26 +988,20 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore self._invalidate_state_caches_and_stream(txn, room_id, members_changed) - def _update_forward_extremities_txn(self, txn, new_forward_extremities, - max_stream_order): + def _update_forward_extremities_txn( + self, txn, new_forward_extremities, max_stream_order + ): for room_id, new_extrem in iteritems(new_forward_extremities): self._simple_delete_txn( - txn, - table="event_forward_extremities", - keyvalues={"room_id": room_id}, - ) - txn.call_after( - self.get_latest_event_ids_in_room.invalidate, (room_id,) + txn, table="event_forward_extremities", keyvalues={"room_id": room_id} ) + txn.call_after(self.get_latest_event_ids_in_room.invalidate, (room_id,)) self._simple_insert_many_txn( txn, table="event_forward_extremities", values=[ - { - "event_id": ev_id, - "room_id": room_id, - } + {"event_id": ev_id, "room_id": room_id} for room_id, new_extrem in iteritems(new_forward_extremities) for ev_id in new_extrem ], @@ -1021,7 +1021,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore } for room_id, new_extrem in iteritems(new_forward_extremities) for event_id in new_extrem - ] + ], ) @classmethod @@ -1065,7 +1065,8 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore if not backfilled: txn.call_after( self._events_stream_cache.entity_has_changed, - event.room_id, event.internal_metadata.stream_ordering, + event.room_id, + event.internal_metadata.stream_ordering, ) if not event.internal_metadata.is_outlier() and not context.rejected: @@ -1092,16 +1093,12 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore are already in the events table. """ txn.execute( - "SELECT event_id, outlier FROM events WHERE event_id in (%s)" % ( - ",".join(["?"] * len(events_and_contexts)), - ), - [event.event_id for event, _ in events_and_contexts] + "SELECT event_id, outlier FROM events WHERE event_id in (%s)" + % (",".join(["?"] * len(events_and_contexts)),), + [event.event_id for event, _ in events_and_contexts], ) - have_persisted = { - event_id: outlier - for event_id, outlier in txn - } + have_persisted = {event_id: outlier for event_id, outlier in txn} to_remove = set() for event, context in events_and_contexts: @@ -1128,18 +1125,12 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore logger.exception("") raise - metadata_json = encode_json( - event.internal_metadata.get_dict() - ) + metadata_json = encode_json(event.internal_metadata.get_dict()) sql = ( - "UPDATE event_json SET internal_metadata = ?" - " WHERE event_id = ?" - ) - txn.execute( - sql, - (metadata_json, event.event_id,) + "UPDATE event_json SET internal_metadata = ?" " WHERE event_id = ?" ) + txn.execute(sql, (metadata_json, event.event_id)) # Add an entry to the ex_outlier_stream table to replicate the # change in outlier status to our workers. @@ -1152,25 +1143,17 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore "event_stream_ordering": stream_order, "event_id": event.event_id, "state_group": state_group_id, - } + }, ) - sql = ( - "UPDATE events SET outlier = ?" - " WHERE event_id = ?" - ) - txn.execute( - sql, - (False, event.event_id,) - ) + sql = "UPDATE events SET outlier = ?" " WHERE event_id = ?" + txn.execute(sql, (False, event.event_id)) # Update the event_backward_extremities table now that this # event isn't an outlier any more. self._update_backward_extremeties(txn, [event]) - return [ - ec for ec in events_and_contexts if ec[0] not in to_remove - ] + return [ec for ec in events_and_contexts if ec[0] not in to_remove] @classmethod def _delete_existing_rows_txn(cls, txn, events_and_contexts): @@ -1181,39 +1164,37 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore logger.info("Deleting existing") for table in ( - "events", - "event_auth", - "event_json", - "event_content_hashes", - "event_destinations", - "event_edge_hashes", - "event_edges", - "event_forward_extremities", - "event_reference_hashes", - "event_search", - "event_signatures", - "event_to_state_groups", - "guest_access", - "history_visibility", - "local_invites", - "room_names", - "state_events", - "rejections", - "redactions", - "room_memberships", - "topics" + "events", + "event_auth", + "event_json", + "event_content_hashes", + "event_destinations", + "event_edge_hashes", + "event_edges", + "event_forward_extremities", + "event_reference_hashes", + "event_search", + "event_signatures", + "event_to_state_groups", + "guest_access", + "history_visibility", + "local_invites", + "room_names", + "state_events", + "rejections", + "redactions", + "room_memberships", + "topics", ): txn.executemany( "DELETE FROM %s WHERE event_id = ?" % (table,), - [(ev.event_id,) for ev, _ in events_and_contexts] + [(ev.event_id,) for ev, _ in events_and_contexts], ) - for table in ( - "event_push_actions", - ): + for table in ("event_push_actions",): txn.executemany( "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,), - [(ev.room_id, ev.event_id) for ev, _ in events_and_contexts] + [(ev.room_id, ev.event_id) for ev, _ in events_and_contexts], ) def _store_event_txn(self, txn, events_and_contexts): @@ -1296,17 +1277,14 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore for event, context in events_and_contexts: if context.rejected: # Insert the event_id into the rejections table - self._store_rejections_txn( - txn, event.event_id, context.rejected - ) + self._store_rejections_txn(txn, event.event_id, context.rejected) to_remove.add(event) - return [ - ec for ec in events_and_contexts if ec[0] not in to_remove - ] + return [ec for ec in events_and_contexts if ec[0] not in to_remove] - def _update_metadata_tables_txn(self, txn, events_and_contexts, - all_events_and_contexts, backfilled): + def _update_metadata_tables_txn( + self, txn, events_and_contexts, all_events_and_contexts, backfilled + ): """Update all the miscellaneous tables for new events Args: @@ -1342,8 +1320,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # Update the event_forward_extremities, event_backward_extremities and # event_edges tables. self._handle_mult_prev_events( - txn, - events=[event for event, _ in events_and_contexts], + txn, events=[event for event, _ in events_and_contexts] ) for event, _ in events_and_contexts: @@ -1401,11 +1378,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore state_values.append(vals) - self._simple_insert_many_txn( - txn, - table="state_events", - values=state_values, - ) + self._simple_insert_many_txn(txn, table="state_events", values=state_values) # Prefill the event cache self._add_to_cache(txn, events_and_contexts) @@ -1416,10 +1389,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore rows = [] N = 200 for i in range(0, len(events_and_contexts), N): - ev_map = { - e[0].event_id: e[0] - for e in events_and_contexts[i:i + N] - } + ev_map = {e[0].event_id: e[0] for e in events_and_contexts[i : i + N]} if not ev_map: break @@ -1439,14 +1409,14 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore for row in rows: event = ev_map[row["event_id"]] if not row["rejects"] and not row["redacts"]: - to_prefill.append(_EventCacheEntry( - event=event, - redacted_event=None, - )) + to_prefill.append( + _EventCacheEntry(event=event, redacted_event=None) + ) def prefill(): for cache_entry in to_prefill: self._get_event_cache.prefill((cache_entry[0].event_id,), cache_entry) + txn.call_after(prefill) def _store_redaction(self, txn, event): @@ -1454,7 +1424,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore txn.call_after(self._invalidate_get_event_cache, event.redacts) txn.execute( "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", - (event.event_id, event.redacts) + (event.event_id, event.redacts), ) @defer.inlineCallbacks @@ -1465,6 +1435,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore If it has been significantly less or more than one day since the last call to this function, it will return None. """ + def _count_messages(txn): sql = """ SELECT COALESCE(COUNT(*), 0) FROM events @@ -1492,7 +1463,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore AND stream_ordering > ? """ - txn.execute(sql, (like_clause, self.stream_ordering_day_ago,)) + txn.execute(sql, (like_clause, self.stream_ordering_day_ago)) count, = txn.fetchone() return count @@ -1557,18 +1528,16 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore update_rows.append((sender, contains_url, event_id)) - sql = ( - "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?" - ) + sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?" for index in range(0, len(update_rows), INSERT_CLUMP_SIZE): - clump = update_rows[index:index + INSERT_CLUMP_SIZE] + clump = update_rows[index : index + INSERT_CLUMP_SIZE] txn.executemany(sql, clump) progress = { "target_min_stream_id_inclusive": target_min_stream_id, "max_stream_id_exclusive": min_stream_id, - "rows_inserted": rows_inserted + len(rows) + "rows_inserted": rows_inserted + len(rows), } self._background_update_progress_txn( @@ -1613,10 +1582,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore rows_to_update = [] - chunks = [ - event_ids[i:i + 100] - for i in range(0, len(event_ids), 100) - ] + chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)] for chunk in chunks: ev_rows = self._simple_select_many_txn( txn, @@ -1639,18 +1605,16 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore rows_to_update.append((origin_server_ts, event_id)) - sql = ( - "UPDATE events SET origin_server_ts = ? WHERE event_id = ?" - ) + sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?" for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE): - clump = rows_to_update[index:index + INSERT_CLUMP_SIZE] + clump = rows_to_update[index : index + INSERT_CLUMP_SIZE] txn.executemany(sql, clump) progress = { "target_min_stream_id_inclusive": target_min_stream_id, "max_stream_id_exclusive": min_stream_id, - "rows_inserted": rows_inserted + len(rows_to_update) + "rows_inserted": rows_inserted + len(rows_to_update), } self._background_update_progress_txn( @@ -1714,6 +1678,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore new_event_updates.extend(txn) return new_event_updates + return self.runInteraction( "get_all_new_forward_event_rows", get_all_new_forward_event_rows ) @@ -1756,13 +1721,20 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore new_event_updates.extend(txn.fetchall()) return new_event_updates + return self.runInteraction( "get_all_new_backfill_event_rows", get_all_new_backfill_event_rows ) @cached(num_args=5, max_entries=10) - def get_all_new_events(self, last_backfill_id, last_forward_id, - current_backfill_id, current_forward_id, limit): + def get_all_new_events( + self, + last_backfill_id, + last_forward_id, + current_backfill_id, + current_forward_id, + limit, + ): """Get all the new events that have arrived at the server either as new events or as backfilled events""" have_backfill_events = last_backfill_id != current_backfill_id @@ -1837,14 +1809,15 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore backward_ex_outliers = [] return AllNewEventsResult( - new_forward_events, new_backfill_events, - forward_ex_outliers, backward_ex_outliers, + new_forward_events, + new_backfill_events, + forward_ex_outliers, + backward_ex_outliers, ) + return self.runInteraction("get_all_new_events", get_all_new_events_txn) - def purge_history( - self, room_id, token, delete_local_events, - ): + def purge_history(self, room_id, token, delete_local_events): """Deletes room history before a certain point Args: @@ -1860,13 +1833,13 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore return self.runInteraction( "purge_history", - self._purge_history_txn, room_id, token, + self._purge_history_txn, + room_id, + token, delete_local_events, ) - def _purge_history_txn( - self, txn, room_id, token_str, delete_local_events, - ): + def _purge_history_txn(self, txn, room_id, token_str, delete_local_events): token = RoomStreamToken.parse(token_str) # Tables that should be pruned: @@ -1913,7 +1886,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore "ON e.event_id = f.event_id " "AND e.room_id = f.room_id " "WHERE f.room_id = ?", - (room_id,) + (room_id,), ) rows = txn.fetchall() max_depth = max(row[1] for row in rows) @@ -1934,10 +1907,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore should_delete_expr += " AND event_id NOT LIKE ?" # We include the parameter twice since we use the expression twice - should_delete_params += ( - "%:" + self.hs.hostname, - "%:" + self.hs.hostname, - ) + should_delete_params += ("%:" + self.hs.hostname, "%:" + self.hs.hostname) should_delete_params += (room_id, token.topological) @@ -1948,10 +1918,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore " SELECT event_id, %s" " FROM events AS e LEFT JOIN state_events USING (event_id)" " WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?" - % ( - should_delete_expr, - should_delete_expr, - ), + % (should_delete_expr, should_delete_expr), should_delete_params, ) @@ -1961,23 +1928,19 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # the should_delete / shouldn't_delete subsets txn.execute( "CREATE INDEX events_to_purge_should_delete" - " ON events_to_purge(should_delete)", + " ON events_to_purge(should_delete)" ) # We do joins against events_to_purge for e.g. calculating state # groups to purge, etc., so lets make an index. - txn.execute( - "CREATE INDEX events_to_purge_id" - " ON events_to_purge(event_id)", - ) + txn.execute("CREATE INDEX events_to_purge_id" " ON events_to_purge(event_id)") - txn.execute( - "SELECT event_id, should_delete FROM events_to_purge" - ) + txn.execute("SELECT event_id, should_delete FROM events_to_purge") event_rows = txn.fetchall() logger.info( "[purge] found %i events before cutoff, of which %i can be deleted", - len(event_rows), sum(1 for e in event_rows if e[1]), + len(event_rows), + sum(1 for e in event_rows if e[1]), ) logger.info("[purge] Finding new backward extremities") @@ -1989,24 +1952,21 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore "SELECT DISTINCT e.event_id FROM events_to_purge AS e" " INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id" " LEFT JOIN events_to_purge AS ep2 ON ed.event_id = ep2.event_id" - " WHERE ep2.event_id IS NULL", + " WHERE ep2.event_id IS NULL" ) new_backwards_extrems = txn.fetchall() logger.info("[purge] replacing backward extremities: %r", new_backwards_extrems) txn.execute( - "DELETE FROM event_backward_extremities WHERE room_id = ?", - (room_id,) + "DELETE FROM event_backward_extremities WHERE room_id = ?", (room_id,) ) # Update backward extremeties txn.executemany( "INSERT INTO event_backward_extremities (room_id, event_id)" " VALUES (?, ?)", - [ - (room_id, event_id) for event_id, in new_backwards_extrems - ] + [(room_id, event_id) for event_id, in new_backwards_extrems], ) logger.info("[purge] finding redundant state groups") @@ -2014,28 +1974,25 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # Get all state groups that are referenced by events that are to be # deleted. We then go and check if they are referenced by other events # or state groups, and if not we delete them. - txn.execute(""" + txn.execute( + """ SELECT DISTINCT state_group FROM events_to_purge INNER JOIN event_to_state_groups USING (event_id) - """) + """ + ) referenced_state_groups = set(sg for sg, in txn) logger.info( - "[purge] found %i referenced state groups", - len(referenced_state_groups), + "[purge] found %i referenced state groups", len(referenced_state_groups) ) logger.info("[purge] finding state groups that can be deleted") - state_groups_to_delete, remaining_state_groups = ( - self._find_unreferenced_groups_during_purge( - txn, referenced_state_groups, - ) - ) + _ = self._find_unreferenced_groups_during_purge(txn, referenced_state_groups) + state_groups_to_delete, remaining_state_groups = _ logger.info( - "[purge] found %i state groups to delete", - len(state_groups_to_delete), + "[purge] found %i state groups to delete", len(state_groups_to_delete) ) logger.info( @@ -2047,25 +2004,15 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # groups to non delta versions. for sg in remaining_state_groups: logger.info("[purge] de-delta-ing remaining state group %s", sg) - curr_state = self._get_state_groups_from_groups_txn( - txn, [sg], - ) + curr_state = self._get_state_groups_from_groups_txn(txn, [sg]) curr_state = curr_state[sg] self._simple_delete_txn( - txn, - table="state_groups_state", - keyvalues={ - "state_group": sg, - } + txn, table="state_groups_state", keyvalues={"state_group": sg} ) self._simple_delete_txn( - txn, - table="state_group_edges", - keyvalues={ - "state_group": sg, - } + txn, table="state_group_edges", keyvalues={"state_group": sg} ) self._simple_insert_many_txn( @@ -2099,9 +2046,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore "WHERE event_id IN (SELECT event_id from events_to_purge)" ) for event_id, _ in event_rows: - txn.call_after(self._get_state_group_for_event.invalidate, ( - event_id, - )) + txn.call_after(self._get_state_group_for_event.invalidate, (event_id,)) # Delete all remote non-state events for table in ( @@ -2123,21 +2068,19 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore txn.execute( "DELETE FROM %s WHERE event_id IN (" " SELECT event_id FROM events_to_purge WHERE should_delete" - ")" % (table,), + ")" % (table,) ) # event_push_actions lacks an index on event_id, and has one on # (room_id, event_id) instead. - for table in ( - "event_push_actions", - ): + for table in ("event_push_actions",): logger.info("[purge] removing events from %s", table) txn.execute( "DELETE FROM %s WHERE room_id = ? AND event_id IN (" " SELECT event_id FROM events_to_purge WHERE should_delete" ")" % (table,), - (room_id, ) + (room_id,), ) # Mark all state and own events as outliers @@ -2162,27 +2105,28 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # extremities. However, the events in event_backward_extremities # are ones we don't have yet so we need to look at the events that # point to it via event_edges table. - txn.execute(""" + txn.execute( + """ SELECT COALESCE(MIN(depth), 0) FROM event_backward_extremities AS eb INNER JOIN event_edges AS eg ON eg.prev_event_id = eb.event_id INNER JOIN events AS e ON e.event_id = eg.event_id WHERE eb.room_id = ? - """, (room_id,)) + """, + (room_id,), + ) min_depth, = txn.fetchone() logger.info("[purge] updating room_depth to %d", min_depth) txn.execute( "UPDATE room_depth SET min_depth = ? WHERE room_id = ?", - (min_depth, room_id,) + (min_depth, room_id), ) # finally, drop the temp table. this will commit the txn in sqlite, # so make sure to keep this actually last. - txn.execute( - "DROP TABLE events_to_purge" - ) + txn.execute("DROP TABLE events_to_purge") logger.info("[purge] done") @@ -2226,7 +2170,9 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore SELECT DISTINCT state_group FROM event_to_state_groups LEFT JOIN events_to_purge AS ep USING (event_id) WHERE state_group IN (%s) AND ep.event_id IS NULL - """ % (",".join("?" for _ in current_search),) + """ % ( + ",".join("?" for _ in current_search), + ) txn.execute(sql, list(current_search)) referenced = set(sg for sg, in txn) @@ -2242,7 +2188,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore column="prev_state_group", iterable=current_search, keyvalues={}, - retcols=("prev_state_group", "state_group",), + retcols=("prev_state_group", "state_group"), ) prevs = set(row["state_group"] for row in rows) @@ -2279,13 +2225,15 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore table="events", retcols=["topological_ordering", "stream_ordering"], keyvalues={"event_id": event_id}, - allow_none=True + allow_none=True, ) if not res: raise SynapseError(404, "Could not find event %s" % (event_id,)) - defer.returnValue((int(res["topological_ordering"]), int(res["stream_ordering"]))) + defer.returnValue( + (int(res["topological_ordering"]), int(res["stream_ordering"])) + ) def get_max_current_state_delta_stream_id(self): return self._stream_id_gen.get_current_token() @@ -2300,13 +2248,19 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore """ txn.execute(sql, (from_token, to_token, limit)) return txn.fetchall() + return self.runInteraction( "get_all_updated_current_state_deltas", get_all_updated_current_state_deltas_txn, ) -AllNewEventsResult = namedtuple("AllNewEventsResult", [ - "new_forward_events", "new_backfill_events", - "forward_ex_outliers", "backward_ex_outliers", -]) +AllNewEventsResult = namedtuple( + "AllNewEventsResult", + [ + "new_forward_events", + "new_backfill_events", + "forward_ex_outliers", + "backward_ex_outliers", + ], +) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 592c1bcd33..57df17bcc2 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -35,28 +35,22 @@ logger = logging.getLogger(__name__) RoomsForUser = namedtuple( - "RoomsForUser", - ("room_id", "sender", "membership", "event_id", "stream_ordering") + "RoomsForUser", ("room_id", "sender", "membership", "event_id", "stream_ordering") ) GetRoomsForUserWithStreamOrdering = namedtuple( - "_GetRoomsForUserWithStreamOrdering", - ("room_id", "stream_ordering",) + "_GetRoomsForUserWithStreamOrdering", ("room_id", "stream_ordering") ) # We store this using a namedtuple so that we save about 3x space over using a # dict. -ProfileInfo = namedtuple( - "ProfileInfo", ("avatar_url", "display_name") -) +ProfileInfo = namedtuple("ProfileInfo", ("avatar_url", "display_name")) # "members" points to a truncated list of (user_id, event_id) tuples for users of # a given membership type, suitable for use in calculating heroes for a room. # "count" points to the total numberr of users of a given membership type. -MemberSummary = namedtuple( - "MemberSummary", ("members", "count") -) +MemberSummary = namedtuple("MemberSummary", ("members", "count")) _MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update" @@ -67,7 +61,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): """Returns the set of all hosts currently in the room """ user_ids = yield self.get_users_in_room( - room_id, on_invalidate=cache_context.invalidate, + room_id, on_invalidate=cache_context.invalidate ) hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids) defer.returnValue(hosts) @@ -84,8 +78,9 @@ class RoomMemberWorkerStore(EventsWorkerStore): " WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?" ) - txn.execute(sql, (room_id, Membership.JOIN,)) + txn.execute(sql, (room_id, Membership.JOIN)) return [to_ascii(r[0]) for r in txn] + return self.runInteraction("get_users_in_room", f) @cached(max_entries=100000) @@ -156,9 +151,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): A deferred list of RoomsForUser. """ - return self.get_rooms_for_user_where_membership_is( - user_id, [Membership.INVITE] - ) + return self.get_rooms_for_user_where_membership_is(user_id, [Membership.INVITE]) @defer.inlineCallbacks def get_invite_for_user_in_room(self, user_id, room_id): @@ -196,11 +189,13 @@ class RoomMemberWorkerStore(EventsWorkerStore): return self.runInteraction( "get_rooms_for_user_where_membership_is", self._get_rooms_for_user_where_membership_is_txn, - user_id, membership_list + user_id, + membership_list, ) - def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id, - membership_list): + def _get_rooms_for_user_where_membership_is_txn( + self, txn, user_id, membership_list + ): do_invite = Membership.INVITE in membership_list membership_list = [m for m in membership_list if m != Membership.INVITE] @@ -227,9 +222,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): ) % (where_clause,) txn.execute(sql, args) - results = [ - RoomsForUser(**r) for r in self.cursor_to_dict(txn) - ] + results = [RoomsForUser(**r) for r in self.cursor_to_dict(txn)] if do_invite: sql = ( @@ -241,13 +234,16 @@ class RoomMemberWorkerStore(EventsWorkerStore): ) txn.execute(sql, (user_id,)) - results.extend(RoomsForUser( - room_id=r["room_id"], - sender=r["inviter"], - event_id=r["event_id"], - stream_ordering=r["stream_ordering"], - membership=Membership.INVITE, - ) for r in self.cursor_to_dict(txn)) + results.extend( + RoomsForUser( + room_id=r["room_id"], + sender=r["inviter"], + event_id=r["event_id"], + stream_ordering=r["stream_ordering"], + membership=Membership.INVITE, + ) + for r in self.cursor_to_dict(txn) + ) return results @@ -264,19 +260,21 @@ class RoomMemberWorkerStore(EventsWorkerStore): of the most recent join for that user and room. """ rooms = yield self.get_rooms_for_user_where_membership_is( - user_id, membership_list=[Membership.JOIN], + user_id, membership_list=[Membership.JOIN] + ) + defer.returnValue( + frozenset( + GetRoomsForUserWithStreamOrdering(r.room_id, r.stream_ordering) + for r in rooms + ) ) - defer.returnValue(frozenset( - GetRoomsForUserWithStreamOrdering(r.room_id, r.stream_ordering) - for r in rooms - )) @defer.inlineCallbacks def get_rooms_for_user(self, user_id, on_invalidate=None): """Returns a set of room_ids the user is currently joined to """ rooms = yield self.get_rooms_for_user_with_stream_ordering( - user_id, on_invalidate=on_invalidate, + user_id, on_invalidate=on_invalidate ) defer.returnValue(frozenset(r.room_id for r in rooms)) @@ -285,13 +283,13 @@ class RoomMemberWorkerStore(EventsWorkerStore): """Returns the set of users who share a room with `user_id` """ room_ids = yield self.get_rooms_for_user( - user_id, on_invalidate=cache_context.invalidate, + user_id, on_invalidate=cache_context.invalidate ) user_who_share_room = set() for room_id in room_ids: user_ids = yield self.get_users_in_room( - room_id, on_invalidate=cache_context.invalidate, + room_id, on_invalidate=cache_context.invalidate ) user_who_share_room.update(user_ids) @@ -309,9 +307,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): current_state_ids = yield context.get_current_state_ids(self) result = yield self._get_joined_users_from_context( - event.room_id, state_group, current_state_ids, - event=event, - context=context, + event.room_id, state_group, current_state_ids, event=event, context=context ) defer.returnValue(result) @@ -325,13 +321,21 @@ class RoomMemberWorkerStore(EventsWorkerStore): state_group = object() return self._get_joined_users_from_context( - room_id, state_group, state_entry.state, context=state_entry, + room_id, state_group, state_entry.state, context=state_entry ) - @cachedInlineCallbacks(num_args=2, cache_context=True, iterable=True, - max_entries=100000) - def _get_joined_users_from_context(self, room_id, state_group, current_state_ids, - cache_context, event=None, context=None): + @cachedInlineCallbacks( + num_args=2, cache_context=True, iterable=True, max_entries=100000 + ) + def _get_joined_users_from_context( + self, + room_id, + state_group, + current_state_ids, + cache_context, + event=None, + context=None, + ): # We don't use `state_group`, it's there so that we can cache based # on it. However, it's important that it's never None, since two current_states # with a state_group of None are likely to be different. @@ -371,9 +375,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): # the hit ratio counts. After all, we don't populate the cache if we # miss it here event_map = self._get_events_from_cache( - member_event_ids, - allow_rejected=False, - update_metrics=False, + member_event_ids, allow_rejected=False, update_metrics=False ) missing_member_event_ids = [] @@ -397,21 +399,21 @@ class RoomMemberWorkerStore(EventsWorkerStore): table="room_memberships", column="event_id", iterable=missing_member_event_ids, - retcols=('user_id', 'display_name', 'avatar_url',), - keyvalues={ - "membership": Membership.JOIN, - }, + retcols=('user_id', 'display_name', 'avatar_url'), + keyvalues={"membership": Membership.JOIN}, batch_size=500, desc="_get_joined_users_from_context", ) - users_in_room.update({ - to_ascii(row["user_id"]): ProfileInfo( - avatar_url=to_ascii(row["avatar_url"]), - display_name=to_ascii(row["display_name"]), - ) - for row in rows - }) + users_in_room.update( + { + to_ascii(row["user_id"]): ProfileInfo( + avatar_url=to_ascii(row["avatar_url"]), + display_name=to_ascii(row["display_name"]), + ) + for row in rows + } + ) if event is not None and event.type == EventTypes.Member: if event.membership == Membership.JOIN: @@ -505,7 +507,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): state_group = object() return self._get_joined_hosts( - room_id, state_group, state_entry.state, state_entry=state_entry, + room_id, state_group, state_entry.state, state_entry=state_entry ) @cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True) @@ -531,6 +533,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): """Returns whether user_id has elected to discard history for room_id. Returns False if they have since re-joined.""" + def f(txn): sql = ( "SELECT" @@ -547,6 +550,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): txn.execute(sql, (user_id, room_id)) rows = txn.fetchall() return rows[0][0] + count = yield self.runInteraction("did_forget_membership", f) defer.returnValue(count == 0) @@ -575,13 +579,14 @@ class RoomMemberStore(RoomMemberWorkerStore): "avatar_url": event.content.get("avatar_url", None), } for event in events - ] + ], ) for event in events: txn.call_after( self._membership_stream_cache.entity_has_changed, - event.state_key, event.internal_metadata.stream_ordering + event.state_key, + event.internal_metadata.stream_ordering, ) txn.call_after( self.get_invited_rooms_for_user.invalidate, (event.state_key,) @@ -607,7 +612,7 @@ class RoomMemberStore(RoomMemberWorkerStore): "inviter": event.sender, "room_id": event.room_id, "stream_id": event.internal_metadata.stream_ordering, - } + }, ) else: sql = ( @@ -616,12 +621,15 @@ class RoomMemberStore(RoomMemberWorkerStore): " AND replaced_by is NULL" ) - txn.execute(sql, ( - event.internal_metadata.stream_ordering, - event.event_id, - event.room_id, - event.state_key, - )) + txn.execute( + sql, + ( + event.internal_metadata.stream_ordering, + event.event_id, + event.room_id, + event.state_key, + ), + ) @defer.inlineCallbacks def locally_reject_invite(self, user_id, room_id): @@ -632,18 +640,14 @@ class RoomMemberStore(RoomMemberWorkerStore): ) def f(txn, stream_ordering): - txn.execute(sql, ( - stream_ordering, - True, - room_id, - user_id, - )) + txn.execute(sql, (stream_ordering, True, room_id, user_id)) with self._stream_id_gen.get_next() as stream_ordering: yield self.runInteraction("locally_reject_invite", f, stream_ordering) def forget(self, user_id, room_id): """Indicate that user_id wishes to discard history for room_id.""" + def f(txn): sql = ( "UPDATE" @@ -657,9 +661,8 @@ class RoomMemberStore(RoomMemberWorkerStore): ) txn.execute(sql, (user_id, room_id)) - self._invalidate_cache_and_stream( - txn, self.did_forget, (user_id, room_id,), - ) + self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id)) + return self.runInteraction("forget_membership", f) @defer.inlineCallbacks @@ -674,7 +677,7 @@ class RoomMemberStore(RoomMemberWorkerStore): INSERT_CLUMP_SIZE = 1000 def add_membership_profile_txn(txn): - sql = (""" + sql = """ SELECT stream_ordering, event_id, events.room_id, event_json.json FROM events INNER JOIN event_json USING (event_id) @@ -683,7 +686,7 @@ class RoomMemberStore(RoomMemberWorkerStore): AND type = 'm.room.member' ORDER BY stream_ordering DESC LIMIT ? - """) + """ txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) @@ -707,16 +710,14 @@ class RoomMemberStore(RoomMemberWorkerStore): avatar_url = content.get("avatar_url", None) if display_name or avatar_url: - to_update.append(( - display_name, avatar_url, event_id, room_id - )) + to_update.append((display_name, avatar_url, event_id, room_id)) - to_update_sql = (""" + to_update_sql = """ UPDATE room_memberships SET display_name = ?, avatar_url = ? WHERE event_id = ? AND room_id = ? - """) + """ for index in range(0, len(to_update), INSERT_CLUMP_SIZE): - clump = to_update[index:index + INSERT_CLUMP_SIZE] + clump = to_update[index : index + INSERT_CLUMP_SIZE] txn.executemany(to_update_sql, clump) progress = { @@ -789,7 +790,7 @@ class _JoinedHostsCache(object): self.hosts_to_joined_users.pop(host, None) else: joined_users = yield self.store.get_joined_users_from_state( - self.room_id, state_entry, + self.room_id, state_entry ) self.hosts_to_joined_users = {} -- cgit 1.5.1 From 297bf2547e6fdfbbed276d439b8b04da06f5551c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 2 Apr 2019 12:42:39 +0100 Subject: Fix sync bug when accepting invites (#4956) Hopefully this time we really will fix #4422. We need to make sure that the cache on `get_rooms_for_user_with_stream_ordering` is invalidated *before* the SyncHandler is notified for the new events, and we can now do so reliably via the `events` stream. --- changelog.d/4956.bugfix | 1 + synapse/replication/slave/storage/events.py | 31 +++-- synapse/storage/_base.py | 5 - synapse/storage/events.py | 24 +++- tests/replication/slave/storage/_base.py | 28 ++++- tests/replication/slave/storage/test_events.py | 161 +++++++++++++++++++++---- tests/server.py | 56 +++++---- 7 files changed, 237 insertions(+), 69 deletions(-) create mode 100644 changelog.d/4956.bugfix (limited to 'synapse/storage/events.py') diff --git a/changelog.d/4956.bugfix b/changelog.d/4956.bugfix new file mode 100644 index 0000000000..e50e67383d --- /dev/null +++ b/changelog.d/4956.bugfix @@ -0,0 +1 @@ +Fix sync bug which made accepting invites unreliable in worker-mode synapses. diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index c57385d92f..b457c5563f 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -16,7 +16,10 @@ import logging from synapse.api.constants import EventTypes -from synapse.replication.tcp.streams.events import EventsStreamEventRow +from synapse.replication.tcp.streams.events import ( + EventsStreamCurrentStateRow, + EventsStreamEventRow, +) from synapse.storage.event_federation import EventFederationWorkerStore from synapse.storage.event_push_actions import EventPushActionsWorkerStore from synapse.storage.events_worker import EventsWorkerStore @@ -80,14 +83,7 @@ class SlavedEventStore(EventFederationWorkerStore, if stream_name == "events": self._stream_id_gen.advance(token) for row in rows: - if row.type != EventsStreamEventRow.TypeId: - continue - data = row.data - self.invalidate_caches_for_event( - token, data.event_id, data.room_id, data.type, data.state_key, - data.redacts, - backfilled=False, - ) + self._process_event_stream_row(token, row) elif stream_name == "backfill": self._backfill_id_gen.advance(-token) for row in rows: @@ -100,6 +96,23 @@ class SlavedEventStore(EventFederationWorkerStore, stream_name, token, rows ) + def _process_event_stream_row(self, token, row): + data = row.data + + if row.type == EventsStreamEventRow.TypeId: + self.invalidate_caches_for_event( + token, data.event_id, data.room_id, data.type, data.state_key, + data.redacts, + backfilled=False, + ) + elif row.type == EventsStreamCurrentStateRow.TypeId: + if data.type == EventTypes.Member: + self.get_rooms_for_user_with_stream_ordering.invalidate( + (data.state_key, ), + ) + else: + raise Exception("Unknown events stream row type %s" % (row.type, )) + def invalidate_caches_for_event(self, stream_ordering, event_id, room_id, etype, state_key, redacts, backfilled): self._invalidate_get_event_cache(event_id) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 7e3903859b..653b32fbf5 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -1355,11 +1355,6 @@ class SQLBaseStore(object): members_changed (iterable[str]): The user_ids of members that have changed """ - for member in members_changed: - self._attempt_to_invalidate_cache( - "get_rooms_for_user_with_stream_ordering", (member,), - ) - for host in set(get_domain_from_id(u) for u in members_changed): self._attempt_to_invalidate_cache( "is_host_joined", (room_id, host,), diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d0668e39c4..dfda39bbe0 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -79,7 +79,7 @@ def encode_json(json_object): """ out = frozendict_json_encoder.encode(json_object) if isinstance(out, bytes): - out = out.decode('utf8') + out = out.decode("utf8") return out @@ -813,9 +813,10 @@ class EventsStore( """ all_events_and_contexts = events_and_contexts + min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering - self._update_current_state_txn(txn, state_delta_for_room, max_stream_order) + self._update_current_state_txn(txn, state_delta_for_room, min_stream_order) self._update_forward_extremities_txn( txn, @@ -890,7 +891,7 @@ class EventsStore( backfilled=backfilled, ) - def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): + def _update_current_state_txn(self, txn, state_delta_by_room, stream_id): for room_id, current_state_tuple in iteritems(state_delta_by_room): to_delete, to_insert = current_state_tuple @@ -899,6 +900,12 @@ class EventsStore( # that we can use it to calculate the `prev_event_id`. (This # allows us to not have to pull out the existing state # unnecessarily). + # + # The stream_id for the update is chosen to be the minimum of the stream_ids + # for the batch of the events that we are persisting; that means we do not + # end up in a situation where workers see events before the + # current_state_delta updates. + # sql = """ INSERT INTO current_state_delta_stream (stream_id, room_id, type, state_key, event_id, prev_event_id) @@ -911,7 +918,7 @@ class EventsStore( sql, ( ( - max_stream_order, + stream_id, room_id, etype, state_key, @@ -929,7 +936,7 @@ class EventsStore( sql, ( ( - max_stream_order, + stream_id, room_id, etype, state_key, @@ -970,7 +977,7 @@ class EventsStore( txn.call_after( self._curr_state_delta_stream_cache.entity_has_changed, room_id, - max_stream_order, + stream_id, ) # Invalidate the various caches @@ -986,6 +993,11 @@ class EventsStore( if ev_type == EventTypes.Member ) + for member in members_changed: + txn.call_after( + self.get_rooms_for_user_with_stream_ordering.invalidate, (member,) + ) + self._invalidate_state_caches_and_stream(txn, room_id, members_changed) def _update_forward_extremities_txn( diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index 524af4f8d1..1f72a2a04f 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -56,7 +56,9 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase): client = client_factory.buildProtocol(None) client.makeConnection(FakeTransport(server, reactor)) - server.makeConnection(FakeTransport(client, reactor)) + + self.server_to_client_transport = FakeTransport(client, reactor) + server.makeConnection(self.server_to_client_transport) def replicate(self): """Tell the master side of replication that something has happened, and then @@ -69,6 +71,24 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase): master_result = self.get_success(getattr(self.master_store, method)(*args)) slaved_result = self.get_success(getattr(self.slaved_store, method)(*args)) if expected_result is not None: - self.assertEqual(master_result, expected_result) - self.assertEqual(slaved_result, expected_result) - self.assertEqual(master_result, slaved_result) + self.assertEqual( + master_result, + expected_result, + "Expected master result to be %r but was %r" % ( + expected_result, master_result + ), + ) + self.assertEqual( + slaved_result, + expected_result, + "Expected slave result to be %r but was %r" % ( + expected_result, slaved_result + ), + ) + self.assertEqual( + master_result, + slaved_result, + "Slave result %r does not match master result %r" % ( + slaved_result, master_result + ), + ) diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 1688a741d1..65ecff3bd6 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -11,11 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging from canonicaljson import encode_canonical_json from synapse.events import FrozenEvent, _EventInternalMetadata from synapse.events.snapshot import EventContext +from synapse.handlers.room import RoomEventSource from synapse.replication.slave.storage.events import SlavedEventStore from synapse.storage.roommember import RoomsForUser @@ -26,6 +28,8 @@ USER_ID_2 = "@bright:blue" OUTLIER = {"outlier": True} ROOM_ID = "!room:blue" +logger = logging.getLogger(__name__) + def dict_equals(self, other): me = encode_canonical_json(self.get_pdu_json()) @@ -172,18 +176,142 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): {"highlight_count": 1, "notify_count": 2}, ) + def test_get_rooms_for_user_with_stream_ordering(self): + """Check that the cache on get_rooms_for_user_with_stream_ordering is invalidated + by rows in the events stream + """ + self.persist(type="m.room.create", key="", creator=USER_ID) + self.persist(type="m.room.member", key=USER_ID, membership="join") + self.replicate() + self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set()) + + j2 = self.persist( + type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join" + ) + self.replicate() + self.check( + "get_rooms_for_user_with_stream_ordering", + (USER_ID_2,), + {(ROOM_ID, j2.internal_metadata.stream_ordering)}, + ) + + def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(self): + """Check that current_state invalidation happens correctly with multiple events + in the persistence batch. + + This test attempts to reproduce a race condition between the event persistence + loop and a worker-based Sync handler. + + The problem occurred when the master persisted several events in one batch. It + only updates the current_state at the end of each batch, so the obvious thing + to do is then to issue a current_state_delta stream update corresponding to the + last stream_id in the batch. + + However, that raises the possibility that a worker will see the replication + notification for a join event before the current_state caches are invalidated. + + The test involves: + * creating a join and a message event for a user, and persisting them in the + same batch + + * controlling the replication stream so that updates are sent gradually + + * between each bunch of replication updates, check that we see a consistent + snapshot of the state. + """ + self.persist(type="m.room.create", key="", creator=USER_ID) + self.persist(type="m.room.member", key=USER_ID, membership="join") + self.replicate() + self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set()) + + # limit the replication rate + repl_transport = self.server_to_client_transport + repl_transport.autoflush = False + + # build the join and message events and persist them in the same batch. + logger.info("----- build test events ------") + j2, j2ctx = self.build_event( + type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join" + ) + msg, msgctx = self.build_event() + self.get_success(self.master_store.persist_events([ + (j2, j2ctx), + (msg, msgctx), + ])) + self.replicate() + + event_source = RoomEventSource(self.hs) + event_source.store = self.slaved_store + current_token = self.get_success(event_source.get_current_key()) + + # gradually stream out the replication + while repl_transport.buffer: + logger.info("------ flush ------") + repl_transport.flush(30) + self.pump(0) + + prev_token = current_token + current_token = self.get_success(event_source.get_current_key()) + + # attempt to replicate the behaviour of the sync handler. + # + # First, we get a list of the rooms we are joined to + joined_rooms = self.get_success( + self.slaved_store.get_rooms_for_user_with_stream_ordering( + USER_ID_2, + ), + ) + + # Then, we get a list of the events since the last sync + membership_changes = self.get_success( + self.slaved_store.get_membership_changes_for_user( + USER_ID_2, prev_token, current_token, + ) + ) + + logger.info( + "%s->%s: joined_rooms=%r membership_changes=%r", + prev_token, + current_token, + joined_rooms, + membership_changes, + ) + + # the membership change is only any use to us if the room is in the + # joined_rooms list. + if membership_changes: + self.assertEqual( + joined_rooms, {(ROOM_ID, j2.internal_metadata.stream_ordering)} + ) + event_id = 0 - def persist( + def persist(self, backfill=False, **kwargs): + """ + Returns: + synapse.events.FrozenEvent: The event that was persisted. + """ + event, context = self.build_event(**kwargs) + + if backfill: + self.get_success( + self.master_store.persist_events([(event, context)], backfilled=True) + ) + else: + self.get_success( + self.master_store.persist_event(event, context) + ) + + return event + + def build_event( self, sender=USER_ID, room_id=ROOM_ID, - type={}, + type="m.room.message", key=None, internal={}, state=None, - reset_state=False, - backfill=False, depth=None, prev_events=[], auth_events=[], @@ -192,10 +320,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): push_actions=[], **content ): - """ - Returns: - synapse.events.FrozenEvent: The event that was persisted. - """ + if depth is None: depth = self.event_id @@ -234,23 +359,11 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): ) else: state_handler = self.hs.get_state_handler() - context = self.get_success(state_handler.compute_event_context(event)) + context = self.get_success(state_handler.compute_event_context( + event + )) self.master_store.add_push_actions_to_staging( event.event_id, {user_id: actions for user_id, actions in push_actions} ) - - ordering = None - if backfill: - self.get_success( - self.master_store.persist_events([(event, context)], backfilled=True) - ) - else: - ordering, _ = self.get_success( - self.master_store.persist_event(event, context) - ) - - if ordering: - event.internal_metadata.stream_ordering = ordering - - return event + return event, context diff --git a/tests/server.py b/tests/server.py index ea26dea623..8f89f4a83d 100644 --- a/tests/server.py +++ b/tests/server.py @@ -365,6 +365,7 @@ class FakeTransport(object): disconnected = False buffer = attr.ib(default=b'') producer = attr.ib(default=None) + autoflush = attr.ib(default=True) def getPeer(self): return None @@ -415,31 +416,44 @@ class FakeTransport(object): def write(self, byt): self.buffer = self.buffer + byt - def _write(): - if not self.buffer: - # nothing to do. Don't write empty buffers: it upsets the - # TLSMemoryBIOProtocol - return - - if self.disconnected: - return - logger.info("%s->%s: %s", self._protocol, self.other, self.buffer) - - if getattr(self.other, "transport") is not None: - try: - self.other.dataReceived(self.buffer) - self.buffer = b"" - except Exception as e: - logger.warning("Exception writing to protocol: %s", e) - return - - self._reactor.callLater(0.0, _write) - # always actually do the write asynchronously. Some protocols (notably the # TLSMemoryBIOProtocol) get very confused if a read comes back while they are # still doing a write. Doing a callLater here breaks the cycle. - self._reactor.callLater(0.0, _write) + if self.autoflush: + self._reactor.callLater(0.0, self.flush) def writeSequence(self, seq): for x in seq: self.write(x) + + def flush(self, maxbytes=None): + if not self.buffer: + # nothing to do. Don't write empty buffers: it upsets the + # TLSMemoryBIOProtocol + return + + if self.disconnected: + return + + if getattr(self.other, "transport") is None: + # the other has no transport yet; reschedule + if self.autoflush: + self._reactor.callLater(0.0, self.flush) + return + + if maxbytes is not None: + to_write = self.buffer[:maxbytes] + else: + to_write = self.buffer + + logger.info("%s->%s: %s", self._protocol, self.other, to_write) + + try: + self.other.dataReceived(to_write) + except Exception as e: + logger.warning("Exception writing to protocol: %s", e) + return + + self.buffer = self.buffer[len(to_write):] + if self.buffer and self.autoflush: + self._reactor.callLater(0.0, self.flush) -- cgit 1.5.1