diff options
Diffstat (limited to 'synapse/storage/__init__.py')
-rw-r--r-- | synapse/storage/__init__.py | 313 |
1 files changed, 233 insertions, 80 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 4beb951b9f..02b1f06854 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -29,10 +29,14 @@ from .stream import StreamStore from .transactions import TransactionStore from .keys import KeyStore from .event_federation import EventFederationStore +from .pusher import PusherStore +from .push_rule import PushRuleStore from .media_repository import MediaRepositoryStore +from .rejections import RejectionsStore from .state import StateStore from .signatures import SignatureStore +from .filtering import FilteringStore from syutil.base64util import decode_base64 from syutil.jsonutil import encode_canonical_json @@ -40,7 +44,6 @@ from syutil.jsonutil import encode_canonical_json from synapse.crypto.event_signing import compute_event_reference_hash -import json import logging import os @@ -60,13 +63,16 @@ SCHEMAS = [ "state", "event_edges", "event_signatures", + "pusher", "media_repository", + "filtering", + "rejections", ] # Remember to update this number every time an incompatible change is made to # database schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 11 +SCHEMA_VERSION = 12 class _RollbackButIsFineException(Exception): @@ -82,6 +88,10 @@ class DataStore(RoomMemberStore, RoomStore, DirectoryStore, KeyStore, StateStore, SignatureStore, EventFederationStore, MediaRepositoryStore, + RejectionsStore, + FilteringStore, + PusherStore, + PushRuleStore ): def __init__(self, hs): @@ -117,21 +127,147 @@ class DataStore(RoomMemberStore, RoomStore, pass @defer.inlineCallbacks - def get_event(self, event_id, allow_none=False): - events = yield self._get_events([event_id]) + def get_event(self, event_id, check_redacted=True, + get_prev_content=False, allow_rejected=False, + allow_none=False): + """Get an event from the database by event_id. + + Args: + event_id (str): The event_id of the event to fetch + check_redacted (bool): If True, check if event has been redacted + and redact it. + get_prev_content (bool): If True and event is a state event, + include the previous states content in the unsigned field. + allow_rejected (bool): If True return rejected events. + allow_none (bool): If True, return None if no event found, if + False throw an exception. + + Returns: + Deferred : A FrozenEvent. + """ + event = yield self.runInteraction( + "get_event", self._get_event_txn, + event_id, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) - if not events: - if allow_none: - defer.returnValue(None) - else: - raise RuntimeError("Could not find event %s" % (event_id,)) + if not event and not allow_none: + raise RuntimeError("Could not find event %s" % (event_id,)) - defer.returnValue(events[0]) + defer.returnValue(event) @log_function def _persist_event_txn(self, txn, event, context, backfilled, stream_ordering=None, is_new_state=True, current_state=None): + + # Remove the any existing cache entries for the event_id + self._get_event_cache.pop(event.event_id) + + # We purposefully do this first since if we include a `current_state` + # key, we *want* to update the `current_state_events` table + if current_state: + txn.execute( + "DELETE FROM current_state_events WHERE room_id = ?", + (event.room_id,) + ) + + for s in current_state: + self._simple_insert_txn( + txn, + "current_state_events", + { + "event_id": s.event_id, + "room_id": s.room_id, + "type": s.type, + "state_key": s.state_key, + }, + or_replace=True, + ) + + if event.is_state() and is_new_state: + if not backfilled and not context.rejected: + self._simple_insert_txn( + txn, + table="state_forward_extremities", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + }, + or_replace=True, + ) + + for prev_state_id, _ in event.prev_state: + self._simple_delete_txn( + txn, + table="state_forward_extremities", + keyvalues={ + "event_id": prev_state_id, + } + ) + + outlier = event.internal_metadata.is_outlier() + + if not outlier: + self._store_state_groups_txn(txn, event, context) + + self._update_min_depth_for_room_txn( + txn, + event.room_id, + event.depth + ) + + self._handle_prev_events( + txn, + outlier=outlier, + event_id=event.event_id, + prev_events=event.prev_events, + room_id=event.room_id, + ) + + have_persisted = self._simple_select_one_onecol_txn( + txn, + table="event_json", + keyvalues={"event_id": event.event_id}, + retcol="event_id", + allow_none=True, + ) + + metadata_json = encode_canonical_json( + event.internal_metadata.get_dict() + ) + + # If we have already persisted this event, we don't need to do any + # more processing. + # The processing above must be done on every call to persist event, + # since they might not have happened on previous calls. For example, + # if we are persisting an event that we had persisted as an outlier, + # but is no longer one. + if have_persisted: + if not outlier: + sql = ( + "UPDATE event_json SET internal_metadata = ?" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (metadata_json.decode("UTF-8"), event.event_id,) + ) + + sql = ( + "UPDATE events SET outlier = 0" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (event.event_id,) + ) + return + if event.type == EventTypes.Member: self._store_room_member_txn(txn, event) elif event.type == EventTypes.Feedback: @@ -143,8 +279,6 @@ class DataStore(RoomMemberStore, RoomStore, elif event.type == EventTypes.Redaction: self._store_redaction(txn, event) - outlier = event.internal_metadata.is_outlier() - event_dict = { k: v for k, v in event.get_dict().items() @@ -154,10 +288,6 @@ class DataStore(RoomMemberStore, RoomStore, ] } - metadata_json = encode_canonical_json( - event.internal_metadata.get_dict() - ) - self._simple_insert_txn( txn, table="event_json", @@ -170,12 +300,16 @@ class DataStore(RoomMemberStore, RoomStore, or_replace=True, ) + content = encode_canonical_json( + event.content + ).decode("UTF-8") + vals = { "topological_ordering": event.depth, "event_id": event.event_id, "type": event.type, "room_id": event.room_id, - "content": json.dumps(event.get_dict()["content"]), + "content": content, "processed": True, "outlier": outlier, "depth": event.depth, @@ -195,7 +329,10 @@ class DataStore(RoomMemberStore, RoomStore, "prev_events", ] } - vals["unrecognized_keys"] = json.dumps(unrec) + + vals["unrecognized_keys"] = encode_canonical_json( + unrec + ).decode("UTF-8") try: self._simple_insert_txn( @@ -213,38 +350,10 @@ class DataStore(RoomMemberStore, RoomStore, ) raise _RollbackButIsFineException("_persist_event") - self._handle_prev_events( - txn, - outlier=outlier, - event_id=event.event_id, - prev_events=event.prev_events, - room_id=event.room_id, - ) - - if not outlier: - self._store_state_groups_txn(txn, event, context) - - if current_state: - txn.execute( - "DELETE FROM current_state_events WHERE room_id = ?", - (event.room_id,) - ) + if context.rejected: + self._store_rejections_txn(txn, event.event_id, context.rejected) - for s in current_state: - self._simple_insert_txn( - txn, - "current_state_events", - { - "event_id": s.event_id, - "room_id": s.room_id, - "type": s.type, - "state_key": s.state_key, - }, - or_replace=True, - ) - - is_state = hasattr(event, "state_key") and event.state_key is not None - if is_state: + if event.is_state(): vals = { "event_id": event.event_id, "room_id": event.room_id, @@ -252,6 +361,7 @@ class DataStore(RoomMemberStore, RoomStore, "state_key": event.state_key, } + # TODO: How does this work with backfilling? if hasattr(event, "replaces_state"): vals["prev_state"] = event.replaces_state @@ -262,7 +372,7 @@ class DataStore(RoomMemberStore, RoomStore, or_replace=True, ) - if is_new_state: + if is_new_state and not context.rejected: self._simple_insert_txn( txn, "current_state_events", @@ -288,28 +398,6 @@ class DataStore(RoomMemberStore, RoomStore, or_ignore=True, ) - if not backfilled: - self._simple_insert_txn( - txn, - table="state_forward_extremities", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - }, - or_replace=True, - ) - - for prev_state_id, _ in event.prev_state: - self._simple_delete_txn( - txn, - table="state_forward_extremities", - keyvalues={ - "event_id": prev_state_id, - } - ) - for hash_alg, hash_base64 in event.hashes.items(): hash_bytes = decode_base64(hash_base64) self._store_event_content_hash_txn( @@ -340,14 +428,9 @@ class DataStore(RoomMemberStore, RoomStore, txn, event.event_id, ref_alg, ref_hash_bytes ) - if not outlier: - self._update_min_depth_for_room_txn( - txn, - event.room_id, - event.depth - ) - def _store_redaction(self, txn, event): + # invalidate the cache for the redacted event + self._get_event_cache.pop(event.redacts) txn.execute( "INSERT OR IGNORE INTO redactions " "(event_id, redacts) VALUES (?,?)", @@ -370,9 +453,12 @@ class DataStore(RoomMemberStore, RoomStore, "redacted": del_sql, } - if event_type: + if event_type and state_key is not None: sql += " AND s.type = ? AND s.state_key = ? " args = (room_id, event_type, state_key) + elif event_type: + sql += " AND s.type = ?" + args = (room_id, event_type) else: args = (room_id, ) @@ -382,6 +468,41 @@ class DataStore(RoomMemberStore, RoomStore, defer.returnValue(events) @defer.inlineCallbacks + def get_room_name_and_aliases(self, room_id): + del_sql = ( + "SELECT event_id FROM redactions WHERE redacts = e.event_id " + "LIMIT 1" + ) + + sql = ( + "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " + "INNER JOIN current_state_events as c ON e.event_id = c.event_id " + "INNER JOIN state_events as s ON e.event_id = s.event_id " + "WHERE c.room_id = ? " + ) % { + "redacted": del_sql, + } + + sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')" + sql += " OR s.type = 'm.room.aliases')" + args = (room_id,) + + results = yield self._execute_and_decode(sql, *args) + + events = yield self._parse_events(results) + + name = None + aliases = [] + + for e in events: + if e.type == 'm.room.name': + name = e.content['name'] + elif e.type == 'm.room.aliases': + aliases.extend(e.content['aliases']) + + defer.returnValue((name, aliases)) + + @defer.inlineCallbacks def _get_min_token(self): row = yield self._execute( None, @@ -417,6 +538,38 @@ class DataStore(RoomMemberStore, RoomStore, ], ) + def have_events(self, event_ids): + """Given a list of event ids, check if we have already processed them. + + Returns: + dict: Has an entry for each event id we already have seen. Maps to + the rejected reason string if we rejected the event, else maps to + None. + """ + if not event_ids: + return defer.succeed({}) + + def f(txn): + sql = ( + "SELECT e.event_id, reason FROM events as e " + "LEFT JOIN rejections as r ON e.event_id = r.event_id " + "WHERE e.event_id = ?" + ) + + res = {} + for event_id in event_ids: + txn.execute(sql, (event_id,)) + row = txn.fetchone() + if row: + _, rejected = row + res[event_id] = rejected + + return res + + return self.runInteraction( + "have_events", f, + ) + def schema_path(schema): """ Get a filesystem path for the named database schema |