summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py169
1 files changed, 79 insertions, 90 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2047110b1d..06db9e56e6 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -739,7 +739,18 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
         }
 
         events_map = {ev.event_id: ev for ev, _ in events_context}
-        room_version = yield self.get_room_version(room_id)
+
+        # We need to get the room version, which is in the create event.
+        # Normally that'd be in the database, but its also possible that we're
+        # currently trying to persist it.
+        room_version = None
+        for ev, _ in events_context:
+            if ev.type == EventTypes.Create and ev.state_key == "":
+                room_version = ev.content.get("room_version", "1")
+                break
+
+        if not room_version:
+            room_version = yield self.get_room_version(room_id)
 
         logger.debug("calling resolve_state_groups from preserve_events")
         res = yield self._state_resolution_handler.resolve_state_groups(
@@ -893,105 +904,82 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
 
     def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
         for room_id, current_state_tuple in iteritems(state_delta_by_room):
-                to_delete, to_insert = current_state_tuple
-
-                # First we add entries to the current_state_delta_stream. We
-                # do this before updating the current_state_events table so
-                # that we can use it to calculate the `prev_event_id`. (This
-                # allows us to not have to pull out the existing state
-                # unnecessarily).
-                sql = """
-                    INSERT INTO current_state_delta_stream
-                    (stream_id, room_id, type, state_key, event_id, prev_event_id)
-                    SELECT ?, ?, ?, ?, ?, (
-                        SELECT event_id FROM current_state_events
-                        WHERE room_id = ? AND type = ? AND state_key = ?
-                    )
-                """
-                txn.executemany(sql, (
-                    (
-                        max_stream_order, room_id, etype, state_key, None,
-                        room_id, etype, state_key,
-                    )
-                    for etype, state_key in to_delete
-                    # We sanity check that we're deleting rather than updating
-                    if (etype, state_key) not in to_insert
-                ))
-                txn.executemany(sql, (
-                    (
-                        max_stream_order, room_id, etype, state_key, ev_id,
-                        room_id, etype, state_key,
-                    )
-                    for (etype, state_key), ev_id in iteritems(to_insert)
-                ))
-
-                # Now we actually update the current_state_events table
+            to_delete, to_insert = current_state_tuple
 
-                txn.executemany(
-                    "DELETE FROM current_state_events"
-                    " WHERE room_id = ? AND type = ? AND state_key = ?",
-                    (
-                        (room_id, etype, state_key)
-                        for etype, state_key in itertools.chain(to_delete, to_insert)
-                    ),
+            # First we add entries to the current_state_delta_stream. We
+            # do this before updating the current_state_events table so
+            # that we can use it to calculate the `prev_event_id`. (This
+            # allows us to not have to pull out the existing state
+            # unnecessarily).
+            sql = """
+                INSERT INTO current_state_delta_stream
+                (stream_id, room_id, type, state_key, event_id, prev_event_id)
+                SELECT ?, ?, ?, ?, ?, (
+                    SELECT event_id FROM current_state_events
+                    WHERE room_id = ? AND type = ? AND state_key = ?
                 )
-
-                self._simple_insert_many_txn(
-                    txn,
-                    table="current_state_events",
-                    values=[
-                        {
-                            "event_id": ev_id,
-                            "room_id": room_id,
-                            "type": key[0],
-                            "state_key": key[1],
-                        }
-                        for key, ev_id in iteritems(to_insert)
-                    ],
+            """
+            txn.executemany(sql, (
+                (
+                    max_stream_order, room_id, etype, state_key, None,
+                    room_id, etype, state_key,
                 )
-
-                txn.call_after(
-                    self._curr_state_delta_stream_cache.entity_has_changed,
-                    room_id, max_stream_order,
+                for etype, state_key in to_delete
+                # We sanity check that we're deleting rather than updating
+                if (etype, state_key) not in to_insert
+            ))
+            txn.executemany(sql, (
+                (
+                    max_stream_order, room_id, etype, state_key, ev_id,
+                    room_id, etype, state_key,
                 )
+                for (etype, state_key), ev_id in iteritems(to_insert)
+            ))
 
-                # Invalidate the various caches
-
-                # Figure out the changes of membership to invalidate the
-                # `get_rooms_for_user` cache.
-                # We find out which membership events we may have deleted
-                # and which we have added, then we invlidate the caches for all
-                # those users.
-                members_changed = set(
-                    state_key
-                    for ev_type, state_key in itertools.chain(to_delete, to_insert)
-                    if ev_type == EventTypes.Member
-                )
+            # Now we actually update the current_state_events table
 
-                for member in members_changed:
-                    self._invalidate_cache_and_stream(
-                        txn, self.get_rooms_for_user_with_stream_ordering, (member,)
-                    )
+            txn.executemany(
+                "DELETE FROM current_state_events"
+                " WHERE room_id = ? AND type = ? AND state_key = ?",
+                (
+                    (room_id, etype, state_key)
+                    for etype, state_key in itertools.chain(to_delete, to_insert)
+                ),
+            )
 
-                for host in set(get_domain_from_id(u) for u in members_changed):
-                    self._invalidate_cache_and_stream(
-                        txn, self.is_host_joined, (room_id, host)
-                    )
-                    self._invalidate_cache_and_stream(
-                        txn, self.was_host_joined, (room_id, host)
-                    )
+            self._simple_insert_many_txn(
+                txn,
+                table="current_state_events",
+                values=[
+                    {
+                        "event_id": ev_id,
+                        "room_id": room_id,
+                        "type": key[0],
+                        "state_key": key[1],
+                    }
+                    for key, ev_id in iteritems(to_insert)
+                ],
+            )
 
-                self._invalidate_cache_and_stream(
-                    txn, self.get_users_in_room, (room_id,)
-                )
+            txn.call_after(
+                self._curr_state_delta_stream_cache.entity_has_changed,
+                room_id, max_stream_order,
+            )
 
-                self._invalidate_cache_and_stream(
-                    txn, self.get_room_summary, (room_id,)
-                )
+            # Invalidate the various caches
+
+            # Figure out the changes of membership to invalidate the
+            # `get_rooms_for_user` cache.
+            # We find out which membership events we may have deleted
+            # and which we have added, then we invlidate the caches for all
+            # those users.
+            members_changed = set(
+                state_key
+                for ev_type, state_key in itertools.chain(to_delete, to_insert)
+                if ev_type == EventTypes.Member
+            )
 
-                self._invalidate_cache_and_stream(
-                    txn, self.get_current_state_ids, (room_id,)
-                )
+            self._invalidate_state_caches_and_stream(txn, room_id, members_changed)
 
     def _update_forward_extremities_txn(self, txn, new_forward_extremities,
                                         max_stream_order):
@@ -1257,6 +1245,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
                         event.internal_metadata.get_dict()
                     ),
                     "json": encode_json(event_dict(event)),
+                    "format_version": event.format_version,
                 }
                 for event, _ in events_and_contexts
             ],