diff options
author | Erik Johnston <erik@matrix.org> | 2016-10-05 14:41:07 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-10-05 14:41:07 +0100 |
commit | e779ee0ee21584b8907b866734376a737a8b6d2e (patch) | |
tree | cc27f059a354379346f9a21ff6b8b661a9407b65 /synapse/storage/events.py | |
parent | Merge branch 'release-v0.18.0' of github.com:matrix-org/synapse (diff) | |
parent | Bump changelog and version (diff) | |
download | synapse-e779ee0ee21584b8907b866734376a737a8b6d2e.tar.xz |
Merge branch 'release-v0.18.1' of github.com:matrix-org/synapse v0.18.1
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r-- | synapse/storage/events.py | 44 |
1 files changed, 29 insertions, 15 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 6dc46fa50f..6cf9d1176d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1355,39 +1355,53 @@ class EventsStore(SQLBaseStore): min_stream_id = rows[-1][0] event_ids = [row[1] for row in rows] - events = self._get_events_txn(txn, event_ids) + rows_to_update = [] - rows = [] - for event in events: - try: - event_id = event.event_id - origin_server_ts = event.origin_server_ts - except (KeyError, AttributeError): - # If the event is missing a necessary field then - # skip over it. - continue + chunks = [ + event_ids[i:i + 100] + for i in xrange(0, len(event_ids), 100) + ] + for chunk in chunks: + ev_rows = self._simple_select_many_txn( + txn, + table="event_json", + column="event_id", + iterable=chunk, + retcols=["event_id", "json"], + keyvalues={}, + ) - rows.append((origin_server_ts, event_id)) + for row in ev_rows: + event_id = row["event_id"] + event_json = json.loads(row["json"]) + try: + origin_server_ts = event_json["origin_server_ts"] + except (KeyError, AttributeError): + # If the event is missing a necessary field then + # skip over it. + continue + + rows_to_update.append((origin_server_ts, event_id)) sql = ( "UPDATE events SET origin_server_ts = ? WHERE event_id = ?" ) - for index in range(0, len(rows), INSERT_CLUMP_SIZE): - clump = rows[index:index + INSERT_CLUMP_SIZE] + for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE): + clump = rows_to_update[index:index + INSERT_CLUMP_SIZE] txn.executemany(sql, clump) progress = { "target_min_stream_id_inclusive": target_min_stream_id, "max_stream_id_exclusive": min_stream_id, - "rows_inserted": rows_inserted + len(rows) + "rows_inserted": rows_inserted + len(rows_to_update) } self._background_update_progress_txn( txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress ) - return len(rows) + return len(rows_to_update) result = yield self.runInteraction( self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn |