diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 3af7d824a2..6bb797caf2 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -17,8 +17,7 @@ from twisted.internet import defer
from ._base import BaseHandler
from synapse.api.streams.event import (
- EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData,
- RoomDataStreamData
+ EventStream, EventsStreamData
)
from synapse.handlers.presence import PresenceStreamData
@@ -26,10 +25,7 @@ from synapse.handlers.presence import PresenceStreamData
class EventStreamHandler(BaseHandler):
stream_data_classes = [
- MessagesStreamData,
- RoomMemberStreamData,
- FeedbackStreamData,
- RoomDataStreamData,
+ EventsStreamData,
PresenceStreamData,
]
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 5d0379254b..14ffddc630 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -23,7 +23,7 @@ from synapse.api.events.room import (
RoomTopicEvent, MessageEvent, InviteJoinEvent, RoomMemberEvent,
RoomConfigEvent
)
-from synapse.api.streams.event import EventStream, MessagesStreamData
+from synapse.api.streams.event import EventStream, EventsStreamData
from synapse.util import stringutils
from ._base import BaseHandler
@@ -114,8 +114,9 @@ class MessageHandler(BaseHandler):
"""
yield self.auth.check_joined_room(room_id, user_id)
- data_source = [MessagesStreamData(self.hs, room_id=room_id,
- feedback=feedback)]
+ data_source = [
+ EventsStreamData(self.hs, room_id=room_id, feedback=feedback)
+ ]
event_stream = EventStream(user_id, data_source)
pagin_config = yield event_stream.fix_tokens(pagin_config)
data_chunk = yield event_stream.get_chunk(config=pagin_config)
@@ -141,12 +142,7 @@ class MessageHandler(BaseHandler):
yield self.state_handler.handle_new_event(event)
# store in db
- store_id = yield self.store.store_room_data(
- room_id=event.room_id,
- etype=event.type,
- state_key=event.state_key,
- content=json.dumps(event.content)
- )
+ store_id = yield self.store.persist_event(event)
event.destinations = yield self.store.get_joined_hosts_for_room(
event.room_id
@@ -201,19 +197,17 @@ class MessageHandler(BaseHandler):
raise RoomError(
403, "Member does not meet private room rules.")
- data = yield self.store.get_room_data(room_id, event_type, state_key)
+ data = yield self.store.get_current_state(
+ room_id, event_type, state_key
+ )
defer.returnValue(data)
@defer.inlineCallbacks
- def get_feedback(self, room_id=None, msg_sender_id=None, msg_id=None,
- user_id=None, fb_sender_id=None, fb_type=None):
- yield self.auth.check_joined_room(room_id, user_id)
+ def get_feedback(self, event_id):
+ # yield self.auth.check_joined_room(room_id, user_id)
# Pull out the feedback from the db
- fb = yield self.store.get_feedback(
- room_id=room_id, msg_id=msg_id, msg_sender_id=msg_sender_id,
- fb_sender_id=fb_sender_id, fb_type=fb_type
- )
+ fb = yield self.store.get_feedback(event_id)
if fb:
defer.returnValue(fb)
@@ -260,20 +254,30 @@ class MessageHandler(BaseHandler):
user_id=user_id,
membership_list=[Membership.INVITE, Membership.JOIN]
)
- for room_info in room_list:
- if room_info["membership"] != Membership.JOIN:
+
+ ret = []
+
+ for event in room_list:
+ d = {
+ "room_id": event.room_id,
+ "membership": event.membership,
+ }
+ ret.append(d)
+
+ if event.membership != Membership.JOIN:
continue
try:
- event_chunk = yield self.get_messages(
- user_id=user_id,
- pagin_config=pagin_config,
- feedback=feedback,
- room_id=room_info["room_id"]
+ messages = yield self.store.get_recent_events_for_room(
+ event.room_id,
+ limit=50,
)
- room_info["messages"] = event_chunk
+ d["messages"] = [m.get_dict() for m in messages]
except:
pass
- defer.returnValue(room_list)
+
+ logger.debug("snapshot_all_rooms returning: %s", ret)
+
+ defer.returnValue(ret)
class RoomCreationHandler(BaseHandler):
@@ -451,7 +455,7 @@ class RoomMemberHandler(BaseHandler):
member_list = yield self.store.get_room_members(room_id=room_id)
event_list = [
- entry.as_event(self.event_factory).get_dict()
+ entry.get_dict()
for entry in member_list
]
chunk_data = {
@@ -495,7 +499,7 @@ class RoomMemberHandler(BaseHandler):
SynapseError if there was a problem changing the membership.
"""
- #broadcast_msg = False
+ # broadcast_msg = False
prev_state = yield self.store.get_room_member(
event.target_user_id, event.room_id
@@ -569,7 +573,8 @@ class RoomMemberHandler(BaseHandler):
defer.returnValue({"room_id": room_id})
@defer.inlineCallbacks
- def _do_join(self, event, room_host=None, do_auth=True, broadcast_msg=True):
+ def _do_join(self, event, room_host=None, do_auth=True,
+ broadcast_msg=True):
joinee = self.hs.parse_userid(event.target_user_id)
# room_id = RoomID.from_string(event.room_id, self.hs)
room_id = event.room_id
@@ -598,7 +603,7 @@ class RoomMemberHandler(BaseHandler):
if prev_state and prev_state.membership == Membership.INVITE:
room = yield self.store.get_room(room_id)
inviter = UserID.from_string(
- prev_state.sender, self.hs
+ prev_state.user_id, self.hs
)
should_do_dance = not inviter.is_mine and not room
@@ -620,7 +625,6 @@ class RoomMemberHandler(BaseHandler):
broadcast_msg=broadcast_msg,
)
-
if should_do_dance:
yield self._do_invite_join_dance(
room_id=room_id,
@@ -694,18 +698,12 @@ class RoomMemberHandler(BaseHandler):
user_id=user.to_string(), membership_list=membership_list
)
- defer.returnValue([r["room_id"] for r in rooms])
+ defer.returnValue([r.room_id for r in rooms])
@defer.inlineCallbacks
def _do_local_membership_update(self, event, membership, broadcast_msg):
# store membership
- store_id = yield self.store.store_room_member(
- user_id=event.target_user_id,
- sender=event.user_id,
- room_id=event.room_id,
- content=event.content,
- membership=membership
- )
+ store_id = yield self.store.persist_event(event)
# Send a PDU to all hosts who have joined the room.
destinations = yield self.store.get_joined_hosts_for_room(
@@ -760,7 +758,7 @@ class RoomMemberHandler(BaseHandler):
room_id, "", is_public=False
)
- #yield self.state_handler.handle_new_event(event)
+ # yield self.state_handler.handle_new_event(event)
yield federation.handle_new_event(new_event)
yield federation.get_state_for_room(
target_host, room_id
@@ -805,5 +803,5 @@ class RoomListHandler(BaseHandler):
@defer.inlineCallbacks
def get_public_room_list(self):
- chunk = yield self.store.get_rooms(is_public=True, with_topics=True)
+ chunk = yield self.store.get_rooms(is_public=True)
defer.returnValue({"start": "START", "end": "END", "chunk": chunk})
|