summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py24
1 files changed, 18 insertions, 6 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index d0668e39c4..dfda39bbe0 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -79,7 +79,7 @@ def encode_json(json_object):
     """
     out = frozendict_json_encoder.encode(json_object)
     if isinstance(out, bytes):
-        out = out.decode('utf8')
+        out = out.decode("utf8")
     return out
 
 
@@ -813,9 +813,10 @@ class EventsStore(
         """
         all_events_and_contexts = events_and_contexts
 
+        min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering
         max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
 
-        self._update_current_state_txn(txn, state_delta_for_room, max_stream_order)
+        self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
 
         self._update_forward_extremities_txn(
             txn,
@@ -890,7 +891,7 @@ class EventsStore(
             backfilled=backfilled,
         )
 
-    def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
+    def _update_current_state_txn(self, txn, state_delta_by_room, stream_id):
         for room_id, current_state_tuple in iteritems(state_delta_by_room):
             to_delete, to_insert = current_state_tuple
 
@@ -899,6 +900,12 @@ class EventsStore(
             # that we can use it to calculate the `prev_event_id`. (This
             # allows us to not have to pull out the existing state
             # unnecessarily).
+            #
+            # The stream_id for the update is chosen to be the minimum of the stream_ids
+            # for the batch of the events that we are persisting; that means we do not
+            # end up in a situation where workers see events before the
+            # current_state_delta updates.
+            #
             sql = """
                 INSERT INTO current_state_delta_stream
                 (stream_id, room_id, type, state_key, event_id, prev_event_id)
@@ -911,7 +918,7 @@ class EventsStore(
                 sql,
                 (
                     (
-                        max_stream_order,
+                        stream_id,
                         room_id,
                         etype,
                         state_key,
@@ -929,7 +936,7 @@ class EventsStore(
                 sql,
                 (
                     (
-                        max_stream_order,
+                        stream_id,
                         room_id,
                         etype,
                         state_key,
@@ -970,7 +977,7 @@ class EventsStore(
             txn.call_after(
                 self._curr_state_delta_stream_cache.entity_has_changed,
                 room_id,
-                max_stream_order,
+                stream_id,
             )
 
             # Invalidate the various caches
@@ -986,6 +993,11 @@ class EventsStore(
                 if ev_type == EventTypes.Member
             )
 
+            for member in members_changed:
+                txn.call_after(
+                    self.get_rooms_for_user_with_stream_ordering.invalidate, (member,)
+                )
+
             self._invalidate_state_caches_and_stream(txn, room_id, members_changed)
 
     def _update_forward_extremities_txn(