diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ed182c8d11..49aeb953bd 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -189,6 +189,14 @@ class EventsStore(SQLBaseStore):
self._background_reindex_fields_sender,
)
+ self.register_background_index_update(
+ "event_contains_url_index",
+ index_name="event_contains_url_index",
+ table="events",
+ columns=["room_id", "topological_ordering", "stream_ordering"],
+ where_clause="contains_url = true AND outlier = false",
+ )
+
self._event_persist_queue = _EventPeristenceQueue()
def persist_events(self, events_and_contexts, backfilled=False):
@@ -595,7 +603,6 @@ class EventsStore(SQLBaseStore):
"rejections",
"redactions",
"room_memberships",
- "state_events",
"topics"
):
txn.executemany(
@@ -1347,39 +1354,53 @@ class EventsStore(SQLBaseStore):
min_stream_id = rows[-1][0]
event_ids = [row[1] for row in rows]
- events = self._get_events_txn(txn, event_ids)
+ rows_to_update = []
- rows = []
- for event in events:
- try:
- event_id = event.event_id
- origin_server_ts = event.origin_server_ts
- except (KeyError, AttributeError):
- # If the event is missing a necessary field then
- # skip over it.
- continue
+ chunks = [
+ event_ids[i:i + 100]
+ for i in xrange(0, len(event_ids), 100)
+ ]
+ for chunk in chunks:
+ ev_rows = self._simple_select_many_txn(
+ txn,
+ table="event_json",
+ column="event_id",
+ iterable=chunk,
+ retcols=["event_id", "json"],
+ keyvalues={},
+ )
- rows.append((origin_server_ts, event_id))
+ for row in ev_rows:
+ event_id = row["event_id"]
+ event_json = json.loads(row["json"])
+ try:
+ origin_server_ts = event_json["origin_server_ts"]
+ except (KeyError, AttributeError):
+ # If the event is missing a necessary field then
+ # skip over it.
+ continue
+
+ rows_to_update.append((origin_server_ts, event_id))
sql = (
"UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
)
- for index in range(0, len(rows), INSERT_CLUMP_SIZE):
- clump = rows[index:index + INSERT_CLUMP_SIZE]
+ for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
+ clump = rows_to_update[index:index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump)
progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
"max_stream_id_exclusive": min_stream_id,
- "rows_inserted": rows_inserted + len(rows)
+ "rows_inserted": rows_inserted + len(rows_to_update)
}
self._background_update_progress_txn(
txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
)
- return len(rows)
+ return len(rows_to_update)
result = yield self.runInteraction(
self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
|