diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index c2f4685c92..3f07b5aa4a 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -24,4 +24,5 @@ class BaseHandler(object):
self.notifier = hs.get_notifier()
self.room_lock = hs.get_room_lock_manager()
self.state_handler = hs.get_state_handler()
+ self.distributor = hs.get_distributor()
self.hs = hs
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 3af7d824a2..6bb797caf2 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -17,8 +17,7 @@ from twisted.internet import defer
from ._base import BaseHandler
from synapse.api.streams.event import (
- EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData,
- RoomDataStreamData
+ EventStream, EventsStreamData
)
from synapse.handlers.presence import PresenceStreamData
@@ -26,10 +25,7 @@ from synapse.handlers.presence import PresenceStreamData
class EventStreamHandler(BaseHandler):
stream_data_classes = [
- MessagesStreamData,
- RoomMemberStreamData,
- FeedbackStreamData,
- RoomDataStreamData,
+ EventsStreamData,
PresenceStreamData,
]
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 7026df90a2..9cff444779 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -32,10 +32,19 @@ logger = logging.getLogger(__name__)
class FederationHandler(BaseHandler):
"""Handles events that originated from federation."""
+ def __init__(self, hs):
+ super(FederationHandler, self).__init__(hs)
+
+ self.distributor.observe(
+ "user_joined_room",
+ self._on_user_joined
+ )
+
+ self.waiting_for_join_list = {}
@log_function
@defer.inlineCallbacks
- def on_receive(self, event, is_new_state):
+ def on_receive(self, event, is_new_state, backfilled):
if hasattr(event, "state_key") and not is_new_state:
logger.debug("Ignoring old state.")
return
@@ -70,6 +79,115 @@ class FederationHandler(BaseHandler):
else:
with (yield self.room_lock.lock(event.room_id)):
- store_id = yield self.store.persist_event(event)
+ store_id = yield self.store.persist_event(event, backfilled)
+
+ room = yield self.store.get_room(event.room_id)
+
+ if not room:
+ # Huh, let's try and get the current state
+ try:
+ federation = self.hs.get_federation()
+ yield federation.get_state_for_room(
+ event.origin, event.room_id
+ )
+
+ hosts = yield self.store.get_joined_hosts_for_room(
+ event.room_id
+ )
+ if self.hs.hostname in hosts:
+ try:
+ yield self.store.store_room(
+ event.room_id,
+ "",
+ is_public=False
+ )
+ except:
+ pass
+ except:
+ logger.exception(
+ "Failed to get current state for room %s",
+ event.room_id
+ )
+
+ if not backfilled:
+ yield self.notifier.on_new_room_event(event, store_id)
+
+ if event.type == RoomMemberEvent.TYPE:
+ if event.membership == Membership.JOIN:
+ user = self.hs.parse_userid(event.target_user_id)
+ self.distributor.fire(
+ "user_joined_room", user=user, room_id=event.room_id
+ )
+
+
+ @log_function
+ @defer.inlineCallbacks
+ def backfill(self, dest, room_id, limit):
+ events = yield self.hs.get_federation().backfill(dest, room_id, limit)
+
+ for event in events:
+ try:
+ yield self.store.persist_event(event, backfilled=True)
+ except:
+ logger.exception("Failed to persist event: %s", event)
+
+ defer.returnValue(events)
- yield self.notifier.on_new_room_event(event, store_id)
+ @log_function
+ @defer.inlineCallbacks
+ def do_invite_join(self, target_host, room_id, joinee, content):
+ federation = self.hs.get_federation()
+
+ hosts = yield self.store.get_joined_hosts_for_room(room_id)
+ if self.hs.hostname in hosts:
+ # We are already in the room.
+ logger.debug("We're already in the room apparently")
+ defer.returnValue(False)
+
+ # First get current state to see if we are already joined.
+ try:
+ yield federation.get_state_for_room(target_host, room_id)
+
+ hosts = yield self.store.get_joined_hosts_for_room(room_id)
+ if self.hs.hostname in hosts:
+ # Oh, we were actually in the room already.
+ logger.debug("We're already in the room apparently")
+ defer.returnValue(False)
+ except Exception:
+ logger.exception("Failed to get current state")
+
+ new_event = self.event_factory.create_event(
+ etype=InviteJoinEvent.TYPE,
+ target_host=target_host,
+ room_id=room_id,
+ user_id=joinee,
+ content=content
+ )
+
+ new_event.destinations = [target_host]
+
+ yield federation.handle_new_event(new_event)
+
+ # TODO (erikj): Time out here.
+ d = defer.Deferred()
+ self.waiting_for_join_list.setdefault((joinee, room_id), []).append(d)
+ yield d
+
+ try:
+ yield self.store.store_room(
+ event.room_id,
+ "",
+ is_public=False
+ )
+ except:
+ pass
+
+
+ defer.returnValue(True)
+
+
+ @log_function
+ def _on_user_joined(self, user, room_id):
+ waiters = self.waiting_for_join_list.get((user.to_string(), room_id), [])
+ while waiters:
+ waiters.pop().callback(None)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index a2152c99cf..540e114b82 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -685,7 +685,10 @@ class PresenceStreamData(StreamData):
super(PresenceStreamData, self).__init__(hs)
self.presence = hs.get_handlers().presence_handler
- def get_rows(self, user_id, from_key, to_key, limit):
+ def get_rows(self, user_id, from_key, to_key, limit, direction):
+ from_key = int(from_key)
+ to_key = int(to_key)
+
cachemap = self.presence._user_cachemap
# TODO(paul): limit, and filter by visibility
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 5d0379254b..899b653fb7 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -23,7 +23,8 @@ from synapse.api.events.room import (
RoomTopicEvent, MessageEvent, InviteJoinEvent, RoomMemberEvent,
RoomConfigEvent
)
-from synapse.api.streams.event import EventStream, MessagesStreamData
+from synapse.api.streams.event import EventStream, EventsStreamData
+from synapse.handlers.presence import PresenceStreamData
from synapse.util import stringutils
from ._base import BaseHandler
@@ -59,12 +60,14 @@ class MessageHandler(BaseHandler):
yield self.auth.check_joined_room(room_id, user_id)
# Pull out the message from the db
- msg = yield self.store.get_message(room_id=room_id,
- msg_id=msg_id,
- user_id=sender_id)
+# msg = yield self.store.get_message(
+# room_id=room_id,
+# msg_id=msg_id,
+# user_id=sender_id
+# )
+
+ # TODO (erikj): Once we work out the correct c-s api we need to think on how to do this.
- if msg:
- defer.returnValue(msg)
defer.returnValue(None)
@defer.inlineCallbacks
@@ -114,8 +117,9 @@ class MessageHandler(BaseHandler):
"""
yield self.auth.check_joined_room(room_id, user_id)
- data_source = [MessagesStreamData(self.hs, room_id=room_id,
- feedback=feedback)]
+ data_source = [
+ EventsStreamData(self.hs, room_id=room_id, feedback=feedback)
+ ]
event_stream = EventStream(user_id, data_source)
pagin_config = yield event_stream.fix_tokens(pagin_config)
data_chunk = yield event_stream.get_chunk(config=pagin_config)
@@ -141,12 +145,7 @@ class MessageHandler(BaseHandler):
yield self.state_handler.handle_new_event(event)
# store in db
- store_id = yield self.store.store_room_data(
- room_id=event.room_id,
- etype=event.type,
- state_key=event.state_key,
- content=json.dumps(event.content)
- )
+ store_id = yield self.store.persist_event(event)
event.destinations = yield self.store.get_joined_hosts_for_room(
event.room_id
@@ -201,19 +200,17 @@ class MessageHandler(BaseHandler):
raise RoomError(
403, "Member does not meet private room rules.")
- data = yield self.store.get_room_data(room_id, event_type, state_key)
+ data = yield self.store.get_current_state(
+ room_id, event_type, state_key
+ )
defer.returnValue(data)
@defer.inlineCallbacks
- def get_feedback(self, room_id=None, msg_sender_id=None, msg_id=None,
- user_id=None, fb_sender_id=None, fb_type=None):
- yield self.auth.check_joined_room(room_id, user_id)
+ def get_feedback(self, event_id):
+ # yield self.auth.check_joined_room(room_id, user_id)
# Pull out the feedback from the db
- fb = yield self.store.get_feedback(
- room_id=room_id, msg_id=msg_id, msg_sender_id=msg_sender_id,
- fb_sender_id=fb_sender_id, fb_type=fb_type
- )
+ fb = yield self.store.get_feedback(event_id)
if fb:
defer.returnValue(fb)
@@ -260,20 +257,59 @@ class MessageHandler(BaseHandler):
user_id=user_id,
membership_list=[Membership.INVITE, Membership.JOIN]
)
- for room_info in room_list:
- if room_info["membership"] != Membership.JOIN:
+
+ rooms_ret = []
+
+ now_rooms_token = yield self.store.get_room_events_max_id()
+
+ # FIXME (erikj): Fix this.
+ presence_stream = PresenceStreamData(self.hs)
+ now_presence_token = yield presence_stream.max_token()
+ presence = yield presence_stream.get_rows(
+ user_id, 0, now_presence_token, None, None
+ )
+
+ # FIXME (erikj): We need to not generate this token,
+ now_token = "%s_%s" % (now_rooms_token, now_presence_token)
+
+ for event in room_list:
+ d = {
+ "room_id": event.room_id,
+ "membership": event.membership,
+ }
+
+ if event.membership == Membership.INVITE:
+ d["inviter"] = event.user_id
+
+ rooms_ret.append(d)
+
+ if event.membership != Membership.JOIN:
continue
try:
- event_chunk = yield self.get_messages(
- user_id=user_id,
- pagin_config=pagin_config,
- feedback=feedback,
- room_id=room_info["room_id"]
+ messages, token = yield self.store.get_recent_events_for_room(
+ event.room_id,
+ limit=10,
+ end_token=now_rooms_token,
)
- room_info["messages"] = event_chunk
+
+ d["messages"] = {
+ "chunk": [m.get_dict() for m in messages],
+ "start": token[0],
+ "end": token[1],
+ }
+
+ current_state = yield self.store.get_current_state(event.room_id)
+ d["state"] = [c.get_dict() for c in current_state]
except:
- pass
- defer.returnValue(room_list)
+ logger.exception("Failed to get snapshot")
+
+ user = self.hs.parse_userid(user_id)
+
+ ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token}
+
+ logger.debug("snapshot_all_rooms returning: %s", ret)
+
+ defer.returnValue(ret)
class RoomCreationHandler(BaseHandler):
@@ -372,7 +408,6 @@ class RoomCreationHandler(BaseHandler):
yield self.hs.get_handlers().room_member_handler.change_membership(
join_event,
- broadcast_msg=True,
do_auth=False
)
@@ -451,11 +486,11 @@ class RoomMemberHandler(BaseHandler):
member_list = yield self.store.get_room_members(room_id=room_id)
event_list = [
- entry.as_event(self.event_factory).get_dict()
+ entry.get_dict()
for entry in member_list
]
chunk_data = {
- "start": "START",
+ "start": "START", # FIXME (erikj): START is no longer a valid value
"end": "END",
"chunk": event_list
}
@@ -484,29 +519,28 @@ class RoomMemberHandler(BaseHandler):
defer.returnValue(member)
@defer.inlineCallbacks
- def change_membership(self, event=None, broadcast_msg=False, do_auth=True):
+ def change_membership(self, event=None, do_auth=True):
""" Change the membership status of a user in a room.
Args:
event (SynapseEvent): The membership event
- broadcast_msg (bool): True to inject a membership message into this
- room on success.
Raises:
SynapseError if there was a problem changing the membership.
"""
- #broadcast_msg = False
-
prev_state = yield self.store.get_room_member(
event.target_user_id, event.room_id
)
- if prev_state and prev_state.membership == event.membership:
- # treat this event as a NOOP.
- if do_auth: # This is mainly to fix a unit test.
- yield self.auth.check(event, raises=True)
- defer.returnValue({})
- return
+ if prev_state:
+ event.content["prev"] = prev_state.membership
+
+# if prev_state and prev_state.membership == event.membership:
+# # treat this event as a NOOP.
+# if do_auth: # This is mainly to fix a unit test.
+# yield self.auth.check(event, raises=True)
+# defer.returnValue({})
+# return
room_id = event.room_id
@@ -514,9 +548,7 @@ class RoomMemberHandler(BaseHandler):
# if this HS is not currently in the room, i.e. we have to do the
# invite/join dance.
if event.membership == Membership.JOIN:
- yield self._do_join(
- event, do_auth=do_auth, broadcast_msg=broadcast_msg
- )
+ yield self._do_join(event, do_auth=do_auth)
else:
# This is not a JOIN, so we can handle it normally.
if do_auth:
@@ -534,7 +566,6 @@ class RoomMemberHandler(BaseHandler):
yield self._do_local_membership_update(
event,
membership=event.content["membership"],
- broadcast_msg=broadcast_msg,
)
defer.returnValue({"room_id": room_id})
@@ -569,14 +600,14 @@ class RoomMemberHandler(BaseHandler):
defer.returnValue({"room_id": room_id})
@defer.inlineCallbacks
- def _do_join(self, event, room_host=None, do_auth=True, broadcast_msg=True):
+ def _do_join(self, event, room_host=None, do_auth=True):
joinee = self.hs.parse_userid(event.target_user_id)
# room_id = RoomID.from_string(event.room_id, self.hs)
room_id = event.room_id
# If event doesn't include a display name, add one.
- yield self._fill_out_join_content(
- joinee, event.content
+ yield self.distributor.fire(
+ "collect_presencelike_data", joinee, event.content
)
# XXX: We don't do an auth check if we are doing an invite
@@ -584,9 +615,9 @@ class RoomMemberHandler(BaseHandler):
# that we are allowed to join when we decide whether or not we
# need to do the invite/join dance.
- room = yield self.store.get_room(room_id)
+ hosts = yield self.store.get_joined_hosts_for_room(room_id)
- if room:
+ if self.hs.hostname in hosts:
should_do_dance = False
elif room_host:
should_do_dance = True
@@ -598,7 +629,7 @@ class RoomMemberHandler(BaseHandler):
if prev_state and prev_state.membership == Membership.INVITE:
room = yield self.store.get_room(room_id)
inviter = UserID.from_string(
- prev_state.sender, self.hs
+ prev_state.user_id, self.hs
)
should_do_dance = not inviter.is_mine and not room
@@ -606,8 +637,15 @@ class RoomMemberHandler(BaseHandler):
else:
should_do_dance = False
+ have_joined = False
+ if should_do_dance:
+ handler = self.hs.get_handlers().federation_handler
+ have_joined = yield handler.do_invite_join(
+ room_host, room_id, event.user_id, event.content
+ )
+
# We want to do the _do_update inside the room lock.
- if not should_do_dance:
+ if not have_joined:
logger.debug("Doing normal join")
if do_auth:
@@ -617,16 +655,6 @@ class RoomMemberHandler(BaseHandler):
yield self._do_local_membership_update(
event,
membership=event.content["membership"],
- broadcast_msg=broadcast_msg,
- )
-
-
- if should_do_dance:
- yield self._do_invite_join_dance(
- room_id=room_id,
- joinee=event.user_id,
- target_host=room_host,
- content=event.content,
)
user = self.hs.parse_userid(event.user_id)
@@ -635,32 +663,6 @@ class RoomMemberHandler(BaseHandler):
)
@defer.inlineCallbacks
- def _fill_out_join_content(self, user_id, content):
- # If event doesn't include a display name, add one.
- profile_handler = self.hs.get_handlers().profile_handler
- if "displayname" not in content:
- try:
- display_name = yield profile_handler.get_displayname(
- user_id
- )
-
- if display_name:
- content["displayname"] = display_name
- except:
- logger.exception("Failed to set display_name")
-
- if "avatar_url" not in content:
- try:
- avatar_url = yield profile_handler.get_avatar_url(
- user_id
- )
-
- if avatar_url:
- content["avatar_url"] = avatar_url
- except:
- logger.exception("Failed to set display_name")
-
- @defer.inlineCallbacks
def _should_invite_join(self, room_id, prev_state, do_auth):
logger.debug("_should_invite_join: room_id: %s", room_id)
@@ -694,18 +696,12 @@ class RoomMemberHandler(BaseHandler):
user_id=user.to_string(), membership_list=membership_list
)
- defer.returnValue([r["room_id"] for r in rooms])
+ defer.returnValue([r.room_id for r in rooms])
@defer.inlineCallbacks
- def _do_local_membership_update(self, event, membership, broadcast_msg):
+ def _do_local_membership_update(self, event, membership):
# store membership
- store_id = yield self.store.store_room_member(
- user_id=event.target_user_id,
- sender=event.user_id,
- room_id=event.room_id,
- content=event.content,
- membership=membership
- )
+ store_id = yield self.store.persist_event(event)
# Send a PDU to all hosts who have joined the room.
destinations = yield self.store.get_joined_hosts_for_room(
@@ -732,78 +728,11 @@ class RoomMemberHandler(BaseHandler):
yield self.hs.get_federation().handle_new_event(event)
self.notifier.on_new_room_event(event, store_id)
- if broadcast_msg:
- yield self._inject_membership_msg(
- source=event.user_id,
- target=event.target_user_id,
- room_id=event.room_id,
- membership=event.content["membership"]
- )
-
- @defer.inlineCallbacks
- def _do_invite_join_dance(self, room_id, joinee, target_host, content):
- logger.debug("Doing remote join dance")
-
- # do invite join dance
- federation = self.hs.get_federation()
- new_event = self.event_factory.create_event(
- etype=InviteJoinEvent.TYPE,
- target_host=target_host,
- room_id=room_id,
- user_id=joinee,
- content=content
- )
-
- new_event.destinations = [target_host]
-
- yield self.store.store_room(
- room_id, "", is_public=False
- )
-
- #yield self.state_handler.handle_new_event(event)
- yield federation.handle_new_event(new_event)
- yield federation.get_state_for_room(
- target_host, room_id
- )
-
- @defer.inlineCallbacks
- def _inject_membership_msg(self, room_id=None, source=None, target=None,
- membership=None):
- # TODO this should be a different type of message, not m.text
- if membership == Membership.INVITE:
- body = "%s invited %s to the room." % (source, target)
- elif membership == Membership.JOIN:
- body = "%s joined the room." % (target)
- elif membership == Membership.LEAVE:
- body = "%s left the room." % (target)
- else:
- raise RoomError(500, "Unknown membership value %s" % membership)
-
- membership_json = {
- "msgtype": u"m.text",
- "body": body,
- "membership_source": source,
- "membership_target": target,
- "membership": membership,
- }
-
- msg_id = "m%s" % int(self.clock.time_msec())
-
- event = self.event_factory.create_event(
- etype=MessageEvent.TYPE,
- room_id=room_id,
- user_id="_homeserver_",
- msg_id=msg_id,
- content=membership_json
- )
-
- handler = self.hs.get_handlers().message_handler
- yield handler.send_message(event, suppress_auth=True)
-
class RoomListHandler(BaseHandler):
@defer.inlineCallbacks
def get_public_room_list(self):
- chunk = yield self.store.get_rooms(is_public=True, with_topics=True)
+ chunk = yield self.store.get_rooms(is_public=True)
+ # FIXME (erikj): START is no longer a valid value
defer.returnValue({"start": "START", "end": "END", "chunk": chunk})
|