diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/events.py | 8 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 21 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 2 | ||||
-rw-r--r-- | synapse/handlers/room.py | 102 |
4 files changed, 75 insertions, 58 deletions
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 3af7d824a2..6bb797caf2 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -17,8 +17,7 @@ from twisted.internet import defer from ._base import BaseHandler from synapse.api.streams.event import ( - EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData, - RoomDataStreamData + EventStream, EventsStreamData ) from synapse.handlers.presence import PresenceStreamData @@ -26,10 +25,7 @@ from synapse.handlers.presence import PresenceStreamData class EventStreamHandler(BaseHandler): stream_data_classes = [ - MessagesStreamData, - RoomMemberStreamData, - FeedbackStreamData, - RoomDataStreamData, + EventsStreamData, PresenceStreamData, ] diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7026df90a2..0430a8307e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -35,7 +35,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def on_receive(self, event, is_new_state): + def on_receive(self, event, is_new_state, backfilled): if hasattr(event, "state_key") and not is_new_state: logger.debug("Ignoring old state.") return @@ -70,6 +70,21 @@ class FederationHandler(BaseHandler): else: with (yield self.room_lock.lock(event.room_id)): - store_id = yield self.store.persist_event(event) + store_id = yield self.store.persist_event(event, backfilled) - yield self.notifier.on_new_room_event(event, store_id) + if not backfilled: + yield self.notifier.on_new_room_event(event, store_id) + + + @log_function + @defer.inlineCallbacks + def backfill(self, dest, room_id, limit): + events = yield self.hs.get_federation().backfill(dest, room_id, limit) + + for event in events: + try: + yield self.store.persist_event(event, backfilled=True) + except: + logger.exception("Failed to persist event: %s", event) + + defer.returnValue(events) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 7e725d1027..60684f17d7 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -688,7 +688,7 @@ class PresenceStreamData(StreamData): super(PresenceStreamData, self).__init__(hs) self.presence = hs.get_handlers().presence_handler - def get_rows(self, user_id, from_key, to_key, limit): + def get_rows(self, user_id, from_key, to_key, limit, direction): cachemap = self.presence._user_cachemap # TODO(paul): limit, and filter by visibility diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 5d0379254b..40867ae2e0 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -23,7 +23,7 @@ from synapse.api.events.room import ( RoomTopicEvent, MessageEvent, InviteJoinEvent, RoomMemberEvent, RoomConfigEvent ) -from synapse.api.streams.event import EventStream, MessagesStreamData +from synapse.api.streams.event import EventStream, EventsStreamData from synapse.util import stringutils from ._base import BaseHandler @@ -59,12 +59,14 @@ class MessageHandler(BaseHandler): yield self.auth.check_joined_room(room_id, user_id) # Pull out the message from the db - msg = yield self.store.get_message(room_id=room_id, - msg_id=msg_id, - user_id=sender_id) +# msg = yield self.store.get_message( +# room_id=room_id, +# msg_id=msg_id, +# user_id=sender_id +# ) + + # TODO (erikj): Once we work out the correct c-s api we need to think on how to do this. - if msg: - defer.returnValue(msg) defer.returnValue(None) @defer.inlineCallbacks @@ -114,8 +116,9 @@ class MessageHandler(BaseHandler): """ yield self.auth.check_joined_room(room_id, user_id) - data_source = [MessagesStreamData(self.hs, room_id=room_id, - feedback=feedback)] + data_source = [ + EventsStreamData(self.hs, room_id=room_id, feedback=feedback) + ] event_stream = EventStream(user_id, data_source) pagin_config = yield event_stream.fix_tokens(pagin_config) data_chunk = yield event_stream.get_chunk(config=pagin_config) @@ -141,12 +144,7 @@ class MessageHandler(BaseHandler): yield self.state_handler.handle_new_event(event) # store in db - store_id = yield self.store.store_room_data( - room_id=event.room_id, - etype=event.type, - state_key=event.state_key, - content=json.dumps(event.content) - ) + store_id = yield self.store.persist_event(event) event.destinations = yield self.store.get_joined_hosts_for_room( event.room_id @@ -201,19 +199,17 @@ class MessageHandler(BaseHandler): raise RoomError( 403, "Member does not meet private room rules.") - data = yield self.store.get_room_data(room_id, event_type, state_key) + data = yield self.store.get_current_state( + room_id, event_type, state_key + ) defer.returnValue(data) @defer.inlineCallbacks - def get_feedback(self, room_id=None, msg_sender_id=None, msg_id=None, - user_id=None, fb_sender_id=None, fb_type=None): - yield self.auth.check_joined_room(room_id, user_id) + def get_feedback(self, event_id): + # yield self.auth.check_joined_room(room_id, user_id) # Pull out the feedback from the db - fb = yield self.store.get_feedback( - room_id=room_id, msg_id=msg_id, msg_sender_id=msg_sender_id, - fb_sender_id=fb_sender_id, fb_type=fb_type - ) + fb = yield self.store.get_feedback(event_id) if fb: defer.returnValue(fb) @@ -260,20 +256,35 @@ class MessageHandler(BaseHandler): user_id=user_id, membership_list=[Membership.INVITE, Membership.JOIN] ) - for room_info in room_list: - if room_info["membership"] != Membership.JOIN: + + ret = [] + + for event in room_list: + d = { + "room_id": event.room_id, + "membership": event.membership, + } + ret.append(d) + + if event.membership != Membership.JOIN: continue try: - event_chunk = yield self.get_messages( - user_id=user_id, - pagin_config=pagin_config, - feedback=feedback, - room_id=room_info["room_id"] + messages, token = yield self.store.get_recent_events_for_room( + event.room_id, + limit=50, ) - room_info["messages"] = event_chunk + + d["messages"] = { + "chunk": [m.get_dict() for m in messages], + "start": token[0], + "end": token[1], + } except: - pass - defer.returnValue(room_list) + logger.exception("Failed to get snapshot") + + logger.debug("snapshot_all_rooms returning: %s", ret) + + defer.returnValue(ret) class RoomCreationHandler(BaseHandler): @@ -451,11 +462,11 @@ class RoomMemberHandler(BaseHandler): member_list = yield self.store.get_room_members(room_id=room_id) event_list = [ - entry.as_event(self.event_factory).get_dict() + entry.get_dict() for entry in member_list ] chunk_data = { - "start": "START", + "start": "START", # FIXME (erikj): START is no longer a valid value "end": "END", "chunk": event_list } @@ -495,7 +506,7 @@ class RoomMemberHandler(BaseHandler): SynapseError if there was a problem changing the membership. """ - #broadcast_msg = False + # broadcast_msg = False prev_state = yield self.store.get_room_member( event.target_user_id, event.room_id @@ -569,7 +580,8 @@ class RoomMemberHandler(BaseHandler): defer.returnValue({"room_id": room_id}) @defer.inlineCallbacks - def _do_join(self, event, room_host=None, do_auth=True, broadcast_msg=True): + def _do_join(self, event, room_host=None, do_auth=True, + broadcast_msg=True): joinee = self.hs.parse_userid(event.target_user_id) # room_id = RoomID.from_string(event.room_id, self.hs) room_id = event.room_id @@ -598,7 +610,7 @@ class RoomMemberHandler(BaseHandler): if prev_state and prev_state.membership == Membership.INVITE: room = yield self.store.get_room(room_id) inviter = UserID.from_string( - prev_state.sender, self.hs + prev_state.user_id, self.hs ) should_do_dance = not inviter.is_mine and not room @@ -620,7 +632,6 @@ class RoomMemberHandler(BaseHandler): broadcast_msg=broadcast_msg, ) - if should_do_dance: yield self._do_invite_join_dance( room_id=room_id, @@ -694,18 +705,12 @@ class RoomMemberHandler(BaseHandler): user_id=user.to_string(), membership_list=membership_list ) - defer.returnValue([r["room_id"] for r in rooms]) + defer.returnValue([r.room_id for r in rooms]) @defer.inlineCallbacks def _do_local_membership_update(self, event, membership, broadcast_msg): # store membership - store_id = yield self.store.store_room_member( - user_id=event.target_user_id, - sender=event.user_id, - room_id=event.room_id, - content=event.content, - membership=membership - ) + store_id = yield self.store.persist_event(event) # Send a PDU to all hosts who have joined the room. destinations = yield self.store.get_joined_hosts_for_room( @@ -760,7 +765,7 @@ class RoomMemberHandler(BaseHandler): room_id, "", is_public=False ) - #yield self.state_handler.handle_new_event(event) + # yield self.state_handler.handle_new_event(event) yield federation.handle_new_event(new_event) yield federation.get_state_for_room( target_host, room_id @@ -805,5 +810,6 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def get_public_room_list(self): - chunk = yield self.store.get_rooms(is_public=True, with_topics=True) + chunk = yield self.store.get_rooms(is_public=True) + # FIXME (erikj): START is no longer a valid value defer.returnValue({"start": "START", "end": "END", "chunk": chunk}) |