summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/_base.py9
-rw-r--r--synapse/handlers/directory.py3
-rw-r--r--synapse/handlers/events.py79
-rw-r--r--synapse/handlers/federation.py29
-rw-r--r--synapse/handlers/message.py45
-rw-r--r--synapse/handlers/presence.py28
-rw-r--r--synapse/handlers/profile.py3
-rw-r--r--synapse/handlers/room.py40
-rw-r--r--synapse/handlers/typing.py21
9 files changed, 158 insertions, 99 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 38af034b4d..1773fa20aa 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -19,6 +19,7 @@ from synapse.api.errors import LimitExceededError, SynapseError
 from synapse.util.async import run_on_reactor
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.api.constants import Membership, EventTypes
+from synapse.types import UserID
 
 import logging
 
@@ -113,7 +114,7 @@ class BaseHandler(object):
 
         if event.type == EventTypes.Member:
             if event.content["membership"] == Membership.INVITE:
-                invitee = self.hs.parse_userid(event.state_key)
+                invitee = UserID.from_string(event.state_key)
                 if not self.hs.is_mine(invitee):
                     # TODO: Can we add signature from remote server in a nicer
                     # way? If we have been invited by a remote server, we need
@@ -134,7 +135,7 @@ class BaseHandler(object):
                 if k[0] == EventTypes.Member:
                     if s.content["membership"] == Membership.JOIN:
                         destinations.add(
-                            self.hs.parse_userid(s.state_key).domain
+                            UserID.from_string(s.state_key).domain
                         )
             except SynapseError:
                 logger.warn(
@@ -144,7 +145,5 @@ class BaseHandler(object):
         yield self.notifier.on_new_room_event(event, extra_users=extra_users)
 
         yield federation_handler.handle_new_event(
-            event,
-            None,
-            destinations=destinations,
+            event, destinations=destinations,
         )
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 91fceda2ac..58e9a91562 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -19,6 +19,7 @@ from ._base import BaseHandler
 
 from synapse.api.errors import SynapseError, Codes, CodeMessageException
 from synapse.api.constants import EventTypes
+from synapse.types import RoomAlias
 
 import logging
 
@@ -122,7 +123,7 @@ class DirectoryHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def on_directory_query(self, args):
-        room_alias = self.hs.parse_roomalias(args["room_alias"])
+        room_alias = RoomAlias.from_string(args["room_alias"])
         if not self.hs.is_mine(room_alias):
             raise SynapseError(
                 400, "Room Alias is not hosted on this Home Server"
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 808219bd10..025e7e7e62 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -17,6 +17,8 @@ from twisted.internet import defer
 
 from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.logutils import log_function
+from synapse.types import UserID
+from synapse.events.utils import serialize_event
 
 from ._base import BaseHandler
 
@@ -46,39 +48,43 @@ class EventStreamHandler(BaseHandler):
 
     @defer.inlineCallbacks
     @log_function
-    def get_stream(self, auth_user_id, pagin_config, timeout=0):
-        auth_user = self.hs.parse_userid(auth_user_id)
+    def get_stream(self, auth_user_id, pagin_config, timeout=0,
+                   as_client_event=True, affect_presence=True):
+        auth_user = UserID.from_string(auth_user_id)
 
         try:
-            if auth_user not in self._streams_per_user:
-                self._streams_per_user[auth_user] = 0
-                if auth_user in self._stop_timer_per_user:
-                    try:
-                        self.clock.cancel_call_later(
-                            self._stop_timer_per_user.pop(auth_user)
+            if affect_presence:
+                if auth_user not in self._streams_per_user:
+                    self._streams_per_user[auth_user] = 0
+                    if auth_user in self._stop_timer_per_user:
+                        try:
+                            self.clock.cancel_call_later(
+                                self._stop_timer_per_user.pop(auth_user)
+                            )
+                        except:
+                            logger.exception("Failed to cancel event timer")
+                    else:
+                        yield self.distributor.fire(
+                            "started_user_eventstream", auth_user
                         )
-                    except:
-                        logger.exception("Failed to cancel event timer")
-                else:
-                    yield self.distributor.fire(
-                        "started_user_eventstream", auth_user
-                    )
-            self._streams_per_user[auth_user] += 1
+                self._streams_per_user[auth_user] += 1
 
             if pagin_config.from_token is None:
                 pagin_config.from_token = None
 
             rm_handler = self.hs.get_handlers().room_member_handler
-            logger.debug("BETA")
             room_ids = yield rm_handler.get_rooms_for_user(auth_user)
 
-            logger.debug("ALPHA")
             with PreserveLoggingContext():
                 events, tokens = yield self.notifier.get_events_for(
                     auth_user, room_ids, pagin_config, timeout
                 )
 
-            chunks = [self.hs.serialize_event(e) for e in events]
+            time_now = self.clock.time_msec()
+
+            chunks = [
+                serialize_event(e, time_now, as_client_event) for e in events
+            ]
 
             chunk = {
                 "chunk": chunks,
@@ -89,27 +95,28 @@ class EventStreamHandler(BaseHandler):
             defer.returnValue(chunk)
 
         finally:
-            self._streams_per_user[auth_user] -= 1
-            if not self._streams_per_user[auth_user]:
-                del self._streams_per_user[auth_user]
-
-                # 10 seconds of grace to allow the client to reconnect again
-                #   before we think they're gone
-                def _later():
-                    logger.debug(
-                        "_later stopped_user_eventstream %s", auth_user
-                    )
+            if affect_presence:
+                self._streams_per_user[auth_user] -= 1
+                if not self._streams_per_user[auth_user]:
+                    del self._streams_per_user[auth_user]
+
+                    # 10 seconds of grace to allow the client to reconnect again
+                    #   before we think they're gone
+                    def _later():
+                        logger.debug(
+                            "_later stopped_user_eventstream %s", auth_user
+                        )
 
-                    self._stop_timer_per_user.pop(auth_user, None)
+                        self._stop_timer_per_user.pop(auth_user, None)
 
-                    yield self.distributor.fire(
-                        "stopped_user_eventstream", auth_user
-                    )
+                        return self.distributor.fire(
+                            "stopped_user_eventstream", auth_user
+                        )
 
-                logger.debug("Scheduling _later: for %s", auth_user)
-                self._stop_timer_per_user[auth_user] = (
-                    self.clock.call_later(30, _later)
-                )
+                    logger.debug("Scheduling _later: for %s", auth_user)
+                    self._stop_timer_per_user[auth_user] = (
+                        self.clock.call_later(30, _later)
+                    )
 
 
 class EventHandler(BaseHandler):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d26975a88a..bcdcc90a18 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -28,6 +28,7 @@ from synapse.crypto.event_signing import (
     compute_event_signature, check_event_content_hash,
     add_hashes_and_signatures,
 )
+from synapse.types import UserID
 from syutil.jsonutil import encode_canonical_json
 
 from twisted.internet import defer
@@ -75,14 +76,14 @@ class FederationHandler(BaseHandler):
 
     @log_function
     @defer.inlineCallbacks
-    def handle_new_event(self, event, snapshot, destinations):
+    def handle_new_event(self, event, destinations):
         """ Takes in an event from the client to server side, that has already
         been authed and handled by the state module, and sends it to any
         remote home servers that may be interested.
 
         Args:
-            event
-            snapshot (.storage.Snapshot): THe snapshot the event happened after
+            event: The event to send
+            destinations: A list of destinations to send it to
 
         Returns:
             Deferred: Resolved when it has successfully been queued for
@@ -154,7 +155,7 @@ class FederationHandler(BaseHandler):
             replication = self.replication_layer
 
             if not state:
-                state, auth_chain = yield replication.get_state_for_context(
+                state, auth_chain = yield replication.get_state_for_room(
                     origin, context=event.room_id, event_id=event.event_id,
                 )
 
@@ -227,7 +228,7 @@ class FederationHandler(BaseHandler):
             extra_users = []
             if event.type == EventTypes.Member:
                 target_user_id = event.state_key
-                target_user = self.hs.parse_userid(target_user_id)
+                target_user = UserID.from_string(target_user_id)
                 extra_users.append(target_user)
 
             yield self.notifier.on_new_room_event(
@@ -236,7 +237,7 @@ class FederationHandler(BaseHandler):
 
         if event.type == EventTypes.Member:
             if event.membership == Membership.JOIN:
-                user = self.hs.parse_userid(event.state_key)
+                user = UserID.from_string(event.state_key)
                 yield self.distributor.fire(
                     "user_joined_room", user=user, room_id=event.room_id
                 )
@@ -281,7 +282,7 @@ class FederationHandler(BaseHandler):
         """
         pdu = yield self.replication_layer.send_invite(
             destination=target_host,
-            context=event.room_id,
+            room_id=event.room_id,
             event_id=event.event_id,
             pdu=event
         )
@@ -491,7 +492,7 @@ class FederationHandler(BaseHandler):
         extra_users = []
         if event.type == EventTypes.Member:
             target_user_id = event.state_key
-            target_user = self.hs.parse_userid(target_user_id)
+            target_user = UserID.from_string(target_user_id)
             extra_users.append(target_user)
 
         yield self.notifier.on_new_room_event(
@@ -500,7 +501,7 @@ class FederationHandler(BaseHandler):
 
         if event.type == EventTypes.Member:
             if event.content["membership"] == Membership.JOIN:
-                user = self.hs.parse_userid(event.state_key)
+                user = UserID.from_string(event.state_key)
                 yield self.distributor.fire(
                     "user_joined_room", user=user, room_id=event.room_id
                 )
@@ -514,7 +515,7 @@ class FederationHandler(BaseHandler):
                 if k[0] == EventTypes.Member:
                     if s.content["membership"] == Membership.JOIN:
                         destinations.add(
-                            self.hs.parse_userid(s.state_key).domain
+                            UserID.from_string(s.state_key).domain
                         )
             except:
                 logger.warn(
@@ -565,7 +566,7 @@ class FederationHandler(BaseHandler):
             backfilled=False,
         )
 
-        target_user = self.hs.parse_userid(event.state_key)
+        target_user = UserID.from_string(event.state_key)
         yield self.notifier.on_new_room_event(
             event, extra_users=[target_user],
         )
@@ -617,13 +618,13 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     @log_function
-    def on_backfill_request(self, origin, context, pdu_list, limit):
-        in_room = yield self.auth.check_host_in_room(context, origin)
+    def on_backfill_request(self, origin, room_id, pdu_list, limit):
+        in_room = yield self.auth.check_host_in_room(room_id, origin)
         if not in_room:
             raise AuthError(403, "Host not in room.")
 
         events = yield self.store.get_backfill_events(
-            context,
+            room_id,
             pdu_list,
             limit
         )
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7195de98b5..9c3271fe88 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -18,8 +18,10 @@ from twisted.internet import defer
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import RoomError
 from synapse.streams.config import PaginationConfig
+from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.util.logcontext import PreserveLoggingContext
+from synapse.types import UserID
 
 from ._base import BaseHandler
 
@@ -67,7 +69,7 @@ class MessageHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def get_messages(self, user_id=None, room_id=None, pagin_config=None,
-                     feedback=False):
+                     feedback=False, as_client_event=True):
         """Get messages in a room.
 
         Args:
@@ -76,6 +78,7 @@ class MessageHandler(BaseHandler):
             pagin_config (synapse.api.streams.PaginationConfig): The pagination
             config rules to apply, if any.
             feedback (bool): True to get compressed feedback with the messages
+            as_client_event (bool): True to get events in client-server format.
         Returns:
             dict: Pagination API results
         """
@@ -88,7 +91,7 @@ class MessageHandler(BaseHandler):
                 yield self.hs.get_event_sources().get_current_token()
             )
 
-        user = self.hs.parse_userid(user_id)
+        user = UserID.from_string(user_id)
 
         events, next_key = yield data_source.get_pagination_rows(
             user, pagin_config.get_source_config("room"), room_id
@@ -98,8 +101,12 @@ class MessageHandler(BaseHandler):
             "room_key", next_key
         )
 
+        time_now = self.clock.time_msec()
+
         chunk = {
-            "chunk": [self.hs.serialize_event(e) for e in events],
+            "chunk": [
+                serialize_event(e, time_now, as_client_event) for e in events
+            ],
             "start": pagin_config.from_token.to_string(),
             "end": next_token.to_string(),
         }
@@ -127,13 +134,13 @@ class MessageHandler(BaseHandler):
         if ratelimit:
             self.ratelimit(builder.user_id)
         # TODO(paul): Why does 'event' not have a 'user' object?
-        user = self.hs.parse_userid(builder.user_id)
+        user = UserID.from_string(builder.user_id)
         assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
 
         if builder.type == EventTypes.Member:
             membership = builder.content.get("membership", None)
             if membership == Membership.JOIN:
-                joinee = self.hs.parse_userid(builder.state_key)
+                joinee = UserID.from_string(builder.state_key)
                 # If event doesn't include a display name, add one.
                 yield self.distributor.fire(
                     "collect_presencelike_data",
@@ -207,11 +214,12 @@ class MessageHandler(BaseHandler):
 
         # TODO: This is duplicating logic from snapshot_all_rooms
         current_state = yield self.state_handler.get_current_state(room_id)
-        defer.returnValue([self.hs.serialize_event(c) for c in current_state])
+        now = self.clock.time_msec()
+        defer.returnValue([serialize_event(c, now) for c in current_state])
 
     @defer.inlineCallbacks
     def snapshot_all_rooms(self, user_id=None, pagin_config=None,
-                           feedback=False):
+                           feedback=False, as_client_event=True):
         """Retrieve a snapshot of all rooms the user is invited or has joined.
 
         This snapshot may include messages for all rooms where the user is
@@ -222,6 +230,7 @@ class MessageHandler(BaseHandler):
             pagin_config (synapse.api.streams.PaginationConfig): The pagination
             config used to determine how many messages *PER ROOM* to return.
             feedback (bool): True to get feedback along with these messages.
+            as_client_event (bool): True to get events in client-server format.
         Returns:
             A list of dicts with "room_id" and "membership" keys for all rooms
             the user is currently invited or joined in on. Rooms where the user
@@ -233,7 +242,7 @@ class MessageHandler(BaseHandler):
             membership_list=[Membership.INVITE, Membership.JOIN]
         )
 
-        user = self.hs.parse_userid(user_id)
+        user = UserID.from_string(user_id)
 
         rooms_ret = []
 
@@ -278,9 +287,13 @@ class MessageHandler(BaseHandler):
 
                 start_token = now_token.copy_and_replace("room_key", token[0])
                 end_token = now_token.copy_and_replace("room_key", token[1])
+                time_now = self.clock.time_msec()
 
                 d["messages"] = {
-                    "chunk": [self.hs.serialize_event(m) for m in messages],
+                    "chunk": [
+                        serialize_event(m, time_now, as_client_event)
+                        for m in messages
+                    ],
                     "start": start_token.to_string(),
                     "end": end_token.to_string(),
                 }
@@ -289,7 +302,8 @@ class MessageHandler(BaseHandler):
                     event.room_id
                 )
                 d["state"] = [
-                    self.hs.serialize_event(c) for c in current_state
+                    serialize_event(c, time_now, as_client_event)
+                    for c in current_state
                 ]
             except:
                 logger.exception("Failed to get snapshot")
@@ -309,11 +323,12 @@ class MessageHandler(BaseHandler):
 
         # TODO(paul): I wish I was called with user objects not user_id
         #   strings...
-        auth_user = self.hs.parse_userid(user_id)
+        auth_user = UserID.from_string(user_id)
 
         # TODO: These concurrently
+        time_now = self.clock.time_msec()
         state_tuples = yield self.state_handler.get_current_state(room_id)
-        state = [self.hs.serialize_event(x) for x in state_tuples]
+        state = [serialize_event(x, time_now) for x in state_tuples]
 
         member_event = (yield self.store.get_room_member(
             user_id=user_id,
@@ -342,7 +357,7 @@ class MessageHandler(BaseHandler):
         for m in room_members:
             try:
                 member_presence = yield presence_handler.get_state(
-                    target_user=self.hs.parse_userid(m.user_id),
+                    target_user=UserID.from_string(m.user_id),
                     auth_user=auth_user,
                     as_event=True,
                 )
@@ -352,11 +367,13 @@ class MessageHandler(BaseHandler):
                     "Failed to get member presence of %r", m.user_id
                 )
 
+        time_now = self.clock.time_msec()
+
         defer.returnValue({
             "membership": member_event.membership,
             "room_id": room_id,
             "messages": {
-                "chunk": [self.hs.serialize_event(m) for m in messages],
+                "chunk": [serialize_event(m, time_now) for m in messages],
                 "start": start_token.to_string(),
                 "end": end_token.to_string(),
             },
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 8aeed99274..cd0798c2b0 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -20,6 +20,7 @@ from synapse.api.constants import PresenceState
 
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext
+from synapse.types import UserID
 
 from ._base import BaseHandler
 
@@ -86,6 +87,10 @@ class PresenceHandler(BaseHandler):
             "changed_presencelike_data", self.changed_presencelike_data
         )
 
+        # outbound signal from the presence module to advertise when a user's
+        # presence has changed
+        distributor.declare("user_presence_changed")
+
         self.distributor = distributor
 
         self.federation = hs.get_replication_layer()
@@ -96,22 +101,22 @@ class PresenceHandler(BaseHandler):
         self.federation.register_edu_handler(
             "m.presence_invite",
             lambda origin, content: self.invite_presence(
-                observed_user=hs.parse_userid(content["observed_user"]),
-                observer_user=hs.parse_userid(content["observer_user"]),
+                observed_user=UserID.from_string(content["observed_user"]),
+                observer_user=UserID.from_string(content["observer_user"]),
             )
         )
         self.federation.register_edu_handler(
             "m.presence_accept",
             lambda origin, content: self.accept_presence(
-                observed_user=hs.parse_userid(content["observed_user"]),
-                observer_user=hs.parse_userid(content["observer_user"]),
+                observed_user=UserID.from_string(content["observed_user"]),
+                observer_user=UserID.from_string(content["observer_user"]),
             )
         )
         self.federation.register_edu_handler(
             "m.presence_deny",
             lambda origin, content: self.deny_presence(
-                observed_user=hs.parse_userid(content["observed_user"]),
-                observer_user=hs.parse_userid(content["observer_user"]),
+                observed_user=UserID.from_string(content["observed_user"]),
+                observer_user=UserID.from_string(content["observer_user"]),
             )
         )
 
@@ -418,7 +423,7 @@ class PresenceHandler(BaseHandler):
         )
 
         for p in presence:
-            observed_user = self.hs.parse_userid(p.pop("observed_user_id"))
+            observed_user = UserID.from_string(p.pop("observed_user_id"))
             p["observed_user"] = observed_user
             p.update(self._get_or_offline_usercache(observed_user).get_state())
             if "last_active" in p:
@@ -441,7 +446,7 @@ class PresenceHandler(BaseHandler):
                 user.localpart, accepted=True
             )
             target_users = set([
-                self.hs.parse_userid(x["observed_user_id"]) for x in presence
+                UserID.from_string(x["observed_user_id"]) for x in presence
             ])
 
             # Also include people in all my rooms
@@ -603,6 +608,7 @@ class PresenceHandler(BaseHandler):
             room_ids=room_ids,
             statuscache=statuscache,
         )
+        yield self.distributor.fire("user_presence_changed", user, statuscache)
 
     @defer.inlineCallbacks
     def _push_presence_remote(self, user, destination, state=None):
@@ -646,7 +652,7 @@ class PresenceHandler(BaseHandler):
         deferreds = []
 
         for push in content.get("push", []):
-            user = self.hs.parse_userid(push["user_id"])
+            user = UserID.from_string(push["user_id"])
 
             logger.debug("Incoming presence update from %s", user)
 
@@ -694,7 +700,7 @@ class PresenceHandler(BaseHandler):
                 del self._user_cachemap[user]
 
         for poll in content.get("poll", []):
-            user = self.hs.parse_userid(poll)
+            user = UserID.from_string(poll)
 
             if not self.hs.is_mine(user):
                 continue
@@ -709,7 +715,7 @@ class PresenceHandler(BaseHandler):
             deferreds.append(self._push_presence_remote(user, origin))
 
         for unpoll in content.get("unpoll", []):
-            user = self.hs.parse_userid(unpoll)
+            user = UserID.from_string(unpoll)
 
             if not self.hs.is_mine(user):
                 continue
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 7777d3cc94..03b2159c53 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 from synapse.api.errors import SynapseError, AuthError, CodeMessageException
 from synapse.api.constants import EventTypes, Membership
 from synapse.util.logcontext import PreserveLoggingContext
+from synapse.types import UserID
 
 from ._base import BaseHandler
 
@@ -169,7 +170,7 @@ class ProfileHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def on_profile_query(self, args):
-        user = self.hs.parse_userid(args["user_id"])
+        user = UserID.from_string(args["user_id"])
         if not self.hs.is_mine(user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 59719a1fae..23821d321f 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -16,12 +16,14 @@
 """Contains functions for performing events on rooms."""
 from twisted.internet import defer
 
+from ._base import BaseHandler
+
 from synapse.types import UserID, RoomAlias, RoomID
 from synapse.api.constants import EventTypes, Membership, JoinRules
 from synapse.api.errors import StoreError, SynapseError
 from synapse.util import stringutils
 from synapse.util.async import run_on_reactor
-from ._base import BaseHandler
+from synapse.events.utils import serialize_event
 
 import logging
 
@@ -64,7 +66,7 @@ class RoomCreationHandler(BaseHandler):
         invite_list = config.get("invite", [])
         for i in invite_list:
             try:
-                self.hs.parse_userid(i)
+                UserID.from_string(i)
             except:
                 raise SynapseError(400, "Invalid user_id: %s" % (i,))
 
@@ -114,7 +116,7 @@ class RoomCreationHandler(BaseHandler):
                 servers=[self.hs.hostname],
             )
 
-        user = self.hs.parse_userid(user_id)
+        user = UserID.from_string(user_id)
         creation_events = self._create_events_for_new_room(
             user, room_id, is_public=is_public
         )
@@ -246,11 +248,9 @@ class RoomMemberHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def get_room_members(self, room_id):
-        hs = self.hs
-
         users = yield self.store.get_users_in_room(room_id)
 
-        defer.returnValue([hs.parse_userid(u) for u in users])
+        defer.returnValue([UserID.from_string(u) for u in users])
 
     @defer.inlineCallbacks
     def fetch_room_distributions_into(self, room_id, localusers=None,
@@ -295,8 +295,9 @@ class RoomMemberHandler(BaseHandler):
         yield self.auth.check_joined_room(room_id, user_id)
 
         member_list = yield self.store.get_room_members(room_id=room_id)
+        time_now = self.clock.time_msec()
         event_list = [
-            self.hs.serialize_event(entry)
+            serialize_event(entry, time_now)
             for entry in member_list
         ]
         chunk_data = {
@@ -368,7 +369,7 @@ class RoomMemberHandler(BaseHandler):
             )
 
             if prev_state and prev_state.membership == Membership.JOIN:
-                user = self.hs.parse_userid(event.user_id)
+                user = UserID.from_string(event.user_id)
                 self.distributor.fire(
                     "user_left_room", user=user, room_id=event.room_id
                 )
@@ -412,7 +413,7 @@ class RoomMemberHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def _do_join(self, event, context, room_host=None, do_auth=True):
-        joinee = self.hs.parse_userid(event.state_key)
+        joinee = UserID.from_string(event.state_key)
         # room_id = RoomID.from_string(event.room_id, self.hs)
         room_id = event.room_id
 
@@ -425,10 +426,22 @@ class RoomMemberHandler(BaseHandler):
             event.room_id,
             self.hs.hostname
         )
+        if not is_host_in_room:
+            # is *anyone* in the room?
+            room_member_keys = [
+                v for (k, v) in context.current_state.keys() if (
+                    k == "m.room.member"
+                )
+            ]
+            if len(room_member_keys) == 0:
+                # has the room been created so we can join it?
+                create_event = context.current_state.get(("m.room.create", ""))
+                if create_event:
+                    is_host_in_room = True
 
         if is_host_in_room:
             should_do_dance = False
-        elif room_host:
+        elif room_host:  # TODO: Shouldn't this be remote_room_host?
             should_do_dance = True
         else:
             # TODO(markjh): get prev_state from snapshot
@@ -442,7 +455,8 @@ class RoomMemberHandler(BaseHandler):
                 should_do_dance = not self.hs.is_mine(inviter)
                 room_host = inviter.domain
             else:
-                should_do_dance = False
+                # return the same error as join_room_alias does
+                raise SynapseError(404, "No known servers")
 
         if should_do_dance:
             handler = self.hs.get_handlers().federation_handler
@@ -463,7 +477,7 @@ class RoomMemberHandler(BaseHandler):
                 do_auth=do_auth,
             )
 
-        user = self.hs.parse_userid(event.user_id)
+        user = UserID.from_string(event.user_id)
         yield self.distributor.fire(
             "user_joined_room", user=user, room_id=room_id
         )
@@ -513,7 +527,7 @@ class RoomMemberHandler(BaseHandler):
                                     do_auth):
         yield run_on_reactor()
 
-        target_user = self.hs.parse_userid(event.state_key)
+        target_user = UserID.from_string(event.state_key)
 
         yield self.handle_new_client_event(
             event,
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index ab698b36e1..c69787005f 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 from ._base import BaseHandler
 
 from synapse.api.errors import SynapseError, AuthError
+from synapse.types import UserID
 
 import logging
 
@@ -83,9 +84,15 @@ class TypingNotificationHandler(BaseHandler):
         if member in self._member_typing_timer:
             self.clock.cancel_call_later(self._member_typing_timer[member])
 
+        def _cb():
+            logger.debug(
+                "%s has timed out in %s", target_user.to_string(), room_id
+            )
+            self._stopped_typing(member)
+
         self._member_typing_until[member] = until
         self._member_typing_timer[member] = self.clock.call_later(
-            timeout / 1000, lambda: self._stopped_typing(member)
+            timeout / 1000.0, _cb
         )
 
         if was_present:
@@ -114,6 +121,10 @@ class TypingNotificationHandler(BaseHandler):
 
         member = RoomMember(room_id=room_id, user=target_user)
 
+        if member in self._member_typing_timer:
+            self.clock.cancel_call_later(self._member_typing_timer[member])
+            del self._member_typing_timer[member]
+
         yield self._stopped_typing(member)
 
     @defer.inlineCallbacks
@@ -136,8 +147,10 @@ class TypingNotificationHandler(BaseHandler):
 
         del self._member_typing_until[member]
 
-        self.clock.cancel_call_later(self._member_typing_timer[member])
-        del self._member_typing_timer[member]
+        if member in self._member_typing_timer:
+            # Don't cancel it - either it already expired, or the real
+            # stopped_typing() will cancel it
+            del self._member_typing_timer[member]
 
     @defer.inlineCallbacks
     def _push_update(self, room_id, user, typing):
@@ -173,7 +186,7 @@ class TypingNotificationHandler(BaseHandler):
     @defer.inlineCallbacks
     def _recv_edu(self, origin, content):
         room_id = content["room_id"]
-        user = self.homeserver.parse_userid(content["user_id"])
+        user = UserID.from_string(content["user_id"])
 
         localusers = set()