summary refs log tree commit diff
path: root/synapse/storage/events_worker.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-06-05 15:45:46 +0100
committerAmber Brown <hawkowl@atleastfornow.net>2019-06-06 00:45:46 +1000
commit75538813fcd0403ec8915484a813b99e6eb256c6 (patch)
tree89735486b44fb3c5572f6bc38524dd231eed6233 /synapse/storage/events_worker.py
parentFix notes about well-known and acme (#5357) (diff)
downloadsynapse-75538813fcd0403ec8915484a813b99e6eb256c6.tar.xz
Fix background updates to handle redactions/rejections (#5352)
* Fix background updates to handle redactions/rejections

In background updates based on current state delta stream we need to
handle that we may not have all the events (or at least that
`get_events` may raise an exception).
Diffstat (limited to 'synapse/storage/events_worker.py')
-rw-r--r--synapse/storage/events_worker.py37
1 files changed, 37 insertions, 0 deletions
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 1782428048..cc7df5cf14 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -78,6 +78,43 @@ class EventsWorkerStore(SQLBaseStore):
             desc="get_received_ts",
         )
 
+    def get_received_ts_by_stream_pos(self, stream_ordering):
+        """Given a stream ordering get an approximate timestamp of when it
+        happened.
+
+        This is done by simply taking the received ts of the first event that
+        has a stream ordering greater than or equal to the given stream pos.
+        If none exists returns the current time, on the assumption that it must
+        have happened recently.
+
+        Args:
+            stream_ordering (int)
+
+        Returns:
+            Deferred[int]
+        """
+
+        def _get_approximate_received_ts_txn(txn):
+            sql = """
+                SELECT received_ts FROM events
+                WHERE stream_ordering >= ?
+                LIMIT 1
+            """
+
+            txn.execute(sql, (stream_ordering,))
+            row = txn.fetchone()
+            if row and row[0]:
+                ts = row[0]
+            else:
+                ts = self.clock.time_msec()
+
+            return ts
+
+        return self.runInteraction(
+            "get_approximate_received_ts",
+            _get_approximate_received_ts_txn,
+        )
+
     @defer.inlineCallbacks
     def get_event(
         self,