summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/_base.py2
-rw-r--r--synapse/handlers/events.py6
-rw-r--r--synapse/handlers/federation.py51
-rw-r--r--synapse/handlers/message.py7
-rw-r--r--synapse/handlers/presence.py228
-rw-r--r--synapse/handlers/room.py3
-rw-r--r--synapse/handlers/typing.py3
7 files changed, 166 insertions, 134 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index f141e92ce2..b37c8be964 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -35,7 +35,7 @@ class BaseRoomHandler(BaseHandler):
                            extra_users=[]):
         snapshot.fill_out_prev_events(event)
 
-        store_id = yield self.store.persist_event(event)
+        yield self.store.persist_event(event)
 
         destinations = set(extra_destinations)
         # Send a PDU to all hosts who have joined the room.
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index e08231406d..980a169b25 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -16,6 +16,7 @@
 from twisted.internet import defer
 
 from synapse.api.events import SynapseEvent
+from synapse.util.logutils import log_function
 
 from ._base import BaseHandler
 
@@ -44,6 +45,7 @@ class EventStreamHandler(BaseHandler):
         self.notifier = hs.get_notifier()
 
     @defer.inlineCallbacks
+    @log_function
     def get_stream(self, auth_user_id, pagin_config, timeout=0):
         auth_user = self.hs.parse_userid(auth_user_id)
 
@@ -90,13 +92,15 @@ class EventStreamHandler(BaseHandler):
                 # 10 seconds of grace to allow the client to reconnect again
                 #   before we think they're gone
                 def _later():
+                    logger.debug("_later stopped_user_eventstream %s", auth_user)
                     self.distributor.fire(
                         "stopped_user_eventstream", auth_user
                     )
                     del self._stop_timer_per_user[auth_user]
 
+                logger.debug("Scheduling _later: for %s", auth_user)
                 self._stop_timer_per_user[auth_user] = (
-                    self.clock.call_later(5, _later)
+                    self.clock.call_later(30, _later)
                 )
 
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 9023c3d403..eac110419c 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -22,8 +22,6 @@ from synapse.api.constants import Membership
 from synapse.util.logutils import log_function
 from synapse.federation.pdu_codec import PduCodec
 
-from synapse.api.errors import AuthError
-
 from twisted.internet import defer
 
 import logging
@@ -87,12 +85,6 @@ class FederationHandler(BaseHandler):
         yield self.replication_layer.send_pdu(pdu)
 
     @log_function
-    def get_state_for_room(self, destination, room_id):
-        return self.replication_layer.get_state_for_context(
-            destination, room_id
-        )
-
-    @log_function
     @defer.inlineCallbacks
     def on_receive_pdu(self, pdu, backfilled):
         """ Called by the ReplicationLayer when we have a new pdu. We need to
@@ -141,19 +133,19 @@ class FederationHandler(BaseHandler):
 
             yield self.hs.get_handlers().room_member_handler.change_membership(
                 new_event,
-                True
+                do_auth=True
             )
 
         else:
             with (yield self.room_lock.lock(event.room_id)):
-                store_id = yield self.store.persist_event(event, backfilled)
+                yield self.store.persist_event(event, backfilled)
 
             room = yield self.store.get_room(event.room_id)
 
             if not room:
                 # Huh, let's try and get the current state
                 try:
-                    yield self.get_state_for_room(
+                    yield self.replication_layer.get_state_for_context(
                         event.origin, event.room_id
                     )
 
@@ -163,9 +155,9 @@ class FederationHandler(BaseHandler):
                     if self.hs.hostname in hosts:
                         try:
                             yield self.store.store_room(
-                                event.room_id,
-                                "",
-                                is_public=False
+                                room_id=event.room_id,
+                                room_creator_user_id="",
+                                is_public=False,
                             )
                         except:
                             pass
@@ -188,27 +180,14 @@ class FederationHandler(BaseHandler):
     @log_function
     @defer.inlineCallbacks
     def backfill(self, dest, room_id, limit):
-        events = yield self._backfill(dest, room_id, limit)
-
-        for event in events:
-            try:
-                yield self.store.persist_event(event, backfilled=True)
-            except:
-                logger.exception("Failed to persist event: %s", event)
-
-        defer.returnValue(events)
-
-    @defer.inlineCallbacks
-    def _backfill(self, dest, room_id, limit):
         pdus = yield self.replication_layer.backfill(dest, room_id, limit)
 
-        if not pdus:
-            defer.returnValue([])
+        events = []
 
-        events = [
-            self.pdu_codec.event_from_pdu(pdu)
-            for pdu in pdus
-        ]
+        for pdu in pdus:
+            event = self.pdu_codec.event_from_pdu(pdu)
+            events.append(event)
+            yield self.store.persist_event(event, backfilled=True)
 
         defer.returnValue(events)
 
@@ -224,7 +203,9 @@ class FederationHandler(BaseHandler):
 
         # First get current state to see if we are already joined.
         try:
-            yield self.get_state_for_room(target_host, room_id)
+            yield self.replication_layer.get_state_for_context(
+                target_host, room_id
+            )
 
             hosts = yield self.store.get_joined_hosts_for_room(room_id)
             if self.hs.hostname in hosts:
@@ -254,8 +235,8 @@ class FederationHandler(BaseHandler):
 
         try:
             yield self.store.store_room(
-                room_id,
-                "",
+                room_id=room_id,
+                room_creator_user_id="",
                 is_public=False
             )
         except:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 32548e66fb..3d7f97bcff 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -277,10 +277,13 @@ class MessageHandler(BaseRoomHandler):
                     end_token=now_token.events_key,
                 )
 
+                start_token = now_token.copy_and_replace("events_key", token[0])
+                end_token = now_token.copy_and_replace("events_key", token[1])
+
                 d["messages"] = {
                     "chunk": [m.get_dict() for m in messages],
-                    "start": token[0],
-                    "end": token[1],
+                    "start": start_token.to_string(),
+                    "end": end_token.to_string(),
                 }
 
                 current_state = yield self.store.get_current_state(
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index c479908f61..7731de85c0 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -18,6 +18,8 @@ from twisted.internet import defer
 from synapse.api.errors import SynapseError, AuthError
 from synapse.api.constants import PresenceState
 
+from synapse.util.logutils import log_function
+
 from ._base import BaseHandler
 
 import logging
@@ -142,7 +144,7 @@ class PresenceHandler(BaseHandler):
     @defer.inlineCallbacks
     def is_presence_visible(self, observer_user, observed_user):
         defer.returnValue(True)
-        return
+        # return
         # FIXME (erikj): This code path absolutely kills the database.
 
         assert(observed_user.is_mine)
@@ -188,8 +190,9 @@ class PresenceHandler(BaseHandler):
         defer.returnValue(state)
 
     @defer.inlineCallbacks
+    @log_function
     def set_state(self, target_user, auth_user, state):
-        return
+        # return
         # TODO (erikj): Turn this back on. Why did we end up sending EDUs
         # everywhere?
 
@@ -245,33 +248,42 @@ class PresenceHandler(BaseHandler):
 
         self.push_presence(user, statuscache=statuscache)
 
+    @log_function
     def started_user_eventstream(self, user):
         # TODO(paul): Use "last online" state
         self.set_state(user, user, {"state": PresenceState.ONLINE})
 
+    @log_function
     def stopped_user_eventstream(self, user):
         # TODO(paul): Save current state as "last online" state
         self.set_state(user, user, {"state": PresenceState.OFFLINE})
 
     @defer.inlineCallbacks
     def user_joined_room(self, user, room_id):
-        localusers = set()
-        remotedomains = set()
-
-        rm_handler = self.homeserver.get_handlers().room_member_handler
-        yield rm_handler.fetch_room_distributions_into(room_id,
-                localusers=localusers, remotedomains=remotedomains,
-                ignore_user=user)
 
         if user.is_mine:
-            yield self._send_presence_to_distribution(srcuser=user,
-                localusers=localusers, remotedomains=remotedomains,
+            self.push_update_to_local_and_remote(
+                observed_user=user,
+                room_ids=[room_id],
+                statuscache=self._get_or_offline_usercache(user),
+            )
+
+        else:
+            self.push_update_to_clients(
+                observed_user=user,
+                room_ids=[room_id],
                 statuscache=self._get_or_offline_usercache(user),
             )
 
-        for srcuser in localusers:
-            yield self._send_presence(srcuser=srcuser, destuser=user,
-                statuscache=self._get_or_offline_usercache(srcuser),
+        # We also want to tell them about current presence of people.
+        rm_handler = self.homeserver.get_handlers().room_member_handler
+        curr_users = yield rm_handler.get_room_members(room_id)
+
+        for local_user in [c for c in curr_users if c.is_mine]:
+            self.push_update_to_local_and_remote(
+                observed_user=local_user,
+                users_to_push=[user],
+                statuscache=self._get_or_offline_usercache(local_user),
             )
 
     @defer.inlineCallbacks
@@ -382,11 +394,13 @@ class PresenceHandler(BaseHandler):
         defer.returnValue(presence)
 
     @defer.inlineCallbacks
+    @log_function
     def start_polling_presence(self, user, target_user=None, state=None):
         logger.debug("Start polling for presence from %s", user)
 
         if target_user:
             target_users = set([target_user])
+            room_ids = []
         else:
             presence = yield self.store.get_presence_list(
                 user.localpart, accepted=True
@@ -400,23 +414,37 @@ class PresenceHandler(BaseHandler):
             rm_handler = self.homeserver.get_handlers().room_member_handler
             room_ids = yield rm_handler.get_rooms_for_user(user)
 
-            for room_id in room_ids:
-                for member in (yield rm_handler.get_room_members(room_id)):
-                    target_users.add(member)
-
         if state is None:
             state = yield self.store.get_presence_state(user.localpart)
-
-        localusers, remoteusers = partitionbool(
-            target_users,
-            lambda u: u.is_mine
+        else:
+#            statuscache = self._get_or_make_usercache(user)
+#            self._user_cachemap_latest_serial += 1
+#            statuscache.update(state, self._user_cachemap_latest_serial)
+            pass
+
+        yield self.push_update_to_local_and_remote(
+            observed_user=user,
+            users_to_push=target_users,
+            room_ids=room_ids,
+            statuscache=self._get_or_make_usercache(user),
         )
 
-        for target_user in localusers:
-            self._start_polling_local(user, target_user)
+        for target_user in target_users:
+            if target_user.is_mine:
+                self._start_polling_local(user, target_user)
+
+                # We want to tell the person that just came online
+                # presence state of people they are interested in?
+                self.push_update_to_clients(
+                    observed_user=target_user,
+                    users_to_push=[user],
+                    statuscache=self._get_or_offline_usercache(target_user),
+                )
 
         deferreds = []
-        remoteusers_by_domain = partition(remoteusers, lambda u: u.domain)
+        remote_users = [u for u in target_users if not u.is_mine]
+        remoteusers_by_domain = partition(remote_users, lambda u: u.domain)
+        # Only poll for people in our get_presence_list
         for domain in remoteusers_by_domain:
             remoteusers = remoteusers_by_domain[domain]
 
@@ -438,25 +466,26 @@ class PresenceHandler(BaseHandler):
 
         self._local_pushmap[target_localpart].add(user)
 
-        self.push_update_to_clients(
-            observer_user=user,
-            observed_user=target_user,
-            statuscache=self._get_or_offline_usercache(target_user),
-        )
-
     def _start_polling_remote(self, user, domain, remoteusers):
+        to_poll = set()
+
         for u in remoteusers:
             if u not in self._remote_recvmap:
                 self._remote_recvmap[u] = set()
+                to_poll.add(u)
 
             self._remote_recvmap[u].add(user)
 
+        if not to_poll:
+            return defer.succeed(None)
+
         return self.federation.send_edu(
             destination=domain,
             edu_type="m.presence",
-            content={"poll": [u.to_string() for u in remoteusers]}
+            content={"poll": [u.to_string() for u in to_poll]}
         )
 
+    @log_function
     def stop_polling_presence(self, user, target_user=None):
         logger.debug("Stop polling for presence from %s", user)
 
@@ -496,20 +525,28 @@ class PresenceHandler(BaseHandler):
             if not self._local_pushmap[localpart]:
                 del self._local_pushmap[localpart]
 
+    @log_function
     def _stop_polling_remote(self, user, domain, remoteusers):
+        to_unpoll = set()
+
         for u in remoteusers:
             self._remote_recvmap[u].remove(user)
 
             if not self._remote_recvmap[u]:
                 del self._remote_recvmap[u]
+                to_unpoll.add(u)
+
+        if not to_unpoll:
+            return defer.succeed(None)
 
         return self.federation.send_edu(
             destination=domain,
             edu_type="m.presence",
-            content={"unpoll": [u.to_string() for u in remoteusers]}
+            content={"unpoll": [u.to_string() for u in to_unpoll]}
         )
 
     @defer.inlineCallbacks
+    @log_function
     def push_presence(self, user, statuscache):
         assert(user.is_mine)
 
@@ -525,53 +562,17 @@ class PresenceHandler(BaseHandler):
         rm_handler = self.homeserver.get_handlers().room_member_handler
         room_ids = yield rm_handler.get_rooms_for_user(user)
 
-        for room_id in room_ids:
-            yield rm_handler.fetch_room_distributions_into(
-                room_id, localusers=localusers, remotedomains=remotedomains,
-                ignore_user=user,
-            )
-
-        if not localusers and not remotedomains:
+        if not localusers and not room_ids:
             defer.returnValue(None)
 
-        yield self._send_presence_to_distribution(user,
-            localusers=localusers, remotedomains=remotedomains,
-            statuscache=statuscache
+        yield self.push_update_to_local_and_remote(
+            observed_user=user,
+            users_to_push=localusers,
+            remote_domains=remotedomains,
+            room_ids=room_ids,
+            statuscache=statuscache,
         )
 
-    def _send_presence(self, srcuser, destuser, statuscache):
-        if destuser.is_mine:
-            self.push_update_to_clients(
-                observer_user=destuser,
-                observed_user=srcuser,
-                statuscache=statuscache)
-            return defer.succeed(None)
-        else:
-            return self._push_presence_remote(srcuser, destuser.domain,
-                state=statuscache.get_state()
-            )
-
-    @defer.inlineCallbacks
-    def _send_presence_to_distribution(self, srcuser, localusers=set(),
-            remotedomains=set(), statuscache=None):
-
-        for u in localusers:
-            logger.debug(" | push to local user %s", u)
-            self.push_update_to_clients(
-                observer_user=u,
-                observed_user=srcuser,
-                statuscache=statuscache,
-            )
-
-        deferreds = []
-        for domain in remotedomains:
-            logger.debug(" | push to remote domain %s", domain)
-            deferreds.append(self._push_presence_remote(srcuser, domain,
-                state=statuscache.get_state())
-            )
-
-        yield defer.DeferredList(deferreds)
-
     @defer.inlineCallbacks
     def _push_presence_remote(self, user, destination, state=None):
         if state is None:
@@ -587,12 +588,17 @@ class PresenceHandler(BaseHandler):
                 self.clock.time_msec() - state.pop("mtime")
             )
 
+        user_state = {
+            "user_id": user.to_string(),
+        }
+        user_state.update(**state)
+
         yield self.federation.send_edu(
             destination=destination,
             edu_type="m.presence",
             content={
                 "push": [
-                    dict(user_id=user.to_string(), **state),
+                    user_state,
                 ],
             }
         )
@@ -611,12 +617,7 @@ class PresenceHandler(BaseHandler):
             rm_handler = self.homeserver.get_handlers().room_member_handler
             room_ids = yield rm_handler.get_rooms_for_user(user)
 
-            for room_id in room_ids:
-                yield rm_handler.fetch_room_distributions_into(
-                    room_id, localusers=observers, ignore_user=user
-                )
-
-            if not observers:
+            if not observers and not room_ids:
                 break
 
             state = dict(push)
@@ -632,12 +633,12 @@ class PresenceHandler(BaseHandler):
             self._user_cachemap_latest_serial += 1
             statuscache.update(state, serial=self._user_cachemap_latest_serial)
 
-            for observer_user in observers:
-                self.push_update_to_clients(
-                    observer_user=observer_user,
-                    observed_user=user,
-                    statuscache=statuscache,
-                )
+            self.push_update_to_clients(
+                observed_user=user,
+                users_to_push=observers,
+                room_ids=room_ids,
+                statuscache=statuscache,
+            )
 
             if state["state"] == PresenceState.OFFLINE:
                 del self._user_cachemap[user]
@@ -671,12 +672,53 @@ class PresenceHandler(BaseHandler):
 
         yield defer.DeferredList(deferreds)
 
-    def push_update_to_clients(self, observer_user, observed_user,
-                               statuscache):
-        statuscache.make_event(user=observed_user, clock=self.clock)
+    @defer.inlineCallbacks
+    def push_update_to_local_and_remote(self, observed_user,
+                                        users_to_push=[], room_ids=[],
+                                        remote_domains=[],
+                                        statuscache=None):
+
+        localusers, remoteusers = partitionbool(
+            users_to_push,
+            lambda u: u.is_mine
+        )
+
+        localusers = set(localusers)
+
+        self.push_update_to_clients(
+            observed_user=observed_user,
+            users_to_push=localusers,
+            room_ids=room_ids,
+            statuscache=statuscache,
+        )
+
+        remote_domains = set(remote_domains)
+        remote_domains |= set([r.domain for r in remoteusers])
+        for room_id in room_ids:
+            remote_domains.update(
+                (yield self.store.get_joined_hosts_for_room(room_id))
+            )
+
+        remote_domains.discard(self.hs.hostname)
+
+        deferreds = []
+        for domain in remote_domains:
+            logger.debug(" | push to remote domain %s", domain)
+            deferreds.append(
+                self._push_presence_remote(
+                    observed_user, domain, state=statuscache.get_state()
+                )
+            )
+
+        yield defer.DeferredList(deferreds)
+
+        defer.returnValue((localusers, remote_domains))
 
+    def push_update_to_clients(self, observed_user, users_to_push=[],
+                                 room_ids=[], statuscache=None):
         self.notifier.on_new_user_event(
-            [observer_user],
+            users_to_push,
+            room_ids,
         )
 
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 1ced7d0613..d46bc308b4 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -360,7 +360,8 @@ class RoomMemberHandler(BaseRoomHandler):
         )
 
         snapshot = yield self.store.snapshot_room(
-            room_id, joinee, RoomMemberEvent.TYPE, joinee
+            room_id, joinee.to_string(), RoomMemberEvent.TYPE,
+            joinee.to_string()
         )
 
         yield self._do_join(new_event, snapshot, room_host=host, do_auth=True)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 9d38a7336e..9fab0ff37c 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -17,11 +17,12 @@ from twisted.internet import defer
 
 from ._base import BaseHandler
 
+from synapse.api.errors import SynapseError, AuthError
+
 import logging
 
 from collections import namedtuple
 
-
 logger = logging.getLogger(__name__)