diff options
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r-- | synapse/storage/events.py | 57 |
1 files changed, 30 insertions, 27 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index cb1082e864..4ff0fdc4ab 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -14,36 +14,34 @@ # See the License for the specific language governing permissions and # limitations under the License. -from collections import OrderedDict, deque, namedtuple -from functools import wraps import itertools import logging +from collections import OrderedDict, deque, namedtuple +from functools import wraps + +from six import iteritems, itervalues +from six.moves import range + +from canonicaljson import json +from prometheus_client import Counter -import simplejson as json from twisted.internet import defer +import synapse.metrics +from synapse.api.constants import EventTypes +from synapse.api.errors import SynapseError +# these are only included to make the type annotations work +from synapse.events import EventBase # noqa: F401 +from synapse.events.snapshot import EventContext # noqa: F401 +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.events_worker import EventsWorkerStore +from synapse.types import RoomStreamToken, get_domain_from_id from synapse.util.async import ObservableDeferred +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.frozenutils import frozendict_json_encoder -from synapse.util.logcontext import ( - PreserveLoggingContext, make_deferred_yieldable, -) +from synapse.util.logcontext import make_deferred_yieldable from synapse.util.logutils import log_function from synapse.util.metrics import Measure -from synapse.api.constants import EventTypes -from synapse.api.errors import SynapseError -from synapse.util.caches.descriptors import cached, cachedInlineCallbacks -from synapse.types import get_domain_from_id, RoomStreamToken -import synapse.metrics - -# these are only included to make the type annotations work -from synapse.events import EventBase # noqa: F401 -from synapse.events.snapshot import EventContext # noqa: F401 - -from six.moves import range -from six import itervalues, iteritems - -from prometheus_client import Counter logger = logging.getLogger(__name__) @@ -158,11 +156,8 @@ class _EventPeristenceQueue(object): self._event_persist_queues[room_id] = queue self._currently_persisting_rooms.discard(room_id) - # set handle_queue_loop off on the background. We don't want to - # attribute work done in it to the current request, so we drop the - # logcontext altogether. - with PreserveLoggingContext(): - handle_queue_loop() + # set handle_queue_loop off in the background + run_as_background_process("persist_events", handle_queue_loop) def _get_drainining_queue(self, room_id): queue = self._event_persist_queues.setdefault(room_id, deque()) @@ -800,7 +795,8 @@ class EventsStore(EventsWorkerStore): ] ) - self._curr_state_delta_stream_cache.entity_has_changed( + txn.call_after( + self._curr_state_delta_stream_cache.entity_has_changed, room_id, max_stream_order, ) @@ -1044,7 +1040,6 @@ class EventsStore(EventsWorkerStore): "event_edge_hashes", "event_edges", "event_forward_extremities", - "event_push_actions", "event_reference_hashes", "event_search", "event_signatures", @@ -1064,6 +1059,14 @@ class EventsStore(EventsWorkerStore): [(ev.event_id,) for ev, _ in events_and_contexts] ) + for table in ( + "event_push_actions", + ): + txn.executemany( + "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,), + [(ev.event_id,) for ev, _ in events_and_contexts] + ) + def _store_event_txn(self, txn, events_and_contexts): """Insert new events into the event and event_json tables |