diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 7e3903859b..653b32fbf5 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -1355,11 +1355,6 @@ class SQLBaseStore(object):
members_changed (iterable[str]): The user_ids of members that have
changed
"""
- for member in members_changed:
- self._attempt_to_invalidate_cache(
- "get_rooms_for_user_with_stream_ordering", (member,),
- )
-
for host in set(get_domain_from_id(u) for u in members_changed):
self._attempt_to_invalidate_cache(
"is_host_joined", (room_id, host,),
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index d0668e39c4..dfda39bbe0 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -79,7 +79,7 @@ def encode_json(json_object):
"""
out = frozendict_json_encoder.encode(json_object)
if isinstance(out, bytes):
- out = out.decode('utf8')
+ out = out.decode("utf8")
return out
@@ -813,9 +813,10 @@ class EventsStore(
"""
all_events_and_contexts = events_and_contexts
+ min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
- self._update_current_state_txn(txn, state_delta_for_room, max_stream_order)
+ self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
self._update_forward_extremities_txn(
txn,
@@ -890,7 +891,7 @@ class EventsStore(
backfilled=backfilled,
)
- def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
+ def _update_current_state_txn(self, txn, state_delta_by_room, stream_id):
for room_id, current_state_tuple in iteritems(state_delta_by_room):
to_delete, to_insert = current_state_tuple
@@ -899,6 +900,12 @@ class EventsStore(
# that we can use it to calculate the `prev_event_id`. (This
# allows us to not have to pull out the existing state
# unnecessarily).
+ #
+ # The stream_id for the update is chosen to be the minimum of the stream_ids
+ # for the batch of the events that we are persisting; that means we do not
+ # end up in a situation where workers see events before the
+ # current_state_delta updates.
+ #
sql = """
INSERT INTO current_state_delta_stream
(stream_id, room_id, type, state_key, event_id, prev_event_id)
@@ -911,7 +918,7 @@ class EventsStore(
sql,
(
(
- max_stream_order,
+ stream_id,
room_id,
etype,
state_key,
@@ -929,7 +936,7 @@ class EventsStore(
sql,
(
(
- max_stream_order,
+ stream_id,
room_id,
etype,
state_key,
@@ -970,7 +977,7 @@ class EventsStore(
txn.call_after(
self._curr_state_delta_stream_cache.entity_has_changed,
room_id,
- max_stream_order,
+ stream_id,
)
# Invalidate the various caches
@@ -986,6 +993,11 @@ class EventsStore(
if ev_type == EventTypes.Member
)
+ for member in members_changed:
+ txn.call_after(
+ self.get_rooms_for_user_with_stream_ordering.invalidate, (member,)
+ )
+
self._invalidate_state_caches_and_stream(txn, room_id, members_changed)
def _update_forward_extremities_txn(
|