summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py57
1 files changed, 30 insertions, 27 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index cb1082e864..4ff0fdc4ab 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -14,36 +14,34 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from collections import OrderedDict, deque, namedtuple
-from functools import wraps
 import itertools
 import logging
+from collections import OrderedDict, deque, namedtuple
+from functools import wraps
+
+from six import iteritems, itervalues
+from six.moves import range
+
+from canonicaljson import json
+from prometheus_client import Counter
 
-import simplejson as json
 from twisted.internet import defer
 
+import synapse.metrics
+from synapse.api.constants import EventTypes
+from synapse.api.errors import SynapseError
+# these are only included to make the type annotations work
+from synapse.events import EventBase  # noqa: F401
+from synapse.events.snapshot import EventContext  # noqa: F401
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.events_worker import EventsWorkerStore
+from synapse.types import RoomStreamToken, get_domain_from_id
 from synapse.util.async import ObservableDeferred
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 from synapse.util.frozenutils import frozendict_json_encoder
-from synapse.util.logcontext import (
-    PreserveLoggingContext, make_deferred_yieldable,
-)
+from synapse.util.logcontext import make_deferred_yieldable
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
-from synapse.api.constants import EventTypes
-from synapse.api.errors import SynapseError
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-from synapse.types import get_domain_from_id, RoomStreamToken
-import synapse.metrics
-
-# these are only included to make the type annotations work
-from synapse.events import EventBase    # noqa: F401
-from synapse.events.snapshot import EventContext   # noqa: F401
-
-from six.moves import range
-from six import itervalues, iteritems
-
-from prometheus_client import Counter
 
 logger = logging.getLogger(__name__)
 
@@ -158,11 +156,8 @@ class _EventPeristenceQueue(object):
                     self._event_persist_queues[room_id] = queue
                 self._currently_persisting_rooms.discard(room_id)
 
-        # set handle_queue_loop off on the background. We don't want to
-        # attribute work done in it to the current request, so we drop the
-        # logcontext altogether.
-        with PreserveLoggingContext():
-            handle_queue_loop()
+        # set handle_queue_loop off in the background
+        run_as_background_process("persist_events", handle_queue_loop)
 
     def _get_drainining_queue(self, room_id):
         queue = self._event_persist_queues.setdefault(room_id, deque())
@@ -800,7 +795,8 @@ class EventsStore(EventsWorkerStore):
                     ]
                 )
 
-                self._curr_state_delta_stream_cache.entity_has_changed(
+                txn.call_after(
+                    self._curr_state_delta_stream_cache.entity_has_changed,
                     room_id, max_stream_order,
                 )
 
@@ -1044,7 +1040,6 @@ class EventsStore(EventsWorkerStore):
                 "event_edge_hashes",
                 "event_edges",
                 "event_forward_extremities",
-                "event_push_actions",
                 "event_reference_hashes",
                 "event_search",
                 "event_signatures",
@@ -1064,6 +1059,14 @@ class EventsStore(EventsWorkerStore):
                 [(ev.event_id,) for ev, _ in events_and_contexts]
             )
 
+        for table in (
+            "event_push_actions",
+        ):
+            txn.executemany(
+                "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
+                [(ev.event_id,) for ev, _ in events_and_contexts]
+            )
+
     def _store_event_txn(self, txn, events_and_contexts):
         """Insert new events into the event and event_json tables