diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/__init__.py | 123 | ||||
-rw-r--r-- | synapse/storage/_base.py | 27 | ||||
-rw-r--r-- | synapse/storage/feedback.py | 72 | ||||
-rw-r--r-- | synapse/storage/message.py | 81 | ||||
-rw-r--r-- | synapse/storage/room.py | 90 | ||||
-rw-r--r-- | synapse/storage/roomdata.py | 85 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 166 | ||||
-rw-r--r-- | synapse/storage/schema/im.sql | 75 | ||||
-rw-r--r-- | synapse/storage/stream.py | 282 |
9 files changed, 339 insertions, 662 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 5d5b5f7c44..f62cee3c39 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -13,21 +13,20 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer from synapse.api.events.room import ( RoomMemberEvent, MessageEvent, RoomTopicEvent, FeedbackEvent, - RoomConfigEvent + RoomConfigEvent, RoomNameEvent, ) from .directory import DirectoryStore from .feedback import FeedbackStore -from .message import MessageStore from .presence import PresenceStore from .profile import ProfileStore from .registration import RegistrationStore from .room import RoomStore from .roommember import RoomMemberStore -from .roomdata import RoomDataStore from .stream import StreamStore from .pdu import StatePduStore, PduStore from .transactions import TransactionStore @@ -36,7 +35,7 @@ import json import os -class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore, +class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore, PresenceStore, PduStore, StatePduStore, TransactionStore, DirectoryStore): @@ -45,50 +44,86 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore, super(DataStore, self).__init__(hs) self.event_factory = hs.get_event_factory() + @defer.inlineCallbacks def persist_event(self, event): - if event.type == MessageEvent.TYPE: - return self.store_message( - user_id=event.user_id, - room_id=event.room_id, - msg_id=event.msg_id, - content=json.dumps(event.content) - ) - elif event.type == RoomMemberEvent.TYPE: - return self.store_room_member( - user_id=event.target_user_id, - sender=event.user_id, - room_id=event.room_id, - content=event.content, - membership=event.content["membership"] - ) + if event.type == RoomMemberEvent.TYPE: + yield self._store_room_member(event) elif event.type == FeedbackEvent.TYPE: - return self.store_feedback( - room_id=event.room_id, - msg_id=event.msg_id, - msg_sender_id=event.msg_sender_id, - fb_sender_id=event.user_id, - fb_type=event.feedback_type, - content=json.dumps(event.content) - ) - elif event.type == RoomTopicEvent.TYPE: - return self.store_room_data( - room_id=event.room_id, - etype=event.type, - state_key=event.state_key, - content=json.dumps(event.content) - ) + yield self._store_feedback(event) elif event.type == RoomConfigEvent.TYPE: - if "visibility" in event.content: - visibility = event.content["visibility"] - return self.store_room_config( - room_id=event.room_id, - visibility=visibility - ) - + yield self._store_room_config(event) + elif event.type == RoomNameEvent.TYPE: + yield self._store_room_name(event) + elif event.type == RoomTopicEvent.TYPE: + yield self._store_room_topic(event) + + yield self._store_event(event) + + @defer.inlineCallbacks + def get_event(self, event_id): + events_dict = yield self._simple_select_one( + "events", + {"event_id": event_id}, + [ + "event_id", + "type", + "sender", + "room_id", + "content", + "unrecognized_keys" + ], + ) + + event = self._parse_event_from_row(events_dict) + defer.returnValue(event) + + @defer.inlineCallbacks + def _store_event(self, event): + vals = { + "event_id": event.event_id, + "type": event.type, + "room_id": event.room_id, + "content": json.dumps(event.content), + } + + unrec = {k: v for k, v in event.get_full_dict().items() if k not in vals.keys()} + vals["unrecognized_keys"] = json.dumps(unrec) + + yield self._simple_insert("events", vals) + + if hasattr(event, "state_key"): + vals = { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + } + + if hasattr(event, "prev_state"): + vals["prev_state"] = event.prev_state + + yield self._simple_insert("state_events", vals) + + # TODO (erikj): We also need to update the current state table? + + @defer.inlineCallbacks + def get_current_state(self, room_id, event_type=None, state_key=""): + sql = ( + "SELECT e.* FROM events as e " + "INNER JOIN current_state_events as c ON e.event_id = c.event_id " + "INNER JOIN state_events as s ON e.event_id = s.event_id " + "WHERE c.room_id = ? " + ) + + if event_type: + sql += " AND s.type = ? AND s.state_key = ? " + args = (room_id, event_type, state_key) else: - raise NotImplementedError( - "Don't know how to persist type=%s" % event.type - ) + args = (room_id, ) + + results = yield self._execute_and_decode(sql, *args) + + defer.returnValue([self._parse_event_from_row(r) for r in results]) def schema_path(schema): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index bf1800f4bf..c26e9a0f98 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from twisted.internet import defer @@ -20,6 +19,9 @@ from twisted.internet import defer from synapse.api.errors import StoreError import collections +import copy +import json + logger = logging.getLogger(__name__) @@ -29,6 +31,7 @@ class SQLBaseStore(object): def __init__(self, hs): self.hs = hs self._db_pool = hs.get_db_pool() + self.event_factory = hs.get_event_factory() self._clock = hs.get_clock() def cursor_to_dict(self, cursor): @@ -57,14 +60,21 @@ class SQLBaseStore(object): The result of decoder(results) """ logger.debug( - "[SQL] %s Args=%s Func=%s", query, args, decoder.__name__ + "[SQL] %s Args=%s Func=%s", query, args, decoder.__name__ if decoder else None ) def interaction(txn): cursor = txn.execute(query, args) - return decoder(cursor) + if decoder: + return decoder(cursor) + else: + return cursor.fetchall() + return self._db_pool.runInteraction(interaction) + def _execute_and_decode(self, query, *args): + return self._execute(self.cursor_to_dict, query, *args) + # "Simple" SQL API methods that operate on a single table with no JOINs, # no complex WHERE clauses, just a dict of values for columns. @@ -281,6 +291,17 @@ class SQLBaseStore(object): return self._db_pool.runInteraction(func) + def _parse_event_from_row(self, row_dict): + d = copy.deepcopy({k: v for k, v in row_dict.items() if v}) + d.update(json.loads(json.loads(row_dict["unrecognized_keys"]))) + d["content"] = json.loads(d["content"]) + del d["unrecognized_keys"] + + return self.event_factory.create_event( + etype=d["type"], + **d + ) + class Table(object): """ A base class used to store information about a particular table. diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py index 9bd562c762..e60f98d1e1 100644 --- a/synapse/storage/feedback.py +++ b/synapse/storage/feedback.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + from ._base import SQLBaseStore, Table from synapse.api.events.room import FeedbackEvent @@ -22,54 +24,28 @@ import json class FeedbackStore(SQLBaseStore): - def store_feedback(self, room_id, msg_id, msg_sender_id, - fb_sender_id, fb_type, content): - return self._simple_insert(FeedbackTable.table_name, dict( - room_id=room_id, - msg_id=msg_id, - msg_sender_id=msg_sender_id, - fb_sender_id=fb_sender_id, - fb_type=fb_type, - content=content, - )) - - def get_feedback(self, room_id=None, msg_id=None, msg_sender_id=None, - fb_sender_id=None, fb_type=None): - query = FeedbackTable.select_statement( - "msg_sender_id = ? AND room_id = ? AND msg_id = ? " + - "AND fb_sender_id = ? AND feedback_type = ? " + - "ORDER BY id DESC LIMIT 1") - return self._execute( - FeedbackTable.decode_single_result, - query, msg_sender_id, room_id, msg_id, fb_sender_id, fb_type, + def _store_feedback(self, event): + return self._simple_insert("feedback", { + "event_id": event.event_id, + "feedback_type": event.feedback_type, + "room_id": event.room_id, + "target_event_id": event.target_event, + "sender": event.user_id, + }) + + @defer.inlineCallbacks + def get_feedback_for_event(self, event_id): + sql = ( + "SELECT events.* FROM events INNER JOIN feedback " + "ON events.event_id = feedback.event_id " + "WHERE feedback.target_event_id = ? " ) - def get_max_feedback_id(self): - return self._simple_max_id(FeedbackTable.table_name) - - -class FeedbackTable(Table): - table_name = "feedback" + rows = yield self._execute_and_decode(sql, event_id) - fields = [ - "id", - "content", - "feedback_type", - "fb_sender_id", - "msg_id", - "room_id", - "msg_sender_id" - ] - - class EntryType(collections.namedtuple("FeedbackEntry", fields)): - - def as_event(self, event_factory): - return event_factory.create_event( - etype=FeedbackEvent.TYPE, - room_id=self.room_id, - msg_id=self.msg_id, - msg_sender_id=self.msg_sender_id, - user_id=self.fb_sender_id, - feedback_type=self.feedback_type, - content=json.loads(self.content), - ) + defer.returnValue( + [ + self._parse_event_from_row(r) + for r in rows + ] + ) diff --git a/synapse/storage/message.py b/synapse/storage/message.py deleted file mode 100644 index 7bb69c1384..0000000000 --- a/synapse/storage/message.py +++ /dev/null @@ -1,81 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ._base import SQLBaseStore, Table -from synapse.api.events.room import MessageEvent - -import collections -import json - - -class MessageStore(SQLBaseStore): - - def get_message(self, user_id, room_id, msg_id): - """Get a message from the store. - - Args: - user_id (str): The ID of the user who sent the message. - room_id (str): The room the message was sent in. - msg_id (str): The unique ID for this user/room combo. - """ - query = MessagesTable.select_statement( - "user_id = ? AND room_id = ? AND msg_id = ? " + - "ORDER BY id DESC LIMIT 1") - return self._execute( - MessagesTable.decode_single_result, - query, user_id, room_id, msg_id, - ) - - def store_message(self, user_id, room_id, msg_id, content): - """Store a message in the store. - - Args: - user_id (str): The ID of the user who sent the message. - room_id (str): The room the message was sent in. - msg_id (str): The unique ID for this user/room combo. - content (str): The content of the message (JSON) - """ - return self._simple_insert(MessagesTable.table_name, dict( - user_id=user_id, - room_id=room_id, - msg_id=msg_id, - content=content, - )) - - def get_max_message_id(self): - return self._simple_max_id(MessagesTable.table_name) - - -class MessagesTable(Table): - table_name = "messages" - - fields = [ - "id", - "user_id", - "room_id", - "msg_id", - "content" - ] - - class EntryType(collections.namedtuple("MessageEntry", fields)): - - def as_event(self, event_factory): - return event_factory.create_event( - etype=MessageEvent.TYPE, - room_id=self.room_id, - user_id=self.user_id, - msg_id=self.msg_id, - content=json.loads(self.content), - ) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index a97162831b..8f44b67d8c 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -76,49 +76,73 @@ class RoomStore(SQLBaseStore): ) @defer.inlineCallbacks - def get_rooms(self, is_public, with_topics): + def get_rooms(self, is_public): """Retrieve a list of all public rooms. Args: is_public (bool): True if the rooms returned should be public. - with_topics (bool): True to include the current topic for the room - in the response. Returns: - A list of room dicts containing at least a "room_id" key, and a - "topic" key if one is set and with_topic=True. + A list of room dicts containing at least a "room_id" key, a + "topic" key if one is set, and a "name" key if one is set """ - room_data_type = RoomTopicEvent.TYPE - public = 1 if is_public else 0 - - latest_topic = ("SELECT max(room_data.id) FROM room_data WHERE " - + "room_data.type = ? GROUP BY room_id") - - query = ("SELECT rooms.*, room_data.content, room_alias FROM rooms " - + "LEFT JOIN " - + "room_aliases ON room_aliases.room_id = rooms.room_id " - + "LEFT JOIN " - + "room_data ON rooms.room_id = room_data.room_id WHERE " - + "(room_data.id IN (" + latest_topic + ") " - + "OR room_data.id IS NULL) AND rooms.is_public = ?") - - res = yield self._execute( - self.cursor_to_dict, query, room_data_type, public + + topic_subquery = ( + "SELECT topics.event_id as event_id, topics.room_id as room_id, topic FROM topics " + "INNER JOIN current_state_events as c " + "ON c.event_id = topics.event_id " ) - # return only the keys the specification expects - ret_keys = ["room_id", "topic", "room_alias"] + name_subquery = ( + "SELECT room_names.event_id as event_id, room_names.room_id as room_id, name FROM room_names " + "INNER JOIN current_state_events as c " + "ON c.event_id = room_names.event_id " + ) - # extract topic from the json (icky) FIXME - for i, room_row in enumerate(res): - try: - content_json = json.loads(room_row["content"]) - room_row["topic"] = content_json["topic"] - except: - pass # no topic set - # filter the dict based on ret_keys - res[i] = {k: v for k, v in room_row.iteritems() if k in ret_keys} + sql = ( + "SELECT r.room_id, n.name, t.topic, group_concat(a.room_alias) FROM rooms AS r " + "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id " + "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id " + "INNER JOIN room_aliases AS a ON a.room_id = r.room_id " + "WHERE r.is_public = ? " + "GROUP BY r.room_id " + ) % { + "topic": topic_subquery, + "name": name_subquery, + } + + rows = yield self._execute(None, sql, is_public) + + ret = [ + { + "room_id": r[0], + "name": r[1], + "topic": r[2], + "aliases": r[3].split(","), + } + for r in rows + ] + + defer.returnValue(ret) + + def _store_room_topic(self, event): + return self._simple_insert( + "topics", + { + "event_id": event.event_id, + "room_id": event.room_id, + "topic": event.topic, + } + ) - defer.returnValue(res) + def _store_room_name(self, event): + return self._simple_insert( + "room_names", + { + "event_id": event.event_id, + "room_id": event.room_id, + "name": event.name, + } + ) class RoomsTable(Table): diff --git a/synapse/storage/roomdata.py b/synapse/storage/roomdata.py deleted file mode 100644 index cc04d1ba14..0000000000 --- a/synapse/storage/roomdata.py +++ /dev/null @@ -1,85 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ._base import SQLBaseStore, Table - -import collections -import json - - -class RoomDataStore(SQLBaseStore): - - """Provides various CRUD operations for Room Events. """ - - def get_room_data(self, room_id, etype, state_key=""): - """Retrieve the data stored under this type and state_key. - - Args: - room_id (str) - etype (str) - state_key (str) - Returns: - namedtuple: Or None if nothing exists at this path. - """ - query = RoomDataTable.select_statement( - "room_id = ? AND type = ? AND state_key = ? " - "ORDER BY id DESC LIMIT 1" - ) - return self._execute( - RoomDataTable.decode_single_result, - query, room_id, etype, state_key, - ) - - def store_room_data(self, room_id, etype, state_key="", content=None): - """Stores room specific data. - - Args: - room_id (str) - etype (str) - state_key (str) - data (str)- The data to store for this path in JSON. - Returns: - The store ID for this data. - """ - return self._simple_insert(RoomDataTable.table_name, dict( - etype=etype, - state_key=state_key, - room_id=room_id, - content=content, - )) - - def get_max_room_data_id(self): - return self._simple_max_id(RoomDataTable.table_name) - - -class RoomDataTable(Table): - table_name = "room_data" - - fields = [ - "id", - "room_id", - "type", - "state_key", - "content" - ] - - class EntryType(collections.namedtuple("RoomDataEntry", fields)): - - def as_event(self, event_factory): - return event_factory.create_event( - etype=self.type, - room_id=self.room_id, - content=json.loads(self.content), - ) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index ef73be4af4..8c4b04f190 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -31,6 +31,38 @@ logger = logging.getLogger(__name__) class RoomMemberStore(SQLBaseStore): + @defer.inlineCallbacks + def _store_room_member(self, event): + """Store a room member in the database. + """ + domain = self.hs.parse_userid(event.target_user_id).domain + + yield self._simple_insert( + "room_memberships", + { + "event_id": event.event_id, + "user_id": event.target_user_id, + "sender": event.user_id, + "room_id": event.room_id, + "membership": event.membership, + } + ) + + # Update room hosts table + if event.membership == Membership.JOIN: + sql = ( + "INSERT OR IGNORE INTO room_hosts (room_id, host) " + "VALUES (?, ?)" + ) + yield self._execute(None, sql, event.room_id, domain) + else: + sql = ( + "DELETE FROM room_hosts WHERE room_id = ? AND host = ?" + ) + + yield self._execute(None, sql, event.room_id, domain) + + def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. @@ -38,36 +70,13 @@ class RoomMemberStore(SQLBaseStore): user_id (str): The member's user ID. room_id (str): The room the member is in. Returns: - namedtuple: The room member from the database, or None if this - member does not exist. + Deferred: Results in a MembershipEvent or None. """ - query = RoomMemberTable.select_statement( - "room_id = ? AND user_id = ? ORDER BY id DESC LIMIT 1") - return self._execute( - RoomMemberTable.decode_single_result, - query, room_id, user_id, - ) - - def store_room_member(self, user_id, sender, room_id, membership, content): - """Store a room member in the database. + return self._get_members_by_dict({ + "e.room_id": room_id, + "m.user_id": user_id, + }) - Args: - user_id (str): The member's user ID. - room_id (str): The room in relation to the member. - membership (synapse.api.constants.Membership): The new membership - state. - content (dict): The content of the membership (JSON). - """ - content_json = json.dumps(content) - return self._simple_insert(RoomMemberTable.table_name, dict( - user_id=user_id, - sender=sender, - room_id=room_id, - membership=membership, - content=content_json, - )) - - @defer.inlineCallbacks def get_room_members(self, room_id, membership=None): """Retrieve the current room member list for a room. @@ -79,17 +88,12 @@ class RoomMemberStore(SQLBaseStore): Returns: list of namedtuples representing the members in this room. """ - query = RoomMemberTable.select_statement( - "id IN (SELECT MAX(id) FROM " + RoomMemberTable.table_name - + " WHERE room_id = ? GROUP BY user_id)" - ) - res = yield self._execute( - RoomMemberTable.decode_results, query, room_id, - ) - # strip memberships which don't match + + where = {"m.room_id": room_id} if membership: - res = [entry for entry in res if entry.membership == membership] - defer.returnValue(res) + where["m.membership"] = membership + + return self._get_members_by_dict(where) def get_rooms_for_user_where_membership_is(self, user_id, membership_list): """ Get all the rooms for this user where the membership for this user @@ -106,67 +110,37 @@ class RoomMemberStore(SQLBaseStore): return defer.succeed(None) args = [user_id] - membership_placeholder = ["membership=?"] * len(membership_list) - where_membership = "(" + " OR ".join(membership_placeholder) + ")" - for membership in membership_list: - args.append(membership) - - query = ("SELECT room_id, membership FROM room_memberships" - + " WHERE user_id=? AND " + where_membership - + " GROUP BY room_id ORDER BY id DESC") - return self._execute( - self.cursor_to_dict, query, *args - ) + args.extend(membership_list) - @defer.inlineCallbacks - def get_joined_hosts_for_room(self, room_id): - query = RoomMemberTable.select_statement( - "id IN (SELECT MAX(id) FROM " + RoomMemberTable.table_name - + " WHERE room_id = ? GROUP BY user_id)" - ) - - res = yield self._execute( - RoomMemberTable.decode_results, query, room_id, + where_clause = "user_id = ? AND (%s)" % ( + " OR ".join(["membership = ?" for _ in membership_list]), ) - def host_from_user_id_string(user_id): - domain = UserID.from_string(entry.user_id, self.hs).domain - return domain - - # strip memberships which don't match - hosts = [ - host_from_user_id_string(entry.user_id) - for entry in res - if entry.membership == Membership.JOIN - ] + return self._get_members_query(where_clause, args) - logger.debug("Returning hosts: %s from results: %s", hosts, res) - - defer.returnValue(hosts) - - def get_max_room_member_id(self): - return self._simple_max_id(RoomMemberTable.table_name) - - -class RoomMemberTable(Table): - table_name = "room_memberships" - - fields = [ - "id", - "user_id", - "sender", - "room_id", - "membership", - "content" - ] + def get_joined_hosts_for_room(self, room_id): + return self._simple_select_onecol( + "room_hosts", + {"room_id": room_id}, + "host" + ) - class EntryType(collections.namedtuple("RoomMemberEntry", fields)): + def _get_members_by_dict(self, where_dict): + clause = " AND ".join("%s = ?" % k for k in where_dict.keys()) + vals = where_dict.values() + return self._get_members_query(clause, vals) - def as_event(self, event_factory): - return event_factory.create_event( - etype=RoomMemberEvent.TYPE, - room_id=self.room_id, - target_user_id=self.user_id, - user_id=self.sender, - content=json.loads(self.content), - ) + @defer.inlineCallbacks + def _get_members_query(self, where_clause, where_values): + sql = ( + "SELECT e.* FROM events as e " + "INNER JOIN room_memberships as m " + "ON e.event_id = m.event_id " + "INNER JOIN current_state_events as c " + "ON m.event_id = c.event_id " + "WHERE %s " + ) % (where_clause,) + + rows = yield self._execute_and_decode(sql, *where_values) + results = [self._parse_event_from_row(r) for r in rows] + defer.returnValue(results) diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index 77096546b2..9a0f2145d5 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -12,43 +12,64 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -CREATE TABLE IF NOT EXISTS rooms( - room_id TEXT PRIMARY KEY NOT NULL, - is_public INTEGER, - creator TEXT + +CREATE TABLE IF NOT EXISTS events( + ordering INTEGER PRIMARY KEY AUTOINCREMENT, + event_id TEXT NOT NULL, + type TEXT NOT NULL, + room_id TEXT, + content TEXT, + unrecognized_keys TEXT ); -CREATE TABLE IF NOT EXISTS room_memberships( - id INTEGER PRIMARY KEY AUTOINCREMENT, - user_id TEXT NOT NULL, -- no foreign key to users table, it could be an id belonging to another home server - sender TEXT NOT NULL, +CREATE TABLE IF NOT EXISTS state_events( + event_id TEXT NOT NULL, room_id TEXT NOT NULL, - membership TEXT NOT NULL, - content TEXT NOT NULL + type TEXT NOT NULL, + state_key TEXT NOT NULL, + prev_state TEXT ); -CREATE TABLE IF NOT EXISTS messages( - id INTEGER PRIMARY KEY AUTOINCREMENT, - user_id TEXT, - room_id TEXT, - msg_id TEXT, - content TEXT +CREATE TABLE IF NOT EXISTS current_state_events( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS room_memberships( + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + sender TEXT NOT NULL, + room_id TEXT NOT NULL, + membership TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS feedback( - id INTEGER PRIMARY KEY AUTOINCREMENT, - content TEXT, + event_id TEXT NOT NULL, feedback_type TEXT, - fb_sender_id TEXT, - msg_id TEXT, - room_id TEXT, - msg_sender_id TEXT + target_event_id TEXT, + sender TEXT, + room_id TEXT ); -CREATE TABLE IF NOT EXISTS room_data( - id INTEGER PRIMARY KEY AUTOINCREMENT, +CREATE TABLE IF NOT EXISTS topics( + event_id TEXT NOT NULL, room_id TEXT NOT NULL, - type TEXT NOT NULL, - state_key TEXT NOT NULL, - content TEXT + topic TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS room_names( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + name TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS rooms( + room_id TEXT PRIMARY KEY NOT NULL, + is_public INTEGER, + creator TEXT +); + +CREATE TABLE IF NOT EXISTS room_hosts( + room_id TEXT NOT NULL, + host TEXT NOT NULL ); diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 47a1f2c45a..9937239c22 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -13,267 +13,59 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer from ._base import SQLBaseStore -from .message import MessagesTable -from .feedback import FeedbackTable -from .roomdata import RoomDataTable -from .roommember import RoomMemberTable + +from synapse.api.constants import Membership import json import logging -logger = logging.getLogger(__name__) - - -class StreamStore(SQLBaseStore): - - def get_message_stream(self, user_id, from_key, to_key, room_id, limit=0, - with_feedback=False): - """Get all messages for this user between the given keys. - - Args: - user_id (str): The user who is requesting messages. - from_key (int): The ID to start returning results from (exclusive). - to_key (int): The ID to stop returning results (exclusive). - room_id (str): Gets messages only for this room. Can be None, in - which case all room messages will be returned. - Returns: - A tuple of rows (list of namedtuples), new_id(int) - """ - if with_feedback and room_id: # with fb MUST specify a room ID - return self._db_pool.runInteraction( - self._get_message_rows_with_feedback, - user_id, from_key, to_key, room_id, limit - ) - else: - return self._db_pool.runInteraction( - self._get_message_rows, - user_id, from_key, to_key, room_id, limit - ) - - def _get_message_rows(self, txn, user_id, from_pkey, to_pkey, room_id, - limit): - # work out which rooms this user is joined in on and join them with - # the room id on the messages table, bounded by the specified pkeys - - # get all messages where the *current* membership state is 'join' for - # this user in that room. - query = ("SELECT messages.* FROM messages WHERE ? IN" - + " (SELECT membership from room_memberships WHERE user_id=?" - + " AND room_id = messages.room_id ORDER BY id DESC LIMIT 1)") - query_args = ["join", user_id] - - if room_id: - query += " AND messages.room_id=?" - query_args.append(room_id) - - (query, query_args) = self._append_stream_operations( - "messages", query, query_args, from_pkey, to_pkey, limit=limit - ) - - cursor = txn.execute(query, query_args) - return self._as_events(cursor, MessagesTable, from_pkey) - - def _get_message_rows_with_feedback(self, txn, user_id, from_pkey, to_pkey, - room_id, limit): - # this col represents the compressed feedback JSON as per spec - compressed_feedback_col = ( - "'[' || group_concat('{\"sender_id\":\"' || f.fb_sender_id" - + " || '\",\"feedback_type\":\"' || f.feedback_type" - + " || '\",\"content\":' || f.content || '}') || ']'" - ) - - global_msg_id_join = ("f.room_id = messages.room_id" - + " and f.msg_id = messages.msg_id" - + " and messages.user_id = f.msg_sender_id") - - select_query = ( - "SELECT messages.*, f.content AS fb_content, f.fb_sender_id" - + ", " + compressed_feedback_col + " AS compressed_fb" - + " FROM messages LEFT JOIN feedback f ON " + global_msg_id_join) - - current_membership_sub_query = ( - "(SELECT membership from room_memberships rm" - + " WHERE user_id=? AND room_id = rm.room_id" - + " ORDER BY id DESC LIMIT 1)") - - where = (" WHERE ? IN " + current_membership_sub_query - + " AND messages.room_id=?") - - query = select_query + where - query_args = ["join", user_id, room_id] - - (query, query_args) = self._append_stream_operations( - "messages", query, query_args, from_pkey, to_pkey, - limit=limit, group_by=" GROUP BY messages.id " - ) - - cursor = txn.execute(query, query_args) - # convert the result set into events - entries = self.cursor_to_dict(cursor) - events = [] - for entry in entries: - # TODO we should spec the cursor > event mapping somewhere else. - event = {} - straight_mappings = ["msg_id", "user_id", "room_id"] - for key in straight_mappings: - event[key] = entry[key] - event["content"] = json.loads(entry["content"]) - if entry["compressed_fb"]: - event["feedback"] = json.loads(entry["compressed_fb"]) - events.append(event) - - latest_pkey = from_pkey if len(entries) == 0 else entries[-1]["id"] - - return (events, latest_pkey) - - def get_room_member_stream(self, user_id, from_key, to_key): - """Get all room membership events for this user between the given keys. - - Args: - user_id (str): The user who is requesting membership events. - from_key (int): The ID to start returning results from (exclusive). - to_key (int): The ID to stop returning results (exclusive). - Returns: - A tuple of rows (list of namedtuples), new_id(int) - """ - return self._db_pool.runInteraction( - self._get_room_member_rows, user_id, from_key, to_key - ) - - def _get_room_member_rows(self, txn, user_id, from_pkey, to_pkey): - # get all room membership events for rooms which the user is - # *currently* joined in on, or all invite events for this user. - current_membership_sub_query = ( - "(SELECT membership FROM room_memberships" - + " WHERE user_id=? AND room_id = rm.room_id" - + " ORDER BY id DESC LIMIT 1)") - - query = ("SELECT rm.* FROM room_memberships rm " - # all membership events for rooms you've currently joined. - + " WHERE (? IN " + current_membership_sub_query - # all invite membership events for this user - + " OR rm.membership=? AND user_id=?)" - + " AND rm.id > ?") - query_args = ["join", user_id, "invite", user_id, from_pkey] - - if to_pkey != -1: - query += " AND rm.id < ?" - query_args.append(to_pkey) +logger = logging.getLogger(__name__) - cursor = txn.execute(query, query_args) - return self._as_events(cursor, RoomMemberTable, from_pkey) - def get_feedback_stream(self, user_id, from_key, to_key, room_id, limit=0): - return self._db_pool.runInteraction( - self._get_feedback_rows, - user_id, from_key, to_key, room_id, limit - ) +MAX_STREAM_SIZE = 1000 - def _get_feedback_rows(self, txn, user_id, from_pkey, to_pkey, room_id, - limit): - # work out which rooms this user is joined in on and join them with - # the room id on the feedback table, bounded by the specified pkeys - # get all messages where the *current* membership state is 'join' for - # this user in that room. - query = ( - "SELECT feedback.* FROM feedback WHERE ? IN " - + "(SELECT membership from room_memberships WHERE user_id=?" - + " AND room_id = feedback.room_id ORDER BY id DESC LIMIT 1)") - query_args = ["join", user_id] +class StreamStore(SQLBaseStore): - if room_id: - query += " AND feedback.room_id=?" - query_args.append(room_id) + @defer.inlineCallbacks + def get_room_events_stream(self, user_id, from_key, to_key, room_id, + limit=0, with_feedback=False): - (query, query_args) = self._append_stream_operations( - "feedback", query, query_args, from_pkey, to_pkey, limit=limit + current_room_membership_sql = ( + "SELECT m.room_id FROM room_memberships as m " + "INNER JOIN current_state_events as c ON m.event_id = c.event_id " + "WHERE m.user_id = ?" ) - cursor = txn.execute(query, query_args) - return self._as_events(cursor, FeedbackTable, from_pkey) - - def get_room_data_stream(self, user_id, from_key, to_key, room_id, - limit=0): - return self._db_pool.runInteraction( - self._get_room_data_rows, - user_id, from_key, to_key, room_id, limit + invites_sql = ( + "SELECT m.event_id FROM room_membershipas as m " + "INNER JOIN current_state_events as c ON m.event_id = c.event_id " + "WHERE m.user_id = ? AND m.membership = ?" ) - def _get_room_data_rows(self, txn, user_id, from_pkey, to_pkey, room_id, - limit): - # work out which rooms this user is joined in on and join them with - # the room id on the feedback table, bounded by the specified pkeys - - # get all messages where the *current* membership state is 'join' for - # this user in that room. - query = ( - "SELECT room_data.* FROM room_data WHERE ? IN " - + "(SELECT membership from room_memberships WHERE user_id=?" - + " AND room_id = room_data.room_id ORDER BY id DESC LIMIT 1)") - query_args = ["join", user_id] - - if room_id: - query += " AND room_data.room_id=?" - query_args.append(room_id) - - (query, query_args) = self._append_stream_operations( - "room_data", query, query_args, from_pkey, to_pkey, limit=limit + if limit: + limit = max(limit, MAX_STREAM_SIZE) + else: + limit = 1000 + + sql = ( + "SELECT * FROM events as e WHERE " + "(room_id IN (%(current)s)) OR " + "(event_id IN (%(invites)s)) " + "ORDER BY ordering ASC LIMIT %(limit)d" + ) % { + "current": current_room_membership_sql, + "invites": invites_sql, + "limit": limit, + } + + rows = yield self._execute_and_decode( + sql, + user_id, user_id, Membership.INVITE ) - cursor = txn.execute(query, query_args) - return self._as_events(cursor, RoomDataTable, from_pkey) - - def _append_stream_operations(self, table_name, query, query_args, - from_pkey, to_pkey, limit=None, - group_by=""): - LATEST_ROW = -1 - order_by = "" - if to_pkey > from_pkey: - if from_pkey != LATEST_ROW: - # e.g. from=5 to=9 >> from 5 to 9 >> id>5 AND id<9 - query += (" AND %s.id > ? AND %s.id < ?" % - (table_name, table_name)) - query_args.append(from_pkey) - query_args.append(to_pkey) - else: - # e.g. from=-1 to=5 >> from now to 5 >> id>5 ORDER BY id DESC - query += " AND %s.id > ? " % table_name - order_by = "ORDER BY id DESC" - query_args.append(to_pkey) - elif from_pkey > to_pkey: - if to_pkey != LATEST_ROW: - # from=9 to=5 >> from 9 to 5 >> id>5 AND id<9 ORDER BY id DESC - query += (" AND %s.id > ? AND %s.id < ? " % - (table_name, table_name)) - order_by = "ORDER BY id DESC" - query_args.append(to_pkey) - query_args.append(from_pkey) - else: - # from=5 to=-1 >> from 5 to now >> id>5 - query += " AND %s.id > ?" % table_name - query_args.append(from_pkey) - - query += group_by + order_by - - if limit and limit > 0: - query += " LIMIT ?" - query_args.append(str(limit)) - - return (query, query_args) - - def _as_events(self, cursor, table, from_pkey): - data_entries = table.decode_results(cursor) - last_pkey = from_pkey - if data_entries: - last_pkey = data_entries[-1].id - - events = [ - entry.as_event(self.event_factory).get_dict() - for entry in data_entries - ] - - return (events, last_pkey) + defer.returnValue([self._parse_event_from_row(r) for r in rows]) |