summary refs log tree commit diff
path: root/synapse/storage/databases/main/state_deltas.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/state_deltas.py')
-rw-r--r--synapse/storage/databases/main/state_deltas.py121
1 files changed, 121 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py
new file mode 100644

index 0000000000..0d963c98ff --- /dev/null +++ b/synapse/storage/databases/main/state_deltas.py
@@ -0,0 +1,121 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.storage._base import SQLBaseStore + +logger = logging.getLogger(__name__) + + +class StateDeltasStore(SQLBaseStore): + def get_current_state_deltas(self, prev_stream_id: int, max_stream_id: int): + """Fetch a list of room state changes since the given stream id + + Each entry in the result contains the following fields: + - stream_id (int) + - room_id (str) + - type (str): event type + - state_key (str): + - event_id (str|None): new event_id for this state key. None if the + state has been deleted. + - prev_event_id (str|None): previous event_id for this state key. None + if it's new state. + + Args: + prev_stream_id (int): point to get changes since (exclusive) + max_stream_id (int): the point that we know has been correctly persisted + - ie, an upper limit to return changes from. + + Returns: + Deferred[tuple[int, list[dict]]: A tuple consisting of: + - the stream id which these results go up to + - list of current_state_delta_stream rows. If it is empty, we are + up to date. + """ + prev_stream_id = int(prev_stream_id) + + # check we're not going backwards + assert prev_stream_id <= max_stream_id + + if not self._curr_state_delta_stream_cache.has_any_entity_changed( + prev_stream_id + ): + # if the CSDs haven't changed between prev_stream_id and now, we + # know for certain that they haven't changed between prev_stream_id and + # max_stream_id. + return defer.succeed((max_stream_id, [])) + + def get_current_state_deltas_txn(txn): + # First we calculate the max stream id that will give us less than + # N results. + # We arbitarily limit to 100 stream_id entries to ensure we don't + # select toooo many. + sql = """ + SELECT stream_id, count(*) + FROM current_state_delta_stream + WHERE stream_id > ? AND stream_id <= ? + GROUP BY stream_id + ORDER BY stream_id ASC + LIMIT 100 + """ + txn.execute(sql, (prev_stream_id, max_stream_id)) + + total = 0 + + for stream_id, count in txn: + total += count + if total > 100: + # We arbitarily limit to 100 entries to ensure we don't + # select toooo many. + logger.debug( + "Clipping current_state_delta_stream rows to stream_id %i", + stream_id, + ) + clipped_stream_id = stream_id + break + else: + # if there's no problem, we may as well go right up to the max_stream_id + clipped_stream_id = max_stream_id + + # Now actually get the deltas + sql = """ + SELECT stream_id, room_id, type, state_key, event_id, prev_event_id + FROM current_state_delta_stream + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC + """ + txn.execute(sql, (prev_stream_id, clipped_stream_id)) + return clipped_stream_id, self.db_pool.cursor_to_dict(txn) + + return self.db_pool.runInteraction( + "get_current_state_deltas", get_current_state_deltas_txn + ) + + def _get_max_stream_id_in_current_state_deltas_txn(self, txn): + return self.db_pool.simple_select_one_onecol_txn( + txn, + table="current_state_delta_stream", + keyvalues={}, + retcol="COALESCE(MAX(stream_id), -1)", + ) + + def get_max_stream_id_in_current_state_deltas(self): + return self.db_pool.runInteraction( + "get_max_stream_id_in_current_state_deltas", + self._get_max_stream_id_in_current_state_deltas_txn, + )