summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
authorMark Haines <mjark@negativecurvature.net>2016-01-04 14:02:50 +0000
committerMark Haines <mjark@negativecurvature.net>2016-01-04 14:02:50 +0000
commitf35f8d06ea58e2d0cdccd82924c7a44fd93f4c38 (patch)
treedc5312558565f8ac01264be21d388e563a5c8c58 /synapse/storage/events.py
parentAdded info abou Martin Giess' auto-deployment process with vagrant/ansible (diff)
parentBump changelog and version for v0.12.0 (diff)
downloadsynapse-f35f8d06ea58e2d0cdccd82924c7a44fd93f4c38.tar.xz
Merge remote-tracking branch 'origin/release-v0.12.0' v0.12.0
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py79
1 files changed, 78 insertions, 1 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5d35ca90b9..fc5725097c 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -51,6 +51,14 @@ EVENT_QUEUE_TIMEOUT_S = 0.1  # Timeout when waiting for requests for events
 
 
 class EventsStore(SQLBaseStore):
+    EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
+
+    def __init__(self, hs):
+        super(EventsStore, self).__init__(hs)
+        self.register_background_update_handler(
+            self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
+        )
+
     @defer.inlineCallbacks
     def persist_events(self, events_and_contexts, backfilled=False,
                        is_new_state=True):
@@ -365,6 +373,7 @@ class EventsStore(SQLBaseStore):
                     "processed": True,
                     "outlier": event.internal_metadata.is_outlier(),
                     "content": encode_json(event.content).decode("UTF-8"),
+                    "origin_server_ts": int(event.origin_server_ts),
                 }
                 for event, _ in events_and_contexts
             ],
@@ -640,7 +649,7 @@ class EventsStore(SQLBaseStore):
                 ]
 
                 rows = self._new_transaction(
-                    conn, "do_fetch", [], self._fetch_event_rows, event_ids
+                    conn, "do_fetch", [], None, self._fetch_event_rows, event_ids
                 )
 
                 row_dict = {
@@ -964,3 +973,71 @@ class EventsStore(SQLBaseStore):
 
         ret = yield self.runInteraction("count_messages", _count_messages)
         defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def _background_reindex_origin_server_ts(self, progress, batch_size):
+        target_min_stream_id = progress["target_min_stream_id_inclusive"]
+        max_stream_id = progress["max_stream_id_exclusive"]
+        rows_inserted = progress.get("rows_inserted", 0)
+
+        INSERT_CLUMP_SIZE = 1000
+
+        def reindex_search_txn(txn):
+            sql = (
+                "SELECT stream_ordering, event_id FROM events"
+                " WHERE ? <= stream_ordering AND stream_ordering < ?"
+                " ORDER BY stream_ordering DESC"
+                " LIMIT ?"
+            )
+
+            txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
+
+            rows = txn.fetchall()
+            if not rows:
+                return 0
+
+            min_stream_id = rows[-1][0]
+            event_ids = [row[1] for row in rows]
+
+            events = self._get_events_txn(txn, event_ids)
+
+            rows = []
+            for event in events:
+                try:
+                    event_id = event.event_id
+                    origin_server_ts = event.origin_server_ts
+                except (KeyError, AttributeError):
+                    # If the event is missing a necessary field then
+                    # skip over it.
+                    continue
+
+                rows.append((origin_server_ts, event_id))
+
+            sql = (
+                "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
+            )
+
+            for index in range(0, len(rows), INSERT_CLUMP_SIZE):
+                clump = rows[index:index + INSERT_CLUMP_SIZE]
+                txn.executemany(sql, clump)
+
+            progress = {
+                "target_min_stream_id_inclusive": target_min_stream_id,
+                "max_stream_id_exclusive": min_stream_id,
+                "rows_inserted": rows_inserted + len(rows)
+            }
+
+            self._background_update_progress_txn(
+                txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
+            )
+
+            return len(rows)
+
+        result = yield self.runInteraction(
+            self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
+        )
+
+        if not result:
+            yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
+
+        defer.returnValue(result)