diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py
index 4b6d739e54..427363cad4 100644
--- a/synapse/api/streams/event.py
+++ b/synapse/api/streams/event.py
@@ -28,17 +28,17 @@ import logging
logger = logging.getLogger(__name__)
-class MessagesStreamData(StreamData):
- EVENT_TYPE = MessageEvent.TYPE
+class EventsStreamData(StreamData):
+ EVENT_TYPE = "EventsStream"
def __init__(self, hs, room_id=None, feedback=False):
- super(MessagesStreamData, self).__init__(hs)
+ super(EventsStreamData, self).__init__(hs)
self.room_id = room_id
self.with_feedback = feedback
@defer.inlineCallbacks
def get_rows(self, user_id, from_key, to_key, limit):
- (data, latest_ver) = yield self.store.get_message_stream(
+ data, latest_ver = yield self.store.get_room_events_stream(
user_id=user_id,
from_key=from_key,
to_key=to_key,
@@ -50,74 +50,7 @@ class MessagesStreamData(StreamData):
@defer.inlineCallbacks
def max_token(self):
- val = yield self.store.get_max_message_id()
- defer.returnValue(val)
-
-
-class RoomMemberStreamData(StreamData):
- EVENT_TYPE = RoomMemberEvent.TYPE
-
- @defer.inlineCallbacks
- def get_rows(self, user_id, from_key, to_key, limit):
- (data, latest_ver) = yield self.store.get_room_member_stream(
- user_id=user_id,
- from_key=from_key,
- to_key=to_key
- )
-
- defer.returnValue((data, latest_ver))
-
- @defer.inlineCallbacks
- def max_token(self):
- val = yield self.store.get_max_room_member_id()
- defer.returnValue(val)
-
-
-class FeedbackStreamData(StreamData):
- EVENT_TYPE = FeedbackEvent.TYPE
-
- def __init__(self, hs, room_id=None):
- super(FeedbackStreamData, self).__init__(hs)
- self.room_id = room_id
-
- @defer.inlineCallbacks
- def get_rows(self, user_id, from_key, to_key, limit):
- (data, latest_ver) = yield self.store.get_feedback_stream(
- user_id=user_id,
- from_key=from_key,
- to_key=to_key,
- limit=limit,
- room_id=self.room_id
- )
- defer.returnValue((data, latest_ver))
-
- @defer.inlineCallbacks
- def max_token(self):
- val = yield self.store.get_max_feedback_id()
- defer.returnValue(val)
-
-
-class RoomDataStreamData(StreamData):
- EVENT_TYPE = RoomTopicEvent.TYPE # TODO need multiple event types
-
- def __init__(self, hs, room_id=None):
- super(RoomDataStreamData, self).__init__(hs)
- self.room_id = room_id
-
- @defer.inlineCallbacks
- def get_rows(self, user_id, from_key, to_key, limit):
- (data, latest_ver) = yield self.store.get_room_data_stream(
- user_id=user_id,
- from_key=from_key,
- to_key=to_key,
- limit=limit,
- room_id=self.room_id
- )
- defer.returnValue((data, latest_ver))
-
- @defer.inlineCallbacks
- def max_token(self):
- val = yield self.store.get_max_room_data_id()
+ val = yield self.store.get_room_events_max_id()
defer.returnValue(val)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 3af7d824a2..6bb797caf2 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -17,8 +17,7 @@ from twisted.internet import defer
from ._base import BaseHandler
from synapse.api.streams.event import (
- EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData,
- RoomDataStreamData
+ EventStream, EventsStreamData
)
from synapse.handlers.presence import PresenceStreamData
@@ -26,10 +25,7 @@ from synapse.handlers.presence import PresenceStreamData
class EventStreamHandler(BaseHandler):
stream_data_classes = [
- MessagesStreamData,
- RoomMemberStreamData,
- FeedbackStreamData,
- RoomDataStreamData,
+ EventsStreamData,
PresenceStreamData,
]
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 432d13982a..3451250008 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -23,7 +23,7 @@ from synapse.api.events.room import (
RoomTopicEvent, MessageEvent, InviteJoinEvent, RoomMemberEvent,
RoomConfigEvent
)
-from synapse.api.streams.event import EventStream, MessagesStreamData
+from synapse.api.streams.event import EventStream, EventsStreamData
from synapse.util import stringutils
from ._base import BaseHandler
@@ -97,30 +97,30 @@ class MessageHandler(BaseHandler):
self.notifier.on_new_room_event(event, store_id)
yield self.hs.get_federation().handle_new_event(event)
-
- @defer.inlineCallbacks
- def get_messages(self, user_id=None, room_id=None, pagin_config=None,
- feedback=False):
- """Get messages in a room.
-
- Args:
- user_id (str): The user requesting messages.
- room_id (str): The room they want messages from.
- pagin_config (synapse.api.streams.PaginationConfig): The pagination
- config rules to apply, if any.
- feedback (bool): True to get compressed feedback with the messages
- Returns:
- dict: Pagination API results
- """
- yield self.auth.check_joined_room(room_id, user_id)
-
- data_source = [MessagesStreamData(self.hs, room_id=room_id,
- feedback=feedback)]
- event_stream = EventStream(user_id, data_source)
- pagin_config = yield event_stream.fix_tokens(pagin_config)
- data_chunk = yield event_stream.get_chunk(config=pagin_config)
- defer.returnValue(data_chunk)
-
+#
+# @defer.inlineCallbacks
+# def get_messages(self, user_id=None, room_id=None, pagin_config=None,
+# feedback=False):
+# """Get messages in a room.
+#
+# Args:
+# user_id (str): The user requesting messages.
+# room_id (str): The room they want messages from.
+# pagin_config (synapse.api.streams.PaginationConfig): The pagination
+# config rules to apply, if any.
+# feedback (bool): True to get compressed feedback with the messages
+# Returns:
+# dict: Pagination API results
+# """
+# yield self.auth.check_joined_room(room_id, user_id)
+#
+# data_source = [MessagesStreamData(self.hs, room_id=room_id,
+# feedback=feedback)]
+# event_stream = EventStream(user_id, data_source)
+# pagin_config = yield event_stream.fix_tokens(pagin_config)
+# data_chunk = yield event_stream.get_chunk(config=pagin_config)
+# defer.returnValue(data_chunk)
+#
@defer.inlineCallbacks
def store_room_data(self, event=None, stamp_event=True):
""" Stores data for a room.
@@ -251,20 +251,27 @@ class MessageHandler(BaseHandler):
user_id=user_id,
membership_list=[Membership.INVITE, Membership.JOIN]
)
- for room_info in room_list:
- if room_info["membership"] != Membership.JOIN:
+
+ ret = []
+
+ for event in room_list:
+ d = event.get_dict()
+ ret.append(d)
+
+ if event.membership != Membership.JOIN:
continue
try:
- event_chunk = yield self.get_messages(
- user_id=user_id,
- pagin_config=pagin_config,
- feedback=feedback,
- room_id=room_info["room_id"]
+ messages = yield self.store.get_recent_events_for_room(
+ event.room_id,
+ limit=50,
)
- room_info["messages"] = event_chunk
+ d["messages"] = [m.get_dict() for m in messages]
except:
pass
- defer.returnValue(room_list)
+
+ logger.debug("snapshot_all_rooms returning: %s", ret)
+
+ defer.returnValue(ret)
class RoomCreationHandler(BaseHandler):
@@ -442,7 +449,7 @@ class RoomMemberHandler(BaseHandler):
member_list = yield self.store.get_room_members(room_id=room_id)
event_list = [
- entry.as_event(self.event_factory).get_dict()
+ entry.get_dict()
for entry in member_list
]
chunk_data = {
@@ -685,7 +692,7 @@ class RoomMemberHandler(BaseHandler):
user_id=user.to_string(), membership_list=membership_list
)
- defer.returnValue([r["room_id"] for r in rooms])
+ defer.returnValue([r.room_id for r in rooms])
@defer.inlineCallbacks
def _do_local_membership_update(self, event, membership, broadcast_msg):
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index f62cee3c39..46b9dbcbbf 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -104,7 +104,15 @@ class DataStore(RoomMemberStore, RoomStore,
yield self._simple_insert("state_events", vals)
- # TODO (erikj): We also need to update the current state table?
+ yield self._simple_insert(
+ "current_state_events",
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ }
+ )
@defer.inlineCallbacks
def get_current_state(self, room_id, event_type=None, state_key=""):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index c26e9a0f98..413838f798 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -293,7 +293,7 @@ class SQLBaseStore(object):
def _parse_event_from_row(self, row_dict):
d = copy.deepcopy({k: v for k, v in row_dict.items() if v})
- d.update(json.loads(json.loads(row_dict["unrecognized_keys"])))
+ d.update(json.loads(row_dict["unrecognized_keys"]))
d["content"] = json.loads(d["content"])
del d["unrecognized_keys"]
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 8c4b04f190..a0620677b9 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -63,6 +63,7 @@ class RoomMemberStore(SQLBaseStore):
yield self._execute(None, sql, event.room_id, domain)
+ @defer.inlineCallbacks
def get_room_member(self, user_id, room_id):
"""Retrieve the current state of a room member.
@@ -72,11 +73,13 @@ class RoomMemberStore(SQLBaseStore):
Returns:
Deferred: Results in a MembershipEvent or None.
"""
- return self._get_members_by_dict({
+ rows = yield self._get_members_by_dict({
"e.room_id": room_id,
"m.user_id": user_id,
})
+ defer.returnValue(rows[0] if rows else None)
+
def get_room_members(self, room_id, membership=None):
"""Retrieve the current room member list for a room.
@@ -142,5 +145,8 @@ class RoomMemberStore(SQLBaseStore):
) % (where_clause,)
rows = yield self._execute_and_decode(sql, *where_values)
+
+ logger.debug("_get_members_query Got rows %s", rows)
+
results = [self._parse_event_from_row(r) for r in rows]
defer.returnValue(results)
diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql
index 9a0f2145d5..2452890ea4 100644
--- a/synapse/storage/schema/im.sql
+++ b/synapse/storage/schema/im.sql
@@ -32,7 +32,10 @@ CREATE TABLE IF NOT EXISTS state_events(
CREATE TABLE IF NOT EXISTS current_state_events(
event_id TEXT NOT NULL,
- room_id TEXT NOT NULL
+ room_id TEXT NOT NULL,
+ type TEXT NOT NULL,
+ state_key TEXT NOT NULL,
+ CONSTRAINT uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE
);
CREATE TABLE IF NOT EXISTS room_memberships(
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 9937239c22..c5c3770a40 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -34,6 +34,7 @@ class StreamStore(SQLBaseStore):
@defer.inlineCallbacks
def get_room_events_stream(self, user_id, from_key, to_key, room_id,
limit=0, with_feedback=False):
+ # TODO (erikj): Handle compressed feedback
current_room_membership_sql = (
"SELECT m.room_id FROM room_memberships as m "
@@ -69,3 +70,33 @@ class StreamStore(SQLBaseStore):
)
defer.returnValue([self._parse_event_from_row(r) for r in rows])
+
+ @defer.inlineCallbacks
+ def get_recent_events_for_room(self, room_id, limit, with_feedback=False):
+ # TODO (erikj): Handle compressed feedback
+
+ sql = (
+ "SELECT * FROM events WHERE room_id = ? "
+ "ORDER BY ordering DESC LIMIT ? "
+ )
+
+ rows = yield self._execute_and_decode(
+ sql,
+ room_id, limit
+ )
+
+ rows.reverse() # As we selected with reverse ordering
+
+ defer.returnValue([self._parse_event_from_row(r) for r in rows])
+
+ @defer.inlineCallbacks
+ def get_room_events_max_id(self):
+ res = yield self._execute_and_decode(
+ "SELECT MAX(ordering) as m FROM events"
+ )
+
+ if not res:
+ defer.returnValue(0)
+ return
+
+ defer.returnValue(res[0]["m"])
|