diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5d35ca90b9..7088f2709b 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -51,6 +51,14 @@ EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events
class EventsStore(SQLBaseStore):
+ EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
+
+ def __init__(self, hs):
+ super(EventsStore, self).__init__(hs)
+ self.register_background_update_handler(
+ self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
+ )
+
@defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False,
is_new_state=True):
@@ -365,6 +373,7 @@ class EventsStore(SQLBaseStore):
"processed": True,
"outlier": event.internal_metadata.is_outlier(),
"content": encode_json(event.content).decode("UTF-8"),
+ "origin_server_ts": int(event.origin_server_ts),
}
for event, _ in events_and_contexts
],
@@ -964,3 +973,71 @@ class EventsStore(SQLBaseStore):
ret = yield self.runInteraction("count_messages", _count_messages)
defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def _background_reindex_origin_server_ts(self, progress, batch_size):
+ target_min_stream_id = progress["target_min_stream_id_inclusive"]
+ max_stream_id = progress["max_stream_id_exclusive"]
+ rows_inserted = progress.get("rows_inserted", 0)
+
+ INSERT_CLUMP_SIZE = 1000
+
+ def reindex_search_txn(txn):
+ sql = (
+ "SELECT stream_ordering, event_id FROM events"
+ " WHERE ? <= stream_ordering AND stream_ordering < ?"
+ " ORDER BY stream_ordering DESC"
+ " LIMIT ?"
+ )
+
+ txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
+
+ rows = txn.fetchall()
+ if not rows:
+ return 0
+
+ min_stream_id = rows[-1][0]
+ event_ids = [row[1] for row in rows]
+
+ events = self._get_events_txn(txn, event_ids)
+
+ rows = []
+ for event in events:
+ try:
+ event_id = event.event_id
+ origin_server_ts = event.origin_server_ts
+ except (KeyError, AttributeError):
+ # If the event is missing a necessary field then
+ # skip over it.
+ continue
+
+ rows.append((origin_server_ts, event_id))
+
+ sql = (
+ "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
+ )
+
+ for index in range(0, len(rows), INSERT_CLUMP_SIZE):
+ clump = rows[index:index + INSERT_CLUMP_SIZE]
+ txn.executemany(sql, clump)
+
+ progress = {
+ "target_min_stream_id_inclusive": target_min_stream_id,
+ "max_stream_id_exclusive": min_stream_id,
+ "rows_inserted": rows_inserted + len(rows)
+ }
+
+ self._background_update_progress_txn(
+ txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
+ )
+
+ return len(rows)
+
+ result = yield self.runInteraction(
+ self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
+ )
+
+ if not result:
+ yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
+
+ defer.returnValue(result)
|