diff options
Diffstat (limited to 'synapse/storage/__init__.py')
-rw-r--r-- | synapse/storage/__init__.py | 187 |
1 files changed, 144 insertions, 43 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 5d5b5f7c44..d06033b980 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -13,30 +13,35 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer from synapse.api.events.room import ( RoomMemberEvent, MessageEvent, RoomTopicEvent, FeedbackEvent, - RoomConfigEvent + RoomConfigEvent, RoomNameEvent, ) +from synapse.util.logutils import log_function + from .directory import DirectoryStore from .feedback import FeedbackStore -from .message import MessageStore from .presence import PresenceStore from .profile import ProfileStore from .registration import RegistrationStore from .room import RoomStore from .roommember import RoomMemberStore -from .roomdata import RoomDataStore from .stream import StreamStore from .pdu import StatePduStore, PduStore from .transactions import TransactionStore import json +import logging import os -class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore, +logger = logging.getLogger(__name__) + + +class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore, PresenceStore, PduStore, StatePduStore, TransactionStore, DirectoryStore): @@ -44,51 +49,147 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore, def __init__(self, hs): super(DataStore, self).__init__(hs) self.event_factory = hs.get_event_factory() + self.hs = hs - def persist_event(self, event): - if event.type == MessageEvent.TYPE: - return self.store_message( - user_id=event.user_id, - room_id=event.room_id, - msg_id=event.msg_id, - content=json.dumps(event.content) - ) - elif event.type == RoomMemberEvent.TYPE: - return self.store_room_member( - user_id=event.target_user_id, - sender=event.user_id, - room_id=event.room_id, - content=event.content, - membership=event.content["membership"] - ) + self.min_token_deferred = self._get_min_token() + self.min_token = None + + @defer.inlineCallbacks + @log_function + def persist_event(self, event, backfilled=False): + if event.type == RoomMemberEvent.TYPE: + yield self._store_room_member(event) elif event.type == FeedbackEvent.TYPE: - return self.store_feedback( - room_id=event.room_id, - msg_id=event.msg_id, - msg_sender_id=event.msg_sender_id, - fb_sender_id=event.user_id, - fb_type=event.feedback_type, - content=json.dumps(event.content) - ) + yield self._store_feedback(event) +# elif event.type == RoomConfigEvent.TYPE: +# yield self._store_room_config(event) + elif event.type == RoomNameEvent.TYPE: + yield self._store_room_name(event) elif event.type == RoomTopicEvent.TYPE: - return self.store_room_data( - room_id=event.room_id, - etype=event.type, - state_key=event.state_key, - content=json.dumps(event.content) + yield self._store_room_topic(event) + + ret = yield self._store_event(event, backfilled) + defer.returnValue(ret) + + @defer.inlineCallbacks + def get_event(self, event_id): + events_dict = yield self._simple_select_one( + "events", + {"event_id": event_id}, + [ + "event_id", + "type", + "sender", + "room_id", + "content", + "unrecognized_keys" + ], + ) + + event = self._parse_event_from_row(events_dict) + defer.returnValue(event) + + @defer.inlineCallbacks + @log_function + def _store_event(self, event, backfilled): + # FIXME (erikj): This should be removed when we start amalgamating + # event and pdu storage + yield self.hs.get_federation().fill_out_prev_events(event) + + vals = { + "topological_ordering": event.depth, + "event_id": event.event_id, + "type": event.type, + "room_id": event.room_id, + "content": json.dumps(event.content), + "processed": True, + } + + if hasattr(event, "outlier"): + vals["outlier"] = event.outlier + else: + vals["outlier"] = False + + if backfilled: + if not self.min_token_deferred.called: + yield self.min_token_deferred + self.min_token -= 1 + vals["stream_ordering"] = self.min_token + + unrec = { + k: v + for k, v in event.get_full_dict().items() + if k not in vals.keys() + } + vals["unrecognized_keys"] = json.dumps(unrec) + + try: + yield self._simple_insert("events", vals) + except: + logger.exception( + "Failed to persist, probably duplicate: %s", + event.event_id + ) + return + + if not backfilled and hasattr(event, "state_key"): + vals = { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + } + + if hasattr(event, "prev_state"): + vals["prev_state"] = event.prev_state + + yield self._simple_insert("state_events", vals) + + yield self._simple_insert( + "current_state_events", + { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + } ) - elif event.type == RoomConfigEvent.TYPE: - if "visibility" in event.content: - visibility = event.content["visibility"] - return self.store_room_config( - room_id=event.room_id, - visibility=visibility - ) + latest = yield self.get_room_events_max_id() + defer.returnValue(latest) + + @defer.inlineCallbacks + def get_current_state(self, room_id, event_type=None, state_key=""): + sql = ( + "SELECT e.* FROM events as e " + "INNER JOIN current_state_events as c ON e.event_id = c.event_id " + "INNER JOIN state_events as s ON e.event_id = s.event_id " + "WHERE c.room_id = ? " + ) + + if event_type: + sql += " AND s.type = ? AND s.state_key = ? " + args = (room_id, event_type, state_key) else: - raise NotImplementedError( - "Don't know how to persist type=%s" % event.type - ) + args = (room_id, ) + + results = yield self._execute_and_decode(sql, *args) + + defer.returnValue([self._parse_event_from_row(r) for r in results]) + + @defer.inlineCallbacks + def _get_min_token(self): + row = yield self._execute( + None, + "SELECT MIN(stream_ordering) FROM events" + ) + + self.min_token = row[0][0] if row and row[0] and row[0][0] else -1 + self.min_token = min(self.min_token, -1) + + logger.debug("min_token is: %s", self.min_token) + + defer.returnValue(self.min_token) def schema_path(schema): |