diff options
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r-- | synapse/storage/events.py | 90 |
1 files changed, 49 insertions, 41 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index b96104ccae..2aaab0d02c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -14,33 +14,33 @@ # See the License for the specific language governing permissions and # limitations under the License. -from collections import OrderedDict, deque, namedtuple -from functools import wraps import itertools import logging +from collections import OrderedDict, deque, namedtuple +from functools import wraps + +from six import iteritems, itervalues +from six.moves import range + +from canonicaljson import json +from prometheus_client import Counter -import simplejson as json from twisted.internet import defer +import synapse.metrics +from synapse.api.constants import EventTypes +from synapse.api.errors import SynapseError +# these are only included to make the type annotations work +from synapse.events import EventBase # noqa: F401 +from synapse.events.snapshot import EventContext # noqa: F401 from synapse.storage.events_worker import EventsWorkerStore +from synapse.types import RoomStreamToken, get_domain_from_id from synapse.util.async import ObservableDeferred +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.frozenutils import frozendict_json_encoder -from synapse.util.logcontext import ( - PreserveLoggingContext, make_deferred_yieldable, -) +from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable from synapse.util.logutils import log_function from synapse.util.metrics import Measure -from synapse.api.constants import EventTypes -from synapse.api.errors import SynapseError -from synapse.util.caches.descriptors import cached, cachedInlineCallbacks -from synapse.types import get_domain_from_id, RoomStreamToken -import synapse.metrics - -# these are only included to make the type annotations work -from synapse.events import EventBase # noqa: F401 -from synapse.events.snapshot import EventContext # noqa: F401 - -from prometheus_client import Counter logger = logging.getLogger(__name__) @@ -245,7 +245,7 @@ class EventsStore(EventsWorkerStore): partitioned.setdefault(event.room_id, []).append((event, ctx)) deferreds = [] - for room_id, evs_ctxs in partitioned.iteritems(): + for room_id, evs_ctxs in iteritems(partitioned): d = self._event_persist_queue.add_to_queue( room_id, evs_ctxs, backfilled=backfilled, @@ -330,7 +330,7 @@ class EventsStore(EventsWorkerStore): chunks = [ events_and_contexts[x:x + 100] - for x in xrange(0, len(events_and_contexts), 100) + for x in range(0, len(events_and_contexts), 100) ] for chunk in chunks: @@ -364,7 +364,7 @@ class EventsStore(EventsWorkerStore): (event, context) ) - for room_id, ev_ctx_rm in events_by_room.iteritems(): + for room_id, ev_ctx_rm in iteritems(events_by_room): # Work out new extremities by recursively adding and removing # the new events. latest_event_ids = yield self.get_latest_event_ids_in_room( @@ -459,12 +459,12 @@ class EventsStore(EventsWorkerStore): event_counter.labels(event.type, origin_type, origin_entity).inc() - for room_id, new_state in current_state_for_room.iteritems(): + for room_id, new_state in iteritems(current_state_for_room): self.get_current_state_ids.prefill( (room_id, ), new_state ) - for room_id, latest_event_ids in new_forward_extremeties.iteritems(): + for room_id, latest_event_ids in iteritems(new_forward_extremeties): self.get_latest_event_ids_in_room.prefill( (room_id,), list(latest_event_ids) ) @@ -641,20 +641,20 @@ class EventsStore(EventsWorkerStore): """ existing_state = yield self.get_current_state_ids(room_id) - existing_events = set(existing_state.itervalues()) - new_events = set(ev_id for ev_id in current_state.itervalues()) + existing_events = set(itervalues(existing_state)) + new_events = set(ev_id for ev_id in itervalues(current_state)) changed_events = existing_events ^ new_events if not changed_events: return to_delete = { - key: ev_id for key, ev_id in existing_state.iteritems() + key: ev_id for key, ev_id in iteritems(existing_state) if ev_id in changed_events } events_to_insert = (new_events - existing_events) to_insert = { - key: ev_id for key, ev_id in current_state.iteritems() + key: ev_id for key, ev_id in iteritems(current_state) if ev_id in events_to_insert } @@ -757,11 +757,11 @@ class EventsStore(EventsWorkerStore): ) def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): - for room_id, current_state_tuple in state_delta_by_room.iteritems(): + for room_id, current_state_tuple in iteritems(state_delta_by_room): to_delete, to_insert = current_state_tuple txn.executemany( "DELETE FROM current_state_events WHERE event_id = ?", - [(ev_id,) for ev_id in to_delete.itervalues()], + [(ev_id,) for ev_id in itervalues(to_delete)], ) self._simple_insert_many_txn( @@ -774,7 +774,7 @@ class EventsStore(EventsWorkerStore): "type": key[0], "state_key": key[1], } - for key, ev_id in to_insert.iteritems() + for key, ev_id in iteritems(to_insert) ], ) @@ -793,11 +793,12 @@ class EventsStore(EventsWorkerStore): "event_id": ev_id, "prev_event_id": to_delete.get(key, None), } - for key, ev_id in state_deltas.iteritems() + for key, ev_id in iteritems(state_deltas) ] ) - self._curr_state_delta_stream_cache.entity_has_changed( + txn.call_after( + self._curr_state_delta_stream_cache.entity_has_changed, room_id, max_stream_order, ) @@ -836,7 +837,7 @@ class EventsStore(EventsWorkerStore): def _update_forward_extremities_txn(self, txn, new_forward_extremities, max_stream_order): - for room_id, new_extrem in new_forward_extremities.iteritems(): + for room_id, new_extrem in iteritems(new_forward_extremities): self._simple_delete_txn( txn, table="event_forward_extremities", @@ -854,7 +855,7 @@ class EventsStore(EventsWorkerStore): "event_id": ev_id, "room_id": room_id, } - for room_id, new_extrem in new_forward_extremities.iteritems() + for room_id, new_extrem in iteritems(new_forward_extremities) for ev_id in new_extrem ], ) @@ -871,7 +872,7 @@ class EventsStore(EventsWorkerStore): "event_id": event_id, "stream_ordering": max_stream_order, } - for room_id, new_extrem in new_forward_extremities.iteritems() + for room_id, new_extrem in iteritems(new_forward_extremities) for event_id in new_extrem ] ) @@ -899,7 +900,7 @@ class EventsStore(EventsWorkerStore): new_events_and_contexts[event.event_id] = (event, context) else: new_events_and_contexts[event.event_id] = (event, context) - return new_events_and_contexts.values() + return list(new_events_and_contexts.values()) def _update_room_depths_txn(self, txn, events_and_contexts, backfilled): """Update min_depth for each room @@ -925,7 +926,7 @@ class EventsStore(EventsWorkerStore): event.depth, depth_updates.get(event.room_id, event.depth) ) - for room_id, depth in depth_updates.iteritems(): + for room_id, depth in iteritems(depth_updates): self._update_min_depth_for_room_txn(txn, room_id, depth) def _update_outliers_txn(self, txn, events_and_contexts): @@ -1041,7 +1042,6 @@ class EventsStore(EventsWorkerStore): "event_edge_hashes", "event_edges", "event_forward_extremities", - "event_push_actions", "event_reference_hashes", "event_search", "event_signatures", @@ -1061,6 +1061,14 @@ class EventsStore(EventsWorkerStore): [(ev.event_id,) for ev, _ in events_and_contexts] ) + for table in ( + "event_push_actions", + ): + txn.executemany( + "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,), + [(ev.event_id,) for ev, _ in events_and_contexts] + ) + def _store_event_txn(self, txn, events_and_contexts): """Insert new events into the event and event_json tables @@ -1309,7 +1317,7 @@ class EventsStore(EventsWorkerStore): " WHERE e.event_id IN (%s)" ) % (",".join(["?"] * len(ev_map)),) - txn.execute(sql, ev_map.keys()) + txn.execute(sql, list(ev_map)) rows = self.cursor_to_dict(txn) for row in rows: event = ev_map[row["event_id"]] @@ -1572,7 +1580,7 @@ class EventsStore(EventsWorkerStore): chunks = [ event_ids[i:i + 100] - for i in xrange(0, len(event_ids), 100) + for i in range(0, len(event_ids), 100) ] for chunk in chunks: ev_rows = self._simple_select_many_txn( @@ -1986,7 +1994,7 @@ class EventsStore(EventsWorkerStore): logger.info("[purge] finding state groups which depend on redundant" " state groups") remaining_state_groups = [] - for i in xrange(0, len(state_rows), 100): + for i in range(0, len(state_rows), 100): chunk = [sg for sg, in state_rows[i:i + 100]] # look for state groups whose prev_state_group is one we are about # to delete @@ -2042,7 +2050,7 @@ class EventsStore(EventsWorkerStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in curr_state.iteritems() + for key, state_id in iteritems(curr_state) ], ) |