From d7a0496f3ec534076121632352f44733253e1e16 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Mar 2015 15:59:48 +0000 Subject: Convert storage layer to be mysql compatible --- synapse/storage/stream.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 09bc522210..64adb0c7fa 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -110,7 +110,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): if self.topological is None: return "(%d < %s)" % (self.stream, "stream_ordering") else: - return "(%d < %s OR (%d == %s AND %d < %s))" % ( + return "(%d < %s OR (%d = %s AND %d < %s))" % ( self.topological, "topological_ordering", self.topological, "topological_ordering", self.stream, "stream_ordering", @@ -120,7 +120,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): if self.topological is None: return "(%d >= %s)" % (self.stream, "stream_ordering") else: - return "(%d > %s OR (%d == %s AND %d >= %s))" % ( + return "(%d > %s OR (%d = %s AND %d >= %s))" % ( self.topological, "topological_ordering", self.topological, "topological_ordering", self.stream, "stream_ordering", -- cgit 1.4.1 From cb8162d3d17a97574073d49bd6eef51c93b68157 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Mar 2015 13:52:56 +0000 Subject: Rearrange storage modules --- synapse/storage/__init__.py | 466 +------------------------------------------- synapse/storage/_base.py | 7 + synapse/storage/events.py | 387 ++++++++++++++++++++++++++++++++++++ synapse/storage/feedback.py | 47 ----- synapse/storage/room.py | 37 ++++ synapse/storage/state.py | 32 +++ synapse/storage/stream.py | 19 ++ 7 files changed, 486 insertions(+), 509 deletions(-) create mode 100644 synapse/storage/events.py delete mode 100644 synapse/storage/feedback.py (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 30cba47717..8604746fb9 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -15,12 +15,9 @@ from twisted.internet import defer -from synapse.util.logutils import log_function -from synapse.api.constants import EventTypes - from .appservice import ApplicationServiceStore from .directory import DirectoryStore -from .feedback import FeedbackStore +from .events import EventsStore from .presence import PresenceStore from .profile import ProfileStore from .registration import RegistrationStore @@ -39,11 +36,6 @@ from .state import StateStore from .signatures import SignatureStore from .filtering import FilteringStore -from syutil.base64util import decode_base64 -from syutil.jsonutil import encode_canonical_json - -from synapse.crypto.event_signing import compute_event_reference_hash - import fnmatch import imp @@ -64,15 +56,8 @@ SCHEMA_VERSION = 14 dir_path = os.path.abspath(os.path.dirname(__file__)) -class _RollbackButIsFineException(Exception): - """ This exception is used to rollback a transaction without implying - something went wrong. - """ - pass - - class DataStore(RoomMemberStore, RoomStore, - RegistrationStore, StreamStore, ProfileStore, FeedbackStore, + RegistrationStore, StreamStore, ProfileStore, PresenceStore, TransactionStore, DirectoryStore, KeyStore, StateStore, SignatureStore, ApplicationServiceStore, @@ -81,7 +66,8 @@ class DataStore(RoomMemberStore, RoomStore, RejectionsStore, FilteringStore, PusherStore, - PushRuleStore + PushRuleStore, + EventsStore, ): def __init__(self, hs): @@ -94,412 +80,6 @@ class DataStore(RoomMemberStore, RoomStore, self._next_stream_id_lock = threading.Lock() self._next_stream_id = int(hs.get_clock().time_msec()) * 1000 - @defer.inlineCallbacks - @log_function - def persist_event(self, event, context, backfilled=False, - is_new_state=True, current_state=None): - stream_ordering = None - if backfilled: - if not self.min_token_deferred.called: - yield self.min_token_deferred - self.min_token -= 1 - stream_ordering = self.min_token - - try: - yield self.runInteraction( - "persist_event", - self._persist_event_txn, - event=event, - context=context, - backfilled=backfilled, - stream_ordering=stream_ordering, - is_new_state=is_new_state, - current_state=current_state, - ) - except _RollbackButIsFineException: - pass - - @defer.inlineCallbacks - def get_event(self, event_id, check_redacted=True, - get_prev_content=False, allow_rejected=False, - allow_none=False): - """Get an event from the database by event_id. - - Args: - event_id (str): The event_id of the event to fetch - check_redacted (bool): If True, check if event has been redacted - and redact it. - get_prev_content (bool): If True and event is a state event, - include the previous states content in the unsigned field. - allow_rejected (bool): If True return rejected events. - allow_none (bool): If True, return None if no event found, if - False throw an exception. - - Returns: - Deferred : A FrozenEvent. - """ - event = yield self.runInteraction( - "get_event", self._get_event_txn, - event_id, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - - if not event and not allow_none: - raise RuntimeError("Could not find event %s" % (event_id,)) - - defer.returnValue(event) - - @log_function - def _persist_event_txn(self, txn, event, context, backfilled, - stream_ordering=None, is_new_state=True, - current_state=None): - - # Remove the any existing cache entries for the event_id - self._get_event_cache.pop(event.event_id) - - # We purposefully do this first since if we include a `current_state` - # key, we *want* to update the `current_state_events` table - if current_state: - txn.execute( - "DELETE FROM current_state_events WHERE room_id = ?", - (event.room_id,) - ) - - for s in current_state: - self._simple_insert_txn( - txn, - "current_state_events", - { - "event_id": s.event_id, - "room_id": s.room_id, - "type": s.type, - "state_key": s.state_key, - }, - ) - - if event.is_state() and is_new_state: - if not backfilled and not context.rejected: - self._simple_insert_txn( - txn, - table="state_forward_extremities", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - }, - ) - - for prev_state_id, _ in event.prev_state: - self._simple_delete_txn( - txn, - table="state_forward_extremities", - keyvalues={ - "event_id": prev_state_id, - } - ) - - outlier = event.internal_metadata.is_outlier() - - if not outlier: - self._store_state_groups_txn(txn, event, context) - - self._update_min_depth_for_room_txn( - txn, - event.room_id, - event.depth - ) - - self._handle_prev_events( - txn, - outlier=outlier, - event_id=event.event_id, - prev_events=event.prev_events, - room_id=event.room_id, - ) - - have_persisted = self._simple_select_one_onecol_txn( - txn, - table="event_json", - keyvalues={"event_id": event.event_id}, - retcol="event_id", - allow_none=True, - ) - - metadata_json = encode_canonical_json( - event.internal_metadata.get_dict() - ) - - # If we have already persisted this event, we don't need to do any - # more processing. - # The processing above must be done on every call to persist event, - # since they might not have happened on previous calls. For example, - # if we are persisting an event that we had persisted as an outlier, - # but is no longer one. - if have_persisted: - if not outlier: - sql = ( - "UPDATE event_json SET internal_metadata = ?" - " WHERE event_id = ?" - ) - txn.execute( - sql, - (metadata_json.decode("UTF-8"), event.event_id,) - ) - - sql = ( - "UPDATE events SET outlier = 0" - " WHERE event_id = ?" - ) - txn.execute( - sql, - (event.event_id,) - ) - return - - if event.type == EventTypes.Member: - self._store_room_member_txn(txn, event) - elif event.type == EventTypes.Feedback: - self._store_feedback_txn(txn, event) - elif event.type == EventTypes.Name: - self._store_room_name_txn(txn, event) - elif event.type == EventTypes.Topic: - self._store_room_topic_txn(txn, event) - elif event.type == EventTypes.Redaction: - self._store_redaction(txn, event) - - event_dict = { - k: v - for k, v in event.get_dict().items() - if k not in [ - "redacted", - "redacted_because", - ] - } - - self._simple_insert_txn( - txn, - table="event_json", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "internal_metadata": metadata_json.decode("UTF-8"), - "json": encode_canonical_json(event_dict).decode("UTF-8"), - }, - ) - - content = encode_canonical_json( - event.content - ).decode("UTF-8") - - vals = { - "topological_ordering": event.depth, - "event_id": event.event_id, - "type": event.type, - "room_id": event.room_id, - "content": content, - "processed": True, - "outlier": outlier, - "depth": event.depth, - } - - if stream_ordering is None: - stream_ordering = self.get_next_stream_id() - - - unrec = { - k: v - for k, v in event.get_dict().items() - if k not in vals.keys() and k not in [ - "redacted", - "redacted_because", - "signatures", - "hashes", - "prev_events", - ] - } - - vals["unrecognized_keys"] = encode_canonical_json( - unrec - ).decode("UTF-8") - - sql = ( - "INSERT INTO events" - " (stream_ordering, topological_ordering, event_id, type," - " room_id, content, processed, outlier, depth)" - " VALUES (%s,?,?,?,?,?,?,?,?)" - ) % (stream_ordering,) - - txn.execute( - sql, - (event.depth, event.event_id, event.type, event.room_id, - content, True, outlier, event.depth) - ) - - if context.rejected: - self._store_rejections_txn(txn, event.event_id, context.rejected) - - if event.is_state(): - vals = { - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - } - - # TODO: How does this work with backfilling? - if hasattr(event, "replaces_state"): - vals["prev_state"] = event.replaces_state - - self._simple_insert_txn( - txn, - "state_events", - vals, - ) - - if is_new_state and not context.rejected: - self._simple_insert_txn( - txn, - "current_state_events", - { - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - }, - ) - - for e_id, h in event.prev_state: - self._simple_insert_txn( - txn, - table="event_edges", - values={ - "event_id": event.event_id, - "prev_event_id": e_id, - "room_id": event.room_id, - "is_state": 1, - }, - ) - - for hash_alg, hash_base64 in event.hashes.items(): - hash_bytes = decode_base64(hash_base64) - self._store_event_content_hash_txn( - txn, event.event_id, hash_alg, hash_bytes, - ) - - for prev_event_id, prev_hashes in event.prev_events: - for alg, hash_base64 in prev_hashes.items(): - hash_bytes = decode_base64(hash_base64) - self._store_prev_event_hash_txn( - txn, event.event_id, prev_event_id, alg, hash_bytes - ) - - for auth_id, _ in event.auth_events: - self._simple_insert_txn( - txn, - table="event_auth", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "auth_id": auth_id, - }, - ) - - (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) - self._store_event_reference_hash_txn( - txn, event.event_id, ref_alg, ref_hash_bytes - ) - - def _store_redaction(self, txn, event): - # invalidate the cache for the redacted event - self._get_event_cache.pop(event.redacts) - txn.execute( - "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", - (event.event_id, event.redacts) - ) - - @defer.inlineCallbacks - def get_current_state(self, room_id, event_type=None, state_key=""): - del_sql = ( - "SELECT event_id FROM redactions WHERE redacts = e.event_id " - "LIMIT 1" - ) - - sql = ( - "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " - "INNER JOIN current_state_events as c ON e.event_id = c.event_id " - "INNER JOIN state_events as s ON e.event_id = s.event_id " - "WHERE c.room_id = ? " - ) % { - "redacted": del_sql, - } - - if event_type and state_key is not None: - sql += " AND s.type = ? AND s.state_key = ? " - args = (room_id, event_type, state_key) - elif event_type: - sql += " AND s.type = ?" - args = (room_id, event_type) - else: - args = (room_id, ) - - results = yield self._execute_and_decode("get_current_state", sql, *args) - - events = yield self._parse_events(results) - defer.returnValue(events) - - @defer.inlineCallbacks - def get_room_name_and_aliases(self, room_id): - del_sql = ( - "SELECT event_id FROM redactions WHERE redacts = e.event_id " - "LIMIT 1" - ) - - sql = ( - "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " - "INNER JOIN current_state_events as c ON e.event_id = c.event_id " - "INNER JOIN state_events as s ON e.event_id = s.event_id " - "WHERE c.room_id = ? " - ) % { - "redacted": del_sql, - } - - sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')" - sql += " OR s.type = 'm.room.aliases')" - args = (room_id,) - - results = yield self._execute_and_decode("get_current_state", sql, *args) - - events = yield self._parse_events(results) - - name = None - aliases = [] - - for e in events: - if e.type == 'm.room.name': - if 'name' in e.content: - name = e.content['name'] - elif e.type == 'm.room.aliases': - if 'aliases' in e.content: - aliases.extend(e.content['aliases']) - - defer.returnValue((name, aliases)) - - @defer.inlineCallbacks - def _get_min_token(self): - row = yield self._execute( - "_get_min_token", None, "SELECT MIN(stream_ordering) FROM events" - ) - - self.min_token = row[0][0] if row and row[0] and row[0][0] else -1 - self.min_token = min(self.min_token, -1) - - logger.debug("min_token is: %s", self.min_token) - - defer.returnValue(self.min_token) - def insert_client_ip(self, user, access_token, device_id, ip, user_agent): return self._simple_insert( "user_ips", @@ -523,44 +103,6 @@ class DataStore(RoomMemberStore, RoomStore, ], ) - def have_events(self, event_ids): - """Given a list of event ids, check if we have already processed them. - - Returns: - dict: Has an entry for each event id we already have seen. Maps to - the rejected reason string if we rejected the event, else maps to - None. - """ - if not event_ids: - return defer.succeed({}) - - def f(txn): - sql = ( - "SELECT e.event_id, reason FROM events as e " - "LEFT JOIN rejections as r ON e.event_id = r.event_id " - "WHERE e.event_id = ?" - ) - - res = {} - for event_id in event_ids: - txn.execute(sql, (event_id,)) - row = txn.fetchone() - if row: - _, rejected = row - res[event_id] = rejected - - return res - - return self.runInteraction( - "have_events", f, - ) - - def get_next_stream_id(self): - with self._next_stream_id_lock: - i = self._next_stream_id - self._next_stream_id += 1 - return i - def read_schema(path): """ Read the named database schema. diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 24ff872dad..37bb28e6cf 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -791,6 +791,13 @@ class SQLBaseStore(object): return result[0] if result else None +class _RollbackButIsFineException(Exception): + """ This exception is used to rollback a transaction without implying + something went wrong. + """ + pass + + class Table(object): """ A base class used to store information about a particular table. """ diff --git a/synapse/storage/events.py b/synapse/storage/events.py new file mode 100644 index 0000000000..b222dfb4aa --- /dev/null +++ b/synapse/storage/events.py @@ -0,0 +1,387 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from _base import SQLBaseStore, _RollbackButIsFineException + +from twisted.internet import defer + +from synapse.util.logutils import log_function +from synapse.api.constants import EventTypes +from synapse.crypto.event_signing import compute_event_reference_hash + +from syutil.base64util import decode_base64 +from syutil.jsonutil import encode_canonical_json + +import logging + +logger = logging.getLogger(__name__) + + +class EventsStore(SQLBaseStore): + @defer.inlineCallbacks + @log_function + def persist_event(self, event, context, backfilled=False, + is_new_state=True, current_state=None): + stream_ordering = None + if backfilled: + if not self.min_token_deferred.called: + yield self.min_token_deferred + self.min_token -= 1 + stream_ordering = self.min_token + + try: + yield self.runInteraction( + "persist_event", + self._persist_event_txn, + event=event, + context=context, + backfilled=backfilled, + stream_ordering=stream_ordering, + is_new_state=is_new_state, + current_state=current_state, + ) + except _RollbackButIsFineException: + pass + + @defer.inlineCallbacks + def get_event(self, event_id, check_redacted=True, + get_prev_content=False, allow_rejected=False, + allow_none=False): + """Get an event from the database by event_id. + + Args: + event_id (str): The event_id of the event to fetch + check_redacted (bool): If True, check if event has been redacted + and redact it. + get_prev_content (bool): If True and event is a state event, + include the previous states content in the unsigned field. + allow_rejected (bool): If True return rejected events. + allow_none (bool): If True, return None if no event found, if + False throw an exception. + + Returns: + Deferred : A FrozenEvent. + """ + event = yield self.runInteraction( + "get_event", self._get_event_txn, + event_id, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + + if not event and not allow_none: + raise RuntimeError("Could not find event %s" % (event_id,)) + + defer.returnValue(event) + + @log_function + def _persist_event_txn(self, txn, event, context, backfilled, + stream_ordering=None, is_new_state=True, + current_state=None): + + # Remove the any existing cache entries for the event_id + self._get_event_cache.pop(event.event_id) + + # We purposefully do this first since if we include a `current_state` + # key, we *want* to update the `current_state_events` table + if current_state: + txn.execute( + "DELETE FROM current_state_events WHERE room_id = ?", + (event.room_id,) + ) + + for s in current_state: + self._simple_insert_txn( + txn, + "current_state_events", + { + "event_id": s.event_id, + "room_id": s.room_id, + "type": s.type, + "state_key": s.state_key, + }, + ) + + if event.is_state() and is_new_state: + if not backfilled and not context.rejected: + self._simple_insert_txn( + txn, + table="state_forward_extremities", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + }, + ) + + for prev_state_id, _ in event.prev_state: + self._simple_delete_txn( + txn, + table="state_forward_extremities", + keyvalues={ + "event_id": prev_state_id, + } + ) + + outlier = event.internal_metadata.is_outlier() + + if not outlier: + self._store_state_groups_txn(txn, event, context) + + self._update_min_depth_for_room_txn( + txn, + event.room_id, + event.depth + ) + + self._handle_prev_events( + txn, + outlier=outlier, + event_id=event.event_id, + prev_events=event.prev_events, + room_id=event.room_id, + ) + + have_persisted = self._simple_select_one_onecol_txn( + txn, + table="event_json", + keyvalues={"event_id": event.event_id}, + retcol="event_id", + allow_none=True, + ) + + metadata_json = encode_canonical_json( + event.internal_metadata.get_dict() + ) + + # If we have already persisted this event, we don't need to do any + # more processing. + # The processing above must be done on every call to persist event, + # since they might not have happened on previous calls. For example, + # if we are persisting an event that we had persisted as an outlier, + # but is no longer one. + if have_persisted: + if not outlier: + sql = ( + "UPDATE event_json SET internal_metadata = ?" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (metadata_json.decode("UTF-8"), event.event_id,) + ) + + sql = ( + "UPDATE events SET outlier = 0" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (event.event_id,) + ) + return + + if event.type == EventTypes.Member: + self._store_room_member_txn(txn, event) + elif event.type == EventTypes.Name: + self._store_room_name_txn(txn, event) + elif event.type == EventTypes.Topic: + self._store_room_topic_txn(txn, event) + elif event.type == EventTypes.Redaction: + self._store_redaction(txn, event) + + event_dict = { + k: v + for k, v in event.get_dict().items() + if k not in [ + "redacted", + "redacted_because", + ] + } + + self._simple_insert_txn( + txn, + table="event_json", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "internal_metadata": metadata_json.decode("UTF-8"), + "json": encode_canonical_json(event_dict).decode("UTF-8"), + }, + ) + + content = encode_canonical_json( + event.content + ).decode("UTF-8") + + vals = { + "topological_ordering": event.depth, + "event_id": event.event_id, + "type": event.type, + "room_id": event.room_id, + "content": content, + "processed": True, + "outlier": outlier, + "depth": event.depth, + } + + if stream_ordering is None: + stream_ordering = self.get_next_stream_id() + + + unrec = { + k: v + for k, v in event.get_dict().items() + if k not in vals.keys() and k not in [ + "redacted", + "redacted_because", + "signatures", + "hashes", + "prev_events", + ] + } + + vals["unrecognized_keys"] = encode_canonical_json( + unrec + ).decode("UTF-8") + + sql = ( + "INSERT INTO events" + " (stream_ordering, topological_ordering, event_id, type," + " room_id, content, processed, outlier, depth)" + " VALUES (%s,?,?,?,?,?,?,?,?)" + ) % (stream_ordering,) + + txn.execute( + sql, + (event.depth, event.event_id, event.type, event.room_id, + content, True, outlier, event.depth) + ) + + if context.rejected: + self._store_rejections_txn(txn, event.event_id, context.rejected) + + if event.is_state(): + vals = { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + } + + # TODO: How does this work with backfilling? + if hasattr(event, "replaces_state"): + vals["prev_state"] = event.replaces_state + + self._simple_insert_txn( + txn, + "state_events", + vals, + ) + + if is_new_state and not context.rejected: + self._simple_insert_txn( + txn, + "current_state_events", + { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + }, + ) + + for e_id, h in event.prev_state: + self._simple_insert_txn( + txn, + table="event_edges", + values={ + "event_id": event.event_id, + "prev_event_id": e_id, + "room_id": event.room_id, + "is_state": 1, + }, + ) + + for hash_alg, hash_base64 in event.hashes.items(): + hash_bytes = decode_base64(hash_base64) + self._store_event_content_hash_txn( + txn, event.event_id, hash_alg, hash_bytes, + ) + + for prev_event_id, prev_hashes in event.prev_events: + for alg, hash_base64 in prev_hashes.items(): + hash_bytes = decode_base64(hash_base64) + self._store_prev_event_hash_txn( + txn, event.event_id, prev_event_id, alg, hash_bytes + ) + + for auth_id, _ in event.auth_events: + self._simple_insert_txn( + txn, + table="event_auth", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "auth_id": auth_id, + }, + ) + + (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) + self._store_event_reference_hash_txn( + txn, event.event_id, ref_alg, ref_hash_bytes + ) + + def _store_redaction(self, txn, event): + # invalidate the cache for the redacted event + self._get_event_cache.pop(event.redacts) + txn.execute( + "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", + (event.event_id, event.redacts) + ) + + def have_events(self, event_ids): + """Given a list of event ids, check if we have already processed them. + + Returns: + dict: Has an entry for each event id we already have seen. Maps to + the rejected reason string if we rejected the event, else maps to + None. + """ + if not event_ids: + return defer.succeed({}) + + def f(txn): + sql = ( + "SELECT e.event_id, reason FROM events as e " + "LEFT JOIN rejections as r ON e.event_id = r.event_id " + "WHERE e.event_id = ?" + ) + + res = {} + for event_id in event_ids: + txn.execute(sql, (event_id,)) + row = txn.fetchone() + if row: + _, rejected = row + res[event_id] = rejected + + return res + + return self.runInteraction( + "have_events", f, + ) diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py deleted file mode 100644 index 8eab769b71..0000000000 --- a/synapse/storage/feedback.py +++ /dev/null @@ -1,47 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from twisted.internet import defer - -from ._base import SQLBaseStore - - -class FeedbackStore(SQLBaseStore): - - def _store_feedback_txn(self, txn, event): - self._simple_insert_txn(txn, "feedback", { - "event_id": event.event_id, - "feedback_type": event.content["type"], - "room_id": event.room_id, - "target_event_id": event.content["target_event_id"], - "sender": event.user_id, - }) - - @defer.inlineCallbacks - def get_feedback_for_event(self, event_id): - sql = ( - "SELECT events.* FROM events INNER JOIN feedback " - "ON events.event_id = feedback.event_id " - "WHERE feedback.target_event_id = ? " - ) - - rows = yield self._execute_and_decode("get_feedback_for_event", sql, event_id) - - defer.returnValue( - [ - (yield self._parse_events(r)) - for r in rows - ] - ) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index b5031f5c77..c64f8f53ac 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -161,6 +161,43 @@ class RoomStore(SQLBaseStore): } ) + @defer.inlineCallbacks + def get_room_name_and_aliases(self, room_id): + del_sql = ( + "SELECT event_id FROM redactions WHERE redacts = e.event_id " + "LIMIT 1" + ) + + sql = ( + "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " + "INNER JOIN current_state_events as c ON e.event_id = c.event_id " + "INNER JOIN state_events as s ON e.event_id = s.event_id " + "WHERE c.room_id = ? " + ) % { + "redacted": del_sql, + } + + sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')" + sql += " OR s.type = 'm.room.aliases')" + args = (room_id,) + + results = yield self._execute_and_decode("get_current_state", sql, *args) + + events = yield self._parse_events(results) + + name = None + aliases = [] + + for e in events: + if e.type == 'm.room.name': + if 'name' in e.content: + name = e.content['name'] + elif e.type == 'm.room.aliases': + if 'aliases' in e.content: + aliases.extend(e.content['aliases']) + + defer.returnValue((name, aliases)) + class RoomsTable(Table): table_name = "rooms" diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 888837cd1e..012144302d 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -15,6 +15,8 @@ from ._base import SQLBaseStore +from twisted.internet import defer + from synapse.util.stringutils import random_string import logging @@ -125,6 +127,36 @@ class StateStore(SQLBaseStore): or_replace=True, ) + @defer.inlineCallbacks + def get_current_state(self, room_id, event_type=None, state_key=""): + del_sql = ( + "SELECT event_id FROM redactions WHERE redacts = e.event_id " + "LIMIT 1" + ) + + sql = ( + "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " + "INNER JOIN current_state_events as c ON e.event_id = c.event_id " + "INNER JOIN state_events as s ON e.event_id = s.event_id " + "WHERE c.room_id = ? " + ) % { + "redacted": del_sql, + } + + if event_type and state_key is not None: + sql += " AND s.type = ? AND s.state_key = ? " + args = (room_id, event_type, state_key) + elif event_type: + sql += " AND s.type = ?" + args = (room_id, event_type) + else: + args = (room_id, ) + + results = yield self._execute_and_decode("get_current_state", sql, *args) + + events = yield self._parse_events(results) + defer.returnValue(events) + def _make_group_id(clock): return str(int(clock.time_msec())) + random_string(5) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 64adb0c7fa..9565fc77c5 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -419,6 +419,25 @@ class StreamStore(SQLBaseStore): self._get_room_events_max_id_txn ) + @defer.inlineCallbacks + def _get_min_token(self): + row = yield self._execute( + "_get_min_token", None, "SELECT MIN(stream_ordering) FROM events" + ) + + self.min_token = row[0][0] if row and row[0] and row[0][0] else -1 + self.min_token = min(self.min_token, -1) + + logger.debug("min_token is: %s", self.min_token) + + defer.returnValue(self.min_token) + + def get_next_stream_id(self): + with self._next_stream_id_lock: + i = self._next_stream_id + self._next_stream_id += 1 + return i + def _get_room_events_max_id_txn(self, txn): txn.execute( "SELECT MAX(stream_ordering) as m FROM events" -- cgit 1.4.1 From 9236136f3a4f0d8119d4a6333f37378f8e259e4a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Apr 2015 14:12:33 +0100 Subject: Make work in both Maria and SQLite. Fix tests --- synapse/app/homeserver.py | 20 ++++++---- synapse/storage/__init__.py | 43 +++++++++++----------- synapse/storage/_base.py | 30 ++++++++++----- synapse/storage/engines/__init__.py | 35 ++++++++++++++++++ synapse/storage/engines/maria.py | 30 +++++++++++++++ synapse/storage/engines/sqlite3.py | 25 +++++++++++++ synapse/storage/keys.py | 4 +- synapse/storage/registration.py | 8 +++- synapse/storage/schema/delta/12/v12.sql | 8 ++-- synapse/storage/schema/delta/13/v13.sql | 4 +- synapse/storage/schema/delta/14/v14.sql | 2 +- .../storage/schema/full_schemas/11/event_edges.sql | 14 +++---- .../schema/full_schemas/11/event_signatures.sql | 8 ++-- synapse/storage/schema/full_schemas/11/im.sql | 18 ++++----- synapse/storage/schema/full_schemas/11/keys.sql | 4 +- .../schema/full_schemas/11/media_repository.sql | 8 ++-- .../storage/schema/full_schemas/11/presence.sql | 6 +-- .../storage/schema/full_schemas/11/profiles.sql | 4 +- .../storage/schema/full_schemas/11/redactions.sql | 2 +- .../schema/full_schemas/11/room_aliases.sql | 8 ++-- synapse/storage/schema/full_schemas/11/state.sql | 6 +-- .../schema/full_schemas/11/transactions.sql | 8 ++-- synapse/storage/schema/full_schemas/11/users.sql | 10 ++--- synapse/storage/signatures.py | 8 ++-- synapse/storage/stream.py | 6 --- synapse/util/retryutils.py | 2 +- tests/federation/test_federation.py | 10 +++-- tests/handlers/test_federation.py | 9 +++++ tests/handlers/test_presence.py | 7 +++- tests/handlers/test_typing.py | 7 +++- tests/rest/client/v1/test_events.py | 9 ----- tests/storage/test_appservice.py | 12 ++---- tests/storage/test_base.py | 20 +++++----- tests/storage/test_registration.py | 36 +++++++++++------- tests/storage/test_roommember.py | 14 +++---- tests/utils.py | 11 +++++- 36 files changed, 296 insertions(+), 160 deletions(-) create mode 100644 synapse/storage/engines/__init__.py create mode 100644 synapse/storage/engines/maria.py create mode 100644 synapse/storage/engines/sqlite3.py (limited to 'synapse/storage/stream.py') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index beab6ffc7a..b185b2f569 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -20,6 +20,7 @@ sys.dont_write_bytecode = True from synapse.storage import ( prepare_database, prepare_sqlite3_database, UpgradeDatabaseException, ) +from synapse.storage.engines import create_engine from synapse.server import HomeServer @@ -376,7 +377,7 @@ def setup(config_options): if name in ["MySQLdb", "mysql.connector"]: db_config.setdefault("args", {}).update({ "sql_mode": "TRADITIONAL", - "charset": "utf8", + "charset": "utf8mb4", "use_unicode": True, }) elif name == "sqlite3": @@ -388,6 +389,8 @@ def setup(config_options): else: raise RuntimeError("Unsupported database type '%s'" % (name,)) + database_engine = create_engine(name) + hs = SynapseHomeServer( config.server_name, domain_with_port=domain_with_port, @@ -398,6 +401,7 @@ def setup(config_options): config=config, content_addr=config.content_addr, version_string=version_string, + database_engine=database_engine, ) hs.create_resource_tree( @@ -409,12 +413,14 @@ def setup(config_options): logger.info("Preparing database: %s...", db_name) try: - # with sqlite3.connect(db_name) as db_conn: - # prepare_sqlite3_database(db_conn) - # prepare_database(db_conn) - import mysql.connector - db_conn = mysql.connector.connect(**db_config.get("args", {})) - prepare_database(db_conn) + db_conn = database_engine.module.connect(**db_config.get("args", {})) + + if name == "sqlite3": + prepare_sqlite3_database(db_conn) + + prepare_database(db_conn, database_engine) + + db_conn.commit() except UpgradeDatabaseException: sys.stderr.write( "\nFailed to upgrade database.\n" diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index abde7d0df5..f8053484cf 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -77,9 +77,6 @@ class DataStore(RoomMemberStore, RoomStore, self.min_token_deferred = self._get_min_token() self.min_token = None - self._next_stream_id_lock = threading.Lock() - self._next_stream_id = int(hs.get_clock().time_msec()) * 1000 - def insert_client_ip(self, user, access_token, device_id, ip, user_agent): return self._simple_upsert( "user_ips", @@ -127,19 +124,21 @@ class UpgradeDatabaseException(PrepareDatabaseException): pass -def prepare_database(db_conn): +def prepare_database(db_conn, database_engine): """Prepares a database for usage. Will either create all necessary tables or upgrade from an older schema version. """ try: cur = db_conn.cursor() - version_info = _get_or_create_schema_state(cur) + version_info = _get_or_create_schema_state(cur, database_engine) if version_info: user_version, delta_files, upgraded = version_info - _upgrade_existing_database(cur, user_version, delta_files, upgraded) + _upgrade_existing_database( + cur, user_version, delta_files, upgraded, database_engine + ) else: - _setup_new_database(cur) + _setup_new_database(cur, database_engine) # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,)) @@ -150,7 +149,7 @@ def prepare_database(db_conn): raise -def _setup_new_database(cur): +def _setup_new_database(cur, database_engine): """Sets up the database by finding a base set of "full schemas" and then applying any necessary deltas. @@ -210,7 +209,7 @@ def _setup_new_database(cur): executescript(cur, sql_loc) cur.execute( - _convert_param_style( + database_engine.convert_param_style( "REPLACE INTO schema_version (version, upgraded)" " VALUES (?,?)" ), @@ -221,12 +220,13 @@ def _setup_new_database(cur): cur, current_version=max_current_ver, applied_delta_files=[], - upgraded=False + upgraded=False, + database_engine=database_engine, ) def _upgrade_existing_database(cur, current_version, applied_delta_files, - upgraded): + upgraded, database_engine): """Upgrades an existing database. Delta files can either be SQL stored in *.sql files, or python modules @@ -335,26 +335,22 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, # Mark as done. cur.execute( - _convert_param_style( + database_engine.convert_param_style( "INSERT INTO applied_schema_deltas (version, file)" - " VALUES (?,?)" + " VALUES (?,?)", ), (v, relative_path) ) cur.execute( - _convert_param_style( + database_engine.convert_param_style( "REPLACE INTO schema_version (version, upgraded)" - " VALUES (?,?)" + " VALUES (?,?)", ), (v, True) ) -def _convert_param_style(sql): - return sql.replace("?", "%s") - - def get_statements(f): statement_buffer = "" in_comment = False # If we're in a /* ... */ style comment @@ -409,7 +405,7 @@ def executescript(txn, schema_path): txn.execute(statement) -def _get_or_create_schema_state(txn): +def _get_or_create_schema_state(txn, database_engine): try: # Bluntly try creating the schema_version tables. schema_path = os.path.join( @@ -426,7 +422,7 @@ def _get_or_create_schema_state(txn): if current_version: txn.execute( - _convert_param_style( + database_engine.convert_param_style( "SELECT file FROM applied_schema_deltas WHERE version >= ?" ), (current_version,) @@ -446,6 +442,8 @@ def prepare_sqlite3_database(db_conn): new. This only affects sqlite databases since they were the only ones supported at the time. """ + import sqlite3 + with db_conn: schema_path = os.path.join( dir_path, "schema", "schema_version.sql", @@ -466,7 +464,8 @@ def prepare_sqlite3_database(db_conn): db_conn.execute( _convert_param_style( "REPLACE INTO schema_version (version, upgraded)" - " VALUES (?,?)" + " VALUES (?,?)", + sqlite3 ), (row[0], False) ) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 76ec3ee93f..047d100f46 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -29,6 +29,7 @@ import functools import simplejson as json import sys import time +import threading logger = logging.getLogger(__name__) @@ -118,19 +119,16 @@ def cached(max_entries=1000, num_args=1): return wrap -def _convert_param_style(sql): - return sql.replace("?", "%s") - - class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object passed to the constructor. Adds logging and metrics to the .execute() method.""" - __slots__ = ["txn", "name"] + __slots__ = ["txn", "name", "database_engine"] - def __init__(self, txn, name): + def __init__(self, txn, name, database_engine): object.__setattr__(self, "txn", txn) object.__setattr__(self, "name", name) + object.__setattr__(self, "database_engine", database_engine) def __getattr__(self, name): return getattr(self.txn, name) @@ -142,7 +140,7 @@ class LoggingTransaction(object): # TODO(paul): Maybe use 'info' and 'debug' for values? sql_logger.debug("[SQL] {%s} %s", self.name, sql) - sql = _convert_param_style(sql) + sql = self.database_engine.convert_param_style(sql) try: if args and args[0]: @@ -227,9 +225,14 @@ class SQLBaseStore(object): self._get_event_cache = LruCache(hs.config.event_cache_size) + self.database_engine = hs.database_engine + # Pretend the getEventCache is just another named cache caches_by_name["*getEvent*"] = self._get_event_cache + self._next_stream_id_lock = threading.Lock() + self._next_stream_id = int(hs.get_clock().time_msec()) * 1000 + def start_profiling(self): self._previous_loop_ts = self._clock.time_msec() @@ -281,7 +284,10 @@ class SQLBaseStore(object): sql_scheduling_timer.inc_by(time.time() * 1000 - start_time) transaction_logger.debug("[TXN START] {%s}", name) try: - return func(LoggingTransaction(txn, name), *args, **kwargs) + return func( + LoggingTransaction(txn, name, self.database_engine), + *args, **kwargs + ) except: logger.exception("[TXN FAIL] {%s}", name) raise @@ -588,7 +594,7 @@ class SQLBaseStore(object): select_sql = "SELECT %s FROM %s WHERE %s" % ( ", ".join(retcols), table, - " AND ".join("%s = ?" % (k) for k in keyvalues) + " AND ".join("%s = ?" % (k,) for k in keyvalues) ) txn.execute(select_sql, keyvalues.values()) @@ -836,6 +842,12 @@ class SQLBaseStore(object): result = txn.fetchone() return result[0] if result else None + def get_next_stream_id(self): + with self._next_stream_id_lock: + i = self._next_stream_id + self._next_stream_id += 1 + return i + class _RollbackButIsFineException(Exception): """ This exception is used to rollback a transaction without implying diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py new file mode 100644 index 0000000000..709b6f88ac --- /dev/null +++ b/synapse/storage/engines/__init__.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .maria import MariaEngine +from .sqlite3 import Sqlite3Engine + + +SUPPORTED_MODULE = { + "sqlite3": Sqlite3Engine, + "mysql.connector": MariaEngine, +} + + +def create_engine(name): + engine_class = SUPPORTED_MODULE.get(name, None) + + if engine_class: + module = __import__(name) + return engine_class(module) + + raise RuntimeError( + "Unsupported database engine '%s'" % (name,) + ) diff --git a/synapse/storage/engines/maria.py b/synapse/storage/engines/maria.py new file mode 100644 index 0000000000..df47763647 --- /dev/null +++ b/synapse/storage/engines/maria.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import types + + +class MariaEngine(object): + def __init__(self, database_module): + self.module = database_module + + def convert_param_style(self, sql): + return sql.replace("?", "%s") + + def encode_parameter(self, param): + if isinstance(param, types.BufferType): + return str(param) + return param diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py new file mode 100644 index 0000000000..639cdea41d --- /dev/null +++ b/synapse/storage/engines/sqlite3.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class Sqlite3Engine(object): + def __init__(self, database_module): + self.module = database_module + + def convert_param_style(self, sql): + return sql + + def encode_parameter(self, param): + return param diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index e6975a945b..25fef79434 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -64,7 +64,7 @@ class KeyStore(SQLBaseStore): "fingerprint": fingerprint, "from_server": from_server, "ts_added_ms": time_now_ms, - "tls_certificate": tls_certificate_bytes, + "tls_certificate": buffer(tls_certificate_bytes), }, ) @@ -113,6 +113,6 @@ class KeyStore(SQLBaseStore): "key_id": "%s:%s" % (verify_key.alg, verify_key.version), "from_server": from_server, "ts_added_ms": time_now_ms, - "verify_key": verify_key.encode(), + "verify_key": buffer(verify_key.encode()), }, ) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 7258f7b2a5..0c785ec989 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -42,6 +42,7 @@ class RegistrationStore(SQLBaseStore): yield self._simple_insert( "access_tokens", { + "id": self.get_next_stream_id(), "user_id": user_id, "token": token }, @@ -78,8 +79,11 @@ class RegistrationStore(SQLBaseStore): # it's possible for this to get a conflict, but only for a single user # since tokens are namespaced based on their user ID - txn.execute("INSERT INTO access_tokens(user_id, token) " + - "VALUES (?,?)", [user_id, token]) + txn.execute( + "INSERT INTO access_tokens(id, user_id, token)" + " VALUES (?,?,?)", + (self.get_next_stream_id(), user_id, token,) + ) @defer.inlineCallbacks def get_user_by_id(self, user_id): diff --git a/synapse/storage/schema/delta/12/v12.sql b/synapse/storage/schema/delta/12/v12.sql index b526109e6e..90ac474859 100644 --- a/synapse/storage/schema/delta/12/v12.sql +++ b/synapse/storage/schema/delta/12/v12.sql @@ -18,7 +18,7 @@ CREATE TABLE IF NOT EXISTS rejections( reason VARCHAR(255) NOT NULL, last_check VARCHAR(255) NOT NULL, UNIQUE (event_id) -) ENGINE = INNODB; +) ; -- Push notification endpoints that users have configured CREATE TABLE IF NOT EXISTS pushers ( @@ -37,7 +37,7 @@ CREATE TABLE IF NOT EXISTS pushers ( last_success BIGINT, failing_since BIGINT, UNIQUE (app_id, pushkey) -) ENGINE = INNODB; +) ; CREATE TABLE IF NOT EXISTS push_rules ( id BIGINT PRIMARY KEY, @@ -48,7 +48,7 @@ CREATE TABLE IF NOT EXISTS push_rules ( conditions VARCHAR(255) NOT NULL, actions VARCHAR(255) NOT NULL, UNIQUE(user_name, rule_id) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name); @@ -56,7 +56,7 @@ CREATE TABLE IF NOT EXISTS user_filters( user_id VARCHAR(255), filter_id BIGINT, filter_json BLOB -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS user_filters_by_user_id_filter_id ON user_filters( user_id, filter_id diff --git a/synapse/storage/schema/delta/13/v13.sql b/synapse/storage/schema/delta/13/v13.sql index f0a5daf445..4953b6323c 100644 --- a/synapse/storage/schema/delta/13/v13.sql +++ b/synapse/storage/schema/delta/13/v13.sql @@ -20,7 +20,7 @@ CREATE TABLE IF NOT EXISTS application_services( hs_token VARCHAR(255), sender VARCHAR(255), UNIQUE(token) -) ENGINE = INNODB; +) ; CREATE TABLE IF NOT EXISTS application_services_regex( id BIGINT PRIMARY KEY, @@ -28,4 +28,4 @@ CREATE TABLE IF NOT EXISTS application_services_regex( namespace INTEGER, /* enum[room_id|room_alias|user_id] */ regex VARCHAR(255), FOREIGN KEY(as_id) REFERENCES application_services(id) -) ENGINE = INNODB; +) ; diff --git a/synapse/storage/schema/delta/14/v14.sql b/synapse/storage/schema/delta/14/v14.sql index a1260c5c1f..3bda073c94 100644 --- a/synapse/storage/schema/delta/14/v14.sql +++ b/synapse/storage/schema/delta/14/v14.sql @@ -4,6 +4,6 @@ CREATE TABLE IF NOT EXISTS push_rules_enable ( rule_id VARCHAR(255) NOT NULL, enabled TINYINT, UNIQUE(user_name, rule_id) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS push_rules_enable_user_name on push_rules_enable (user_name); diff --git a/synapse/storage/schema/full_schemas/11/event_edges.sql b/synapse/storage/schema/full_schemas/11/event_edges.sql index 0f53488e92..336cd563df 100644 --- a/synapse/storage/schema/full_schemas/11/event_edges.sql +++ b/synapse/storage/schema/full_schemas/11/event_edges.sql @@ -17,7 +17,7 @@ CREATE TABLE IF NOT EXISTS event_forward_extremities( event_id VARCHAR(255) NOT NULL, room_id VARCHAR(255) NOT NULL, UNIQUE (event_id, room_id) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id); CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id); @@ -27,7 +27,7 @@ CREATE TABLE IF NOT EXISTS event_backward_extremities( event_id VARCHAR(255) NOT NULL, room_id VARCHAR(255) NOT NULL, UNIQUE (event_id, room_id) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id); CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id); @@ -39,7 +39,7 @@ CREATE TABLE IF NOT EXISTS event_edges( room_id VARCHAR(255) NOT NULL, is_state BOOL NOT NULL, UNIQUE (event_id, prev_event_id, room_id, is_state) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id); CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id); @@ -49,7 +49,7 @@ CREATE TABLE IF NOT EXISTS room_depth( room_id VARCHAR(255) NOT NULL, min_depth INTEGER NOT NULL, UNIQUE (room_id) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id); @@ -59,7 +59,7 @@ create TABLE IF NOT EXISTS event_destinations( destination VARCHAR(255) NOT NULL, delivered_ts BIGINT DEFAULT 0, -- or 0 if not delivered UNIQUE (event_id, destination) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id); @@ -70,7 +70,7 @@ CREATE TABLE IF NOT EXISTS state_forward_extremities( type VARCHAR(255) NOT NULL, state_key VARCHAR(255) NOT NULL, UNIQUE (event_id, room_id) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS st_extrem_keys ON state_forward_extremities( room_id, type, state_key @@ -83,7 +83,7 @@ CREATE TABLE IF NOT EXISTS event_auth( auth_id VARCHAR(255) NOT NULL, room_id VARCHAR(255) NOT NULL, UNIQUE (event_id, auth_id, room_id) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS evauth_edges_id ON event_auth(event_id); CREATE INDEX IF NOT EXISTS evauth_edges_auth_id ON event_auth(auth_id); diff --git a/synapse/storage/schema/full_schemas/11/event_signatures.sql b/synapse/storage/schema/full_schemas/11/event_signatures.sql index 334d7c8680..11e611598b 100644 --- a/synapse/storage/schema/full_schemas/11/event_signatures.sql +++ b/synapse/storage/schema/full_schemas/11/event_signatures.sql @@ -18,7 +18,7 @@ CREATE TABLE IF NOT EXISTS event_content_hashes ( algorithm VARCHAR(255), hash BLOB, UNIQUE (event_id, algorithm) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS event_content_hashes_id ON event_content_hashes(event_id); @@ -28,7 +28,7 @@ CREATE TABLE IF NOT EXISTS event_reference_hashes ( algorithm VARCHAR(255), hash BLOB, UNIQUE (event_id, algorithm) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes(event_id); @@ -39,7 +39,7 @@ CREATE TABLE IF NOT EXISTS event_signatures ( key_id VARCHAR(255), signature BLOB, UNIQUE (event_id, signature_name, key_id) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures(event_id); @@ -50,6 +50,6 @@ CREATE TABLE IF NOT EXISTS event_edge_hashes( algorithm VARCHAR(255), hash BLOB, UNIQUE (event_id, prev_event_id, algorithm) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS event_edge_hashes_id ON event_edge_hashes(event_id); diff --git a/synapse/storage/schema/full_schemas/11/im.sql b/synapse/storage/schema/full_schemas/11/im.sql index 9849e969be..a0fb337629 100644 --- a/synapse/storage/schema/full_schemas/11/im.sql +++ b/synapse/storage/schema/full_schemas/11/im.sql @@ -25,7 +25,7 @@ CREATE TABLE IF NOT EXISTS events( outlier BOOL NOT NULL, depth BIGINT DEFAULT 0 NOT NULL, UNIQUE (event_id) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS events_stream_ordering ON events (stream_ordering); CREATE INDEX IF NOT EXISTS events_topological_ordering ON events (topological_ordering); @@ -38,7 +38,7 @@ CREATE TABLE IF NOT EXISTS event_json( internal_metadata BLOB NOT NULL, json BLOB NOT NULL, UNIQUE (event_id) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS event_json_room_id ON event_json(room_id); @@ -50,7 +50,7 @@ CREATE TABLE IF NOT EXISTS state_events( state_key VARCHAR(255) NOT NULL, prev_state VARCHAR(255), UNIQUE (event_id) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS state_events_room_id ON state_events (room_id); CREATE INDEX IF NOT EXISTS state_events_type ON state_events (type); @@ -64,7 +64,7 @@ CREATE TABLE IF NOT EXISTS current_state_events( state_key VARCHAR(255) NOT NULL, UNIQUE (event_id), UNIQUE (room_id, type, state_key) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS current_state_events_room_id ON current_state_events (room_id); CREATE INDEX IF NOT EXISTS current_state_events_type ON current_state_events (type); @@ -77,7 +77,7 @@ CREATE TABLE IF NOT EXISTS room_memberships( room_id VARCHAR(255) NOT NULL, membership VARCHAR(255) NOT NULL, UNIQUE (event_id) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS room_memberships_room_id ON room_memberships (room_id); CREATE INDEX IF NOT EXISTS room_memberships_user_id ON room_memberships (user_id); @@ -89,14 +89,14 @@ CREATE TABLE IF NOT EXISTS feedback( sender VARCHAR(255), room_id VARCHAR(255), UNIQUE (event_id) -) ENGINE = INNODB; +) ; CREATE TABLE IF NOT EXISTS topics( event_id VARCHAR(255) NOT NULL, room_id VARCHAR(255) NOT NULL, topic VARCHAR(255) NOT NULL, UNIQUE (event_id) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS topics_room_id ON topics(room_id); @@ -113,12 +113,12 @@ CREATE TABLE IF NOT EXISTS rooms( room_id VARCHAR(255) PRIMARY KEY NOT NULL, is_public BOOL, creator VARCHAR(255) -) ENGINE = INNODB; +) ; CREATE TABLE IF NOT EXISTS room_hosts( room_id VARCHAR(255) NOT NULL, host VARCHAR(255) NOT NULL, UNIQUE (room_id, host) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS room_hosts_room_id ON room_hosts (room_id); diff --git a/synapse/storage/schema/full_schemas/11/keys.sql b/synapse/storage/schema/full_schemas/11/keys.sql index c0f2ec29bb..a785cdb4c5 100644 --- a/synapse/storage/schema/full_schemas/11/keys.sql +++ b/synapse/storage/schema/full_schemas/11/keys.sql @@ -19,7 +19,7 @@ CREATE TABLE IF NOT EXISTS server_tls_certificates( ts_added_ms BIGINT, -- When the certifcate was added. tls_certificate BLOB, -- DER encoded x509 certificate. UNIQUE (server_name, fingerprint) -) ENGINE = INNODB; +) ; CREATE TABLE IF NOT EXISTS server_signature_keys( server_name VARCHAR(255), -- Server name. @@ -28,4 +28,4 @@ CREATE TABLE IF NOT EXISTS server_signature_keys( ts_added_ms BIGINT, -- When the key was added. verify_key BLOB, -- NACL verification key. UNIQUE (server_name, key_id) -) ENGINE = INNODB; +) ; diff --git a/synapse/storage/schema/full_schemas/11/media_repository.sql b/synapse/storage/schema/full_schemas/11/media_repository.sql index d9559f5902..27fe297af6 100644 --- a/synapse/storage/schema/full_schemas/11/media_repository.sql +++ b/synapse/storage/schema/full_schemas/11/media_repository.sql @@ -21,7 +21,7 @@ CREATE TABLE IF NOT EXISTS local_media_repository ( upload_name VARCHAR(255), -- The name the media was uploaded with. user_id VARCHAR(255), -- The user who uploaded the file. UNIQUE (media_id) -) ENGINE = INNODB; +) ; CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails ( media_id VARCHAR(255), -- The id used to refer to the media. @@ -33,7 +33,7 @@ CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails ( UNIQUE ( media_id, thumbnail_width, thumbnail_height, thumbnail_type ) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS local_media_repository_thumbnails_media_id ON local_media_repository_thumbnails (media_id); @@ -47,7 +47,7 @@ CREATE TABLE IF NOT EXISTS remote_media_cache ( media_length INTEGER, -- Length of the media in bytes. filesystem_id VARCHAR(255), -- The name used to store the media on disk. UNIQUE (media_origin, media_id) -) ENGINE = INNODB; +) ; CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails ( media_origin VARCHAR(255), -- The remote HS the media came from. @@ -62,7 +62,7 @@ CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails ( media_origin, media_id, thumbnail_width, thumbnail_height, thumbnail_type ) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id ON remote_media_cache_thumbnails (media_id); diff --git a/synapse/storage/schema/full_schemas/11/presence.sql b/synapse/storage/schema/full_schemas/11/presence.sql index 8031321083..b48b110ae9 100644 --- a/synapse/storage/schema/full_schemas/11/presence.sql +++ b/synapse/storage/schema/full_schemas/11/presence.sql @@ -18,7 +18,7 @@ CREATE TABLE IF NOT EXISTS presence( status_msg VARCHAR(255), mtime BIGINT, -- miliseconds since last state change UNIQUE(user_id) -) ENGINE = INNODB; +) ; -- For each of /my/ users which possibly-remote users are allowed to see their -- presence state @@ -26,7 +26,7 @@ CREATE TABLE IF NOT EXISTS presence_allow_inbound( observed_user_id VARCHAR(255) NOT NULL, observer_user_id VARCHAR(255), -- a UserID, UNIQUE(observed_user_id) -) ENGINE = INNODB; +) ; -- For each of /my/ users (watcher), which possibly-remote users are they -- watching? @@ -35,4 +35,4 @@ CREATE TABLE IF NOT EXISTS presence_list( observed_user_id VARCHAR(255), -- a UserID, accepted BOOLEAN, UNIQUE(user_id) -) ENGINE = INNODB; +) ; diff --git a/synapse/storage/schema/full_schemas/11/profiles.sql b/synapse/storage/schema/full_schemas/11/profiles.sql index 552645c56f..92da48f97e 100644 --- a/synapse/storage/schema/full_schemas/11/profiles.sql +++ b/synapse/storage/schema/full_schemas/11/profiles.sql @@ -14,7 +14,7 @@ */ CREATE TABLE IF NOT EXISTS profiles( user_id VARCHAR(255) NOT NULL, - displayname VARBINARY(255), + displayname VARCHAR(255), avatar_url VARCHAR(255), UNIQUE(user_id) -) ENGINE = INNODB; +) ; diff --git a/synapse/storage/schema/full_schemas/11/redactions.sql b/synapse/storage/schema/full_schemas/11/redactions.sql index ba93e860f6..9b52a2012a 100644 --- a/synapse/storage/schema/full_schemas/11/redactions.sql +++ b/synapse/storage/schema/full_schemas/11/redactions.sql @@ -16,7 +16,7 @@ CREATE TABLE IF NOT EXISTS redactions ( event_id VARCHAR(255) NOT NULL, redacts VARCHAR(255) NOT NULL, UNIQUE (event_id) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS redactions_event_id ON redactions (event_id); CREATE INDEX IF NOT EXISTS redactions_redacts ON redactions (redacts); diff --git a/synapse/storage/schema/full_schemas/11/room_aliases.sql b/synapse/storage/schema/full_schemas/11/room_aliases.sql index 1e706aac2b..220df87573 100644 --- a/synapse/storage/schema/full_schemas/11/room_aliases.sql +++ b/synapse/storage/schema/full_schemas/11/room_aliases.sql @@ -14,12 +14,12 @@ */ CREATE TABLE IF NOT EXISTS room_aliases( - room_alias VARCHAR(255) NOT NULL, + room_alias VARBINARY(255) NOT NULL, room_id VARCHAR(255) NOT NULL, UNIQUE (room_alias) -) ENGINE = INNODB; +) ; CREATE TABLE IF NOT EXISTS room_alias_servers( - room_alias VARCHAR(255) NOT NULL, + room_alias VARBINARY(255) NOT NULL, server VARCHAR(255) NOT NULL -) ENGINE = INNODB; +) ; diff --git a/synapse/storage/schema/full_schemas/11/state.sql b/synapse/storage/schema/full_schemas/11/state.sql index be9dc2920d..40584a325f 100644 --- a/synapse/storage/schema/full_schemas/11/state.sql +++ b/synapse/storage/schema/full_schemas/11/state.sql @@ -17,7 +17,7 @@ CREATE TABLE IF NOT EXISTS state_groups( id VARCHAR(20) PRIMARY KEY, room_id VARCHAR(255) NOT NULL, event_id VARCHAR(255) NOT NULL -) ENGINE = INNODB; +) ; CREATE TABLE IF NOT EXISTS state_groups_state( state_group VARCHAR(20) NOT NULL, @@ -25,13 +25,13 @@ CREATE TABLE IF NOT EXISTS state_groups_state( type VARCHAR(255) NOT NULL, state_key VARCHAR(255) NOT NULL, event_id VARCHAR(255) NOT NULL -) ENGINE = INNODB; +) ; CREATE TABLE IF NOT EXISTS event_to_state_groups( event_id VARCHAR(255) NOT NULL, state_group VARCHAR(255) NOT NULL, UNIQUE (event_id) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS state_groups_id ON state_groups(id); diff --git a/synapse/storage/schema/full_schemas/11/transactions.sql b/synapse/storage/schema/full_schemas/11/transactions.sql index bd13bba8c2..d33bdfb301 100644 --- a/synapse/storage/schema/full_schemas/11/transactions.sql +++ b/synapse/storage/schema/full_schemas/11/transactions.sql @@ -21,7 +21,7 @@ CREATE TABLE IF NOT EXISTS received_transactions( response_json BLOB, has_been_referenced BOOL default 0, -- Whether thishas been referenced by a prev_tx UNIQUE (transaction_id, origin) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0; @@ -35,7 +35,7 @@ CREATE TABLE IF NOT EXISTS sent_transactions( response_code INTEGER DEFAULT 0, response_json BLOB, ts BIGINT -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS sent_transaction_dest ON sent_transactions(destination); CREATE INDEX IF NOT EXISTS sent_transaction_txn_id ON sent_transactions(transaction_id); @@ -51,7 +51,7 @@ CREATE TABLE IF NOT EXISTS transaction_id_to_pdu( pdu_id VARCHAR(255), pdu_origin VARCHAR(255), UNIQUE (transaction_id, destination) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination); @@ -60,4 +60,4 @@ CREATE TABLE IF NOT EXISTS destinations( destination VARCHAR(255) PRIMARY KEY, retry_last_ts BIGINT, retry_interval INTEGER -) ENGINE = INNODB; +) ; diff --git a/synapse/storage/schema/full_schemas/11/users.sql b/synapse/storage/schema/full_schemas/11/users.sql index 55bffb22f3..28909f5805 100644 --- a/synapse/storage/schema/full_schemas/11/users.sql +++ b/synapse/storage/schema/full_schemas/11/users.sql @@ -14,20 +14,20 @@ */ CREATE TABLE IF NOT EXISTS users( name VARCHAR(255), - password_hash VARBINARY(255), + password_hash VARCHAR(255), creation_ts BIGINT, admin BOOL DEFAULT 0 NOT NULL, UNIQUE(name) -) ENGINE = INNODB; +) ; CREATE TABLE IF NOT EXISTS access_tokens( - id INTEGER PRIMARY KEY AUTO_INCREMENT, + id BIGINT PRIMARY KEY, user_id VARCHAR(255) NOT NULL, device_id VARCHAR(255), token VARCHAR(255) NOT NULL, last_used BIGINT, UNIQUE(token) -) ENGINE = INNODB; +) ; CREATE TABLE IF NOT EXISTS user_ips ( user VARCHAR(255) NOT NULL, @@ -37,6 +37,6 @@ CREATE TABLE IF NOT EXISTS user_ips ( user_agent VARCHAR(255) NOT NULL, last_seen BIGINT NOT NULL, UNIQUE (user, access_token, ip, user_agent) -) ENGINE = INNODB; +) ; CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user); diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index 35bba854f9..f051828630 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -54,7 +54,7 @@ class SignatureStore(SQLBaseStore): { "event_id": event_id, "algorithm": algorithm, - "hash": hash_bytes, + "hash": buffer(hash_bytes), }, ) @@ -116,7 +116,7 @@ class SignatureStore(SQLBaseStore): { "event_id": event_id, "algorithm": algorithm, - "hash": hash_bytes, + "hash": buffer(hash_bytes), }, ) @@ -160,7 +160,7 @@ class SignatureStore(SQLBaseStore): "event_id": event_id, "signature_name": signature_name, "key_id": key_id, - "signature": signature_bytes, + "signature": buffer(signature_bytes), }, ) @@ -193,6 +193,6 @@ class SignatureStore(SQLBaseStore): "event_id": event_id, "prev_event_id": prev_event_id, "algorithm": algorithm, - "hash": hash_bytes, + "hash": buffer(hash_bytes), }, ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 3a310cd003..e6bb5a8077 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -433,12 +433,6 @@ class StreamStore(SQLBaseStore): defer.returnValue(self.min_token) - def get_next_stream_id(self): - with self._next_stream_id_lock: - i = self._next_stream_id - self._next_stream_id += 1 - return i - def _get_room_events_max_id_txn(self, txn): txn.execute( "SELECT MAX(stream_ordering) as m FROM events" diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 4e82232796..a42138f556 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -60,7 +60,7 @@ def get_retry_limiter(destination, clock, store, **kwargs): if retry_timings: retry_last_ts, retry_interval = ( - retry_timings.retry_last_ts, retry_timings.retry_interval + retry_timings["retry_last_ts"], retry_timings["retry_interval"] ) now = int(clock.time_msec()) diff --git a/tests/federation/test_federation.py b/tests/federation/test_federation.py index 2ecd00d2ad..a4ef60b911 100644 --- a/tests/federation/test_federation.py +++ b/tests/federation/test_federation.py @@ -24,8 +24,6 @@ from ..utils import MockHttpResource, MockClock, setup_test_homeserver from synapse.federation import initialize_http_replication from synapse.events import FrozenEvent -from synapse.storage.transactions import DestinationsTable - def make_pdu(prev_pdus=[], **kwargs): """Provide some default fields for making a PduTuple.""" @@ -57,8 +55,14 @@ class FederationTestCase(unittest.TestCase): self.mock_persistence.get_received_txn_response.return_value = ( defer.succeed(None) ) + + retry_timings_res = { + "destination": "", + "retry_last_ts": 0, + "retry_interval": 0, + } self.mock_persistence.get_destination_retry_timings.return_value = ( - defer.succeed(DestinationsTable.EntryType("", 0, 0)) + defer.succeed(retry_timings_res) ) self.mock_persistence.get_auth_chain.return_value = [] self.clock = MockClock() diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index c13ade3286..08d2404b6c 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -87,6 +87,15 @@ class FederationTestCase(unittest.TestCase): self.datastore.get_room.return_value = defer.succeed(True) self.auth.check_host_in_room.return_value = defer.succeed(True) + retry_timings_res = { + "destination": "", + "retry_last_ts": 0, + "retry_interval": 0, + } + self.datastore.get_destination_retry_timings.return_value = ( + defer.succeed(retry_timings_res) + ) + def have_events(event_ids): return defer.succeed({}) self.datastore.have_events.side_effect = have_events diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 04eba4289e..9b0e606918 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -194,8 +194,13 @@ class MockedDatastorePresenceTestCase(PresenceTestCase): return datastore def setUp_datastore_federation_mocks(self, datastore): + retry_timings_res = { + "destination": "", + "retry_last_ts": 0, + "retry_interval": 0, + } datastore.get_destination_retry_timings.return_value = ( - defer.succeed(DestinationsTable.EntryType("", 0, 0)) + defer.succeed(retry_timings_res) ) def get_received_txn_response(*args): diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index bf34b7ccbd..2d76b23564 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -96,8 +96,13 @@ class TypingNotificationsTestCase(unittest.TestCase): self.event_source = hs.get_event_sources().sources["typing"] self.datastore = hs.get_datastore() + retry_timings_res = { + "destination": "", + "retry_last_ts": 0, + "retry_interval": 0, + } self.datastore.get_destination_retry_timings.return_value = ( - defer.succeed(DestinationsTable.EntryType("", 0, 0)) + defer.succeed(retry_timings_res) ) def get_received_txn_response(*args): diff --git a/tests/rest/client/v1/test_events.py b/tests/rest/client/v1/test_events.py index 36b0f2ff6d..445272e323 100644 --- a/tests/rest/client/v1/test_events.py +++ b/tests/rest/client/v1/test_events.py @@ -115,12 +115,6 @@ class EventStreamPermissionsTestCase(RestTestCase): hs = yield setup_test_homeserver( http_client=None, replication_layer=Mock(), - clock=Mock(spec=[ - "call_later", - "cancel_call_later", - "time_msec", - "time" - ]), ratelimiter=NonCallableMock(spec_set=[ "send_message", ]), @@ -132,9 +126,6 @@ class EventStreamPermissionsTestCase(RestTestCase): hs.get_handlers().federation_handler = Mock() - hs.get_clock().time_msec.return_value = 1000000 - hs.get_clock().time.return_value = 1000 - synapse.rest.client.v1.register.register_servlets(hs, self.mock_resource) synapse.rest.client.v1.events.register_servlets(hs, self.mock_resource) synapse.rest.client.v1.room.register_servlets(hs, self.mock_resource) diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index ca5b92ec85..2ad55c8462 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -16,22 +16,18 @@ from tests import unittest from twisted.internet import defer from synapse.appservice import ApplicationService -from synapse.server import HomeServer from synapse.storage.appservice import ApplicationServiceStore -from mock import Mock -from tests.utils import SQLiteMemoryDbPool, MockClock +from tests.utils import setup_test_homeserver class ApplicationServiceStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): - db_pool = SQLiteMemoryDbPool() - yield db_pool.prepare() - hs = HomeServer( - "test", db_pool=db_pool, clock=MockClock(), config=Mock() - ) + hs = yield setup_test_homeserver() + db_pool = hs.get_db_pool() + self.as_token = "token1" db_pool.runQuery( "INSERT INTO application_services(token) VALUES(?)", diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py index 7f5845cf0c..5c17d30148 100644 --- a/tests/storage/test_base.py +++ b/tests/storage/test_base.py @@ -24,6 +24,7 @@ from collections import OrderedDict from synapse.server import HomeServer from synapse.storage._base import SQLBaseStore +from synapse.storage.engines import create_engine class SQLBaseStoreTestCase(unittest.TestCase): @@ -40,7 +41,12 @@ class SQLBaseStoreTestCase(unittest.TestCase): config = Mock() config.event_cache_size = 1 - hs = HomeServer("test", db_pool=self.db_pool, config=config) + hs = HomeServer( + "test", + db_pool=self.db_pool, + config=config, + database_engine=create_engine("sqlite3"), + ) self.datastore = SQLBaseStore(hs) @@ -86,8 +92,7 @@ class SQLBaseStoreTestCase(unittest.TestCase): self.assertEquals("Value", value) self.mock_txn.execute.assert_called_with( - "SELECT retcol FROM tablename WHERE keycol = ? " - "ORDER BY rowid asc", + "SELECT retcol FROM tablename WHERE keycol = ?", ["TheKey"] ) @@ -104,8 +109,7 @@ class SQLBaseStoreTestCase(unittest.TestCase): self.assertEquals({"colA": 1, "colB": 2, "colC": 3}, ret) self.mock_txn.execute.assert_called_with( - "SELECT colA, colB, colC FROM tablename WHERE keycol = ? " - "ORDER BY rowid asc", + "SELECT colA, colB, colC FROM tablename WHERE keycol = ?", ["TheKey"] ) @@ -139,8 +143,7 @@ class SQLBaseStoreTestCase(unittest.TestCase): self.assertEquals([{"colA": 1}, {"colA": 2}, {"colA": 3}], ret) self.mock_txn.execute.assert_called_with( - "SELECT colA FROM tablename WHERE keycol = ? " - "ORDER BY rowid asc", + "SELECT colA FROM tablename WHERE keycol = ?", ["A set"] ) @@ -189,8 +192,7 @@ class SQLBaseStoreTestCase(unittest.TestCase): self.assertEquals({"columname": "Old Value"}, ret) self.mock_txn.execute.assert_has_calls([ - call('SELECT columname FROM tablename WHERE keycol = ? ' - 'ORDER BY rowid asc', + call('SELECT columname FROM tablename WHERE keycol = ?', ['TheKey']), call("UPDATE tablename SET columname = ? WHERE keycol = ?", ["New Value", "TheKey"]) diff --git a/tests/storage/test_registration.py b/tests/storage/test_registration.py index e0b81f2b57..78f6004204 100644 --- a/tests/storage/test_registration.py +++ b/tests/storage/test_registration.py @@ -42,28 +42,38 @@ class RegistrationStoreTestCase(unittest.TestCase): self.assertEquals( # TODO(paul): Surely this field should be 'user_id', not 'name' # Additionally surely it shouldn't come in a 1-element list - [{"name": self.user_id, "password_hash": self.pwhash}], + {"name": self.user_id, "password_hash": self.pwhash}, (yield self.store.get_user_by_id(self.user_id)) ) - self.assertEquals( - {"admin": 0, - "device_id": None, - "name": self.user_id, - "token_id": 1}, - (yield self.store.get_user_by_token(self.tokens[0])) + result = yield self.store.get_user_by_token(self.tokens[1]) + + self.assertDictContainsSubset( + { + "admin": 0, + "device_id": None, + "name": self.user_id, + }, + result ) + self.assertTrue("token_id" in result) + @defer.inlineCallbacks def test_add_tokens(self): yield self.store.register(self.user_id, self.tokens[0], self.pwhash) yield self.store.add_access_token_to_user(self.user_id, self.tokens[1]) - self.assertEquals( - {"admin": 0, - "device_id": None, - "name": self.user_id, - "token_id": 2}, - (yield self.store.get_user_by_token(self.tokens[1])) + result = yield self.store.get_user_by_token(self.tokens[1]) + + self.assertDictContainsSubset( + { + "admin": 0, + "device_id": None, + "name": self.user_id, + }, + result ) + self.assertTrue("token_id" in result) + diff --git a/tests/storage/test_roommember.py b/tests/storage/test_roommember.py index 811fea544b..785953cc89 100644 --- a/tests/storage/test_roommember.py +++ b/tests/storage/test_roommember.py @@ -119,7 +119,7 @@ class RoomMemberStoreTestCase(unittest.TestCase): yield self.inject_room_member(self.room, self.u_alice, Membership.JOIN) self.assertEquals( - ["test"], + {"test"}, (yield self.store.get_joined_hosts_for_room(self.room.to_string())) ) @@ -127,7 +127,7 @@ class RoomMemberStoreTestCase(unittest.TestCase): yield self.inject_room_member(self.room, self.u_bob, Membership.JOIN) self.assertEquals( - ["test"], + {"test"}, (yield self.store.get_joined_hosts_for_room(self.room.to_string())) ) @@ -136,9 +136,9 @@ class RoomMemberStoreTestCase(unittest.TestCase): self.assertEquals( {"test", "elsewhere"}, - set((yield + (yield self.store.get_joined_hosts_for_room(self.room.to_string()) - )) + ) ) # Should still have both hosts @@ -146,15 +146,15 @@ class RoomMemberStoreTestCase(unittest.TestCase): self.assertEquals( {"test", "elsewhere"}, - set((yield + (yield self.store.get_joined_hosts_for_room(self.room.to_string()) - )) + ) ) # Should have only one host after other leaves yield self.inject_room_member(self.room, self.u_charlie, Membership.LEAVE) self.assertEquals( - ["test"], + {"test"}, (yield self.store.get_joined_hosts_for_room(self.room.to_string())) ) diff --git a/tests/utils.py b/tests/utils.py index 81e82a80df..cc038fecf1 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -17,6 +17,7 @@ from synapse.http.server import HttpServer from synapse.api.errors import cs_error, CodeMessageException, StoreError from synapse.api.constants import EventTypes from synapse.storage import prepare_database +from synapse.storage.engines import create_engine from synapse.server import HomeServer from synapse.util.logcontext import LoggingContext @@ -44,18 +45,23 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): config.event_cache_size = 1 config.disable_registration = False + if "clock" not in kargs: + kargs["clock"] = MockClock() + if datastore is None: db_pool = SQLiteMemoryDbPool() yield db_pool.prepare() hs = HomeServer( name, db_pool=db_pool, config=config, version_string="Synapse/tests", + database_engine=create_engine("sqlite3"), **kargs ) else: hs = HomeServer( name, db_pool=None, datastore=datastore, config=config, version_string="Synapse/tests", + database_engine=create_engine("sqlite3"), **kargs ) @@ -227,7 +233,10 @@ class SQLiteMemoryDbPool(ConnectionPool, object): ) def prepare(self): - return self.runWithConnection(prepare_database) + engine = create_engine("sqlite3") + return self.runWithConnection( + lambda conn: prepare_database(conn, engine) + ) class MemoryDataStore(object): -- cgit 1.4.1 From 8ad0f4912ed72daced74ae4d1c939ebdbc517476 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 9 Apr 2015 11:41:36 +0100 Subject: Stream ordering and out of order insertions. Handle the fact that events can be persisted out of order, and so to get the "current max" stream token becomes non trivial - as we need to make sure that *all* stream tokens less than the current max have also successfully been persisted. --- synapse/storage/_base.py | 46 +------------ synapse/storage/events.py | 9 ++- synapse/storage/stream.py | 23 +------ synapse/storage/util/__init__.py | 14 ++++ synapse/storage/util/id_generators.py | 126 ++++++++++++++++++++++++++++++++++ 5 files changed, 153 insertions(+), 65 deletions(-) create mode 100644 synapse/storage/util/__init__.py create mode 100644 synapse/storage/util/id_generators.py (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 23289bbdd4..badf9a5f40 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -22,6 +22,8 @@ from synapse.util.logcontext import PreserveLoggingContext, LoggingContext from synapse.util.lrucache import LruCache import synapse.metrics +from util.id_generators import IdGenerator, StreamIdGenerator + from twisted.internet import defer from collections import namedtuple, OrderedDict @@ -29,7 +31,6 @@ import functools import simplejson as json import sys import time -import threading logger = logging.getLogger(__name__) @@ -232,46 +233,6 @@ class PerformanceCounters(object): return top_n_counters -class IdGenerator(object): - def __init__(self, table, column, store): - self.table = table - self.column = column - self.store = store - self._lock = threading.Lock() - self._next_id = None - - @defer.inlineCallbacks - def get_next(self): - with self._lock: - if not self._next_id: - res = yield self.store._execute_and_decode( - "IdGenerator_%s" % (self.table,), - "SELECT MAX(%s) as mx FROM %s" % (self.column, self.table,) - ) - - self._next_id = (res and res[0] and res[0]["mx"]) or 1 - - i = self._next_id - self._next_id += 1 - defer.returnValue(i) - - def get_next_txn(self, txn): - with self._lock: - if self._next_id: - i = self._next_id - self._next_id += 1 - return i - else: - txn.execute( - "SELECT MAX(%s) FROM %s" % (self.column, self.table,) - ) - - val, = txn.fetchone() - self._next_id = val or 2 - - return 1 - - class SQLBaseStore(object): _TXN_ID = 0 @@ -297,7 +258,7 @@ class SQLBaseStore(object): # Pretend the getEventCache is just another named cache caches_by_name["*getEvent*"] = self._get_event_cache - self._stream_id_gen = IdGenerator("events", "stream_ordering", self) + self._stream_id_gen = StreamIdGenerator() self._transaction_id_gen = IdGenerator("sent_transactions", "id", self) self._state_groups_id_gen = IdGenerator("state_groups", "id", self) self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self) @@ -363,7 +324,6 @@ class SQLBaseStore(object): *args, **kwargs ) except self.database_engine.module.DatabaseError as e: - logger.warn("[TXN DEADLOCK] {%s} %r, %r", name, e.errno, e.sqlstate) if self.database_engine.is_deadlock(e): logger.warn("[TXN DEADLOCK] {%s} %d/%d", name, i, N) if i < N: diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 3b3416716e..f066484c7e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -52,7 +52,6 @@ class EventsStore(SQLBaseStore): is_new_state=is_new_state, current_state=current_state, ) - self.get_room_events_max_id.invalidate() except _RollbackButIsFineException: pass @@ -97,7 +96,13 @@ class EventsStore(SQLBaseStore): self._get_event_cache.pop(event.event_id) if stream_ordering is None: - stream_ordering = self._stream_id_gen.get_next_txn(txn) + with self._stream_id_gen.get_next_txn(txn) as stream_ordering: + return self._persist_event_txn( + txn, event, context, backfilled, + stream_ordering=stream_ordering, + is_new_state=is_new_state, + current_state=current_state, + ) # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index e6bb5a8077..9925f04bf7 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -413,12 +413,10 @@ class StreamStore(SQLBaseStore): "get_recent_events_for_room", get_recent_events_for_room_txn ) - @cached(num_args=0) + @defer.inlineCallbacks def get_room_events_max_id(self): - return self.runInteraction( - "get_room_events_max_id", - self._get_room_events_max_id_txn - ) + token = yield self._stream_id_gen.get_max_token(self) + defer.returnValue("s%d" % (token,)) @defer.inlineCallbacks def _get_min_token(self): @@ -433,21 +431,6 @@ class StreamStore(SQLBaseStore): defer.returnValue(self.min_token) - def _get_room_events_max_id_txn(self, txn): - txn.execute( - "SELECT MAX(stream_ordering) as m FROM events" - ) - - res = self.cursor_to_dict(txn) - - logger.debug("get_room_events_max_id: %s", res) - - if not res or not res[0] or not res[0]["m"]: - return "s0" - - key = res[0]["m"] - return "s%d" % (key,) - @staticmethod def _set_before_and_after(events, rows): for event, row in zip(events, rows): diff --git a/synapse/storage/util/__init__.py b/synapse/storage/util/__init__.py new file mode 100644 index 0000000000..c488b10d3c --- /dev/null +++ b/synapse/storage/util/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py new file mode 100644 index 0000000000..8f419323a7 --- /dev/null +++ b/synapse/storage/util/id_generators.py @@ -0,0 +1,126 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from collections import deque +import contextlib +import threading + + +class IdGenerator(object): + def __init__(self, table, column, store): + self.table = table + self.column = column + self.store = store + self._lock = threading.Lock() + self._next_id = None + + @defer.inlineCallbacks + def get_next(self): + with self._lock: + if not self._next_id: + res = yield self.store._execute_and_decode( + "IdGenerator_%s" % (self.table,), + "SELECT MAX(%s) as mx FROM %s" % (self.column, self.table,) + ) + + self._next_id = (res and res[0] and res[0]["mx"]) or 1 + + i = self._next_id + self._next_id += 1 + defer.returnValue(i) + + def get_next_txn(self, txn): + with self._lock: + if self._next_id: + i = self._next_id + self._next_id += 1 + return i + else: + txn.execute( + "SELECT MAX(%s) FROM %s" % (self.column, self.table,) + ) + + val, = txn.fetchone() + self._next_id = val or 2 + + return 1 + + +class StreamIdGenerator(object): + """Used to generate new stream ids when persisting events while keeping + track of which transactions have been completed. + + This allows us to get the "current" stream id, i.e. the stream id such that + all ids less than or equal to it have completed. This handles the fact that + persistence of events can complete out of order. + + Usage: + with stream_id_gen.get_next_txn(txn) as stream_id: + # ... persist event ... + """ + def __init__(self): + self._lock = threading.Lock() + + self._current_max = None + self._unfinished_ids = deque() + + def get_next_txn(self, txn): + """ + Usage: + with stream_id_gen.get_next_txn(txn) as stream_id: + # ... persist event ... + """ + with self._lock: + if not self._current_max: + self._compute_current_max(txn) + + self._current_max += 1 + next_id = self._current_max + + self._unfinished_ids.append(next_id) + + @contextlib.contextmanager + def manager(): + yield next_id + with self._lock: + self._unfinished_ids.remove(next_id) + + return manager() + + def get_max_token(self, store): + """Returns the maximum stream id such that all stream ids less than or + equal to it have been successfully persisted. + """ + with self._lock: + if self._unfinished_ids: + return self._unfinished_ids[0] - 1 + + if not self._current_max: + return store.runInteraction( + "_compute_current_max", + self._compute_current_max, + ) + + return self._current_max + + def _compute_current_max(self, txn): + txn.execute("SELECT MAX(stream_ordering) FROM events") + val, = txn.fetchone() + + self._current_max = int(val) if val else 1 + + return self._current_max -- cgit 1.4.1 From 2ded3446201f833a69cdb7cf269c65e5f9de1f27 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 9 Apr 2015 13:46:06 +0100 Subject: Remove unused import --- synapse/storage/stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 9925f04bf7..57c2e4dfeb 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -35,7 +35,7 @@ what sort order was used: from twisted.internet import defer -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.util.logutils import log_function -- cgit 1.4.1 From 58d83399663a080c123d2f112b4f4d84accbc638 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 14 Apr 2015 13:53:20 +0100 Subject: Add support for postgres instead of mysql. Change sql accourdingly. blob + varbinary -> bytea. No support for UNSIGNED or CREATE INDEX IF NOT EXISTS. --- synapse/app/homeserver.py | 2 + synapse/storage/__init__.py | 15 +++----- synapse/storage/_base.py | 2 +- synapse/storage/engines/__init__.py | 2 + synapse/storage/event_federation.py | 10 ++--- synapse/storage/events.py | 4 +- synapse/storage/room.py | 34 ++++++++++++----- .../full_schemas/16/application_services.sql | 10 ++--- .../storage/schema/full_schemas/16/event_edges.sql | 26 ++++++------- .../schema/full_schemas/16/event_signatures.sql | 16 ++++---- synapse/storage/schema/full_schemas/16/im.sql | 44 +++++++++++----------- synapse/storage/schema/full_schemas/16/keys.sql | 8 ++-- .../schema/full_schemas/16/media_repository.sql | 8 ++-- .../storage/schema/full_schemas/16/presence.sql | 4 +- synapse/storage/schema/full_schemas/16/push.sql | 30 +++++++-------- .../storage/schema/full_schemas/16/redactions.sql | 4 +- synapse/storage/schema/full_schemas/16/state.sql | 10 ++--- .../schema/full_schemas/16/transactions.sql | 24 ++++++------ synapse/storage/schema/full_schemas/16/users.sql | 10 ++--- synapse/storage/schema/schema_version.sql | 14 +++---- synapse/storage/stream.py | 16 ++++---- 21 files changed, 153 insertions(+), 140 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index a47e548d66..033011e1d7 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -373,6 +373,8 @@ def setup(config_options): "use_unicode": True, "collation": "utf8mb4_bin", }) + elif name == "psycopg2": + pass elif name == "sqlite3": db_config.setdefault("args", {}).update({ "cp_min": 1, diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index b46cafd25e..272420194d 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -236,7 +236,7 @@ def _setup_new_database(cur, database_engine): cur.execute( database_engine.convert_param_style( - "REPLACE INTO schema_version (version, upgraded)" + "INSERT INTO schema_version (version, upgraded)" " VALUES (?,?)" ), (max_current_ver, False,) @@ -432,14 +432,11 @@ def executescript(txn, schema_path): def _get_or_create_schema_state(txn, database_engine): - try: - # Bluntly try creating the schema_version tables. - schema_path = os.path.join( - dir_path, "schema", "schema_version.sql", - ) - executescript(txn, schema_path) - except: - pass + # Bluntly try creating the schema_version tables. + schema_path = os.path.join( + dir_path, "schema", "schema_version.sql", + ) + executescript(txn, schema_path) txn.execute("SELECT version, upgraded FROM schema_version") row = txn.fetchone() diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e30514cd5e..fa5199104a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -330,7 +330,7 @@ class SQLBaseStore(object): continue raise except Exception as e: - logger.debug("[TXN FAIL] {%s}", name, e) + logger.debug("[TXN FAIL] {%s} %s", name, e) raise finally: end = time.time() * 1000 diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py index 29702be923..548d4e1b42 100644 --- a/synapse/storage/engines/__init__.py +++ b/synapse/storage/engines/__init__.py @@ -14,6 +14,7 @@ # limitations under the License. from .maria import MariaEngine +from .postgres import PostgresEngine from .sqlite3 import Sqlite3Engine import importlib @@ -22,6 +23,7 @@ import importlib SUPPORTED_MODULE = { "sqlite3": Sqlite3Engine, "mysql.connector": MariaEngine, + "psycopg2": PostgresEngine, } diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 79ad5ddc9c..54a3c9d805 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -153,7 +153,7 @@ class EventFederationStore(SQLBaseStore): results = self._get_prev_events_and_state( txn, event_id, - is_state=1, + is_state=True, ) return [(e_id, h, ) for e_id, h, _ in results] @@ -164,7 +164,7 @@ class EventFederationStore(SQLBaseStore): } if is_state is not None: - keyvalues["is_state"] = is_state + keyvalues["is_state"] = bool(is_state) res = self._simple_select_list_txn( txn, @@ -259,7 +259,7 @@ class EventFederationStore(SQLBaseStore): "event_id": event_id, "prev_event_id": e_id, "room_id": room_id, - "is_state": 0, + "is_state": False, }, ) @@ -397,7 +397,7 @@ class EventFederationStore(SQLBaseStore): query = ( "SELECT prev_event_id FROM event_edges " - "WHERE room_id = ? AND event_id = ? AND is_state = 0 " + "WHERE room_id = ? AND event_id = ? AND is_state = ? " "LIMIT ?" ) @@ -406,7 +406,7 @@ class EventFederationStore(SQLBaseStore): for event_id in front: txn.execute( query, - (room_id, event_id, limit - len(event_results)) + (room_id, event_id, False, limit - len(event_results)) ) for e_id, in txn.fetchall(): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a2e87c27ce..9fe2effb4b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -188,12 +188,12 @@ class EventsStore(SQLBaseStore): ) sql = ( - "UPDATE events SET outlier = 0" + "UPDATE events SET outlier = ?" " WHERE event_id = ?" ) txn.execute( sql, - (event.event_id,) + (False, event.event_id,) ) return diff --git a/synapse/storage/room.py b/synapse/storage/room.py index a1a76280fe..48ebb33057 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -105,14 +105,12 @@ class RoomStore(SQLBaseStore): # We use non printing ascii character US (\x1F) as a separator sql = ( - "SELECT r.room_id, n.name, t.topic, " - "group_concat(a.room_alias, '\x1F') " - "FROM rooms AS r " - "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id " - "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id " - "INNER JOIN room_aliases AS a ON a.room_id = r.room_id " - "WHERE r.is_public = ? " - "GROUP BY r.room_id " + "SELECT r.room_id, max(n.name), max(t.topic)" + " FROM rooms AS r" + " LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id" + " LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id" + " WHERE r.is_public = ?" + " GROUP BY r.room_id" ) % { "topic": topic_subquery, "name": name_subquery, @@ -120,7 +118,22 @@ class RoomStore(SQLBaseStore): txn.execute(sql, (is_public,)) - return txn.fetchall() + rows = txn.fetchall() + + for i, row in enumerate(rows): + room_id = row[0] + aliases = self._simple_select_onecol_txn( + txn, + table="room_aliases", + keyvalues={ + "room_id": room_id + }, + retcol="room_alias", + ) + + rows[i] = list(row) + [aliases] + + return rows rows = yield self.runInteraction( "get_rooms", f @@ -131,9 +144,10 @@ class RoomStore(SQLBaseStore): "room_id": r[0], "name": r[1], "topic": r[2], - "aliases": r[3].split("\x1F"), + "aliases": r[3], } for r in rows + if r[3] # We only return rooms that have at least one alias. ] defer.returnValue(ret) diff --git a/synapse/storage/schema/full_schemas/16/application_services.sql b/synapse/storage/schema/full_schemas/16/application_services.sql index bc709df92d..f08c5bcf76 100644 --- a/synapse/storage/schema/full_schemas/16/application_services.sql +++ b/synapse/storage/schema/full_schemas/16/application_services.sql @@ -14,7 +14,7 @@ */ CREATE TABLE IF NOT EXISTS application_services( - id BIGINT UNSIGNED PRIMARY KEY, + id BIGINT PRIMARY KEY, url VARCHAR(150), token VARCHAR(150), hs_token VARCHAR(150), @@ -23,8 +23,8 @@ CREATE TABLE IF NOT EXISTS application_services( ); CREATE TABLE IF NOT EXISTS application_services_regex( - id BIGINT UNSIGNED PRIMARY KEY, - as_id BIGINT UNSIGNED NOT NULL, + id BIGINT PRIMARY KEY, + as_id BIGINT NOT NULL, namespace INTEGER, /* enum[room_id|room_alias|user_id] */ regex VARCHAR(150), FOREIGN KEY(as_id) REFERENCES application_services(id) @@ -39,10 +39,10 @@ CREATE TABLE IF NOT EXISTS application_services_state( CREATE TABLE IF NOT EXISTS application_services_txns( as_id VARCHAR(150) NOT NULL, txn_id INTEGER NOT NULL, - event_ids LONGBLOB NOT NULL, + event_ids bytea NOT NULL, UNIQUE(as_id, txn_id) ); -CREATE INDEX IF NOT EXISTS application_services_txns_id ON application_services_txns ( +CREATE INDEX application_services_txns_id ON application_services_txns ( as_id ); diff --git a/synapse/storage/schema/full_schemas/16/event_edges.sql b/synapse/storage/schema/full_schemas/16/event_edges.sql index bdb1109094..05d0874f0d 100644 --- a/synapse/storage/schema/full_schemas/16/event_edges.sql +++ b/synapse/storage/schema/full_schemas/16/event_edges.sql @@ -19,8 +19,8 @@ CREATE TABLE IF NOT EXISTS event_forward_extremities( UNIQUE (event_id, room_id) ); -CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id); -CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id); +CREATE INDEX ev_extrem_room ON event_forward_extremities(room_id); +CREATE INDEX ev_extrem_id ON event_forward_extremities(event_id); CREATE TABLE IF NOT EXISTS event_backward_extremities( @@ -29,8 +29,8 @@ CREATE TABLE IF NOT EXISTS event_backward_extremities( UNIQUE (event_id, room_id) ); -CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id); -CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id); +CREATE INDEX ev_b_extrem_room ON event_backward_extremities(room_id); +CREATE INDEX ev_b_extrem_id ON event_backward_extremities(event_id); CREATE TABLE IF NOT EXISTS event_edges( @@ -41,8 +41,8 @@ CREATE TABLE IF NOT EXISTS event_edges( UNIQUE (event_id, prev_event_id, room_id, is_state) ); -CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id); -CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id); +CREATE INDEX ev_edges_id ON event_edges(event_id); +CREATE INDEX ev_edges_prev_id ON event_edges(prev_event_id); CREATE TABLE IF NOT EXISTS room_depth( @@ -51,17 +51,17 @@ CREATE TABLE IF NOT EXISTS room_depth( UNIQUE (room_id) ); -CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id); +CREATE INDEX room_depth_room ON room_depth(room_id); create TABLE IF NOT EXISTS event_destinations( event_id VARCHAR(150) NOT NULL, destination VARCHAR(150) NOT NULL, - delivered_ts BIGINT UNSIGNED DEFAULT 0, -- or 0 if not delivered + delivered_ts BIGINT DEFAULT 0, -- or 0 if not delivered UNIQUE (event_id, destination) ); -CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id); +CREATE INDEX event_destinations_id ON event_destinations(event_id); CREATE TABLE IF NOT EXISTS state_forward_extremities( @@ -72,10 +72,10 @@ CREATE TABLE IF NOT EXISTS state_forward_extremities( UNIQUE (event_id, room_id) ); -CREATE INDEX IF NOT EXISTS st_extrem_keys ON state_forward_extremities( +CREATE INDEX st_extrem_keys ON state_forward_extremities( room_id, type, state_key ); -CREATE INDEX IF NOT EXISTS st_extrem_id ON state_forward_extremities(event_id); +CREATE INDEX st_extrem_id ON state_forward_extremities(event_id); CREATE TABLE IF NOT EXISTS event_auth( @@ -85,5 +85,5 @@ CREATE TABLE IF NOT EXISTS event_auth( UNIQUE (event_id, auth_id, room_id) ); -CREATE INDEX IF NOT EXISTS evauth_edges_id ON event_auth(event_id); -CREATE INDEX IF NOT EXISTS evauth_edges_auth_id ON event_auth(auth_id); +CREATE INDEX evauth_edges_id ON event_auth(event_id); +CREATE INDEX evauth_edges_auth_id ON event_auth(auth_id); diff --git a/synapse/storage/schema/full_schemas/16/event_signatures.sql b/synapse/storage/schema/full_schemas/16/event_signatures.sql index 09886f607c..4291827368 100644 --- a/synapse/storage/schema/full_schemas/16/event_signatures.sql +++ b/synapse/storage/schema/full_schemas/16/event_signatures.sql @@ -16,40 +16,40 @@ CREATE TABLE IF NOT EXISTS event_content_hashes ( event_id VARCHAR(150), algorithm VARCHAR(150), - hash LONGBLOB, + hash bytea, UNIQUE (event_id, algorithm) ); -CREATE INDEX IF NOT EXISTS event_content_hashes_id ON event_content_hashes(event_id); +CREATE INDEX event_content_hashes_id ON event_content_hashes(event_id); CREATE TABLE IF NOT EXISTS event_reference_hashes ( event_id VARCHAR(150), algorithm VARCHAR(150), - hash LONGBLOB, + hash bytea, UNIQUE (event_id, algorithm) ); -CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes(event_id); +CREATE INDEX event_reference_hashes_id ON event_reference_hashes(event_id); CREATE TABLE IF NOT EXISTS event_signatures ( event_id VARCHAR(150), signature_name VARCHAR(150), key_id VARCHAR(150), - signature LONGBLOB, + signature bytea, UNIQUE (event_id, signature_name, key_id) ); -CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures(event_id); +CREATE INDEX event_signatures_id ON event_signatures(event_id); CREATE TABLE IF NOT EXISTS event_edge_hashes( event_id VARCHAR(150), prev_event_id VARCHAR(150), algorithm VARCHAR(150), - hash LONGBLOB, + hash bytea, UNIQUE (event_id, prev_event_id, algorithm) ); -CREATE INDEX IF NOT EXISTS event_edge_hashes_id ON event_edge_hashes(event_id); +CREATE INDEX event_edge_hashes_id ON event_edge_hashes(event_id); diff --git a/synapse/storage/schema/full_schemas/16/im.sql b/synapse/storage/schema/full_schemas/16/im.sql index 19f0f34143..a661fc160c 100644 --- a/synapse/storage/schema/full_schemas/16/im.sql +++ b/synapse/storage/schema/full_schemas/16/im.sql @@ -14,33 +14,33 @@ */ CREATE TABLE IF NOT EXISTS events( - stream_ordering BIGINT UNSIGNED PRIMARY KEY, - topological_ordering BIGINT UNSIGNED NOT NULL, + stream_ordering BIGINT PRIMARY KEY, + topological_ordering BIGINT NOT NULL, event_id VARCHAR(150) NOT NULL, type VARCHAR(150) NOT NULL, room_id VARCHAR(150) NOT NULL, - content LONGBLOB NOT NULL, - unrecognized_keys LONGBLOB, + content bytea NOT NULL, + unrecognized_keys bytea, processed BOOL NOT NULL, outlier BOOL NOT NULL, - depth BIGINT UNSIGNED DEFAULT 0 NOT NULL, + depth BIGINT DEFAULT 0 NOT NULL, UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS events_stream_ordering ON events (stream_ordering); -CREATE INDEX IF NOT EXISTS events_topological_ordering ON events (topological_ordering); -CREATE INDEX IF NOT EXISTS events_room_id ON events (room_id); +CREATE INDEX events_stream_ordering ON events (stream_ordering); +CREATE INDEX events_topological_ordering ON events (topological_ordering); +CREATE INDEX events_room_id ON events (room_id); CREATE TABLE IF NOT EXISTS event_json( event_id VARCHAR(150) NOT NULL, room_id VARCHAR(150) NOT NULL, - internal_metadata LONGBLOB NOT NULL, - json LONGBLOB NOT NULL, + internal_metadata bytea NOT NULL, + json bytea NOT NULL, UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS event_json_room_id ON event_json(room_id); +CREATE INDEX event_json_room_id ON event_json(room_id); CREATE TABLE IF NOT EXISTS state_events( @@ -52,9 +52,9 @@ CREATE TABLE IF NOT EXISTS state_events( UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS state_events_room_id ON state_events (room_id); -CREATE INDEX IF NOT EXISTS state_events_type ON state_events (type); -CREATE INDEX IF NOT EXISTS state_events_state_key ON state_events (state_key); +CREATE INDEX state_events_room_id ON state_events (room_id); +CREATE INDEX state_events_type ON state_events (type); +CREATE INDEX state_events_state_key ON state_events (state_key); CREATE TABLE IF NOT EXISTS current_state_events( @@ -66,9 +66,9 @@ CREATE TABLE IF NOT EXISTS current_state_events( UNIQUE (room_id, type, state_key) ); -CREATE INDEX IF NOT EXISTS current_state_events_room_id ON current_state_events (room_id); -CREATE INDEX IF NOT EXISTS current_state_events_type ON current_state_events (type); -CREATE INDEX IF NOT EXISTS current_state_events_state_key ON current_state_events (state_key); +CREATE INDEX current_state_events_room_id ON current_state_events (room_id); +CREATE INDEX current_state_events_type ON current_state_events (type); +CREATE INDEX current_state_events_state_key ON current_state_events (state_key); CREATE TABLE IF NOT EXISTS room_memberships( event_id VARCHAR(150) NOT NULL, @@ -79,8 +79,8 @@ CREATE TABLE IF NOT EXISTS room_memberships( UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS room_memberships_room_id ON room_memberships (room_id); -CREATE INDEX IF NOT EXISTS room_memberships_user_id ON room_memberships (user_id); +CREATE INDEX room_memberships_room_id ON room_memberships (room_id); +CREATE INDEX room_memberships_user_id ON room_memberships (user_id); CREATE TABLE IF NOT EXISTS feedback( event_id VARCHAR(150) NOT NULL, @@ -98,7 +98,7 @@ CREATE TABLE IF NOT EXISTS topics( UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS topics_room_id ON topics(room_id); +CREATE INDEX topics_room_id ON topics(room_id); CREATE TABLE IF NOT EXISTS room_names( event_id VARCHAR(150) NOT NULL, @@ -107,7 +107,7 @@ CREATE TABLE IF NOT EXISTS room_names( UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS room_names_room_id ON room_names(room_id); +CREATE INDEX room_names_room_id ON room_names(room_id); CREATE TABLE IF NOT EXISTS rooms( room_id VARCHAR(150) PRIMARY KEY NOT NULL, @@ -121,4 +121,4 @@ CREATE TABLE IF NOT EXISTS room_hosts( UNIQUE (room_id, host) ); -CREATE INDEX IF NOT EXISTS room_hosts_room_id ON room_hosts (room_id); +CREATE INDEX room_hosts_room_id ON room_hosts (room_id); diff --git a/synapse/storage/schema/full_schemas/16/keys.sql b/synapse/storage/schema/full_schemas/16/keys.sql index 35f141c288..459b510427 100644 --- a/synapse/storage/schema/full_schemas/16/keys.sql +++ b/synapse/storage/schema/full_schemas/16/keys.sql @@ -16,8 +16,8 @@ CREATE TABLE IF NOT EXISTS server_tls_certificates( server_name VARCHAR(150), -- Server name. fingerprint VARCHAR(150), -- Certificate fingerprint. from_server VARCHAR(150), -- Which key server the certificate was fetched from. - ts_added_ms BIGINT UNSIGNED, -- When the certifcate was added. - tls_certificate LONGBLOB, -- DER encoded x509 certificate. + ts_added_ms BIGINT, -- When the certifcate was added. + tls_certificate bytea, -- DER encoded x509 certificate. UNIQUE (server_name, fingerprint) ); @@ -25,7 +25,7 @@ CREATE TABLE IF NOT EXISTS server_signature_keys( server_name VARCHAR(150), -- Server name. key_id VARCHAR(150), -- Key version. from_server VARCHAR(150), -- Which key server the key was fetched form. - ts_added_ms BIGINT UNSIGNED, -- When the key was added. - verify_key LONGBLOB, -- NACL verification key. + ts_added_ms BIGINT, -- When the key was added. + verify_key bytea, -- NACL verification key. UNIQUE (server_name, key_id) ); diff --git a/synapse/storage/schema/full_schemas/16/media_repository.sql b/synapse/storage/schema/full_schemas/16/media_repository.sql index 014bce4aeb..0e819fca38 100644 --- a/synapse/storage/schema/full_schemas/16/media_repository.sql +++ b/synapse/storage/schema/full_schemas/16/media_repository.sql @@ -17,7 +17,7 @@ CREATE TABLE IF NOT EXISTS local_media_repository ( media_id VARCHAR(150), -- The id used to refer to the media. media_type VARCHAR(150), -- The MIME-type of the media. media_length INTEGER, -- Length of the media in bytes. - created_ts BIGINT UNSIGNED, -- When the content was uploaded in ms. + created_ts BIGINT, -- When the content was uploaded in ms. upload_name VARCHAR(150), -- The name the media was uploaded with. user_id VARCHAR(150), -- The user who uploaded the file. UNIQUE (media_id) @@ -35,14 +35,14 @@ CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails ( ) ); -CREATE INDEX IF NOT EXISTS local_media_repository_thumbnails_media_id +CREATE INDEX local_media_repository_thumbnails_media_id ON local_media_repository_thumbnails (media_id); CREATE TABLE IF NOT EXISTS remote_media_cache ( media_origin VARCHAR(150), -- The remote HS the media came from. media_id VARCHAR(150), -- The id used to refer to the media on that server. media_type VARCHAR(150), -- The MIME-type of the media. - created_ts BIGINT UNSIGNED, -- When the content was uploaded in ms. + created_ts BIGINT, -- When the content was uploaded in ms. upload_name VARCHAR(150), -- The name the media was uploaded with. media_length INTEGER, -- Length of the media in bytes. filesystem_id VARCHAR(150), -- The name used to store the media on disk. @@ -64,5 +64,5 @@ CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails ( ) ); -CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id +CREATE INDEX remote_media_cache_thumbnails_media_id ON remote_media_cache_thumbnails (media_id); diff --git a/synapse/storage/schema/full_schemas/16/presence.sql b/synapse/storage/schema/full_schemas/16/presence.sql index fbe5b0af6c..9c41be296e 100644 --- a/synapse/storage/schema/full_schemas/16/presence.sql +++ b/synapse/storage/schema/full_schemas/16/presence.sql @@ -16,7 +16,7 @@ CREATE TABLE IF NOT EXISTS presence( user_id VARCHAR(150) NOT NULL, state VARCHAR(20), status_msg VARCHAR(150), - mtime BIGINT UNSIGNED, -- miliseconds since last state change + mtime BIGINT, -- miliseconds since last state change UNIQUE (user_id) ); @@ -37,4 +37,4 @@ CREATE TABLE IF NOT EXISTS presence_list( UNIQUE (user_id, observed_user_id) ); -CREATE INDEX IF NOT EXISTS presence_list_user_id ON presence_list (user_id); +CREATE INDEX presence_list_user_id ON presence_list (user_id); diff --git a/synapse/storage/schema/full_schemas/16/push.sql b/synapse/storage/schema/full_schemas/16/push.sql index 33300736f9..5c0c7bc201 100644 --- a/synapse/storage/schema/full_schemas/16/push.sql +++ b/synapse/storage/schema/full_schemas/16/push.sql @@ -22,52 +22,52 @@ CREATE TABLE IF NOT EXISTS rejections( -- Push notification endpoints that users have configured CREATE TABLE IF NOT EXISTS pushers ( - id BIGINT UNSIGNED PRIMARY KEY, + id BIGINT PRIMARY KEY, user_name VARCHAR(150) NOT NULL, profile_tag VARCHAR(32) NOT NULL, kind VARCHAR(8) NOT NULL, app_id VARCHAR(64) NOT NULL, app_display_name VARCHAR(64) NOT NULL, device_display_name VARCHAR(128) NOT NULL, - pushkey VARBINARY(512) NOT NULL, - ts BIGINT UNSIGNED NOT NULL, + pushkey bytea NOT NULL, + ts BIGINT NOT NULL, lang VARCHAR(8), - data LONGBLOB, + data bytea, last_token TEXT, - last_success BIGINT UNSIGNED, - failing_since BIGINT UNSIGNED, + last_success BIGINT, + failing_since BIGINT, UNIQUE (app_id, pushkey) ); CREATE TABLE IF NOT EXISTS push_rules ( - id BIGINT UNSIGNED PRIMARY KEY, + id BIGINT PRIMARY KEY, user_name VARCHAR(150) NOT NULL, rule_id VARCHAR(150) NOT NULL, - priority_class TINYINT NOT NULL, + priority_class SMALLINT NOT NULL, priority INTEGER NOT NULL DEFAULT 0, conditions VARCHAR(150) NOT NULL, actions VARCHAR(150) NOT NULL, UNIQUE(user_name, rule_id) ); -CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name); +CREATE INDEX push_rules_user_name on push_rules (user_name); CREATE TABLE IF NOT EXISTS user_filters( user_id VARCHAR(150), - filter_id BIGINT UNSIGNED, - filter_json LONGBLOB + filter_id BIGINT, + filter_json bytea ); -CREATE INDEX IF NOT EXISTS user_filters_by_user_id_filter_id ON user_filters( +CREATE INDEX user_filters_by_user_id_filter_id ON user_filters( user_id, filter_id ); CREATE TABLE IF NOT EXISTS push_rules_enable ( - id BIGINT UNSIGNED PRIMARY KEY, + id BIGINT PRIMARY KEY, user_name VARCHAR(150) NOT NULL, rule_id VARCHAR(150) NOT NULL, - enabled TINYINT, + enabled SMALLINT, UNIQUE(user_name, rule_id) ); -CREATE INDEX IF NOT EXISTS push_rules_enable_user_name on push_rules_enable (user_name); +CREATE INDEX push_rules_enable_user_name on push_rules_enable (user_name); diff --git a/synapse/storage/schema/full_schemas/16/redactions.sql b/synapse/storage/schema/full_schemas/16/redactions.sql index b81451eab4..492fd22033 100644 --- a/synapse/storage/schema/full_schemas/16/redactions.sql +++ b/synapse/storage/schema/full_schemas/16/redactions.sql @@ -18,5 +18,5 @@ CREATE TABLE IF NOT EXISTS redactions ( UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS redactions_event_id ON redactions (event_id); -CREATE INDEX IF NOT EXISTS redactions_redacts ON redactions (redacts); +CREATE INDEX redactions_event_id ON redactions (event_id); +CREATE INDEX redactions_redacts ON redactions (redacts); diff --git a/synapse/storage/schema/full_schemas/16/state.sql b/synapse/storage/schema/full_schemas/16/state.sql index 8c51610396..3c54595e64 100644 --- a/synapse/storage/schema/full_schemas/16/state.sql +++ b/synapse/storage/schema/full_schemas/16/state.sql @@ -14,7 +14,7 @@ */ CREATE TABLE IF NOT EXISTS state_groups( - id BIGINT UNSIGNED PRIMARY KEY, + id BIGINT PRIMARY KEY, room_id VARCHAR(150) NOT NULL, event_id VARCHAR(150) NOT NULL ); @@ -33,8 +33,8 @@ CREATE TABLE IF NOT EXISTS event_to_state_groups( UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS state_groups_id ON state_groups(id); +CREATE INDEX state_groups_id ON state_groups(id); -CREATE INDEX IF NOT EXISTS state_groups_state_id ON state_groups_state(state_group); -CREATE INDEX IF NOT EXISTS state_groups_state_tuple ON state_groups_state(room_id, type, state_key); -CREATE INDEX IF NOT EXISTS event_to_state_groups_id ON event_to_state_groups(event_id); +CREATE INDEX state_groups_state_id ON state_groups_state(state_group); +CREATE INDEX state_groups_state_tuple ON state_groups_state(room_id, type, state_key); +CREATE INDEX event_to_state_groups_id ON event_to_state_groups(event_id); diff --git a/synapse/storage/schema/full_schemas/16/transactions.sql b/synapse/storage/schema/full_schemas/16/transactions.sql index f381e67603..bc64064936 100644 --- a/synapse/storage/schema/full_schemas/16/transactions.sql +++ b/synapse/storage/schema/full_schemas/16/transactions.sql @@ -16,32 +16,32 @@ CREATE TABLE IF NOT EXISTS received_transactions( transaction_id VARCHAR(150), origin VARCHAR(150), - ts BIGINT UNSIGNED, + ts BIGINT, response_code INTEGER, - response_json LONGBLOB, - has_been_referenced BOOL default 0, -- Whether thishas been referenced by a prev_tx + response_json bytea, + has_been_referenced smallint default 0, -- Whether thishas been referenced by a prev_tx UNIQUE (transaction_id, origin) ); -CREATE INDEX IF NOT EXISTS transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0; +CREATE INDEX transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0; -- Stores what transactions we've sent, what their response was (if we got one) and whether we have -- since referenced the transaction in another outgoing transaction CREATE TABLE IF NOT EXISTS sent_transactions( - id BIGINT UNSIGNED PRIMARY KEY, -- This is used to apply insertion ordering + id BIGINT PRIMARY KEY, -- This is used to apply insertion ordering transaction_id VARCHAR(150), destination VARCHAR(150), response_code INTEGER DEFAULT 0, - response_json LONGBLOB, - ts BIGINT UNSIGNED + response_json bytea, + ts BIGINT ); -CREATE INDEX IF NOT EXISTS sent_transaction_dest ON sent_transactions(destination); -CREATE INDEX IF NOT EXISTS sent_transaction_txn_id ON sent_transactions(transaction_id); +CREATE INDEX sent_transaction_dest ON sent_transactions(destination); +CREATE INDEX sent_transaction_txn_id ON sent_transactions(transaction_id); -- So that we can do an efficient look up of all transactions that have yet to be successfully -- sent. -CREATE INDEX IF NOT EXISTS sent_transaction_sent ON sent_transactions(response_code); +CREATE INDEX sent_transaction_sent ON sent_transactions(response_code); -- For sent transactions only. @@ -53,11 +53,11 @@ CREATE TABLE IF NOT EXISTS transaction_id_to_pdu( UNIQUE (transaction_id, destination) ); -CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination); +CREATE INDEX transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination); -- To track destination health CREATE TABLE IF NOT EXISTS destinations( destination VARCHAR(150) PRIMARY KEY, - retry_last_ts BIGINT UNSIGNED, + retry_last_ts BIGINT, retry_interval INTEGER ); diff --git a/synapse/storage/schema/full_schemas/16/users.sql b/synapse/storage/schema/full_schemas/16/users.sql index d0011c04b4..267284d07d 100644 --- a/synapse/storage/schema/full_schemas/16/users.sql +++ b/synapse/storage/schema/full_schemas/16/users.sql @@ -15,17 +15,17 @@ CREATE TABLE IF NOT EXISTS users( name VARCHAR(150), password_hash VARCHAR(150), - creation_ts BIGINT UNSIGNED, - admin BOOL DEFAULT 0 NOT NULL, + creation_ts BIGINT, + admin SMALLINT DEFAULT 0 NOT NULL, UNIQUE(name) ); CREATE TABLE IF NOT EXISTS access_tokens( - id BIGINT UNSIGNED PRIMARY KEY, + id BIGINT PRIMARY KEY, user_id VARCHAR(150) NOT NULL, device_id VARCHAR(150), token VARCHAR(150) NOT NULL, - last_used BIGINT UNSIGNED, + last_used BIGINT, UNIQUE(token) ); @@ -35,7 +35,7 @@ CREATE TABLE IF NOT EXISTS user_ips ( device_id VARCHAR(150), ip VARCHAR(150) NOT NULL, user_agent VARCHAR(150) NOT NULL, - last_seen BIGINT UNSIGNED NOT NULL + last_seen BIGINT NOT NULL ); CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user); diff --git a/synapse/storage/schema/schema_version.sql b/synapse/storage/schema/schema_version.sql index e7fa6fe569..d9494611e0 100644 --- a/synapse/storage/schema/schema_version.sql +++ b/synapse/storage/schema/schema_version.sql @@ -14,16 +14,14 @@ */ CREATE TABLE IF NOT EXISTS schema_version( - `Lock` CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. - `version` INTEGER NOT NULL, - `upgraded` BOOL NOT NULL, -- Whether we reached this version from an upgrade or an initial schema. - CHECK (`Lock`='X') + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + version INTEGER NOT NULL, + upgraded BOOL NOT NULL, -- Whether we reached this version from an upgrade or an initial schema. + CHECK (Lock='X') ); CREATE TABLE IF NOT EXISTS applied_schema_deltas( - `version` INTEGER NOT NULL, - `file` VARCHAR(150) NOT NULL, + version INTEGER NOT NULL, + file VARCHAR(150) NOT NULL, UNIQUE(version, file) ); - -CREATE INDEX IF NOT EXISTS schema_deltas_ver ON applied_schema_deltas(version); diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 57c2e4dfeb..df6de7cbcd 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -240,7 +240,7 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT e.event_id, e.stream_ordering FROM events AS e WHERE " - "(e.outlier = 0 AND (room_id IN (%(current)s)) OR " + "(e.outlier = ? AND (room_id IN (%(current)s)) OR " "(event_id IN (%(invites)s))) " "AND e.stream_ordering > ? AND e.stream_ordering <= ? " "ORDER BY stream_ordering ASC LIMIT %(limit)d " @@ -251,7 +251,7 @@ class StreamStore(SQLBaseStore): } def f(txn): - txn.execute(sql, (user_id, user_id, from_id.stream, to_id.stream,)) + txn.execute(sql, (False, user_id, user_id, from_id.stream, to_id.stream,)) rows = self.cursor_to_dict(txn) @@ -283,7 +283,7 @@ class StreamStore(SQLBaseStore): # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence # we have a bit of asymmetry when it comes to equalities. - args = [room_id] + args = [False, room_id] if direction == 'b': order = "DESC" bounds = _StreamToken.parse(from_key).upper_bound() @@ -307,7 +307,7 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT * FROM events" - " WHERE outlier = 0 AND room_id = ? AND %(bounds)s" + " WHERE outlier = ? AND room_id = ? AND %(bounds)s" " ORDER BY topological_ordering %(order)s," " stream_ordering %(order)s %(limit)s" ) % { @@ -358,7 +358,7 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT stream_ordering, topological_ordering, event_id" " FROM events" - " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0" + " WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?" " ORDER BY topological_ordering DESC, stream_ordering DESC" " LIMIT ?" ) @@ -368,17 +368,17 @@ class StreamStore(SQLBaseStore): "SELECT stream_ordering, topological_ordering, event_id" " FROM events" " WHERE room_id = ? AND stream_ordering > ?" - " AND stream_ordering <= ? AND outlier = 0" + " AND stream_ordering <= ? AND outlier = ?" " ORDER BY topological_ordering DESC, stream_ordering DESC" " LIMIT ?" ) def get_recent_events_for_room_txn(txn): if from_token is None: - txn.execute(sql, (room_id, end_token.stream, limit,)) + txn.execute(sql, (room_id, end_token.stream, False, limit,)) else: txn.execute(sql, ( - room_id, from_token.stream, end_token.stream, limit + room_id, from_token.stream, end_token.stream, False, limit )) rows = self.cursor_to_dict(txn) -- cgit 1.4.1