summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2016-09-29 13:53:48 +0100
committerGitHub <noreply@github.com>2016-09-29 13:53:48 +0100
commit5875a652530ac06719d793bda3fb396ad698fdf2 (patch)
tree67ce749c63bf804537978ba9984a6492ab0558ac /synapse
parentMerge pull request #1146 from matrix-org/erikj/port_script_fix (diff)
parentFix background reindex of origin_server_ts (diff)
downloadsynapse-5875a652530ac06719d793bda3fb396ad698fdf2.tar.xz
Merge pull request #1145 from matrix-org/erikj/fix_reindex
Fix background reindex of origin_server_ts
Diffstat (limited to '')
-rw-r--r--synapse/storage/events.py44
1 files changed, 29 insertions, 15 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 6dc46fa50f..6cf9d1176d 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1355,39 +1355,53 @@ class EventsStore(SQLBaseStore):
             min_stream_id = rows[-1][0]
             event_ids = [row[1] for row in rows]
 
-            events = self._get_events_txn(txn, event_ids)
+            rows_to_update = []
 
-            rows = []
-            for event in events:
-                try:
-                    event_id = event.event_id
-                    origin_server_ts = event.origin_server_ts
-                except (KeyError, AttributeError):
-                    # If the event is missing a necessary field then
-                    # skip over it.
-                    continue
+            chunks = [
+                event_ids[i:i + 100]
+                for i in xrange(0, len(event_ids), 100)
+            ]
+            for chunk in chunks:
+                ev_rows = self._simple_select_many_txn(
+                    txn,
+                    table="event_json",
+                    column="event_id",
+                    iterable=chunk,
+                    retcols=["event_id", "json"],
+                    keyvalues={},
+                )
 
-                rows.append((origin_server_ts, event_id))
+                for row in ev_rows:
+                    event_id = row["event_id"]
+                    event_json = json.loads(row["json"])
+                    try:
+                        origin_server_ts = event_json["origin_server_ts"]
+                    except (KeyError, AttributeError):
+                        # If the event is missing a necessary field then
+                        # skip over it.
+                        continue
+
+                    rows_to_update.append((origin_server_ts, event_id))
 
             sql = (
                 "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
             )
 
-            for index in range(0, len(rows), INSERT_CLUMP_SIZE):
-                clump = rows[index:index + INSERT_CLUMP_SIZE]
+            for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
+                clump = rows_to_update[index:index + INSERT_CLUMP_SIZE]
                 txn.executemany(sql, clump)
 
             progress = {
                 "target_min_stream_id_inclusive": target_min_stream_id,
                 "max_stream_id_exclusive": min_stream_id,
-                "rows_inserted": rows_inserted + len(rows)
+                "rows_inserted": rows_inserted + len(rows_to_update)
             }
 
             self._background_update_progress_txn(
                 txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
             )
 
-            return len(rows)
+            return len(rows_to_update)
 
         result = yield self.runInteraction(
             self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn