diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 68f39bd684..3cd3fbdc9b 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -241,7 +241,7 @@ class EventFederationStore(SQLBaseStore):
return int(min_depth) if min_depth is not None else None
- def _update_min_depth_for_room_txn(self, txn, room_id, depth):
+ def _update_min_depth_for_room_txn(self, txn, invalidates, room_id, depth):
min_depth = self._get_min_depth_interaction(txn, room_id)
do_insert = depth < min_depth if min_depth else True
@@ -256,8 +256,8 @@ class EventFederationStore(SQLBaseStore):
},
)
- def _handle_prev_events(self, txn, outlier, event_id, prev_events,
- room_id):
+ def _handle_prev_events(self, txn, invalidates, outlier, event_id,
+ prev_events, room_id):
"""
For the given event, update the event edges table and forward and
backward extremities tables.
@@ -330,7 +330,9 @@ class EventFederationStore(SQLBaseStore):
)
txn.execute(query)
- self.get_latest_event_ids_in_room.invalidate(room_id)
+ invalidates.append((
+ self.get_latest_event_ids_in_room.invalidate, room_id
+ ))
def get_backfill_events(self, room_id, event_list, limit):
"""Get a list of Events for a given topic that occurred before (and
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index a3c260ddc4..b2ab4b02f3 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -42,7 +42,7 @@ class EventsStore(SQLBaseStore):
stream_ordering = self.min_token
try:
- yield self.runInteraction(
+ invalidates = yield self.runInteraction(
"persist_event",
self._persist_event_txn,
event=event,
@@ -52,6 +52,11 @@ class EventsStore(SQLBaseStore):
is_new_state=is_new_state,
current_state=current_state,
)
+ for invalidated in invalidates:
+ invalidated_callback = invalidated[0]
+ invalidated_args = invalidated[1:]
+ invalidated_callback(*invalidated_args)
+
except _RollbackButIsFineException:
pass
@@ -91,9 +96,10 @@ class EventsStore(SQLBaseStore):
def _persist_event_txn(self, txn, event, context, backfilled,
stream_ordering=None, is_new_state=True,
current_state=None):
+ invalidates = []
# Remove the any existing cache entries for the event_id
- self._invalidate_get_event_cache(event.event_id)
+ invalidates.append((self._invalidate_get_event_cache, event.event_id))
if stream_ordering is None:
with self._stream_id_gen.get_next_txn(txn) as stream_ordering:
@@ -150,10 +156,11 @@ class EventsStore(SQLBaseStore):
outlier = event.internal_metadata.is_outlier()
if not outlier:
- self._store_state_groups_txn(txn, event, context)
+ self._store_state_groups_txn(txn, invalidates, event, context)
self._update_min_depth_for_room_txn(
txn,
+ invalidates,
event.room_id,
event.depth
)
@@ -199,6 +206,7 @@ class EventsStore(SQLBaseStore):
self._handle_prev_events(
txn,
+ invalidates,
outlier=outlier,
event_id=event.event_id,
prev_events=event.prev_events,
@@ -206,13 +214,13 @@ class EventsStore(SQLBaseStore):
)
if event.type == EventTypes.Member:
- self._store_room_member_txn(txn, event)
+ self._store_room_member_txn(txn, invalidates, event)
elif event.type == EventTypes.Name:
- self._store_room_name_txn(txn, event)
+ self._store_room_name_txn(txn, invalidates, event)
elif event.type == EventTypes.Topic:
- self._store_room_topic_txn(txn, event)
+ self._store_room_topic_txn(txn, invalidates, event)
elif event.type == EventTypes.Redaction:
- self._store_redaction(txn, event)
+ self._store_redaction(txn, invalidates, event)
event_dict = {
k: v
@@ -281,19 +289,22 @@ class EventsStore(SQLBaseStore):
)
if context.rejected:
- self._store_rejections_txn(txn, event.event_id, context.rejected)
+ self._store_rejections_txn(
+ txn, invalidates, event.event_id, context.rejected
+ )
for hash_alg, hash_base64 in event.hashes.items():
hash_bytes = decode_base64(hash_base64)
self._store_event_content_hash_txn(
- txn, event.event_id, hash_alg, hash_bytes,
+ txn, invalidates, event.event_id, hash_alg, hash_bytes,
)
for prev_event_id, prev_hashes in event.prev_events:
for alg, hash_base64 in prev_hashes.items():
hash_bytes = decode_base64(hash_base64)
self._store_prev_event_hash_txn(
- txn, event.event_id, prev_event_id, alg, hash_bytes
+ txn, invalidates, event.event_id, prev_event_id, alg,
+ hash_bytes
)
for auth_id, _ in event.auth_events:
@@ -309,7 +320,7 @@ class EventsStore(SQLBaseStore):
(ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
self._store_event_reference_hash_txn(
- txn, event.event_id, ref_alg, ref_hash_bytes
+ txn, invalidates, event.event_id, ref_alg, ref_hash_bytes
)
if event.is_state():
@@ -356,9 +367,11 @@ class EventsStore(SQLBaseStore):
}
)
- def _store_redaction(self, txn, event):
+ return invalidates
+
+ def _store_redaction(self, txn, invalidates, event):
# invalidate the cache for the redacted event
- self._invalidate_get_event_cache(event.redacts)
+ invalidates.append((self._invalidate_get_event_cache, event.redacts))
txn.execute(
"INSERT INTO redactions (event_id, redacts) VALUES (?,?)",
(event.event_id, event.redacts)
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index f956377632..d42d7ff0e3 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -162,7 +162,7 @@ class RoomStore(SQLBaseStore):
defer.returnValue(ret)
- def _store_room_topic_txn(self, txn, event):
+ def _store_room_topic_txn(self, txn, invalidates, event):
if hasattr(event, "content") and "topic" in event.content:
self._simple_insert_txn(
txn,
@@ -174,7 +174,7 @@ class RoomStore(SQLBaseStore):
},
)
- def _store_room_name_txn(self, txn, event):
+ def _store_room_name_txn(self, txn, invalidates, event):
if hasattr(event, "content") and "name" in event.content:
self._simple_insert_txn(
txn,
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 09fb77a194..117da817ba 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -35,7 +35,7 @@ RoomsForUser = namedtuple(
class RoomMemberStore(SQLBaseStore):
- def _store_room_member_txn(self, txn, event):
+ def _store_room_member_txn(self, txn, invalidates, event):
"""Store a room member in the database.
"""
try:
@@ -64,8 +64,10 @@ class RoomMemberStore(SQLBaseStore):
}
)
- self.get_rooms_for_user.invalidate(target_user_id)
- self.get_joined_hosts_for_room.invalidate(event.room_id)
+ invalidates.extend([
+ (self.get_rooms_for_user.invalidate, target_user_id),
+ (self.get_joined_hosts_for_room.invalidate, event.room_id),
+ ])
def get_room_member(self, user_id, room_id):
"""Retrieve the current state of a room member.
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index f051828630..e3979846e7 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -39,8 +39,8 @@ class SignatureStore(SQLBaseStore):
txn.execute(query, (event_id, ))
return dict(txn.fetchall())
- def _store_event_content_hash_txn(self, txn, event_id, algorithm,
- hash_bytes):
+ def _store_event_content_hash_txn(self, txn, invalidates, event_id,
+ algorithm, hash_bytes):
"""Store a hash for a Event
Args:
txn (cursor):
@@ -101,8 +101,8 @@ class SignatureStore(SQLBaseStore):
txn.execute(query, (event_id, ))
return {k: v for k, v in txn.fetchall()}
- def _store_event_reference_hash_txn(self, txn, event_id, algorithm,
- hash_bytes):
+ def _store_event_reference_hash_txn(self, txn, invalidates, event_id,
+ algorithm, hash_bytes):
"""Store a hash for a PDU
Args:
txn (cursor):
@@ -184,8 +184,8 @@ class SignatureStore(SQLBaseStore):
hashes[algorithm] = hash_bytes
return results
- def _store_prev_event_hash_txn(self, txn, event_id, prev_event_id,
- algorithm, hash_bytes):
+ def _store_prev_event_hash_txn(self, txn, invalidates, event_id,
+ prev_event_id, algorithm, hash_bytes):
self._simple_insert_txn(
txn,
"event_edge_hashes",
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 7e55e8bed6..35d11c27cc 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -82,7 +82,7 @@ class StateStore(SQLBaseStore):
f,
)
- def _store_state_groups_txn(self, txn, event, context):
+ def _store_state_groups_txn(self, txn, invalidates, event, context):
if context.current_state is None:
return
|