diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index c2910094d0..19c05dc8d6 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -39,7 +39,7 @@ from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util.async import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder
-from synapse.util.logcontext import make_deferred_yieldable
+from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
@@ -147,7 +147,8 @@ class _EventPeristenceQueue(object):
# callbacks on the deferred.
try:
ret = yield per_item_callback(item)
- item.deferred.callback(ret)
+ with PreserveLoggingContext():
+ item.deferred.callback(ret)
except Exception:
item.deferred.errback()
finally:
@@ -417,18 +418,27 @@ class EventsStore(EventsWorkerStore):
logger.info(
"Calculating state delta for room %s", room_id,
)
- current_state = yield self._get_new_state_after_events(
- room_id,
- ev_ctx_rm,
- latest_event_ids,
- new_latest_event_ids,
- )
+
+ with Measure(
+ self._clock,
+ "persist_events.get_new_state_after_events",
+ ):
+ current_state = yield self._get_new_state_after_events(
+ room_id,
+ ev_ctx_rm,
+ latest_event_ids,
+ new_latest_event_ids,
+ )
+
if current_state is not None:
current_state_for_room[room_id] = current_state
- delta = yield self._calculate_state_delta(
- room_id, current_state,
- )
- if delta is not None:
+ with Measure(
+ self._clock,
+ "persist_events.calculate_state_delta",
+ ):
+ delta = yield self._calculate_state_delta(
+ room_id, current_state,
+ )
state_delta_for_room[room_id] = delta
yield self.runInteraction(
@@ -644,21 +654,14 @@ class EventsStore(EventsWorkerStore):
"""
existing_state = yield self.get_current_state_ids(room_id)
- existing_events = set(itervalues(existing_state))
- new_events = set(ev_id for ev_id in itervalues(current_state))
- changed_events = existing_events ^ new_events
-
- if not changed_events:
- return
-
to_delete = {
key: ev_id for key, ev_id in iteritems(existing_state)
- if ev_id in changed_events
+ if ev_id != current_state.get(key)
}
- events_to_insert = (new_events - existing_events)
+
to_insert = {
key: ev_id for key, ev_id in iteritems(current_state)
- if ev_id in events_to_insert
+ if ev_id != existing_state.get(key)
}
defer.returnValue((to_delete, to_insert))
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index af564b1b4e..6a5028961d 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -21,7 +21,6 @@ from canonicaljson import json
from twisted.internet import defer
-from synapse.api.constants import EventTypes
from synapse.push.baserules import list_with_base_rules
from synapse.storage.appservice import ApplicationServiceWorkerStore
from synapse.storage.pusher import PusherWorkerStore
@@ -250,18 +249,6 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore,
if uid in local_users_in_room:
user_ids.add(uid)
- forgotten = yield self.who_forgot_in_room(
- event.room_id, on_invalidate=cache_context.invalidate,
- )
-
- for row in forgotten:
- user_id = row["user_id"]
- event_id = row["event_id"]
-
- mem_id = current_state_ids.get((EventTypes.Member, user_id), None)
- if event_id == mem_id:
- user_ids.discard(user_id)
-
rules_by_user = yield self.bulk_get_push_rules(
user_ids, on_invalidate=cache_context.invalidate,
)
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index cc273a57b2..8443bd4c1b 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -233,7 +233,7 @@ class PusherStore(PusherWorkerStore):
)
if newly_inserted:
- self.runInteraction(
+ yield self.runInteraction(
"add_pusher",
self._invalidate_cache_and_stream,
self.get_if_user_has_pusher, (user_id,)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index a27702a7a0..01697ab2c9 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -461,18 +461,6 @@ class RoomMemberWorkerStore(EventsWorkerStore):
def _get_joined_hosts_cache(self, room_id):
return _JoinedHostsCache(self, room_id)
- @cached()
- def who_forgot_in_room(self, room_id):
- return self._simple_select_list(
- table="room_memberships",
- retcols=("user_id", "event_id"),
- keyvalues={
- "room_id": room_id,
- "forgotten": 1,
- },
- desc="who_forgot"
- )
-
class RoomMemberStore(RoomMemberWorkerStore):
def __init__(self, db_conn, hs):
@@ -581,9 +569,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
txn.execute(sql, (user_id, room_id))
txn.call_after(self.did_forget.invalidate, (user_id, room_id))
- self._invalidate_cache_and_stream(
- txn, self.who_forgot_in_room, (room_id,)
- )
return self.runInteraction("forget_membership", f)
@cachedInlineCallbacks(num_args=2)
|