diff options
Diffstat (limited to 'synapse/storage/state_deltas.py')
-rw-r--r-- | synapse/storage/state_deltas.py | 107 |
1 files changed, 107 insertions, 0 deletions
diff --git a/synapse/storage/state_deltas.py b/synapse/storage/state_deltas.py new file mode 100644 index 0000000000..b733e68c45 --- /dev/null +++ b/synapse/storage/state_deltas.py @@ -0,0 +1,107 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import re + +from six import iteritems + +from twisted.internet import defer + +from synapse.api.constants import EventTypes, JoinRules +from synapse.storage.engines import PostgresEngine, Sqlite3Engine +from synapse.types import get_domain_from_id, get_localpart_from_id +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks + +from ._base import SQLBaseStore + +logger = logging.getLogger(__name__) + + +class StateDeltasStore(SQLBaseStore): + + @defer.inlineCallbacks + def get_all_rooms(self): + """Get all room_ids we've ever known about, in ascending order of "size" + """ + sql = """ + SELECT room_id FROM current_state_events + GROUP BY room_id + ORDER BY count(*) ASC + """ + rows = yield self._execute("get_all_rooms", None, sql) + defer.returnValue([room_id for room_id, in rows]) + + @defer.inlineCallbacks + def get_all_local_users(self): + """Get all local users + """ + sql = """ + SELECT name FROM users + """ + rows = yield self._execute("get_all_local_users", None, sql) + defer.returnValue([name for name, in rows]) + + def get_current_state_deltas(self, prev_stream_id): + prev_stream_id = int(prev_stream_id) + if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id): + return [] + + def get_current_state_deltas_txn(txn): + # First we calculate the max stream id that will give us less than + # N results. + # We arbitarily limit to 100 stream_id entries to ensure we don't + # select toooo many. + sql = """ + SELECT stream_id, count(*) + FROM current_state_delta_stream + WHERE stream_id > ? + GROUP BY stream_id + ORDER BY stream_id ASC + LIMIT 100 + """ + txn.execute(sql, (prev_stream_id,)) + + total = 0 + max_stream_id = prev_stream_id + for max_stream_id, count in txn: + total += count + if total > 100: + # We arbitarily limit to 100 entries to ensure we don't + # select toooo many. + break + + # Now actually get the deltas + sql = """ + SELECT stream_id, room_id, type, state_key, event_id, prev_event_id + FROM current_state_delta_stream + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC + """ + txn.execute(sql, (prev_stream_id, max_stream_id,)) + return self.cursor_to_dict(txn) + + return self.runInteraction( + "get_current_state_deltas", get_current_state_deltas_txn + ) + + def get_max_stream_id_in_current_state_deltas(self): + return self._simple_select_one_onecol( + table="current_state_delta_stream", + keyvalues={}, + retcol="COALESCE(MAX(stream_id), -1)", + desc="get_max_stream_id_in_current_state_deltas", + ) + |