diff options
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r-- | synapse/storage/events.py | 24 |
1 files changed, 18 insertions, 6 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d0668e39c4..dfda39bbe0 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -79,7 +79,7 @@ def encode_json(json_object): """ out = frozendict_json_encoder.encode(json_object) if isinstance(out, bytes): - out = out.decode('utf8') + out = out.decode("utf8") return out @@ -813,9 +813,10 @@ class EventsStore( """ all_events_and_contexts = events_and_contexts + min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering - self._update_current_state_txn(txn, state_delta_for_room, max_stream_order) + self._update_current_state_txn(txn, state_delta_for_room, min_stream_order) self._update_forward_extremities_txn( txn, @@ -890,7 +891,7 @@ class EventsStore( backfilled=backfilled, ) - def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): + def _update_current_state_txn(self, txn, state_delta_by_room, stream_id): for room_id, current_state_tuple in iteritems(state_delta_by_room): to_delete, to_insert = current_state_tuple @@ -899,6 +900,12 @@ class EventsStore( # that we can use it to calculate the `prev_event_id`. (This # allows us to not have to pull out the existing state # unnecessarily). + # + # The stream_id for the update is chosen to be the minimum of the stream_ids + # for the batch of the events that we are persisting; that means we do not + # end up in a situation where workers see events before the + # current_state_delta updates. + # sql = """ INSERT INTO current_state_delta_stream (stream_id, room_id, type, state_key, event_id, prev_event_id) @@ -911,7 +918,7 @@ class EventsStore( sql, ( ( - max_stream_order, + stream_id, room_id, etype, state_key, @@ -929,7 +936,7 @@ class EventsStore( sql, ( ( - max_stream_order, + stream_id, room_id, etype, state_key, @@ -970,7 +977,7 @@ class EventsStore( txn.call_after( self._curr_state_delta_stream_cache.entity_has_changed, room_id, - max_stream_order, + stream_id, ) # Invalidate the various caches @@ -986,6 +993,11 @@ class EventsStore( if ev_type == EventTypes.Member ) + for member in members_changed: + txn.call_after( + self.get_rooms_for_user_with_stream_ordering.invalidate, (member,) + ) + self._invalidate_state_caches_and_stream(txn, room_id, members_changed) def _update_forward_extremities_txn( |