summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/handlers/_base.py1
-rw-r--r--synapse/handlers/federation.py29
2 files changed, 28 insertions, 2 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index c2f4685c92..3f07b5aa4a 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -24,4 +24,5 @@ class BaseHandler(object):
         self.notifier = hs.get_notifier()
         self.room_lock = hs.get_room_lock_manager()
         self.state_handler = hs.get_state_handler()
+        self.distributor = hs.get_distributor()
         self.hs = hs
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index aa3bf273f7..9cff444779 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -32,6 +32,15 @@ logger = logging.getLogger(__name__)
 class FederationHandler(BaseHandler):
 
     """Handles events that originated from federation."""
+    def __init__(self, hs):
+        super(FederationHandler, self).__init__(hs)
+
+        self.distributor.observe(
+            "user_joined_room",
+            self._on_user_joined
+        )
+
+        self.waiting_for_join_list = {}
 
     @log_function
     @defer.inlineCallbacks
@@ -103,6 +112,13 @@ class FederationHandler(BaseHandler):
             if not backfilled:
                 yield self.notifier.on_new_room_event(event, store_id)
 
+        if event.type == RoomMemberEvent.TYPE:
+            if event.membership == Membership.JOIN:
+                user = self.hs.parse_userid(event.target_user_id)
+                self.distributor.fire(
+                    "user_joined_room", user=user, room_id=event.room_id
+                )
+
 
     @log_function
     @defer.inlineCallbacks
@@ -152,8 +168,10 @@ class FederationHandler(BaseHandler):
 
         yield federation.handle_new_event(new_event)
 
-        store_id = yield self.store.persist_event(new_event)
-        self.notifier.on_new_room_event(new_event, store_id)
+        # TODO (erikj): Time out here.
+        d = defer.Deferred()
+        self.waiting_for_join_list.setdefault((joinee, room_id), []).append(d)
+        yield d
 
         try:
             yield self.store.store_room(
@@ -166,3 +184,10 @@ class FederationHandler(BaseHandler):
 
 
         defer.returnValue(True)
+
+
+    @log_function
+    def _on_user_joined(self, user, room_id):
+        waiters = self.waiting_for_join_list.get((user.to_string(), room_id), [])
+        while waiters:
+            waiters.pop().callback(None)