diff --git a/synapse/api/events/factory.py b/synapse/api/events/factory.py
index 12aa04fc6e..b61dac7acd 100644
--- a/synapse/api/events/factory.py
+++ b/synapse/api/events/factory.py
@@ -15,7 +15,7 @@
from synapse.api.events.room import (
RoomTopicEvent, MessageEvent, RoomMemberEvent, FeedbackEvent,
- InviteJoinEvent, RoomConfigEvent
+ InviteJoinEvent, RoomConfigEvent, RoomNameEvent, GenericEvent,
)
from synapse.util.stringutils import random_string
@@ -25,6 +25,7 @@ class EventFactory(object):
_event_classes = [
RoomTopicEvent,
+ RoomNameEvent,
MessageEvent,
RoomMemberEvent,
FeedbackEvent,
@@ -42,10 +43,9 @@ class EventFactory(object):
if "event_id" not in kwargs:
kwargs["event_id"] = random_string(10)
- try:
+ if etype in self._event_list:
handler = self._event_list[etype]
- except KeyError: # unknown event type
- # TODO allow custom event types.
- raise NotImplementedError("Unknown etype=%s" % etype)
+ else:
+ handler = GenericEvent
return handler(**kwargs)
diff --git a/synapse/api/events/room.py b/synapse/api/events/room.py
index f3df849af2..dbf537fb88 100644
--- a/synapse/api/events/room.py
+++ b/synapse/api/events/room.py
@@ -16,17 +16,45 @@
from . import SynapseEvent
+class GenericEvent(SynapseEvent):
+ def get_content_template(self):
+ return {}
+
+
class RoomTopicEvent(SynapseEvent):
TYPE = "m.room.topic"
+ internal_keys = SynapseEvent.internal_keys + [
+ "topic",
+ ]
+
def __init__(self, **kwargs):
kwargs["state_key"] = ""
+ if "topic" in kwargs["content"]:
+ kwargs["topic"] = kwargs["content"]["topic"]
super(RoomTopicEvent, self).__init__(**kwargs)
def get_content_template(self):
return {"topic": u"string"}
+class RoomNameEvent(SynapseEvent):
+ TYPE = "m.room.name"
+
+ internal_keys = SynapseEvent.internal_keys + [
+ "name",
+ ]
+
+ def __init__(self, **kwargs):
+ kwargs["state_key"] = ""
+ if "name" in kwargs["content"]:
+ kwargs["name"] = kwargs["content"]["name"]
+ super(RoomNameEvent, self).__init__(**kwargs)
+
+ def get_content_template(self):
+ return {"name": u"string"}
+
+
class RoomMemberEvent(SynapseEvent):
TYPE = "m.room.member"
diff --git a/synapse/api/notifier.py b/synapse/api/notifier.py
index 65b5a4ebb3..9f622df6bb 100644
--- a/synapse/api/notifier.py
+++ b/synapse/api/notifier.py
@@ -15,6 +15,7 @@
from synapse.api.constants import Membership
from synapse.api.events.room import RoomMemberEvent
+from synapse.api.streams.event import EventsStreamData
from twisted.internet import defer
from twisted.internet import reactor
@@ -66,7 +67,7 @@ class Notifier(object):
self._notify_and_callback(
user_id=user_id,
event_data=event.get_dict(),
- stream_type=event.type,
+ stream_type=EventsStreamData.EVENT_TYPE,
store_id=store_id)
def on_new_user_event(self, user_id, event_data, stream_type, store_id):
diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py
index 4b6d739e54..895a96b5b9 100644
--- a/synapse/api/streams/event.py
+++ b/synapse/api/streams/event.py
@@ -28,17 +28,17 @@ import logging
logger = logging.getLogger(__name__)
-class MessagesStreamData(StreamData):
- EVENT_TYPE = MessageEvent.TYPE
+class EventsStreamData(StreamData):
+ EVENT_TYPE = "EventsStream"
def __init__(self, hs, room_id=None, feedback=False):
- super(MessagesStreamData, self).__init__(hs)
+ super(EventsStreamData, self).__init__(hs)
self.room_id = room_id
self.with_feedback = feedback
@defer.inlineCallbacks
def get_rows(self, user_id, from_key, to_key, limit):
- (data, latest_ver) = yield self.store.get_message_stream(
+ data, latest_ver = yield self.store.get_room_events_stream(
user_id=user_id,
from_key=from_key,
to_key=to_key,
@@ -50,74 +50,7 @@ class MessagesStreamData(StreamData):
@defer.inlineCallbacks
def max_token(self):
- val = yield self.store.get_max_message_id()
- defer.returnValue(val)
-
-
-class RoomMemberStreamData(StreamData):
- EVENT_TYPE = RoomMemberEvent.TYPE
-
- @defer.inlineCallbacks
- def get_rows(self, user_id, from_key, to_key, limit):
- (data, latest_ver) = yield self.store.get_room_member_stream(
- user_id=user_id,
- from_key=from_key,
- to_key=to_key
- )
-
- defer.returnValue((data, latest_ver))
-
- @defer.inlineCallbacks
- def max_token(self):
- val = yield self.store.get_max_room_member_id()
- defer.returnValue(val)
-
-
-class FeedbackStreamData(StreamData):
- EVENT_TYPE = FeedbackEvent.TYPE
-
- def __init__(self, hs, room_id=None):
- super(FeedbackStreamData, self).__init__(hs)
- self.room_id = room_id
-
- @defer.inlineCallbacks
- def get_rows(self, user_id, from_key, to_key, limit):
- (data, latest_ver) = yield self.store.get_feedback_stream(
- user_id=user_id,
- from_key=from_key,
- to_key=to_key,
- limit=limit,
- room_id=self.room_id
- )
- defer.returnValue((data, latest_ver))
-
- @defer.inlineCallbacks
- def max_token(self):
- val = yield self.store.get_max_feedback_id()
- defer.returnValue(val)
-
-
-class RoomDataStreamData(StreamData):
- EVENT_TYPE = RoomTopicEvent.TYPE # TODO need multiple event types
-
- def __init__(self, hs, room_id=None):
- super(RoomDataStreamData, self).__init__(hs)
- self.room_id = room_id
-
- @defer.inlineCallbacks
- def get_rows(self, user_id, from_key, to_key, limit):
- (data, latest_ver) = yield self.store.get_room_data_stream(
- user_id=user_id,
- from_key=from_key,
- to_key=to_key,
- limit=limit,
- room_id=self.room_id
- )
- defer.returnValue((data, latest_ver))
-
- @defer.inlineCallbacks
- def max_token(self):
- val = yield self.store.get_max_room_data_id()
+ val = yield self.store.get_room_events_max_id()
defer.returnValue(val)
@@ -227,7 +160,7 @@ class EventStream(PaginationStream):
self.user_id, from_pkey, to_pkey, limit
)
- chunk += event_chunk
+ chunk += [e.get_dict() for e in event_chunk]
next_ver.append(str(max_pkey))
defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver)))
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 3af7d824a2..6bb797caf2 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -17,8 +17,7 @@ from twisted.internet import defer
from ._base import BaseHandler
from synapse.api.streams.event import (
- EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData,
- RoomDataStreamData
+ EventStream, EventsStreamData
)
from synapse.handlers.presence import PresenceStreamData
@@ -26,10 +25,7 @@ from synapse.handlers.presence import PresenceStreamData
class EventStreamHandler(BaseHandler):
stream_data_classes = [
- MessagesStreamData,
- RoomMemberStreamData,
- FeedbackStreamData,
- RoomDataStreamData,
+ EventsStreamData,
PresenceStreamData,
]
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 5d0379254b..14ffddc630 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -23,7 +23,7 @@ from synapse.api.events.room import (
RoomTopicEvent, MessageEvent, InviteJoinEvent, RoomMemberEvent,
RoomConfigEvent
)
-from synapse.api.streams.event import EventStream, MessagesStreamData
+from synapse.api.streams.event import EventStream, EventsStreamData
from synapse.util import stringutils
from ._base import BaseHandler
@@ -114,8 +114,9 @@ class MessageHandler(BaseHandler):
"""
yield self.auth.check_joined_room(room_id, user_id)
- data_source = [MessagesStreamData(self.hs, room_id=room_id,
- feedback=feedback)]
+ data_source = [
+ EventsStreamData(self.hs, room_id=room_id, feedback=feedback)
+ ]
event_stream = EventStream(user_id, data_source)
pagin_config = yield event_stream.fix_tokens(pagin_config)
data_chunk = yield event_stream.get_chunk(config=pagin_config)
@@ -141,12 +142,7 @@ class MessageHandler(BaseHandler):
yield self.state_handler.handle_new_event(event)
# store in db
- store_id = yield self.store.store_room_data(
- room_id=event.room_id,
- etype=event.type,
- state_key=event.state_key,
- content=json.dumps(event.content)
- )
+ store_id = yield self.store.persist_event(event)
event.destinations = yield self.store.get_joined_hosts_for_room(
event.room_id
@@ -201,19 +197,17 @@ class MessageHandler(BaseHandler):
raise RoomError(
403, "Member does not meet private room rules.")
- data = yield self.store.get_room_data(room_id, event_type, state_key)
+ data = yield self.store.get_current_state(
+ room_id, event_type, state_key
+ )
defer.returnValue(data)
@defer.inlineCallbacks
- def get_feedback(self, room_id=None, msg_sender_id=None, msg_id=None,
- user_id=None, fb_sender_id=None, fb_type=None):
- yield self.auth.check_joined_room(room_id, user_id)
+ def get_feedback(self, event_id):
+ # yield self.auth.check_joined_room(room_id, user_id)
# Pull out the feedback from the db
- fb = yield self.store.get_feedback(
- room_id=room_id, msg_id=msg_id, msg_sender_id=msg_sender_id,
- fb_sender_id=fb_sender_id, fb_type=fb_type
- )
+ fb = yield self.store.get_feedback(event_id)
if fb:
defer.returnValue(fb)
@@ -260,20 +254,30 @@ class MessageHandler(BaseHandler):
user_id=user_id,
membership_list=[Membership.INVITE, Membership.JOIN]
)
- for room_info in room_list:
- if room_info["membership"] != Membership.JOIN:
+
+ ret = []
+
+ for event in room_list:
+ d = {
+ "room_id": event.room_id,
+ "membership": event.membership,
+ }
+ ret.append(d)
+
+ if event.membership != Membership.JOIN:
continue
try:
- event_chunk = yield self.get_messages(
- user_id=user_id,
- pagin_config=pagin_config,
- feedback=feedback,
- room_id=room_info["room_id"]
+ messages = yield self.store.get_recent_events_for_room(
+ event.room_id,
+ limit=50,
)
- room_info["messages"] = event_chunk
+ d["messages"] = [m.get_dict() for m in messages]
except:
pass
- defer.returnValue(room_list)
+
+ logger.debug("snapshot_all_rooms returning: %s", ret)
+
+ defer.returnValue(ret)
class RoomCreationHandler(BaseHandler):
@@ -451,7 +455,7 @@ class RoomMemberHandler(BaseHandler):
member_list = yield self.store.get_room_members(room_id=room_id)
event_list = [
- entry.as_event(self.event_factory).get_dict()
+ entry.get_dict()
for entry in member_list
]
chunk_data = {
@@ -495,7 +499,7 @@ class RoomMemberHandler(BaseHandler):
SynapseError if there was a problem changing the membership.
"""
- #broadcast_msg = False
+ # broadcast_msg = False
prev_state = yield self.store.get_room_member(
event.target_user_id, event.room_id
@@ -569,7 +573,8 @@ class RoomMemberHandler(BaseHandler):
defer.returnValue({"room_id": room_id})
@defer.inlineCallbacks
- def _do_join(self, event, room_host=None, do_auth=True, broadcast_msg=True):
+ def _do_join(self, event, room_host=None, do_auth=True,
+ broadcast_msg=True):
joinee = self.hs.parse_userid(event.target_user_id)
# room_id = RoomID.from_string(event.room_id, self.hs)
room_id = event.room_id
@@ -598,7 +603,7 @@ class RoomMemberHandler(BaseHandler):
if prev_state and prev_state.membership == Membership.INVITE:
room = yield self.store.get_room(room_id)
inviter = UserID.from_string(
- prev_state.sender, self.hs
+ prev_state.user_id, self.hs
)
should_do_dance = not inviter.is_mine and not room
@@ -620,7 +625,6 @@ class RoomMemberHandler(BaseHandler):
broadcast_msg=broadcast_msg,
)
-
if should_do_dance:
yield self._do_invite_join_dance(
room_id=room_id,
@@ -694,18 +698,12 @@ class RoomMemberHandler(BaseHandler):
user_id=user.to_string(), membership_list=membership_list
)
- defer.returnValue([r["room_id"] for r in rooms])
+ defer.returnValue([r.room_id for r in rooms])
@defer.inlineCallbacks
def _do_local_membership_update(self, event, membership, broadcast_msg):
# store membership
- store_id = yield self.store.store_room_member(
- user_id=event.target_user_id,
- sender=event.user_id,
- room_id=event.room_id,
- content=event.content,
- membership=membership
- )
+ store_id = yield self.store.persist_event(event)
# Send a PDU to all hosts who have joined the room.
destinations = yield self.store.get_joined_hosts_for_room(
@@ -760,7 +758,7 @@ class RoomMemberHandler(BaseHandler):
room_id, "", is_public=False
)
- #yield self.state_handler.handle_new_event(event)
+ # yield self.state_handler.handle_new_event(event)
yield federation.handle_new_event(new_event)
yield federation.get_state_for_room(
target_host, room_id
@@ -805,5 +803,5 @@ class RoomListHandler(BaseHandler):
@defer.inlineCallbacks
def get_public_room_list(self):
- chunk = yield self.store.get_rooms(is_public=True, with_topics=True)
+ chunk = yield self.store.get_rooms(is_public=True)
defer.returnValue({"start": "START", "end": "END", "chunk": chunk})
diff --git a/synapse/rest/room.py b/synapse/rest/room.py
index 799dd58b2d..303759b718 100644
--- a/synapse/rest/room.py
+++ b/synapse/rest/room.py
@@ -285,25 +285,28 @@ class FeedbackRestServlet(RestServlet):
feedback_type):
user = yield (self.auth.get_user_by_req(request))
- if feedback_type not in Feedback.LIST:
- raise SynapseError(400, "Bad feedback type.",
- errcode=Codes.BAD_JSON)
-
- msg_handler = self.handlers.message_handler
- feedback = yield msg_handler.get_feedback(
- room_id=urllib.unquote(room_id),
- msg_sender_id=msg_sender_id,
- msg_id=msg_id,
- user_id=user.to_string(),
- fb_sender_id=fb_sender_id,
- fb_type=feedback_type
- )
-
- if not feedback:
- raise SynapseError(404, "Feedback not found.",
- errcode=Codes.NOT_FOUND)
+ # TODO (erikj): Implement this?
+ raise NotImplementedError("Getting feedback is not supported")
- defer.returnValue((200, json.loads(feedback.content)))
+# if feedback_type not in Feedback.LIST:
+# raise SynapseError(400, "Bad feedback type.",
+# errcode=Codes.BAD_JSON)
+#
+# msg_handler = self.handlers.message_handler
+# feedback = yield msg_handler.get_feedback(
+# room_id=urllib.unquote(room_id),
+# msg_sender_id=msg_sender_id,
+# msg_id=msg_id,
+# user_id=user.to_string(),
+# fb_sender_id=fb_sender_id,
+# fb_type=feedback_type
+# )
+#
+# if not feedback:
+# raise SynapseError(404, "Feedback not found.",
+# errcode=Codes.NOT_FOUND)
+#
+# defer.returnValue((200, json.loads(feedback.content)))
@defer.inlineCallbacks
def on_PUT(self, request, room_id, sender_id, msg_id, fb_sender_id,
diff --git a/synapse/state.py b/synapse/state.py
index 4f8b4d9760..ca8e1ca630 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -86,7 +86,7 @@ class StateHandler(object):
else:
event.depth = 0
- current_state = yield self.store.get_current_state(
+ current_state = yield self.store.get_current_state_pdu(
key.context, key.type, key.state_key
)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 5d5b5f7c44..841ad8f132 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -13,21 +13,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from twisted.internet import defer
from synapse.api.events.room import (
RoomMemberEvent, MessageEvent, RoomTopicEvent, FeedbackEvent,
- RoomConfigEvent
+ RoomConfigEvent, RoomNameEvent,
)
from .directory import DirectoryStore
from .feedback import FeedbackStore
-from .message import MessageStore
from .presence import PresenceStore
from .profile import ProfileStore
from .registration import RegistrationStore
from .room import RoomStore
from .roommember import RoomMemberStore
-from .roomdata import RoomDataStore
from .stream import StreamStore
from .pdu import StatePduStore, PduStore
from .transactions import TransactionStore
@@ -36,7 +35,7 @@ import json
import os
-class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore,
+class DataStore(RoomMemberStore, RoomStore,
RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
PresenceStore, PduStore, StatePduStore, TransactionStore,
DirectoryStore):
@@ -45,50 +44,102 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore,
super(DataStore, self).__init__(hs)
self.event_factory = hs.get_event_factory()
+ @defer.inlineCallbacks
def persist_event(self, event):
- if event.type == MessageEvent.TYPE:
- return self.store_message(
- user_id=event.user_id,
- room_id=event.room_id,
- msg_id=event.msg_id,
- content=json.dumps(event.content)
- )
- elif event.type == RoomMemberEvent.TYPE:
- return self.store_room_member(
- user_id=event.target_user_id,
- sender=event.user_id,
- room_id=event.room_id,
- content=event.content,
- membership=event.content["membership"]
- )
+ if event.type == RoomMemberEvent.TYPE:
+ yield self._store_room_member(event)
elif event.type == FeedbackEvent.TYPE:
- return self.store_feedback(
- room_id=event.room_id,
- msg_id=event.msg_id,
- msg_sender_id=event.msg_sender_id,
- fb_sender_id=event.user_id,
- fb_type=event.feedback_type,
- content=json.dumps(event.content)
- )
+ yield self._store_feedback(event)
+# elif event.type == RoomConfigEvent.TYPE:
+# yield self._store_room_config(event)
+ elif event.type == RoomNameEvent.TYPE:
+ yield self._store_room_name(event)
elif event.type == RoomTopicEvent.TYPE:
- return self.store_room_data(
- room_id=event.room_id,
- etype=event.type,
- state_key=event.state_key,
- content=json.dumps(event.content)
+ yield self._store_room_topic(event)
+
+ ret = yield self._store_event(event)
+ defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def get_event(self, event_id):
+ events_dict = yield self._simple_select_one(
+ "events",
+ {"event_id": event_id},
+ [
+ "event_id",
+ "type",
+ "sender",
+ "room_id",
+ "content",
+ "unrecognized_keys"
+ ],
+ )
+
+ event = self._parse_event_from_row(events_dict)
+ defer.returnValue(event)
+
+ @defer.inlineCallbacks
+ def _store_event(self, event):
+ vals = {
+ "event_id": event.event_id,
+ "type": event.type,
+ "room_id": event.room_id,
+ "content": json.dumps(event.content),
+ }
+
+ unrec = {
+ k: v
+ for k, v in event.get_full_dict().items()
+ if k not in vals.keys()
+ }
+ vals["unrecognized_keys"] = json.dumps(unrec)
+
+ yield self._simple_insert("events", vals)
+
+ if hasattr(event, "state_key"):
+ vals = {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ }
+
+ if hasattr(event, "prev_state"):
+ vals["prev_state"] = event.prev_state
+
+ yield self._simple_insert("state_events", vals)
+
+ yield self._simple_insert(
+ "current_state_events",
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ }
)
- elif event.type == RoomConfigEvent.TYPE:
- if "visibility" in event.content:
- visibility = event.content["visibility"]
- return self.store_room_config(
- room_id=event.room_id,
- visibility=visibility
- )
+ latest = yield self.get_room_events_max_id()
+ defer.returnValue(latest)
+
+ @defer.inlineCallbacks
+ def get_current_state(self, room_id, event_type=None, state_key=""):
+ sql = (
+ "SELECT e.* FROM events as e "
+ "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
+ "INNER JOIN state_events as s ON e.event_id = s.event_id "
+ "WHERE c.room_id = ? "
+ )
+
+ if event_type:
+ sql += " AND s.type = ? AND s.state_key = ? "
+ args = (room_id, event_type, state_key)
else:
- raise NotImplementedError(
- "Don't know how to persist type=%s" % event.type
- )
+ args = (room_id, )
+
+ results = yield self._execute_and_decode(sql, *args)
+
+ defer.returnValue([self._parse_event_from_row(r) for r in results])
def schema_path(schema):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index bf1800f4bf..36cc57c1b8 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import logging
from twisted.internet import defer
@@ -20,6 +19,9 @@ from twisted.internet import defer
from synapse.api.errors import StoreError
import collections
+import copy
+import json
+
logger = logging.getLogger(__name__)
@@ -29,6 +31,7 @@ class SQLBaseStore(object):
def __init__(self, hs):
self.hs = hs
self._db_pool = hs.get_db_pool()
+ self.event_factory = hs.get_event_factory()
self._clock = hs.get_clock()
def cursor_to_dict(self, cursor):
@@ -57,14 +60,22 @@ class SQLBaseStore(object):
The result of decoder(results)
"""
logger.debug(
- "[SQL] %s Args=%s Func=%s", query, args, decoder.__name__
+ "[SQL] %s Args=%s Func=%s",
+ query, args, decoder.__name__ if decoder else None
)
def interaction(txn):
cursor = txn.execute(query, args)
- return decoder(cursor)
+ if decoder:
+ return decoder(cursor)
+ else:
+ return cursor.fetchall()
+
return self._db_pool.runInteraction(interaction)
+ def _execute_and_decode(self, query, *args):
+ return self._execute(self.cursor_to_dict, query, *args)
+
# "Simple" SQL API methods that operate on a single table with no JOINs,
# no complex WHERE clauses, just a dict of values for columns.
@@ -281,6 +292,17 @@ class SQLBaseStore(object):
return self._db_pool.runInteraction(func)
+ def _parse_event_from_row(self, row_dict):
+ d = copy.deepcopy({k: v for k, v in row_dict.items() if v})
+ d.update(json.loads(row_dict["unrecognized_keys"]))
+ d["content"] = json.loads(d["content"])
+ del d["unrecognized_keys"]
+
+ return self.event_factory.create_event(
+ etype=d["type"],
+ **d
+ )
+
class Table(object):
""" A base class used to store information about a particular table.
diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py
index 9bd562c762..e60f98d1e1 100644
--- a/synapse/storage/feedback.py
+++ b/synapse/storage/feedback.py
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from twisted.internet import defer
+
from ._base import SQLBaseStore, Table
from synapse.api.events.room import FeedbackEvent
@@ -22,54 +24,28 @@ import json
class FeedbackStore(SQLBaseStore):
- def store_feedback(self, room_id, msg_id, msg_sender_id,
- fb_sender_id, fb_type, content):
- return self._simple_insert(FeedbackTable.table_name, dict(
- room_id=room_id,
- msg_id=msg_id,
- msg_sender_id=msg_sender_id,
- fb_sender_id=fb_sender_id,
- fb_type=fb_type,
- content=content,
- ))
-
- def get_feedback(self, room_id=None, msg_id=None, msg_sender_id=None,
- fb_sender_id=None, fb_type=None):
- query = FeedbackTable.select_statement(
- "msg_sender_id = ? AND room_id = ? AND msg_id = ? " +
- "AND fb_sender_id = ? AND feedback_type = ? " +
- "ORDER BY id DESC LIMIT 1")
- return self._execute(
- FeedbackTable.decode_single_result,
- query, msg_sender_id, room_id, msg_id, fb_sender_id, fb_type,
+ def _store_feedback(self, event):
+ return self._simple_insert("feedback", {
+ "event_id": event.event_id,
+ "feedback_type": event.feedback_type,
+ "room_id": event.room_id,
+ "target_event_id": event.target_event,
+ "sender": event.user_id,
+ })
+
+ @defer.inlineCallbacks
+ def get_feedback_for_event(self, event_id):
+ sql = (
+ "SELECT events.* FROM events INNER JOIN feedback "
+ "ON events.event_id = feedback.event_id "
+ "WHERE feedback.target_event_id = ? "
)
- def get_max_feedback_id(self):
- return self._simple_max_id(FeedbackTable.table_name)
-
-
-class FeedbackTable(Table):
- table_name = "feedback"
+ rows = yield self._execute_and_decode(sql, event_id)
- fields = [
- "id",
- "content",
- "feedback_type",
- "fb_sender_id",
- "msg_id",
- "room_id",
- "msg_sender_id"
- ]
-
- class EntryType(collections.namedtuple("FeedbackEntry", fields)):
-
- def as_event(self, event_factory):
- return event_factory.create_event(
- etype=FeedbackEvent.TYPE,
- room_id=self.room_id,
- msg_id=self.msg_id,
- msg_sender_id=self.msg_sender_id,
- user_id=self.fb_sender_id,
- feedback_type=self.feedback_type,
- content=json.loads(self.content),
- )
+ defer.returnValue(
+ [
+ self._parse_event_from_row(r)
+ for r in rows
+ ]
+ )
diff --git a/synapse/storage/message.py b/synapse/storage/message.py
deleted file mode 100644
index 7bb69c1384..0000000000
--- a/synapse/storage/message.py
+++ /dev/null
@@ -1,81 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014 matrix.org
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ._base import SQLBaseStore, Table
-from synapse.api.events.room import MessageEvent
-
-import collections
-import json
-
-
-class MessageStore(SQLBaseStore):
-
- def get_message(self, user_id, room_id, msg_id):
- """Get a message from the store.
-
- Args:
- user_id (str): The ID of the user who sent the message.
- room_id (str): The room the message was sent in.
- msg_id (str): The unique ID for this user/room combo.
- """
- query = MessagesTable.select_statement(
- "user_id = ? AND room_id = ? AND msg_id = ? " +
- "ORDER BY id DESC LIMIT 1")
- return self._execute(
- MessagesTable.decode_single_result,
- query, user_id, room_id, msg_id,
- )
-
- def store_message(self, user_id, room_id, msg_id, content):
- """Store a message in the store.
-
- Args:
- user_id (str): The ID of the user who sent the message.
- room_id (str): The room the message was sent in.
- msg_id (str): The unique ID for this user/room combo.
- content (str): The content of the message (JSON)
- """
- return self._simple_insert(MessagesTable.table_name, dict(
- user_id=user_id,
- room_id=room_id,
- msg_id=msg_id,
- content=content,
- ))
-
- def get_max_message_id(self):
- return self._simple_max_id(MessagesTable.table_name)
-
-
-class MessagesTable(Table):
- table_name = "messages"
-
- fields = [
- "id",
- "user_id",
- "room_id",
- "msg_id",
- "content"
- ]
-
- class EntryType(collections.namedtuple("MessageEntry", fields)):
-
- def as_event(self, event_factory):
- return event_factory.create_event(
- etype=MessageEvent.TYPE,
- room_id=self.room_id,
- user_id=self.user_id,
- msg_id=self.msg_id,
- content=json.loads(self.content),
- )
diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py
index 13adc581e1..a24ce7ab78 100644
--- a/synapse/storage/pdu.py
+++ b/synapse/storage/pdu.py
@@ -580,7 +580,7 @@ class StatePduStore(SQLBaseStore):
txn.execute(query, query_args)
- def get_current_state(self, context, pdu_type, state_key):
+ def get_current_state_pdu(self, context, pdu_type, state_key):
"""For a given context, pdu_type, state_key 3-tuple, return what is
currently considered the current state.
@@ -595,10 +595,10 @@ class StatePduStore(SQLBaseStore):
"""
return self._db_pool.runInteraction(
- self._get_current_state, context, pdu_type, state_key
+ self._get_current_state_pdu, context, pdu_type, state_key
)
- def _get_current_state(self, txn, context, pdu_type, state_key):
+ def _get_current_state_pdu(self, txn, context, pdu_type, state_key):
return self._get_current_interaction(txn, context, pdu_type, state_key)
def _get_current_interaction(self, txn, context, pdu_type, state_key):
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index a97162831b..22f2dcca45 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -76,49 +76,80 @@ class RoomStore(SQLBaseStore):
)
@defer.inlineCallbacks
- def get_rooms(self, is_public, with_topics):
+ def get_rooms(self, is_public):
"""Retrieve a list of all public rooms.
Args:
is_public (bool): True if the rooms returned should be public.
- with_topics (bool): True to include the current topic for the room
- in the response.
Returns:
- A list of room dicts containing at least a "room_id" key, and a
- "topic" key if one is set and with_topic=True.
+ A list of room dicts containing at least a "room_id" key, a
+ "topic" key if one is set, and a "name" key if one is set
"""
- room_data_type = RoomTopicEvent.TYPE
- public = 1 if is_public else 0
-
- latest_topic = ("SELECT max(room_data.id) FROM room_data WHERE "
- + "room_data.type = ? GROUP BY room_id")
-
- query = ("SELECT rooms.*, room_data.content, room_alias FROM rooms "
- + "LEFT JOIN "
- + "room_aliases ON room_aliases.room_id = rooms.room_id "
- + "LEFT JOIN "
- + "room_data ON rooms.room_id = room_data.room_id WHERE "
- + "(room_data.id IN (" + latest_topic + ") "
- + "OR room_data.id IS NULL) AND rooms.is_public = ?")
-
- res = yield self._execute(
- self.cursor_to_dict, query, room_data_type, public
+
+ topic_subquery = (
+ "SELECT topics.event_id as event_id, "
+ "topics.room_id as room_id, topic "
+ "FROM topics "
+ "INNER JOIN current_state_events as c "
+ "ON c.event_id = topics.event_id "
)
- # return only the keys the specification expects
- ret_keys = ["room_id", "topic", "room_alias"]
+ name_subquery = (
+ "SELECT room_names.event_id as event_id, "
+ "room_names.room_id as room_id, name "
+ "FROM room_names "
+ "INNER JOIN current_state_events as c "
+ "ON c.event_id = room_names.event_id "
+ )
- # extract topic from the json (icky) FIXME
- for i, room_row in enumerate(res):
- try:
- content_json = json.loads(room_row["content"])
- room_row["topic"] = content_json["topic"]
- except:
- pass # no topic set
- # filter the dict based on ret_keys
- res[i] = {k: v for k, v in room_row.iteritems() if k in ret_keys}
+ # We use non printing ascii character US () as a seperator
+ sql = (
+ "SELECT r.room_id, n.name, t.topic, "
+ "group_concat(a.room_alias, '') "
+ "FROM rooms AS r "
+ "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id "
+ "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id "
+ "INNER JOIN room_aliases AS a ON a.room_id = r.room_id "
+ "WHERE r.is_public = ? "
+ "GROUP BY r.room_id "
+ ) % {
+ "topic": topic_subquery,
+ "name": name_subquery,
+ }
+
+ rows = yield self._execute(None, sql, is_public)
+
+ ret = [
+ {
+ "room_id": r[0],
+ "name": r[1],
+ "topic": r[2],
+ "aliases": r[3].split(""),
+ }
+ for r in rows
+ ]
+
+ defer.returnValue(ret)
+
+ def _store_room_topic(self, event):
+ return self._simple_insert(
+ "topics",
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "topic": event.topic,
+ }
+ )
- defer.returnValue(res)
+ def _store_room_name(self, event):
+ return self._simple_insert(
+ "room_names",
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "name": event.name,
+ }
+ )
class RoomsTable(Table):
diff --git a/synapse/storage/roomdata.py b/synapse/storage/roomdata.py
deleted file mode 100644
index cc04d1ba14..0000000000
--- a/synapse/storage/roomdata.py
+++ /dev/null
@@ -1,85 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014 matrix.org
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ._base import SQLBaseStore, Table
-
-import collections
-import json
-
-
-class RoomDataStore(SQLBaseStore):
-
- """Provides various CRUD operations for Room Events. """
-
- def get_room_data(self, room_id, etype, state_key=""):
- """Retrieve the data stored under this type and state_key.
-
- Args:
- room_id (str)
- etype (str)
- state_key (str)
- Returns:
- namedtuple: Or None if nothing exists at this path.
- """
- query = RoomDataTable.select_statement(
- "room_id = ? AND type = ? AND state_key = ? "
- "ORDER BY id DESC LIMIT 1"
- )
- return self._execute(
- RoomDataTable.decode_single_result,
- query, room_id, etype, state_key,
- )
-
- def store_room_data(self, room_id, etype, state_key="", content=None):
- """Stores room specific data.
-
- Args:
- room_id (str)
- etype (str)
- state_key (str)
- data (str)- The data to store for this path in JSON.
- Returns:
- The store ID for this data.
- """
- return self._simple_insert(RoomDataTable.table_name, dict(
- etype=etype,
- state_key=state_key,
- room_id=room_id,
- content=content,
- ))
-
- def get_max_room_data_id(self):
- return self._simple_max_id(RoomDataTable.table_name)
-
-
-class RoomDataTable(Table):
- table_name = "room_data"
-
- fields = [
- "id",
- "room_id",
- "type",
- "state_key",
- "content"
- ]
-
- class EntryType(collections.namedtuple("RoomDataEntry", fields)):
-
- def as_event(self, event_factory):
- return event_factory.create_event(
- etype=self.type,
- room_id=self.room_id,
- content=json.loads(self.content),
- )
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index c45d128f1b..89c87290cf 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -31,6 +31,38 @@ logger = logging.getLogger(__name__)
class RoomMemberStore(SQLBaseStore):
+ @defer.inlineCallbacks
+ def _store_room_member(self, event):
+ """Store a room member in the database.
+ """
+ domain = self.hs.parse_userid(event.target_user_id).domain
+
+ yield self._simple_insert(
+ "room_memberships",
+ {
+ "event_id": event.event_id,
+ "user_id": event.target_user_id,
+ "sender": event.user_id,
+ "room_id": event.room_id,
+ "membership": event.membership,
+ }
+ )
+
+ # Update room hosts table
+ if event.membership == Membership.JOIN:
+ sql = (
+ "INSERT OR IGNORE INTO room_hosts (room_id, host) "
+ "VALUES (?, ?)"
+ )
+ yield self._execute(None, sql, event.room_id, domain)
+ else:
+ sql = (
+ "DELETE FROM room_hosts WHERE room_id = ? AND host = ?"
+ )
+
+ yield self._execute(None, sql, event.room_id, domain)
+
+ @defer.inlineCallbacks
def get_room_member(self, user_id, room_id):
"""Retrieve the current state of a room member.
@@ -38,36 +70,15 @@ class RoomMemberStore(SQLBaseStore):
user_id (str): The member's user ID.
room_id (str): The room the member is in.
Returns:
- namedtuple: The room member from the database, or None if this
- member does not exist.
+ Deferred: Results in a MembershipEvent or None.
"""
- query = RoomMemberTable.select_statement(
- "room_id = ? AND user_id = ? ORDER BY id DESC LIMIT 1")
- return self._execute(
- RoomMemberTable.decode_single_result,
- query, room_id, user_id,
- )
+ rows = yield self._get_members_by_dict({
+ "e.room_id": room_id,
+ "m.user_id": user_id,
+ })
- def store_room_member(self, user_id, sender, room_id, membership, content):
- """Store a room member in the database.
+ defer.returnValue(rows[0] if rows else None)
- Args:
- user_id (str): The member's user ID.
- room_id (str): The room in relation to the member.
- membership (synapse.api.constants.Membership): The new membership
- state.
- content (dict): The content of the membership (JSON).
- """
- content_json = json.dumps(content)
- return self._simple_insert(RoomMemberTable.table_name, dict(
- user_id=user_id,
- sender=sender,
- room_id=room_id,
- membership=membership,
- content=content_json,
- ))
-
- @defer.inlineCallbacks
def get_room_members(self, room_id, membership=None):
"""Retrieve the current room member list for a room.
@@ -79,17 +90,12 @@ class RoomMemberStore(SQLBaseStore):
Returns:
list of namedtuples representing the members in this room.
"""
- query = RoomMemberTable.select_statement(
- "id IN (SELECT MAX(id) FROM " + RoomMemberTable.table_name
- + " WHERE room_id = ? GROUP BY user_id)"
- )
- res = yield self._execute(
- RoomMemberTable.decode_results, query, room_id,
- )
- # strip memberships which don't match
+
+ where = {"m.room_id": room_id}
if membership:
- res = [entry for entry in res if entry.membership == membership]
- defer.returnValue(res)
+ where["m.membership"] = membership
+
+ return self._get_members_by_dict(where)
def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
""" Get all the rooms for this user where the membership for this user
@@ -106,70 +112,40 @@ class RoomMemberStore(SQLBaseStore):
return defer.succeed(None)
args = [user_id]
- membership_placeholder = ["membership=?"] * len(membership_list)
- where_membership = "(" + " OR ".join(membership_placeholder) + ")"
- for membership in membership_list:
- args.append(membership)
-
- # sub-select finds the row ID for the most recent (i.e. current)
- # state change of this user per room, then the outer select finds those
- query = ("SELECT room_id, membership FROM room_memberships"
- + " WHERE id IN (SELECT MAX(id) FROM room_memberships"
- + " WHERE user_id=? GROUP BY room_id)"
- + " AND " + where_membership)
- return self._execute(
- self.cursor_to_dict, query, *args
- )
+ args.extend(membership_list)
- @defer.inlineCallbacks
- def get_joined_hosts_for_room(self, room_id):
- query = RoomMemberTable.select_statement(
- "id IN (SELECT MAX(id) FROM " + RoomMemberTable.table_name
- + " WHERE room_id = ? GROUP BY user_id)"
- )
-
- res = yield self._execute(
- RoomMemberTable.decode_results, query, room_id,
+ where_clause = "user_id = ? AND (%s)" % (
+ " OR ".join(["membership = ?" for _ in membership_list]),
)
- def host_from_user_id_string(user_id):
- domain = UserID.from_string(entry.user_id, self.hs).domain
- return domain
-
- # strip memberships which don't match
- hosts = [
- host_from_user_id_string(entry.user_id)
- for entry in res
- if entry.membership == Membership.JOIN
- ]
+ return self._get_members_query(where_clause, args)
- logger.debug("Returning hosts: %s from results: %s", hosts, res)
-
- defer.returnValue(hosts)
-
- def get_max_room_member_id(self):
- return self._simple_max_id(RoomMemberTable.table_name)
-
-
-class RoomMemberTable(Table):
- table_name = "room_memberships"
-
- fields = [
- "id",
- "user_id",
- "sender",
- "room_id",
- "membership",
- "content"
- ]
+ def get_joined_hosts_for_room(self, room_id):
+ return self._simple_select_onecol(
+ "room_hosts",
+ {"room_id": room_id},
+ "host"
+ )
- class EntryType(collections.namedtuple("RoomMemberEntry", fields)):
+ def _get_members_by_dict(self, where_dict):
+ clause = " AND ".join("%s = ?" % k for k in where_dict.keys())
+ vals = where_dict.values()
+ return self._get_members_query(clause, vals)
- def as_event(self, event_factory):
- return event_factory.create_event(
- etype=RoomMemberEvent.TYPE,
- room_id=self.room_id,
- target_user_id=self.user_id,
- user_id=self.sender,
- content=json.loads(self.content),
- )
+ @defer.inlineCallbacks
+ def _get_members_query(self, where_clause, where_values):
+ sql = (
+ "SELECT e.* FROM events as e "
+ "INNER JOIN room_memberships as m "
+ "ON e.event_id = m.event_id "
+ "INNER JOIN current_state_events as c "
+ "ON m.event_id = c.event_id "
+ "WHERE %s "
+ ) % (where_clause,)
+
+ rows = yield self._execute_and_decode(sql, *where_values)
+
+ logger.debug("_get_members_query Got rows %s", rows)
+
+ results = [self._parse_event_from_row(r) for r in rows]
+ defer.returnValue(results)
diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql
index 77096546b2..2452890ea4 100644
--- a/synapse/storage/schema/im.sql
+++ b/synapse/storage/schema/im.sql
@@ -12,43 +12,67 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-CREATE TABLE IF NOT EXISTS rooms(
- room_id TEXT PRIMARY KEY NOT NULL,
- is_public INTEGER,
- creator TEXT
+
+CREATE TABLE IF NOT EXISTS events(
+ ordering INTEGER PRIMARY KEY AUTOINCREMENT,
+ event_id TEXT NOT NULL,
+ type TEXT NOT NULL,
+ room_id TEXT,
+ content TEXT,
+ unrecognized_keys TEXT
);
-CREATE TABLE IF NOT EXISTS room_memberships(
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- user_id TEXT NOT NULL, -- no foreign key to users table, it could be an id belonging to another home server
- sender TEXT NOT NULL,
+CREATE TABLE IF NOT EXISTS state_events(
+ event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
- membership TEXT NOT NULL,
- content TEXT NOT NULL
+ type TEXT NOT NULL,
+ state_key TEXT NOT NULL,
+ prev_state TEXT
);
-CREATE TABLE IF NOT EXISTS messages(
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- user_id TEXT,
- room_id TEXT,
- msg_id TEXT,
- content TEXT
+CREATE TABLE IF NOT EXISTS current_state_events(
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ type TEXT NOT NULL,
+ state_key TEXT NOT NULL,
+ CONSTRAINT uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE
+);
+
+CREATE TABLE IF NOT EXISTS room_memberships(
+ event_id TEXT NOT NULL,
+ user_id TEXT NOT NULL,
+ sender TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ membership TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS feedback(
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- content TEXT,
+ event_id TEXT NOT NULL,
feedback_type TEXT,
- fb_sender_id TEXT,
- msg_id TEXT,
- room_id TEXT,
- msg_sender_id TEXT
+ target_event_id TEXT,
+ sender TEXT,
+ room_id TEXT
);
-CREATE TABLE IF NOT EXISTS room_data(
- id INTEGER PRIMARY KEY AUTOINCREMENT,
+CREATE TABLE IF NOT EXISTS topics(
+ event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
- type TEXT NOT NULL,
- state_key TEXT NOT NULL,
- content TEXT
+ topic TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS room_names(
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ name TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS rooms(
+ room_id TEXT PRIMARY KEY NOT NULL,
+ is_public INTEGER,
+ creator TEXT
+);
+
+CREATE TABLE IF NOT EXISTS room_hosts(
+ room_id TEXT NOT NULL,
+ host TEXT NOT NULL
);
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 47a1f2c45a..cf4b1682b6 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -13,267 +13,118 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from twisted.internet import defer
from ._base import SQLBaseStore
-from .message import MessagesTable
-from .feedback import FeedbackTable
-from .roomdata import RoomDataTable
-from .roommember import RoomMemberTable
+
+from synapse.api.constants import Membership
import json
import logging
-logger = logging.getLogger(__name__)
-
-
-class StreamStore(SQLBaseStore):
-
- def get_message_stream(self, user_id, from_key, to_key, room_id, limit=0,
- with_feedback=False):
- """Get all messages for this user between the given keys.
- Args:
- user_id (str): The user who is requesting messages.
- from_key (int): The ID to start returning results from (exclusive).
- to_key (int): The ID to stop returning results (exclusive).
- room_id (str): Gets messages only for this room. Can be None, in
- which case all room messages will be returned.
- Returns:
- A tuple of rows (list of namedtuples), new_id(int)
- """
- if with_feedback and room_id: # with fb MUST specify a room ID
- return self._db_pool.runInteraction(
- self._get_message_rows_with_feedback,
- user_id, from_key, to_key, room_id, limit
- )
- else:
- return self._db_pool.runInteraction(
- self._get_message_rows,
- user_id, from_key, to_key, room_id, limit
- )
+logger = logging.getLogger(__name__)
- def _get_message_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
- limit):
- # work out which rooms this user is joined in on and join them with
- # the room id on the messages table, bounded by the specified pkeys
- # get all messages where the *current* membership state is 'join' for
- # this user in that room.
- query = ("SELECT messages.* FROM messages WHERE ? IN"
- + " (SELECT membership from room_memberships WHERE user_id=?"
- + " AND room_id = messages.room_id ORDER BY id DESC LIMIT 1)")
- query_args = ["join", user_id]
+MAX_STREAM_SIZE = 1000
- if room_id:
- query += " AND messages.room_id=?"
- query_args.append(room_id)
- (query, query_args) = self._append_stream_operations(
- "messages", query, query_args, from_pkey, to_pkey, limit=limit
- )
+class StreamStore(SQLBaseStore):
- cursor = txn.execute(query, query_args)
- return self._as_events(cursor, MessagesTable, from_pkey)
+ @defer.inlineCallbacks
+ def get_room_events_stream(self, user_id, from_key, to_key, room_id,
+ limit=0, with_feedback=False):
+ # TODO (erikj): Handle compressed feedback
- def _get_message_rows_with_feedback(self, txn, user_id, from_pkey, to_pkey,
- room_id, limit):
- # this col represents the compressed feedback JSON as per spec
- compressed_feedback_col = (
- "'[' || group_concat('{\"sender_id\":\"' || f.fb_sender_id"
- + " || '\",\"feedback_type\":\"' || f.feedback_type"
- + " || '\",\"content\":' || f.content || '}') || ']'"
+ current_room_membership_sql = (
+ "SELECT m.room_id FROM room_memberships as m "
+ "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
+ "WHERE m.user_id = ?"
)
- global_msg_id_join = ("f.room_id = messages.room_id"
- + " and f.msg_id = messages.msg_id"
- + " and messages.user_id = f.msg_sender_id")
-
- select_query = (
- "SELECT messages.*, f.content AS fb_content, f.fb_sender_id"
- + ", " + compressed_feedback_col + " AS compressed_fb"
- + " FROM messages LEFT JOIN feedback f ON " + global_msg_id_join)
-
- current_membership_sub_query = (
- "(SELECT membership from room_memberships rm"
- + " WHERE user_id=? AND room_id = rm.room_id"
- + " ORDER BY id DESC LIMIT 1)")
-
- where = (" WHERE ? IN " + current_membership_sub_query
- + " AND messages.room_id=?")
-
- query = select_query + where
- query_args = ["join", user_id, room_id]
-
- (query, query_args) = self._append_stream_operations(
- "messages", query, query_args, from_pkey, to_pkey,
- limit=limit, group_by=" GROUP BY messages.id "
+ invites_sql = (
+ "SELECT m.event_id FROM room_memberships as m "
+ "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
+ "WHERE m.user_id = ? AND m.membership = ?"
)
- cursor = txn.execute(query, query_args)
-
- # convert the result set into events
- entries = self.cursor_to_dict(cursor)
- events = []
- for entry in entries:
- # TODO we should spec the cursor > event mapping somewhere else.
- event = {}
- straight_mappings = ["msg_id", "user_id", "room_id"]
- for key in straight_mappings:
- event[key] = entry[key]
- event["content"] = json.loads(entry["content"])
- if entry["compressed_fb"]:
- event["feedback"] = json.loads(entry["compressed_fb"])
- events.append(event)
-
- latest_pkey = from_pkey if len(entries) == 0 else entries[-1]["id"]
-
- return (events, latest_pkey)
-
- def get_room_member_stream(self, user_id, from_key, to_key):
- """Get all room membership events for this user between the given keys.
-
- Args:
- user_id (str): The user who is requesting membership events.
- from_key (int): The ID to start returning results from (exclusive).
- to_key (int): The ID to stop returning results (exclusive).
- Returns:
- A tuple of rows (list of namedtuples), new_id(int)
- """
- return self._db_pool.runInteraction(
- self._get_room_member_rows, user_id, from_key, to_key
+ if limit:
+ limit = max(limit, MAX_STREAM_SIZE)
+ else:
+ limit = MAX_STREAM_SIZE
+
+ # From and to keys should be integers from ordering.
+ from_key = int(from_key)
+ to_key = int(to_key)
+
+ if from_key == to_key:
+ defer.returnValue(([], to_key))
+ return
+
+ sql = (
+ "SELECT * FROM events as e WHERE "
+ "((room_id IN (%(current)s)) OR "
+ "(event_id IN (%(invites)s))) "
+ ) % {
+ "current": current_room_membership_sql,
+ "invites": invites_sql,
+ }
+
+ # Constraints and ordering depend on direction.
+ if from_key < to_key:
+ sql += (
+ "AND e.ordering > ? AND e.ordering < ? "
+ "ORDER BY ordering ASC LIMIT %(limit)d "
+ ) % {"limit": limit}
+ else:
+ sql += (
+ "AND e.ordering < ? AND e.ordering > ? "
+ "ORDER BY ordering DESC LIMIT %(limit)d "
+ ) % {"limit": int(limit)}
+
+ rows = yield self._execute_and_decode(
+ sql,
+ user_id, user_id, Membership.INVITE, from_key, to_key
)
- def _get_room_member_rows(self, txn, user_id, from_pkey, to_pkey):
- # get all room membership events for rooms which the user is
- # *currently* joined in on, or all invite events for this user.
- current_membership_sub_query = (
- "(SELECT membership FROM room_memberships"
- + " WHERE user_id=? AND room_id = rm.room_id"
- + " ORDER BY id DESC LIMIT 1)")
-
- query = ("SELECT rm.* FROM room_memberships rm "
- # all membership events for rooms you've currently joined.
- + " WHERE (? IN " + current_membership_sub_query
- # all invite membership events for this user
- + " OR rm.membership=? AND user_id=?)"
- + " AND rm.id > ?")
- query_args = ["join", user_id, "invite", user_id, from_pkey]
-
- if to_pkey != -1:
- query += " AND rm.id < ?"
- query_args.append(to_pkey)
-
- cursor = txn.execute(query, query_args)
- return self._as_events(cursor, RoomMemberTable, from_pkey)
+ ret = [self._parse_event_from_row(r) for r in rows]
- def get_feedback_stream(self, user_id, from_key, to_key, room_id, limit=0):
- return self._db_pool.runInteraction(
- self._get_feedback_rows,
- user_id, from_key, to_key, room_id, limit
- )
-
- def _get_feedback_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
- limit):
- # work out which rooms this user is joined in on and join them with
- # the room id on the feedback table, bounded by the specified pkeys
+ if rows:
+ if from_key < to_key:
+ key = max([r["ordering"] for r in rows])
+ else:
+ key = min([r["ordering"] for r in rows])
+ else:
+ key = to_key
- # get all messages where the *current* membership state is 'join' for
- # this user in that room.
- query = (
- "SELECT feedback.* FROM feedback WHERE ? IN "
- + "(SELECT membership from room_memberships WHERE user_id=?"
- + " AND room_id = feedback.room_id ORDER BY id DESC LIMIT 1)")
- query_args = ["join", user_id]
+ defer.returnValue((ret, key))
- if room_id:
- query += " AND feedback.room_id=?"
- query_args.append(room_id)
+ @defer.inlineCallbacks
+ def get_recent_events_for_room(self, room_id, limit, with_feedback=False):
+ # TODO (erikj): Handle compressed feedback
- (query, query_args) = self._append_stream_operations(
- "feedback", query, query_args, from_pkey, to_pkey, limit=limit
+ sql = (
+ "SELECT * FROM events WHERE room_id = ? "
+ "ORDER BY ordering DESC LIMIT ? "
)
- cursor = txn.execute(query, query_args)
- return self._as_events(cursor, FeedbackTable, from_pkey)
-
- def get_room_data_stream(self, user_id, from_key, to_key, room_id,
- limit=0):
- return self._db_pool.runInteraction(
- self._get_room_data_rows,
- user_id, from_key, to_key, room_id, limit
+ rows = yield self._execute_and_decode(
+ sql,
+ room_id, limit
)
- def _get_room_data_rows(self, txn, user_id, from_pkey, to_pkey, room_id,
- limit):
- # work out which rooms this user is joined in on and join them with
- # the room id on the feedback table, bounded by the specified pkeys
+ rows.reverse() # As we selected with reverse ordering
- # get all messages where the *current* membership state is 'join' for
- # this user in that room.
- query = (
- "SELECT room_data.* FROM room_data WHERE ? IN "
- + "(SELECT membership from room_memberships WHERE user_id=?"
- + " AND room_id = room_data.room_id ORDER BY id DESC LIMIT 1)")
- query_args = ["join", user_id]
+ defer.returnValue([self._parse_event_from_row(r) for r in rows])
- if room_id:
- query += " AND room_data.room_id=?"
- query_args.append(room_id)
-
- (query, query_args) = self._append_stream_operations(
- "room_data", query, query_args, from_pkey, to_pkey, limit=limit
+ @defer.inlineCallbacks
+ def get_room_events_max_id(self):
+ res = yield self._execute_and_decode(
+ "SELECT MAX(ordering) as m FROM events"
)
- cursor = txn.execute(query, query_args)
- return self._as_events(cursor, RoomDataTable, from_pkey)
-
- def _append_stream_operations(self, table_name, query, query_args,
- from_pkey, to_pkey, limit=None,
- group_by=""):
- LATEST_ROW = -1
- order_by = ""
- if to_pkey > from_pkey:
- if from_pkey != LATEST_ROW:
- # e.g. from=5 to=9 >> from 5 to 9 >> id>5 AND id<9
- query += (" AND %s.id > ? AND %s.id < ?" %
- (table_name, table_name))
- query_args.append(from_pkey)
- query_args.append(to_pkey)
- else:
- # e.g. from=-1 to=5 >> from now to 5 >> id>5 ORDER BY id DESC
- query += " AND %s.id > ? " % table_name
- order_by = "ORDER BY id DESC"
- query_args.append(to_pkey)
- elif from_pkey > to_pkey:
- if to_pkey != LATEST_ROW:
- # from=9 to=5 >> from 9 to 5 >> id>5 AND id<9 ORDER BY id DESC
- query += (" AND %s.id > ? AND %s.id < ? " %
- (table_name, table_name))
- order_by = "ORDER BY id DESC"
- query_args.append(to_pkey)
- query_args.append(from_pkey)
- else:
- # from=5 to=-1 >> from 5 to now >> id>5
- query += " AND %s.id > ?" % table_name
- query_args.append(from_pkey)
-
- query += group_by + order_by
-
- if limit and limit > 0:
- query += " LIMIT ?"
- query_args.append(str(limit))
-
- return (query, query_args)
-
- def _as_events(self, cursor, table, from_pkey):
- data_entries = table.decode_results(cursor)
- last_pkey = from_pkey
- if data_entries:
- last_pkey = data_entries[-1].id
-
- events = [
- entry.as_event(self.event_factory).get_dict()
- for entry in data_entries
- ]
+ if not res:
+ defer.returnValue(0)
+ return
- return (events, last_pkey)
+ defer.returnValue(res[0]["m"])
|