diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index c2f4685c92..3f07b5aa4a 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -24,4 +24,5 @@ class BaseHandler(object):
self.notifier = hs.get_notifier()
self.room_lock = hs.get_room_lock_manager()
self.state_handler = hs.get_state_handler()
+ self.distributor = hs.get_distributor()
self.hs = hs
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 351bb3c084..16bac95331 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -32,6 +32,15 @@ logger = logging.getLogger(__name__)
class FederationHandler(BaseHandler):
"""Handles events that originated from federation."""
+ def __init__(self, hs):
+ super(FederationHandler, self).__init__(hs)
+
+ self.distributor.observe(
+ "user_joined_room",
+ self._on_user_joined
+ )
+
+ self.waiting_for_join_list = {}
@log_function
@defer.inlineCallbacks
@@ -103,6 +112,13 @@ class FederationHandler(BaseHandler):
if not backfilled:
yield self.notifier.on_new_room_event(event, store_id)
+ if event.type == RoomMemberEvent.TYPE:
+ if event.membership == Membership.JOIN:
+ user = self.hs.parse_userid(event.target_user_id)
+ self.distributor.fire(
+ "user_joined_room", user=user, room_id=event.room_id
+ )
+
@log_function
@defer.inlineCallbacks
@@ -152,8 +168,10 @@ class FederationHandler(BaseHandler):
yield federation.handle_new_event(new_event)
- store_id = yield self.store.persist_event(new_event)
- self.notifier.on_new_room_event(new_event, store_id)
+ # TODO (erikj): Time out here.
+ d = defer.Deferred()
+ self.waiting_for_join_list.setdefault((joinee, room_id), []).append(d)
+ yield d
try:
yield self.store.store_room(
@@ -166,3 +184,10 @@ class FederationHandler(BaseHandler):
defer.returnValue(True)
+
+
+ @log_function
+ def _on_user_joined(self, user, room_id):
+ waiters = self.waiting_for_join_list.get((user.to_string(), room_id), [])
+ while waiters:
+ waiters.pop().callback(None)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 540e114b82..c88cc18788 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -142,6 +142,10 @@ class PresenceHandler(BaseHandler):
@defer.inlineCallbacks
def is_presence_visible(self, observer_user, observed_user):
+ defer.returnValue(True)
+ return
+ # FIXME (erikj): This code path absolutely kills the database.
+
assert(observed_user.is_mine)
if observer_user == observed_user:
@@ -187,6 +191,10 @@ class PresenceHandler(BaseHandler):
@defer.inlineCallbacks
def set_state(self, target_user, auth_user, state):
+ return
+ # TODO (erikj): Turn this back on. Why did we end up sending EDUs
+ # everywhere?
+
if not target_user.is_mine:
raise SynapseError(400, "User is not hosted on this Home Server")
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 049b4884af..5489de841f 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -24,6 +24,7 @@ from synapse.api.events.room import (
RoomConfigEvent
)
from synapse.api.streams.event import EventStream, EventsStreamData
+from synapse.handlers.presence import PresenceStreamData
from synapse.util import stringutils
from ._base import BaseHandler
@@ -257,21 +258,38 @@ class MessageHandler(BaseHandler):
membership_list=[Membership.INVITE, Membership.JOIN]
)
- ret = []
+ rooms_ret = []
+
+ now_rooms_token = yield self.store.get_room_events_max_id()
+
+ # FIXME (erikj): Fix this.
+ presence_stream = PresenceStreamData(self.hs)
+ now_presence_token = yield presence_stream.max_token()
+ presence = yield presence_stream.get_rows(
+ user_id, 0, now_presence_token, None, None
+ )
+
+ # FIXME (erikj): We need to not generate this token,
+ now_token = "%s_%s" % (now_rooms_token, now_presence_token)
for event in room_list:
d = {
"room_id": event.room_id,
"membership": event.membership,
}
- ret.append(d)
+
+ if event.membership == Membership.INVITE:
+ d["inviter"] = event.user_id
+
+ rooms_ret.append(d)
if event.membership != Membership.JOIN:
continue
try:
messages, token = yield self.store.get_recent_events_for_room(
event.room_id,
- limit=50,
+ limit=10,
+ end_token=now_rooms_token,
)
d["messages"] = {
@@ -279,10 +297,17 @@ class MessageHandler(BaseHandler):
"start": token[0],
"end": token[1],
}
+
+ current_state = yield self.store.get_current_state(event.room_id)
+ d["state"] = [c.get_dict() for c in current_state]
except:
logger.exception("Failed to get snapshot")
- logger.debug("snapshot_all_rooms returning: %s", ret)
+ user = self.hs.parse_userid(user_id)
+
+ ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token}
+
+ # logger.debug("snapshot_all_rooms returning: %s", ret)
defer.returnValue(ret)
|