summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/event_federation.py9
-rw-r--r--synapse/storage/events.py44
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/schema/delta/36/readd_public_rooms.sql26
-rw-r--r--synapse/storage/state.py52
5 files changed, 89 insertions, 44 deletions
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 3d62451de9..53feaa1960 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -398,12 +398,11 @@ class EventFederationStore(SQLBaseStore):
             sql = ("""
                 DELETE FROM stream_ordering_to_exterm
                 WHERE
-                (
-                    SELECT max(stream_ordering) AS stream_ordering
+                room_id IN (
+                    SELECT room_id
                     FROM stream_ordering_to_exterm
-                    WHERE room_id = stream_ordering_to_exterm.room_id
-                ) > ?
-                AND stream_ordering < ?
+                    WHERE stream_ordering > ?
+                ) AND stream_ordering < ?
             """)
             txn.execute(
                 sql,
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 6dc46fa50f..6cf9d1176d 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1355,39 +1355,53 @@ class EventsStore(SQLBaseStore):
             min_stream_id = rows[-1][0]
             event_ids = [row[1] for row in rows]
 
-            events = self._get_events_txn(txn, event_ids)
+            rows_to_update = []
 
-            rows = []
-            for event in events:
-                try:
-                    event_id = event.event_id
-                    origin_server_ts = event.origin_server_ts
-                except (KeyError, AttributeError):
-                    # If the event is missing a necessary field then
-                    # skip over it.
-                    continue
+            chunks = [
+                event_ids[i:i + 100]
+                for i in xrange(0, len(event_ids), 100)
+            ]
+            for chunk in chunks:
+                ev_rows = self._simple_select_many_txn(
+                    txn,
+                    table="event_json",
+                    column="event_id",
+                    iterable=chunk,
+                    retcols=["event_id", "json"],
+                    keyvalues={},
+                )
 
-                rows.append((origin_server_ts, event_id))
+                for row in ev_rows:
+                    event_id = row["event_id"]
+                    event_json = json.loads(row["json"])
+                    try:
+                        origin_server_ts = event_json["origin_server_ts"]
+                    except (KeyError, AttributeError):
+                        # If the event is missing a necessary field then
+                        # skip over it.
+                        continue
+
+                    rows_to_update.append((origin_server_ts, event_id))
 
             sql = (
                 "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
             )
 
-            for index in range(0, len(rows), INSERT_CLUMP_SIZE):
-                clump = rows[index:index + INSERT_CLUMP_SIZE]
+            for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
+                clump = rows_to_update[index:index + INSERT_CLUMP_SIZE]
                 txn.executemany(sql, clump)
 
             progress = {
                 "target_min_stream_id_inclusive": target_min_stream_id,
                 "max_stream_id_exclusive": min_stream_id,
-                "rows_inserted": rows_inserted + len(rows)
+                "rows_inserted": rows_inserted + len(rows_to_update)
             }
 
             self._background_update_progress_txn(
                 txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
             )
 
-            return len(rows)
+            return len(rows_to_update)
 
         result = yield self.runInteraction(
             self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 7efbe51cda..08de3cc4c1 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 35
+SCHEMA_VERSION = 36
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
diff --git a/synapse/storage/schema/delta/36/readd_public_rooms.sql b/synapse/storage/schema/delta/36/readd_public_rooms.sql
new file mode 100644
index 0000000000..90d8fd18f9
--- /dev/null
+++ b/synapse/storage/schema/delta/36/readd_public_rooms.sql
@@ -0,0 +1,26 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Re-add some entries to stream_ordering_to_exterm that were incorrectly deleted
+INSERT INTO stream_ordering_to_exterm (stream_ordering, room_id, event_id)
+    SELECT
+        (SELECT stream_ordering FROM events where event_id = e.event_id) AS stream_ordering,
+        room_id,
+        event_id
+    FROM event_forward_extremities AS e
+    WHERE NOT EXISTS (
+        SELECT room_id FROM stream_ordering_to_exterm AS s
+        WHERE s.room_id = e.room_id
+    );
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 7eb342674c..49abf0ac74 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -307,6 +307,9 @@ class StateStore(SQLBaseStore):
 
     def _get_state_groups_from_groups_txn(self, txn, groups, types=None):
         results = {group: {} for group in groups}
+        if types is not None:
+            types = list(set(types))  # deduplicate types list
+
         if isinstance(self.database_engine, PostgresEngine):
             # Temporarily disable sequential scans in this transaction. This is
             # a temporary hack until we can add the right indices in
@@ -375,10 +378,35 @@ class StateStore(SQLBaseStore):
             # We don't use WITH RECURSIVE on sqlite3 as there are distributions
             # that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
             for group in groups:
-                group_tree = [group]
                 next_group = group
 
                 while next_group:
+                    # We did this before by getting the list of group ids, and
+                    # then passing that list to sqlite to get latest event for
+                    # each (type, state_key). However, that was terribly slow
+                    # without the right indicies (which we can't add until
+                    # after we finish deduping state, which requires this func)
+                    args = [next_group]
+                    if types:
+                        args.extend(i for typ in types for i in typ)
+
+                    txn.execute(
+                        "SELECT type, state_key, event_id FROM state_groups_state"
+                        " WHERE state_group = ? %s" % (where_clause,),
+                        args
+                    )
+                    rows = txn.fetchall()
+                    results[group].update({
+                        (typ, state_key): event_id
+                        for typ, state_key, event_id in rows
+                        if (typ, state_key) not in results[group]
+                    })
+
+                    # If the lengths match then we must have all the types,
+                    # so no need to go walk further down the tree.
+                    if types is not None and len(results[group]) == len(types):
+                        break
+
                     next_group = self._simple_select_one_onecol_txn(
                         txn,
                         table="state_group_edges",
@@ -386,28 +414,6 @@ class StateStore(SQLBaseStore):
                         retcol="prev_state_group",
                         allow_none=True,
                     )
-                    if next_group:
-                        group_tree.append(next_group)
-
-                sql = ("""
-                    SELECT type, state_key, event_id FROM state_groups_state
-                    INNER JOIN (
-                        SELECT type, state_key, max(state_group) as state_group
-                        FROM state_groups_state
-                        WHERE state_group IN (%s) %s
-                        GROUP BY type, state_key
-                    ) USING (type, state_key, state_group);
-                """) % (",".join("?" for _ in group_tree), where_clause,)
-
-                args = list(group_tree)
-                if types is not None:
-                    args.extend([i for typ in types for i in typ])
-
-                txn.execute(sql, args)
-                rows = self.cursor_to_dict(txn)
-                for row in rows:
-                    key = (row["type"], row["state_key"])
-                    results[group][key] = row["event_id"]
 
         return results