summary refs log tree commit diff
path: root/synapse/storage/state_deltas.py
diff options
context:
space:
mode:
authorAmber Brown <hawkowl@atleastfornow.net>2019-03-25 20:37:08 +1100
committerGitHub <noreply@github.com>2019-03-25 20:37:08 +1100
commitac396a0d325eb7fc1311d5bd31b2693ff10fc53c (patch)
treebe1cfb6c9857f28bbf02e4331a5f223b8ae24ded /synapse/storage/state_deltas.py
parentMerge pull request #4869 from matrix-org/erikj/yaml_load (diff)
downloadsynapse-ac396a0d325eb7fc1311d5bd31b2693ff10fc53c.tar.xz
Refactor out state delta handling into its own class (#4917)
Diffstat (limited to 'synapse/storage/state_deltas.py')
-rw-r--r--synapse/storage/state_deltas.py74
1 files changed, 74 insertions, 0 deletions
diff --git a/synapse/storage/state_deltas.py b/synapse/storage/state_deltas.py
new file mode 100644
index 0000000000..57bc45cdb9
--- /dev/null
+++ b/synapse/storage/state_deltas.py
@@ -0,0 +1,74 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.storage._base import SQLBaseStore
+
+logger = logging.getLogger(__name__)
+
+
+class StateDeltasStore(SQLBaseStore):
+
+    def get_current_state_deltas(self, prev_stream_id):
+        prev_stream_id = int(prev_stream_id)
+        if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id):
+            return []
+
+        def get_current_state_deltas_txn(txn):
+            # First we calculate the max stream id that will give us less than
+            # N results.
+            # We arbitarily limit to 100 stream_id entries to ensure we don't
+            # select toooo many.
+            sql = """
+                SELECT stream_id, count(*)
+                FROM current_state_delta_stream
+                WHERE stream_id > ?
+                GROUP BY stream_id
+                ORDER BY stream_id ASC
+                LIMIT 100
+            """
+            txn.execute(sql, (prev_stream_id,))
+
+            total = 0
+            max_stream_id = prev_stream_id
+            for max_stream_id, count in txn:
+                total += count
+                if total > 100:
+                    # We arbitarily limit to 100 entries to ensure we don't
+                    # select toooo many.
+                    break
+
+            # Now actually get the deltas
+            sql = """
+                SELECT stream_id, room_id, type, state_key, event_id, prev_event_id
+                FROM current_state_delta_stream
+                WHERE ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC
+            """
+            txn.execute(sql, (prev_stream_id, max_stream_id,))
+            return self.cursor_to_dict(txn)
+
+        return self.runInteraction(
+            "get_current_state_deltas", get_current_state_deltas_txn
+        )
+
+    def get_max_stream_id_in_current_state_deltas(self):
+        return self._simple_select_one_onecol(
+            table="current_state_delta_stream",
+            keyvalues={},
+            retcol="COALESCE(MAX(stream_id), -1)",
+            desc="get_max_stream_id_in_current_state_deltas",
+        )