diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index c57385d92f..b457c5563f 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -16,7 +16,10 @@
import logging
from synapse.api.constants import EventTypes
-from synapse.replication.tcp.streams.events import EventsStreamEventRow
+from synapse.replication.tcp.streams.events import (
+ EventsStreamCurrentStateRow,
+ EventsStreamEventRow,
+)
from synapse.storage.event_federation import EventFederationWorkerStore
from synapse.storage.event_push_actions import EventPushActionsWorkerStore
from synapse.storage.events_worker import EventsWorkerStore
@@ -80,14 +83,7 @@ class SlavedEventStore(EventFederationWorkerStore,
if stream_name == "events":
self._stream_id_gen.advance(token)
for row in rows:
- if row.type != EventsStreamEventRow.TypeId:
- continue
- data = row.data
- self.invalidate_caches_for_event(
- token, data.event_id, data.room_id, data.type, data.state_key,
- data.redacts,
- backfilled=False,
- )
+ self._process_event_stream_row(token, row)
elif stream_name == "backfill":
self._backfill_id_gen.advance(-token)
for row in rows:
@@ -100,6 +96,23 @@ class SlavedEventStore(EventFederationWorkerStore,
stream_name, token, rows
)
+ def _process_event_stream_row(self, token, row):
+ data = row.data
+
+ if row.type == EventsStreamEventRow.TypeId:
+ self.invalidate_caches_for_event(
+ token, data.event_id, data.room_id, data.type, data.state_key,
+ data.redacts,
+ backfilled=False,
+ )
+ elif row.type == EventsStreamCurrentStateRow.TypeId:
+ if data.type == EventTypes.Member:
+ self.get_rooms_for_user_with_stream_ordering.invalidate(
+ (data.state_key, ),
+ )
+ else:
+ raise Exception("Unknown events stream row type %s" % (row.type, ))
+
def invalidate_caches_for_event(self, stream_ordering, event_id, room_id,
etype, state_key, redacts, backfilled):
self._invalidate_get_event_cache(event_id)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 7e3903859b..653b32fbf5 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -1355,11 +1355,6 @@ class SQLBaseStore(object):
members_changed (iterable[str]): The user_ids of members that have
changed
"""
- for member in members_changed:
- self._attempt_to_invalidate_cache(
- "get_rooms_for_user_with_stream_ordering", (member,),
- )
-
for host in set(get_domain_from_id(u) for u in members_changed):
self._attempt_to_invalidate_cache(
"is_host_joined", (room_id, host,),
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index d0668e39c4..dfda39bbe0 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -79,7 +79,7 @@ def encode_json(json_object):
"""
out = frozendict_json_encoder.encode(json_object)
if isinstance(out, bytes):
- out = out.decode('utf8')
+ out = out.decode("utf8")
return out
@@ -813,9 +813,10 @@ class EventsStore(
"""
all_events_and_contexts = events_and_contexts
+ min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
- self._update_current_state_txn(txn, state_delta_for_room, max_stream_order)
+ self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
self._update_forward_extremities_txn(
txn,
@@ -890,7 +891,7 @@ class EventsStore(
backfilled=backfilled,
)
- def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
+ def _update_current_state_txn(self, txn, state_delta_by_room, stream_id):
for room_id, current_state_tuple in iteritems(state_delta_by_room):
to_delete, to_insert = current_state_tuple
@@ -899,6 +900,12 @@ class EventsStore(
# that we can use it to calculate the `prev_event_id`. (This
# allows us to not have to pull out the existing state
# unnecessarily).
+ #
+ # The stream_id for the update is chosen to be the minimum of the stream_ids
+ # for the batch of the events that we are persisting; that means we do not
+ # end up in a situation where workers see events before the
+ # current_state_delta updates.
+ #
sql = """
INSERT INTO current_state_delta_stream
(stream_id, room_id, type, state_key, event_id, prev_event_id)
@@ -911,7 +918,7 @@ class EventsStore(
sql,
(
(
- max_stream_order,
+ stream_id,
room_id,
etype,
state_key,
@@ -929,7 +936,7 @@ class EventsStore(
sql,
(
(
- max_stream_order,
+ stream_id,
room_id,
etype,
state_key,
@@ -970,7 +977,7 @@ class EventsStore(
txn.call_after(
self._curr_state_delta_stream_cache.entity_has_changed,
room_id,
- max_stream_order,
+ stream_id,
)
# Invalidate the various caches
@@ -986,6 +993,11 @@ class EventsStore(
if ev_type == EventTypes.Member
)
+ for member in members_changed:
+ txn.call_after(
+ self.get_rooms_for_user_with_stream_ordering.invalidate, (member,)
+ )
+
self._invalidate_state_caches_and_stream(txn, room_id, members_changed)
def _update_forward_extremities_txn(
|