summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/storage/stats.py43
1 files changed, 32 insertions, 11 deletions
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 4cb10dc9fb..5a7dfde926 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -17,8 +17,6 @@
 import logging
 from itertools import chain
 
-from twisted.internet import defer
-
 from synapse.storage.state_deltas import StateDeltasStore
 
 logger = logging.getLogger(__name__)
@@ -197,22 +195,41 @@ class StatsStore(StateDeltasStore):
                 self._simple_update_one_txn(txn, table, keyvalues, current_row)
 
     def _upsert_copy_from_table_with_additive_relatives_txn(
-        self, txn, into_table, keyvalues, additive_relatives, src_table, copy_columns
+        self,
+        txn,
+        into_table,
+        keyvalues,
+        extra_dst_keyvalues,
+        additive_relatives,
+        src_table,
+        copy_columns,
+        additional_where="",
     ):
         """
         Args:
              txn: Transaction
              into_table (str): The destination table to UPSERT the row into
              keyvalues (dict[str, any]): Row-identifying key values
+             extra_dst_keyvalues (dict[str, any]): Additional keyvalues
+                for `into_table`.
              additive_relatives (dict[str, any]): Fields that will be added onto
                 if existing row present. (Must be disjoint from copy_columns.)
              src_table (str): The source table to copy from
              copy_columns (iterable[str]): The list of columns to copy
+             additional_where (str): Additional SQL for where (prefix with AND
+                if using).
         """
         if self.database_engine.can_native_upsert:
-            ins_columns = chain(keyvalues, copy_columns, additive_relatives.keys())
+            ins_columns = chain(
+                keyvalues,
+                copy_columns,
+                additive_relatives.keys(),
+                extra_dst_keyvalues.keys(),
+            )
             sel_exprs = chain(
-                keyvalues, copy_columns, ("?" for _ in additive_relatives)
+                keyvalues,
+                copy_columns,
+                ("?" for _ in chain(additive_relatives, extra_dst_keyvalues)),
             )
             keyvalues_where = ("%s = ?" % f for f in keyvalues)
 
@@ -225,17 +242,20 @@ class StatsStore(StateDeltasStore):
                 INSERT INTO %(into_table)s (%(ins_columns)s)
                 SELECT %(sel_exprs)s
                 FROM %(src_table)s
-                WHERE %(keyvalues_where)s
+                WHERE %(keyvalues_where)s %(additional_where)s
                 ON CONFLICT (%(keyvalues)s)
                 DO UPDATE SET %(sets)s
             """ % {
                 "into_table": into_table,
                 "ins_columns": ", ".join(ins_columns),
                 "sel_exprs": ", ".join(sel_exprs),
-                "keyvalues_where": ", ".join(keyvalues_where),
+                "keyvalues_where": " AND ".join(keyvalues_where),
                 "src_table": src_table,
-                "keyvalues": ", ".join(keyvalues.keys()),
+                "keyvalues": ", ".join(
+                    chain(keyvalues.keys(), extra_dst_keyvalues.keys())
+                ),
                 "sets": ", ".join(chain(sets_cc, sets_ar)),
+                "additional_where": additional_where,
             }
 
             qargs = chain(additive_relatives.values(), keyvalues.values())
@@ -320,14 +340,15 @@ class StatsStore(StateDeltasStore):
             # we don't support absolute_fields for slice_field_names as it makes
             # no sense.
             per_slice_additive_relatives = {
-                key: fields.get(key, 0)
-                for key in slice_field_names
+                key: fields.get(key, 0) for key in slice_field_names
             }
             self._upsert_copy_from_table_with_additive_relatives_txn(
                 txn,
                 table + "_historical",
                 {id_col: stats_id},
+                {"end_ts": end_ts, "bucket_size": self.stats_bucket_size},
                 per_slice_additive_relatives,
                 table + "_current",
-                abs_field_names
+                abs_field_names,
+                additional_where=" AND completed_delta_stream_id IS NOT NULL",
             )