diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index cb1082e864..4ff0fdc4ab 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -14,36 +14,34 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from collections import OrderedDict, deque, namedtuple
-from functools import wraps
import itertools
import logging
+from collections import OrderedDict, deque, namedtuple
+from functools import wraps
+
+from six import iteritems, itervalues
+from six.moves import range
+
+from canonicaljson import json
+from prometheus_client import Counter
-import simplejson as json
from twisted.internet import defer
+import synapse.metrics
+from synapse.api.constants import EventTypes
+from synapse.api.errors import SynapseError
+# these are only included to make the type annotations work
+from synapse.events import EventBase # noqa: F401
+from synapse.events.snapshot import EventContext # noqa: F401
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.events_worker import EventsWorkerStore
+from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util.async import ObservableDeferred
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder
-from synapse.util.logcontext import (
- PreserveLoggingContext, make_deferred_yieldable,
-)
+from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
-from synapse.api.constants import EventTypes
-from synapse.api.errors import SynapseError
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-from synapse.types import get_domain_from_id, RoomStreamToken
-import synapse.metrics
-
-# these are only included to make the type annotations work
-from synapse.events import EventBase # noqa: F401
-from synapse.events.snapshot import EventContext # noqa: F401
-
-from six.moves import range
-from six import itervalues, iteritems
-
-from prometheus_client import Counter
logger = logging.getLogger(__name__)
@@ -158,11 +156,8 @@ class _EventPeristenceQueue(object):
self._event_persist_queues[room_id] = queue
self._currently_persisting_rooms.discard(room_id)
- # set handle_queue_loop off on the background. We don't want to
- # attribute work done in it to the current request, so we drop the
- # logcontext altogether.
- with PreserveLoggingContext():
- handle_queue_loop()
+ # set handle_queue_loop off in the background
+ run_as_background_process("persist_events", handle_queue_loop)
def _get_drainining_queue(self, room_id):
queue = self._event_persist_queues.setdefault(room_id, deque())
@@ -800,7 +795,8 @@ class EventsStore(EventsWorkerStore):
]
)
- self._curr_state_delta_stream_cache.entity_has_changed(
+ txn.call_after(
+ self._curr_state_delta_stream_cache.entity_has_changed,
room_id, max_stream_order,
)
@@ -1044,7 +1040,6 @@ class EventsStore(EventsWorkerStore):
"event_edge_hashes",
"event_edges",
"event_forward_extremities",
- "event_push_actions",
"event_reference_hashes",
"event_search",
"event_signatures",
@@ -1064,6 +1059,14 @@ class EventsStore(EventsWorkerStore):
[(ev.event_id,) for ev, _ in events_and_contexts]
)
+ for table in (
+ "event_push_actions",
+ ):
+ txn.executemany(
+ "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
+ [(ev.event_id,) for ev, _ in events_and_contexts]
+ )
+
def _store_event_txn(self, txn, events_and_contexts):
"""Insert new events into the event and event_json tables
|