summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py77
1 files changed, 77 insertions, 0 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5d35ca90b9..7088f2709b 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -51,6 +51,14 @@ EVENT_QUEUE_TIMEOUT_S = 0.1  # Timeout when waiting for requests for events
 
 
 class EventsStore(SQLBaseStore):
+    EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
+
+    def __init__(self, hs):
+        super(EventsStore, self).__init__(hs)
+        self.register_background_update_handler(
+            self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
+        )
+
     @defer.inlineCallbacks
     def persist_events(self, events_and_contexts, backfilled=False,
                        is_new_state=True):
@@ -365,6 +373,7 @@ class EventsStore(SQLBaseStore):
                     "processed": True,
                     "outlier": event.internal_metadata.is_outlier(),
                     "content": encode_json(event.content).decode("UTF-8"),
+                    "origin_server_ts": int(event.origin_server_ts),
                 }
                 for event, _ in events_and_contexts
             ],
@@ -964,3 +973,71 @@ class EventsStore(SQLBaseStore):
 
         ret = yield self.runInteraction("count_messages", _count_messages)
         defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def _background_reindex_origin_server_ts(self, progress, batch_size):
+        target_min_stream_id = progress["target_min_stream_id_inclusive"]
+        max_stream_id = progress["max_stream_id_exclusive"]
+        rows_inserted = progress.get("rows_inserted", 0)
+
+        INSERT_CLUMP_SIZE = 1000
+
+        def reindex_search_txn(txn):
+            sql = (
+                "SELECT stream_ordering, event_id FROM events"
+                " WHERE ? <= stream_ordering AND stream_ordering < ?"
+                " ORDER BY stream_ordering DESC"
+                " LIMIT ?"
+            )
+
+            txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
+
+            rows = txn.fetchall()
+            if not rows:
+                return 0
+
+            min_stream_id = rows[-1][0]
+            event_ids = [row[1] for row in rows]
+
+            events = self._get_events_txn(txn, event_ids)
+
+            rows = []
+            for event in events:
+                try:
+                    event_id = event.event_id
+                    origin_server_ts = event.origin_server_ts
+                except (KeyError, AttributeError):
+                    # If the event is missing a necessary field then
+                    # skip over it.
+                    continue
+
+                rows.append((origin_server_ts, event_id))
+
+            sql = (
+                "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
+            )
+
+            for index in range(0, len(rows), INSERT_CLUMP_SIZE):
+                clump = rows[index:index + INSERT_CLUMP_SIZE]
+                txn.executemany(sql, clump)
+
+            progress = {
+                "target_min_stream_id_inclusive": target_min_stream_id,
+                "max_stream_id_exclusive": min_stream_id,
+                "rows_inserted": rows_inserted + len(rows)
+            }
+
+            self._background_update_progress_txn(
+                txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
+            )
+
+            return len(rows)
+
+        result = yield self.runInteraction(
+            self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
+        )
+
+        if not result:
+            yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
+
+        defer.returnValue(result)