diff options
Diffstat (limited to 'synapse/storage/__init__.py')
-rw-r--r-- | synapse/storage/__init__.py | 122 |
1 files changed, 100 insertions, 22 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index d97014f4da..66658f6721 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -36,7 +36,7 @@ from .registration import RegistrationStore from .room import RoomStore from .roommember import RoomMemberStore from .stream import StreamStore -from .pdu import StatePduStore, PduStore +from .pdu import StatePduStore, PduStore, PdusTable from .transactions import TransactionStore from .keys import KeyStore @@ -48,6 +48,28 @@ import os logger = logging.getLogger(__name__) +SCHEMAS = [ + "transactions", + "pdu", + "users", + "profiles", + "presence", + "im", + "room_aliases", +] + + +# Remember to update this number every time an incompatible change is made to +# database schema files, so the users will be informed on server restarts. +SCHEMA_VERSION = 3 + + +class _RollbackButIsFineException(Exception): + """ This exception is used to rollback a transaction without implying + something went wrong. + """ + pass + class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore, PresenceStore, PduStore, StatePduStore, TransactionStore, @@ -63,7 +85,8 @@ class DataStore(RoomMemberStore, RoomStore, @defer.inlineCallbacks @log_function - def persist_event(self, event=None, backfilled=False, pdu=None): + def persist_event(self, event=None, backfilled=False, pdu=None, + is_new_state=True): stream_ordering = None if backfilled: if not self.min_token_deferred.called: @@ -71,17 +94,20 @@ class DataStore(RoomMemberStore, RoomStore, self.min_token -= 1 stream_ordering = self.min_token - latest = yield self._db_pool.runInteraction( - self._persist_pdu_event_txn, - pdu=pdu, - event=event, - backfilled=backfilled, - stream_ordering=stream_ordering, - ) - defer.returnValue(latest) + try: + yield self.runInteraction( + self._persist_pdu_event_txn, + pdu=pdu, + event=event, + backfilled=backfilled, + stream_ordering=stream_ordering, + is_new_state=is_new_state, + ) + except _RollbackButIsFineException as e: + pass @defer.inlineCallbacks - def get_event(self, event_id): + def get_event(self, event_id, allow_none=False): events_dict = yield self._simple_select_one( "events", {"event_id": event_id}, @@ -92,18 +118,24 @@ class DataStore(RoomMemberStore, RoomStore, "content", "unrecognized_keys" ], + allow_none=allow_none, ) + if not events_dict: + defer.returnValue(None) + event = self._parse_event_from_row(events_dict) defer.returnValue(event) def _persist_pdu_event_txn(self, txn, pdu=None, event=None, - backfilled=False, stream_ordering=None): + backfilled=False, stream_ordering=None, + is_new_state=True): if pdu is not None: self._persist_event_pdu_txn(txn, pdu) if event is not None: return self._persist_event_txn( - txn, event, backfilled, stream_ordering + txn, event, backfilled, stream_ordering, + is_new_state=is_new_state, ) def _persist_event_pdu_txn(self, txn, pdu): @@ -112,6 +144,12 @@ class DataStore(RoomMemberStore, RoomStore, del cols["content"] del cols["prev_pdus"] cols["content_json"] = json.dumps(pdu.content) + + unrec_keys.update({ + k: v for k, v in cols.items() + if k not in PdusTable.fields + }) + cols["unrecognized_keys"] = json.dumps(unrec_keys) logger.debug("Persisting: %s", repr(cols)) @@ -124,7 +162,8 @@ class DataStore(RoomMemberStore, RoomStore, self._update_min_depth_for_context_txn(txn, pdu.context, pdu.depth) @log_function - def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None): + def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None, + is_new_state=True): if event.type == RoomMemberEvent.TYPE: self._store_room_member_txn(txn, event) elif event.type == FeedbackEvent.TYPE: @@ -171,13 +210,14 @@ class DataStore(RoomMemberStore, RoomStore, try: self._simple_insert_txn(txn, "events", vals) except: - logger.exception( + logger.warn( "Failed to persist, probably duplicate: %s", - event.event_id + event.event_id, + exc_info=True, ) - return + raise _RollbackButIsFineException("_persist_event") - if not backfilled and hasattr(event, "state_key"): + if is_new_state and hasattr(event, "state_key"): vals = { "event_id": event.event_id, "room_id": event.room_id, @@ -201,8 +241,6 @@ class DataStore(RoomMemberStore, RoomStore, } ) - return self._get_room_events_max_id_txn(txn) - @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): sql = ( @@ -220,7 +258,8 @@ class DataStore(RoomMemberStore, RoomStore, results = yield self._execute_and_decode(sql, *args) - defer.returnValue([self._parse_event_from_row(r) for r in results]) + events = yield self._parse_events(results) + defer.returnValue(events) @defer.inlineCallbacks def _get_min_token(self): @@ -269,7 +308,7 @@ class DataStore(RoomMemberStore, RoomStore, prev_state_pdu=prev_state_pdu, ) - return self._db_pool.runInteraction(_snapshot) + return self.runInteraction(_snapshot) class Snapshot(object): @@ -339,3 +378,42 @@ def read_schema(schema): """ with open(schema_path(schema)) as schema_file: return schema_file.read() + + +def prepare_database(db_conn): + """ Set up all the dbs. Since all the *.sql have IF NOT EXISTS, so we + don't have to worry about overwriting existing content. + """ + c = db_conn.cursor() + c.execute("PRAGMA user_version") + row = c.fetchone() + + if row and row[0]: + user_version = row[0] + + if user_version > SCHEMA_VERSION: + raise ValueError("Cannot use this database as it is too " + + "new for the server to understand" + ) + elif user_version < SCHEMA_VERSION: + logging.info("Upgrading database from version %d", + user_version + ) + + # Run every version since after the current version. + for v in range(user_version + 1, SCHEMA_VERSION + 1): + sql_script = read_schema("delta/v%d" % (v)) + c.executescript(sql_script) + + db_conn.commit() + + else: + for sql_loc in SCHEMAS: + sql_script = read_schema(sql_loc) + + c.executescript(sql_script) + db_conn.commit() + c.execute("PRAGMA user_version = %d" % SCHEMA_VERSION) + + c.close() + |