diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/__init__.py | 198 | ||||
-rw-r--r-- | synapse/storage/_base.py | 158 | ||||
-rw-r--r-- | synapse/storage/event_federation.py | 11 | ||||
-rw-r--r-- | synapse/storage/media_repository.py | 129 | ||||
-rw-r--r-- | synapse/storage/schema/delta/v9.sql | 79 | ||||
-rw-r--r-- | synapse/storage/schema/im.sql | 13 | ||||
-rw-r--r-- | synapse/storage/schema/media_repository.sql | 68 | ||||
-rw-r--r-- | synapse/storage/schema/state.sql | 3 | ||||
-rw-r--r-- | synapse/storage/schema/transactions.sql | 6 | ||||
-rw-r--r-- | synapse/storage/signatures.py | 19 | ||||
-rw-r--r-- | synapse/storage/state.py | 13 | ||||
-rw-r--r-- | synapse/storage/transactions.py | 106 |
12 files changed, 560 insertions, 243 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index f15e3dfe62..e6bb665932 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -15,12 +15,8 @@ from twisted.internet import defer -from synapse.api.events.room import ( - RoomMemberEvent, RoomTopicEvent, FeedbackEvent, RoomNameEvent, - RoomRedactionEvent, -) - from synapse.util.logutils import log_function +from synapse.api.constants import EventTypes from .directory import DirectoryStore from .feedback import FeedbackStore @@ -33,11 +29,13 @@ from .stream import StreamStore from .transactions import TransactionStore from .keys import KeyStore from .event_federation import EventFederationStore +from .media_repository import MediaRepositoryStore from .state import StateStore from .signatures import SignatureStore from syutil.base64util import decode_base64 +from syutil.jsonutil import encode_canonical_json from synapse.crypto.event_signing import compute_event_reference_hash @@ -62,12 +60,13 @@ SCHEMAS = [ "state", "event_edges", "event_signatures", + "media_repository", ] # Remember to update this number every time an incompatible change is made to # database schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 8 +SCHEMA_VERSION = 10 class _RollbackButIsFineException(Exception): @@ -81,11 +80,12 @@ class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore, PresenceStore, TransactionStore, DirectoryStore, KeyStore, StateStore, SignatureStore, - EventFederationStore, ): + EventFederationStore, + MediaRepositoryStore, + ): def __init__(self, hs): super(DataStore, self).__init__(hs) - self.event_factory = hs.get_event_factory() self.hs = hs self.min_token_deferred = self._get_min_token() @@ -93,8 +93,8 @@ class DataStore(RoomMemberStore, RoomStore, @defer.inlineCallbacks @log_function - def persist_event(self, event, backfilled=False, is_new_state=True, - current_state=None): + def persist_event(self, event, context, backfilled=False, + is_new_state=True, current_state=None): stream_ordering = None if backfilled: if not self.min_token_deferred.called: @@ -107,6 +107,7 @@ class DataStore(RoomMemberStore, RoomStore, "persist_event", self._persist_event_txn, event=event, + context=context, backfilled=backfilled, stream_ordering=stream_ordering, is_new_state=is_new_state, @@ -117,50 +118,64 @@ class DataStore(RoomMemberStore, RoomStore, @defer.inlineCallbacks def get_event(self, event_id, allow_none=False): - events_dict = yield self._simple_select_one( - "events", - {"event_id": event_id}, - [ - "event_id", - "type", - "room_id", - "content", - "unrecognized_keys", - "depth", - ], - allow_none=allow_none, - ) + events = yield self._get_events([event_id]) - if not events_dict: - defer.returnValue(None) + if not events: + if allow_none: + defer.returnValue(None) + else: + raise RuntimeError("Could not find event %s" % (event_id,)) - event = yield self._parse_events([events_dict]) - defer.returnValue(event[0]) + defer.returnValue(events[0]) @log_function - def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None, - is_new_state=True, current_state=None): - if event.type == RoomMemberEvent.TYPE: + def _persist_event_txn(self, txn, event, context, backfilled, + stream_ordering=None, is_new_state=True, + current_state=None): + if event.type == EventTypes.Member: self._store_room_member_txn(txn, event) - elif event.type == FeedbackEvent.TYPE: + elif event.type == EventTypes.Feedback: self._store_feedback_txn(txn, event) - elif event.type == RoomNameEvent.TYPE: + elif event.type == EventTypes.Name: self._store_room_name_txn(txn, event) - elif event.type == RoomTopicEvent.TYPE: + elif event.type == EventTypes.Topic: self._store_room_topic_txn(txn, event) - elif event.type == RoomRedactionEvent.TYPE: + elif event.type == EventTypes.Redaction: self._store_redaction(txn, event) - outlier = False - if hasattr(event, "outlier"): - outlier = event.outlier + outlier = event.internal_metadata.is_outlier() + + event_dict = { + k: v + for k, v in event.get_dict().items() + if k not in [ + "redacted", + "redacted_because", + ] + } + + metadata_json = encode_canonical_json( + event.internal_metadata.get_dict() + ) + + self._simple_insert_txn( + txn, + table="event_json", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "internal_metadata": metadata_json.decode("UTF-8"), + "json": encode_canonical_json(event_dict).decode("UTF-8"), + }, + or_replace=True, + ) vals = { "topological_ordering": event.depth, "event_id": event.event_id, "type": event.type, "room_id": event.room_id, - "content": json.dumps(event.content), + "content": json.dumps(event.get_dict()["content"]), "processed": True, "outlier": outlier, "depth": event.depth, @@ -171,7 +186,7 @@ class DataStore(RoomMemberStore, RoomStore, unrec = { k: v - for k, v in event.get_full_dict().items() + for k, v in event.get_dict().items() if k not in vals.keys() and k not in [ "redacted", "redacted_because", @@ -206,7 +221,8 @@ class DataStore(RoomMemberStore, RoomStore, room_id=event.room_id, ) - self._store_state_groups_txn(txn, event) + if not outlier: + self._store_state_groups_txn(txn, event, context) if current_state: txn.execute( @@ -300,16 +316,6 @@ class DataStore(RoomMemberStore, RoomStore, txn, event.event_id, hash_alg, hash_bytes, ) - if hasattr(event, "signatures"): - logger.debug("sigs: %s", event.signatures) - for name, sigs in event.signatures.items(): - for key_id, signature_base64 in sigs.items(): - signature_bytes = decode_base64(signature_base64) - self._store_event_signature_txn( - txn, event.event_id, name, key_id, - signature_bytes, - ) - for prev_event_id, prev_hashes in event.prev_events: for alg, hash_base64 in prev_hashes.items(): hash_bytes = decode_base64(hash_base64) @@ -411,86 +417,6 @@ class DataStore(RoomMemberStore, RoomStore, ], ) - def snapshot_room(self, event): - """Snapshot the room for an update by a user - Args: - room_id (synapse.types.RoomId): The room to snapshot. - user_id (synapse.types.UserId): The user to snapshot the room for. - state_type (str): Optional state type to snapshot. - state_key (str): Optional state key to snapshot. - Returns: - synapse.storage.Snapshot: A snapshot of the state of the room. - """ - def _snapshot(txn): - prev_events = self._get_latest_events_in_room( - txn, - event.room_id - ) - - prev_state = None - state_key = None - if hasattr(event, "state_key"): - state_key = event.state_key - prev_state = self._get_latest_state_in_room( - txn, - event.room_id, - type=event.type, - state_key=state_key, - ) - - return Snapshot( - store=self, - room_id=event.room_id, - user_id=event.user_id, - prev_events=prev_events, - prev_state=prev_state, - state_type=event.type, - state_key=state_key, - ) - - return self.runInteraction("snapshot_room", _snapshot) - - -class Snapshot(object): - """Snapshot of the state of a room - Args: - store (DataStore): The datastore. - room_id (RoomId): The room of the snapshot. - user_id (UserId): The user this snapshot is for. - prev_events (list): The list of event ids this snapshot is after. - membership_state (RoomMemberEvent): The current state of the user in - the room. - state_type (str, optional): State type captured by the snapshot - state_key (str, optional): State key captured by the snapshot - prev_state_pdu (PduEntry, optional): pdu id of - the previous value of the state type and key in the room. - """ - - def __init__(self, store, room_id, user_id, prev_events, - prev_state, state_type=None, state_key=None): - self.store = store - self.room_id = room_id - self.user_id = user_id - self.prev_events = prev_events - self.prev_state = prev_state - self.state_type = state_type - self.state_key = state_key - - def fill_out_prev_events(self, event): - if not hasattr(event, "prev_events"): - event.prev_events = [ - (event_id, hashes) - for event_id, hashes, _ in self.prev_events - ] - - if self.prev_events: - event.depth = max([int(v) for _, _, v in self.prev_events]) + 1 - else: - event.depth = 0 - - if not hasattr(event, "prev_state") and self.prev_state is not None: - event.prev_state = self.prev_state - def schema_path(schema): """ Get a filesystem path for the named database schema @@ -518,6 +444,14 @@ def read_schema(schema): return schema_file.read() +class PrepareDatabaseException(Exception): + pass + + +class UpgradeDatabaseException(PrepareDatabaseException): + pass + + def prepare_database(db_conn): """ Set up all the dbs. Since all the *.sql have IF NOT EXISTS, so we don't have to worry about overwriting existing content. @@ -542,6 +476,10 @@ def prepare_database(db_conn): # Run every version since after the current version. for v in range(user_version + 1, SCHEMA_VERSION + 1): + if v == 10: + raise UpgradeDatabaseException( + "No delta for version 10" + ) sql_script = read_schema("delta/v%d" % (v)) c.executescript(sql_script) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 4881f03368..e0d97f440b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -15,15 +15,14 @@ import logging from synapse.api.errors import StoreError -from synapse.api.events.utils import prune_event +from synapse.events import FrozenEvent +from synapse.events.utils import prune_event from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext, LoggingContext -from syutil.base64util import encode_base64 from twisted.internet import defer import collections -import copy import json import sys import time @@ -84,7 +83,6 @@ class SQLBaseStore(object): def __init__(self, hs): self.hs = hs self._db_pool = hs.get_db_pool() - self.event_factory = hs.get_event_factory() self._clock = hs.get_clock() @defer.inlineCallbacks @@ -436,123 +434,79 @@ class SQLBaseStore(object): return self.runInteraction("_simple_max_id", func) - def _parse_event_from_row(self, row_dict): - d = copy.deepcopy({k: v for k, v in row_dict.items()}) - - d.pop("stream_ordering", None) - d.pop("topological_ordering", None) - d.pop("processed", None) - d["origin_server_ts"] = d.pop("ts", 0) - replaces_state = d.pop("prev_state", None) + def _get_events(self, event_ids): + return self.runInteraction( + "_get_events", self._get_events_txn, event_ids + ) - if replaces_state: - d["replaces_state"] = replaces_state + def _get_events_txn(self, txn, event_ids): + events = [] + for e_id in event_ids: + ev = self._get_event_txn(txn, e_id) - d.update(json.loads(row_dict["unrecognized_keys"])) - d["content"] = json.loads(d["content"]) - del d["unrecognized_keys"] + if ev: + events.append(ev) - if "age_ts" not in d: - # For compatibility - d["age_ts"] = d.get("origin_server_ts", 0) + return events - return self.event_factory.create_event( - etype=d["type"], - **d + def _get_event_txn(self, txn, event_id, check_redacted=True, + get_prev_content=True): + sql = ( + "SELECT internal_metadata, json, r.event_id FROM event_json as e " + "LEFT JOIN redactions as r ON e.event_id = r.redacts " + "WHERE e.event_id = ? " + "LIMIT 1 " ) - def _get_events_txn(self, txn, event_ids): - # FIXME (erikj): This should be batched? + txn.execute(sql, (event_id,)) - sql = "SELECT * FROM events WHERE event_id = ? ORDER BY rowid asc" + res = txn.fetchone() - event_rows = [] - for e_id in event_ids: - c = txn.execute(sql, (e_id,)) - event_rows.extend(self.cursor_to_dict(c)) + if not res: + return None - return self._parse_events_txn(txn, event_rows) + internal_metadata, js, redacted = res - def _parse_events(self, rows): - return self.runInteraction( - "_parse_events", self._parse_events_txn, rows - ) + d = json.loads(js) + internal_metadata = json.loads(internal_metadata) - def _parse_events_txn(self, txn, rows): - events = [self._parse_event_from_row(r) for r in rows] + ev = FrozenEvent(d, internal_metadata_dict=internal_metadata) - select_event_sql = ( - "SELECT * FROM events WHERE event_id = ? ORDER BY rowid asc" - ) + if check_redacted and redacted: + ev = prune_event(ev) + + ev.unsigned["redacted_by"] = redacted + # Get the redaction event. - for i, ev in enumerate(events): - signatures = self._get_event_signatures_txn( - txn, ev.event_id, + because = self._get_event_txn( + txn, + redacted, + check_redacted=False ) - ev.signatures = { - n: { - k: encode_base64(v) for k, v in s.items() - } - for n, s in signatures.items() - } + if because: + ev.unsigned["redacted_because"] = because - hashes = self._get_event_content_hashes_txn( - txn, ev.event_id, + if get_prev_content and "replaces_state" in ev.unsigned: + prev = self._get_event_txn( + txn, + ev.unsigned["replaces_state"], + get_prev_content=False, ) + if prev: + ev.unsigned["prev_content"] = prev.get_dict()["content"] - ev.hashes = { - k: encode_base64(v) for k, v in hashes.items() - } - - prevs = self._get_prev_events_and_state(txn, ev.event_id) - - ev.prev_events = [ - (e_id, h) - for e_id, h, is_state in prevs - if is_state == 0 - ] - - ev.auth_events = self._get_auth_events(txn, ev.event_id) - - if hasattr(ev, "state_key"): - ev.prev_state = [ - (e_id, h) - for e_id, h, is_state in prevs - if is_state == 1 - ] - - if hasattr(ev, "replaces_state"): - # Load previous state_content. - # FIXME (erikj): Handle multiple prev_states. - cursor = txn.execute( - select_event_sql, - (ev.replaces_state,) - ) - prevs = self.cursor_to_dict(cursor) - if prevs: - prev = self._parse_event_from_row(prevs[0]) - ev.prev_content = prev.content - - if not hasattr(ev, "redacted"): - logger.debug("Doesn't have redacted key: %s", ev) - ev.redacted = self._has_been_redacted_txn(txn, ev) - - if ev.redacted: - # Get the redaction event. - select_event_sql = "SELECT * FROM events WHERE event_id = ?" - txn.execute(select_event_sql, (ev.redacted,)) - - del_evs = self._parse_events_txn( - txn, self.cursor_to_dict(txn) - ) + return ev - if del_evs: - ev = prune_event(ev) - events[i] = ev - ev.redacted_because = del_evs[0] + def _parse_events(self, rows): + return self.runInteraction( + "_parse_events", self._parse_events_txn, rows + ) - return events + def _parse_events_txn(self, txn, rows): + event_ids = [r["event_id"] for r in rows] + + return self._get_events_txn(txn, event_ids) def _has_been_redacted_txn(self, txn, event): sql = "SELECT event_id FROM redactions WHERE redacts = ?" @@ -650,7 +604,7 @@ class JoinHelper(object): to dump the results into. Attributes: - taples (list): List of `Table` classes + tables (list): List of `Table` classes EntryType (type) """ diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 0ff9a23ee0..7a6009c9ee 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -177,14 +177,15 @@ class EventFederationStore(SQLBaseStore): retcols=["prev_event_id", "is_state"], ) + hashes = self._get_prev_event_hashes_txn(txn, event_id) + results = [] for d in res: - hashes = self._get_event_reference_hashes_txn( - txn, - d["prev_event_id"] - ) + edge_hash = self._get_event_reference_hashes_txn(txn, d["prev_event_id"]) + edge_hash.update(hashes.get(d["prev_event_id"], {})) prev_hashes = { - k: encode_base64(v) for k, v in hashes.items() + k: encode_base64(v) + for k, v in edge_hash.items() if k == "sha256" } results.append((d["prev_event_id"], prev_hashes, d["is_state"])) diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py new file mode 100644 index 0000000000..18c068d3d9 --- /dev/null +++ b/synapse/storage/media_repository.py @@ -0,0 +1,129 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from _base import SQLBaseStore + + +class MediaRepositoryStore(SQLBaseStore): + """Persistence for attachments and avatars""" + + def get_default_thumbnails(self, top_level_type, sub_type): + return [] + + def get_local_media(self, media_id): + """Get the metadata for a local piece of media + Returns: + None if the meia_id doesn't exist. + """ + return self._simple_select_one( + "local_media_repository", + {"media_id": media_id}, + ("media_type", "media_length", "upload_name", "created_ts"), + allow_none=True, + ) + + def store_local_media(self, media_id, media_type, time_now_ms, upload_name, + media_length, user_id): + return self._simple_insert( + "local_media_repository", + { + "media_id": media_id, + "media_type": media_type, + "created_ts": time_now_ms, + "upload_name": upload_name, + "media_length": media_length, + "user_id": user_id.to_string(), + } + ) + + def get_local_media_thumbnails(self, media_id): + return self._simple_select_list( + "local_media_repository_thumbnails", + {"media_id": media_id}, + ( + "thumbnail_width", "thumbnail_height", "thumbnail_method", + "thumbnail_type", "thumbnail_length", + ) + ) + + def store_local_thumbnail(self, media_id, thumbnail_width, + thumbnail_height, thumbnail_type, + thumbnail_method, thumbnail_length): + return self._simple_insert( + "local_media_repository_thumbnails", + { + "media_id": media_id, + "thumbnail_width": thumbnail_width, + "thumbnail_height": thumbnail_height, + "thumbnail_method": thumbnail_method, + "thumbnail_type": thumbnail_type, + "thumbnail_length": thumbnail_length, + } + ) + + def get_cached_remote_media(self, origin, media_id): + return self._simple_select_one( + "remote_media_cache", + {"media_origin": origin, "media_id": media_id}, + ( + "media_type", "media_length", "upload_name", "created_ts", + "filesystem_id", + ), + allow_none=True, + ) + + def store_cached_remote_media(self, origin, media_id, media_type, + media_length, time_now_ms, upload_name, + filesystem_id): + return self._simple_insert( + "remote_media_cache", + { + "media_origin": origin, + "media_id": media_id, + "media_type": media_type, + "media_length": media_length, + "created_ts": time_now_ms, + "upload_name": upload_name, + "filesystem_id": filesystem_id, + } + ) + + def get_remote_media_thumbnails(self, origin, media_id): + return self._simple_select_list( + "remote_media_cache_thumbnails", + {"media_origin": origin, "media_id": media_id}, + ( + "thumbnail_width", "thumbnail_height", "thumbnail_method", + "thumbnail_type", "thumbnail_length", "filesystem_id", + ) + ) + + def store_remote_media_thumbnail(self, origin, media_id, filesystem_id, + thumbnail_width, thumbnail_height, + thumbnail_type, thumbnail_method, + thumbnail_length): + return self._simple_insert( + "remote_media_cache_thumbnails", + { + "media_origin": origin, + "media_id": media_id, + "thumbnail_width": thumbnail_width, + "thumbnail_height": thumbnail_height, + "thumbnail_method": thumbnail_method, + "thumbnail_type": thumbnail_type, + "thumbnail_length": thumbnail_length, + "filesystem_id": filesystem_id, + } + ) diff --git a/synapse/storage/schema/delta/v9.sql b/synapse/storage/schema/delta/v9.sql new file mode 100644 index 0000000000..0af29733a0 --- /dev/null +++ b/synapse/storage/schema/delta/v9.sql @@ -0,0 +1,79 @@ +/* Copyright 2014 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- To track destination health +CREATE TABLE IF NOT EXISTS destinations( + destination TEXT PRIMARY KEY, + retry_last_ts INTEGER, + retry_interval INTEGER +); + + +CREATE TABLE IF NOT EXISTS local_media_repository ( + media_id TEXT, -- The id used to refer to the media. + media_type TEXT, -- The MIME-type of the media. + media_length INTEGER, -- Length of the media in bytes. + created_ts INTEGER, -- When the content was uploaded in ms. + upload_name TEXT, -- The name the media was uploaded with. + user_id TEXT, -- The user who uploaded the file. + CONSTRAINT uniqueness UNIQUE (media_id) +); + +CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails ( + media_id TEXT, -- The id used to refer to the media. + thumbnail_width INTEGER, -- The width of the thumbnail in pixels. + thumbnail_height INTEGER, -- The height of the thumbnail in pixels. + thumbnail_type TEXT, -- The MIME-type of the thumbnail. + thumbnail_method TEXT, -- The method used to make the thumbnail. + thumbnail_length INTEGER, -- The length of the thumbnail in bytes. + CONSTRAINT uniqueness UNIQUE ( + media_id, thumbnail_width, thumbnail_height, thumbnail_type + ) +); + +CREATE INDEX IF NOT EXISTS local_media_repository_thumbnails_media_id + ON local_media_repository_thumbnails (media_id); + +CREATE TABLE IF NOT EXISTS remote_media_cache ( + media_origin TEXT, -- The remote HS the media came from. + media_id TEXT, -- The id used to refer to the media on that server. + media_type TEXT, -- The MIME-type of the media. + created_ts INTEGER, -- When the content was uploaded in ms. + upload_name TEXT, -- The name the media was uploaded with. + media_length INTEGER, -- Length of the media in bytes. + filesystem_id TEXT, -- The name used to store the media on disk. + CONSTRAINT uniqueness UNIQUE (media_origin, media_id) +); + +CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails ( + media_origin TEXT, -- The remote HS the media came from. + media_id TEXT, -- The id used to refer to the media. + thumbnail_width INTEGER, -- The width of the thumbnail in pixels. + thumbnail_height INTEGER, -- The height of the thumbnail in pixels. + thumbnail_method TEXT, -- The method used to make the thumbnail + thumbnail_type TEXT, -- The MIME-type of the thumbnail. + thumbnail_length INTEGER, -- The length of the thumbnail in bytes. + filesystem_id TEXT, -- The name used to store the media on disk. + CONSTRAINT uniqueness UNIQUE ( + media_origin, media_id, thumbnail_width, thumbnail_height, + thumbnail_type, thumbnail_type + ) +); + +CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id + ON local_media_repository_thumbnails (media_id); + + +PRAGMA user_version = 9; diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index 8ba732a23b..253f9f779b 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -32,6 +32,19 @@ CREATE INDEX IF NOT EXISTS events_stream_ordering ON events (stream_ordering); CREATE INDEX IF NOT EXISTS events_topological_ordering ON events (topological_ordering); CREATE INDEX IF NOT EXISTS events_room_id ON events (room_id); + +CREATE TABLE IF NOT EXISTS event_json( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + internal_metadata NOT NULL, + json BLOB NOT NULL, + CONSTRAINT ev_j_uniq UNIQUE (event_id) +); + +CREATE INDEX IF NOT EXISTS event_json_id ON event_json(event_id); +CREATE INDEX IF NOT EXISTS event_json_room_id ON event_json(room_id); + + CREATE TABLE IF NOT EXISTS state_events( event_id TEXT NOT NULL, room_id TEXT NOT NULL, diff --git a/synapse/storage/schema/media_repository.sql b/synapse/storage/schema/media_repository.sql new file mode 100644 index 0000000000..b785fa0208 --- /dev/null +++ b/synapse/storage/schema/media_repository.sql @@ -0,0 +1,68 @@ +/* Copyright 2014 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS local_media_repository ( + media_id TEXT, -- The id used to refer to the media. + media_type TEXT, -- The MIME-type of the media. + media_length INTEGER, -- Length of the media in bytes. + created_ts INTEGER, -- When the content was uploaded in ms. + upload_name TEXT, -- The name the media was uploaded with. + user_id TEXT, -- The user who uploaded the file. + CONSTRAINT uniqueness UNIQUE (media_id) +); + +CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails ( + media_id TEXT, -- The id used to refer to the media. + thumbnail_width INTEGER, -- The width of the thumbnail in pixels. + thumbnail_height INTEGER, -- The height of the thumbnail in pixels. + thumbnail_type TEXT, -- The MIME-type of the thumbnail. + thumbnail_method TEXT, -- The method used to make the thumbnail. + thumbnail_length INTEGER, -- The length of the thumbnail in bytes. + CONSTRAINT uniqueness UNIQUE ( + media_id, thumbnail_width, thumbnail_height, thumbnail_type + ) +); + +CREATE INDEX IF NOT EXISTS local_media_repository_thumbnails_media_id + ON local_media_repository_thumbnails (media_id); + +CREATE TABLE IF NOT EXISTS remote_media_cache ( + media_origin TEXT, -- The remote HS the media came from. + media_id TEXT, -- The id used to refer to the media on that server. + media_type TEXT, -- The MIME-type of the media. + created_ts INTEGER, -- When the content was uploaded in ms. + upload_name TEXT, -- The name the media was uploaded with. + media_length INTEGER, -- Length of the media in bytes. + filesystem_id TEXT, -- The name used to store the media on disk. + CONSTRAINT uniqueness UNIQUE (media_origin, media_id) +); + +CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails ( + media_origin TEXT, -- The remote HS the media came from. + media_id TEXT, -- The id used to refer to the media. + thumbnail_width INTEGER, -- The width of the thumbnail in pixels. + thumbnail_height INTEGER, -- The height of the thumbnail in pixels. + thumbnail_method TEXT, -- The method used to make the thumbnail + thumbnail_type TEXT, -- The MIME-type of the thumbnail. + thumbnail_length INTEGER, -- The length of the thumbnail in bytes. + filesystem_id TEXT, -- The name used to store the media on disk. + CONSTRAINT uniqueness UNIQUE ( + media_origin, media_id, thumbnail_width, thumbnail_height, + thumbnail_type, thumbnail_type + ) +); + +CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id + ON local_media_repository_thumbnails (media_id); diff --git a/synapse/storage/schema/state.sql b/synapse/storage/schema/state.sql index 44f7aafb27..2c48d6daca 100644 --- a/synapse/storage/schema/state.sql +++ b/synapse/storage/schema/state.sql @@ -29,7 +29,8 @@ CREATE TABLE IF NOT EXISTS state_groups_state( CREATE TABLE IF NOT EXISTS event_to_state_groups( event_id TEXT NOT NULL, - state_group INTEGER NOT NULL + state_group INTEGER NOT NULL, + CONSTRAINT event_to_state_groups_uniq UNIQUE (event_id) ); CREATE INDEX IF NOT EXISTS state_groups_id ON state_groups(id); diff --git a/synapse/storage/schema/transactions.sql b/synapse/storage/schema/transactions.sql index 88e3e4e04d..de461bfa15 100644 --- a/synapse/storage/schema/transactions.sql +++ b/synapse/storage/schema/transactions.sql @@ -59,3 +59,9 @@ CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(tra CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination); CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination); +-- To track destination health +CREATE TABLE IF NOT EXISTS destinations( + destination TEXT PRIMARY KEY, + retry_last_ts INTEGER, + retry_interval INTEGER +); diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index eea4f21065..3a705119fd 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -13,8 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + from _base import SQLBaseStore +from syutil.base64util import encode_base64 + class SignatureStore(SQLBaseStore): """Persistence for event signatures and hashes""" @@ -67,6 +71,21 @@ class SignatureStore(SQLBaseStore): f ) + @defer.inlineCallbacks + def add_event_hashes(self, event_ids): + hashes = yield self.get_event_reference_hashes( + event_ids + ) + hashes = [ + { + k: encode_base64(v) for k, v in h.items() + if k == "sha256" + } + for h in hashes + ] + + defer.returnValue(zip(event_ids, hashes)) + def _get_event_reference_hashes_txn(self, txn, event_id): """Get all the hashes for a given PDU. Args: diff --git a/synapse/storage/state.py b/synapse/storage/state.py index e0f44b3e59..afe3e5edea 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -86,11 +86,16 @@ class StateStore(SQLBaseStore): self._store_state_groups_txn, event ) - def _store_state_groups_txn(self, txn, event): - if event.state_events is None: + def _store_state_groups_txn(self, txn, event, context): + if context.current_state is None: return - state_group = event.state_group + state_events = context.current_state + + if event.is_state(): + state_events[(event.type, event.state_key)] = event + + state_group = context.state_group if not state_group: state_group = self._simple_insert_txn( txn, @@ -102,7 +107,7 @@ class StateStore(SQLBaseStore): or_ignore=True, ) - for state in event.state_events.values(): + for state in state_events.values(): self._simple_insert_txn( txn, table="state_groups_state", diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 00d0f48082..423cc3f02a 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -17,6 +17,8 @@ from ._base import SQLBaseStore, Table from collections import namedtuple +from twisted.internet import defer + import logging logger = logging.getLogger(__name__) @@ -26,6 +28,10 @@ class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. """ + # a write-through cache of DestinationsTable.EntryType indexed by + # destination string + destination_retry_cache = {} + def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have already responded to it. If so, return the response code and response @@ -114,7 +120,7 @@ class TransactionStore(SQLBaseStore): def _prep_send_transaction(self, txn, transaction_id, destination, origin_server_ts): - # First we find out what the prev_txs should be. + # First we find out what the prev_txns should be. # Since we know that we are only sending one transaction at a time, # we can simply take the last one. query = "%s ORDER BY id DESC LIMIT 1" % ( @@ -205,6 +211,92 @@ class TransactionStore(SQLBaseStore): return ReceivedTransactionsTable.decode_results(txn.fetchall()) + def get_destination_retry_timings(self, destination): + """Gets the current retry timings (if any) for a given destination. + + Args: + destination (str) + + Returns: + None if not retrying + Otherwise a DestinationsTable.EntryType for the retry scheme + """ + if destination in self.destination_retry_cache: + return defer.succeed(self.destination_retry_cache[destination]) + + return self.runInteraction( + "get_destination_retry_timings", + self._get_destination_retry_timings, destination) + + def _get_destination_retry_timings(cls, txn, destination): + query = DestinationsTable.select_statement("destination = ?") + txn.execute(query, (destination,)) + result = txn.fetchall() + if result: + result = DestinationsTable.decode_single_result(result) + if result.retry_last_ts > 0: + return result + else: + return None + + def set_destination_retry_timings(self, destination, + retry_last_ts, retry_interval): + """Sets the current retry timings for a given destination. + Both timings should be zero if retrying is no longer occuring. + + Args: + destination (str) + retry_last_ts (int) - time of last retry attempt in unix epoch ms + retry_interval (int) - how long until next retry in ms + """ + + self.destination_retry_cache[destination] = ( + DestinationsTable.EntryType( + destination, + retry_last_ts, + retry_interval + ) + ) + + # XXX: we could chose to not bother persisting this if our cache thinks + # this is a NOOP + return self.runInteraction( + "set_destination_retry_timings", + self._set_destination_retry_timings, + destination, + retry_last_ts, + retry_interval, + ) + + def _set_destination_retry_timings(cls, txn, destination, + retry_last_ts, retry_interval): + + query = ( + "INSERT OR REPLACE INTO %s " + "(destination, retry_last_ts, retry_interval) " + "VALUES (?, ?, ?) " + ) % DestinationsTable.table_name + + txn.execute(query, (destination, retry_last_ts, retry_interval)) + + def get_destinations_needing_retry(self): + """Get all destinations which are due a retry for sending a transaction. + + Returns: + list: A list of `DestinationsTable.EntryType` + """ + + return self.runInteraction( + "get_destinations_needing_retry", + self._get_destinations_needing_retry + ) + + def _get_destinations_needing_retry(cls, txn): + where = "retry_last_ts > 0 and retry_next_ts < now()" + query = DestinationsTable.select_statement(where) + txn.execute(query) + return DestinationsTable.decode_results(txn.fetchall()) + class ReceivedTransactionsTable(Table): table_name = "received_transactions" @@ -247,3 +339,15 @@ class TransactionsToPduTable(Table): ] EntryType = namedtuple("TransactionsToPduEntry", fields) + + +class DestinationsTable(Table): + table_name = "destinations" + + fields = [ + "destination", + "retry_last_ts", + "retry_interval", + ] + + EntryType = namedtuple("DestinationsEntry", fields) |