diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index f33d17a31e..1773fa20aa 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -19,6 +19,7 @@ from synapse.api.errors import LimitExceededError, SynapseError
from synapse.util.async import run_on_reactor
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.api.constants import Membership, EventTypes
+from synapse.types import UserID
import logging
@@ -113,7 +114,7 @@ class BaseHandler(object):
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.INVITE:
- invitee = self.hs.parse_userid(event.state_key)
+ invitee = UserID.from_string(event.state_key)
if not self.hs.is_mine(invitee):
# TODO: Can we add signature from remote server in a nicer
# way? If we have been invited by a remote server, we need
@@ -134,7 +135,7 @@ class BaseHandler(object):
if k[0] == EventTypes.Member:
if s.content["membership"] == Membership.JOIN:
destinations.add(
- self.hs.parse_userid(s.state_key).domain
+ UserID.from_string(s.state_key).domain
)
except SynapseError:
logger.warn(
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 91fceda2ac..58e9a91562 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -19,6 +19,7 @@ from ._base import BaseHandler
from synapse.api.errors import SynapseError, Codes, CodeMessageException
from synapse.api.constants import EventTypes
+from synapse.types import RoomAlias
import logging
@@ -122,7 +123,7 @@ class DirectoryHandler(BaseHandler):
@defer.inlineCallbacks
def on_directory_query(self, args):
- room_alias = self.hs.parse_roomalias(args["room_alias"])
+ room_alias = RoomAlias.from_string(args["room_alias"])
if not self.hs.is_mine(room_alias):
raise SynapseError(
400, "Room Alias is not hosted on this Home Server"
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 103bc67c42..025e7e7e62 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -17,6 +17,8 @@ from twisted.internet import defer
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.logutils import log_function
+from synapse.types import UserID
+from synapse.events.utils import serialize_event
from ._base import BaseHandler
@@ -47,24 +49,25 @@ class EventStreamHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
def get_stream(self, auth_user_id, pagin_config, timeout=0,
- as_client_event=True):
- auth_user = self.hs.parse_userid(auth_user_id)
+ as_client_event=True, affect_presence=True):
+ auth_user = UserID.from_string(auth_user_id)
try:
- if auth_user not in self._streams_per_user:
- self._streams_per_user[auth_user] = 0
- if auth_user in self._stop_timer_per_user:
- try:
- self.clock.cancel_call_later(
- self._stop_timer_per_user.pop(auth_user)
+ if affect_presence:
+ if auth_user not in self._streams_per_user:
+ self._streams_per_user[auth_user] = 0
+ if auth_user in self._stop_timer_per_user:
+ try:
+ self.clock.cancel_call_later(
+ self._stop_timer_per_user.pop(auth_user)
+ )
+ except:
+ logger.exception("Failed to cancel event timer")
+ else:
+ yield self.distributor.fire(
+ "started_user_eventstream", auth_user
)
- except:
- logger.exception("Failed to cancel event timer")
- else:
- yield self.distributor.fire(
- "started_user_eventstream", auth_user
- )
- self._streams_per_user[auth_user] += 1
+ self._streams_per_user[auth_user] += 1
if pagin_config.from_token is None:
pagin_config.from_token = None
@@ -77,8 +80,10 @@ class EventStreamHandler(BaseHandler):
auth_user, room_ids, pagin_config, timeout
)
+ time_now = self.clock.time_msec()
+
chunks = [
- self.hs.serialize_event(e, as_client_event) for e in events
+ serialize_event(e, time_now, as_client_event) for e in events
]
chunk = {
@@ -90,27 +95,28 @@ class EventStreamHandler(BaseHandler):
defer.returnValue(chunk)
finally:
- self._streams_per_user[auth_user] -= 1
- if not self._streams_per_user[auth_user]:
- del self._streams_per_user[auth_user]
-
- # 10 seconds of grace to allow the client to reconnect again
- # before we think they're gone
- def _later():
- logger.debug(
- "_later stopped_user_eventstream %s", auth_user
- )
+ if affect_presence:
+ self._streams_per_user[auth_user] -= 1
+ if not self._streams_per_user[auth_user]:
+ del self._streams_per_user[auth_user]
+
+ # 10 seconds of grace to allow the client to reconnect again
+ # before we think they're gone
+ def _later():
+ logger.debug(
+ "_later stopped_user_eventstream %s", auth_user
+ )
- self._stop_timer_per_user.pop(auth_user, None)
+ self._stop_timer_per_user.pop(auth_user, None)
- yield self.distributor.fire(
- "stopped_user_eventstream", auth_user
- )
+ return self.distributor.fire(
+ "stopped_user_eventstream", auth_user
+ )
- logger.debug("Scheduling _later: for %s", auth_user)
- self._stop_timer_per_user[auth_user] = (
- self.clock.call_later(30, _later)
- )
+ logger.debug("Scheduling _later: for %s", auth_user)
+ self._stop_timer_per_user[auth_user] = (
+ self.clock.call_later(30, _later)
+ )
class EventHandler(BaseHandler):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 81203bf1a3..bcdcc90a18 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -28,6 +28,7 @@ from synapse.crypto.event_signing import (
compute_event_signature, check_event_content_hash,
add_hashes_and_signatures,
)
+from synapse.types import UserID
from syutil.jsonutil import encode_canonical_json
from twisted.internet import defer
@@ -227,7 +228,7 @@ class FederationHandler(BaseHandler):
extra_users = []
if event.type == EventTypes.Member:
target_user_id = event.state_key
- target_user = self.hs.parse_userid(target_user_id)
+ target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
yield self.notifier.on_new_room_event(
@@ -236,7 +237,7 @@ class FederationHandler(BaseHandler):
if event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
- user = self.hs.parse_userid(event.state_key)
+ user = UserID.from_string(event.state_key)
yield self.distributor.fire(
"user_joined_room", user=user, room_id=event.room_id
)
@@ -491,7 +492,7 @@ class FederationHandler(BaseHandler):
extra_users = []
if event.type == EventTypes.Member:
target_user_id = event.state_key
- target_user = self.hs.parse_userid(target_user_id)
+ target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
yield self.notifier.on_new_room_event(
@@ -500,7 +501,7 @@ class FederationHandler(BaseHandler):
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.JOIN:
- user = self.hs.parse_userid(event.state_key)
+ user = UserID.from_string(event.state_key)
yield self.distributor.fire(
"user_joined_room", user=user, room_id=event.room_id
)
@@ -514,7 +515,7 @@ class FederationHandler(BaseHandler):
if k[0] == EventTypes.Member:
if s.content["membership"] == Membership.JOIN:
destinations.add(
- self.hs.parse_userid(s.state_key).domain
+ UserID.from_string(s.state_key).domain
)
except:
logger.warn(
@@ -565,7 +566,7 @@ class FederationHandler(BaseHandler):
backfilled=False,
)
- target_user = self.hs.parse_userid(event.state_key)
+ target_user = UserID.from_string(event.state_key)
yield self.notifier.on_new_room_event(
event, extra_users=[target_user],
)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index f2a2f16933..6fbd2af4ab 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -18,8 +18,10 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import RoomError
from synapse.streams.config import PaginationConfig
+from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.util.logcontext import PreserveLoggingContext
+from synapse.types import UserID
from ._base import BaseHandler
@@ -89,7 +91,7 @@ class MessageHandler(BaseHandler):
yield self.hs.get_event_sources().get_current_token()
)
- user = self.hs.parse_userid(user_id)
+ user = UserID.from_string(user_id)
events, next_key = yield data_source.get_pagination_rows(
user, pagin_config.get_source_config("room"), room_id
@@ -99,9 +101,11 @@ class MessageHandler(BaseHandler):
"room_key", next_key
)
+ time_now = self.clock.time_msec()
+
chunk = {
"chunk": [
- self.hs.serialize_event(e, as_client_event) for e in events
+ serialize_event(e, time_now, as_client_event) for e in events
],
"start": pagin_config.from_token.to_string(),
"end": next_token.to_string(),
@@ -110,7 +114,8 @@ class MessageHandler(BaseHandler):
defer.returnValue(chunk)
@defer.inlineCallbacks
- def create_and_send_event(self, event_dict, ratelimit=True):
+ def create_and_send_event(self, event_dict, ratelimit=True,
+ client=None, txn_id=None):
""" Given a dict from a client, create and handle a new event.
Creates an FrozenEvent object, filling out auth_events, prev_events,
@@ -130,13 +135,13 @@ class MessageHandler(BaseHandler):
if ratelimit:
self.ratelimit(builder.user_id)
# TODO(paul): Why does 'event' not have a 'user' object?
- user = self.hs.parse_userid(builder.user_id)
+ user = UserID.from_string(builder.user_id)
assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
if builder.type == EventTypes.Member:
membership = builder.content.get("membership", None)
if membership == Membership.JOIN:
- joinee = self.hs.parse_userid(builder.state_key)
+ joinee = UserID.from_string(builder.state_key)
# If event doesn't include a display name, add one.
yield self.distributor.fire(
"collect_presencelike_data",
@@ -144,6 +149,15 @@ class MessageHandler(BaseHandler):
builder.content
)
+ if client is not None:
+ if client.token_id is not None:
+ builder.internal_metadata.token_id = client.token_id
+ if client.device_id is not None:
+ builder.internal_metadata.device_id = client.device_id
+
+ if txn_id is not None:
+ builder.internal_metadata.txn_id = txn_id
+
event, context = yield self._create_new_client_event(
builder=builder,
)
@@ -210,7 +224,8 @@ class MessageHandler(BaseHandler):
# TODO: This is duplicating logic from snapshot_all_rooms
current_state = yield self.state_handler.get_current_state(room_id)
- defer.returnValue([self.hs.serialize_event(c) for c in current_state])
+ now = self.clock.time_msec()
+ defer.returnValue([serialize_event(c, now) for c in current_state])
@defer.inlineCallbacks
def snapshot_all_rooms(self, user_id=None, pagin_config=None,
@@ -237,7 +252,7 @@ class MessageHandler(BaseHandler):
membership_list=[Membership.INVITE, Membership.JOIN]
)
- user = self.hs.parse_userid(user_id)
+ user = UserID.from_string(user_id)
rooms_ret = []
@@ -282,10 +297,11 @@ class MessageHandler(BaseHandler):
start_token = now_token.copy_and_replace("room_key", token[0])
end_token = now_token.copy_and_replace("room_key", token[1])
+ time_now = self.clock.time_msec()
d["messages"] = {
"chunk": [
- self.hs.serialize_event(m, as_client_event)
+ serialize_event(m, time_now, as_client_event)
for m in messages
],
"start": start_token.to_string(),
@@ -296,7 +312,8 @@ class MessageHandler(BaseHandler):
event.room_id
)
d["state"] = [
- self.hs.serialize_event(c) for c in current_state
+ serialize_event(c, time_now, as_client_event)
+ for c in current_state
]
except:
logger.exception("Failed to get snapshot")
@@ -316,11 +333,12 @@ class MessageHandler(BaseHandler):
# TODO(paul): I wish I was called with user objects not user_id
# strings...
- auth_user = self.hs.parse_userid(user_id)
+ auth_user = UserID.from_string(user_id)
# TODO: These concurrently
+ time_now = self.clock.time_msec()
state_tuples = yield self.state_handler.get_current_state(room_id)
- state = [self.hs.serialize_event(x) for x in state_tuples]
+ state = [serialize_event(x, time_now) for x in state_tuples]
member_event = (yield self.store.get_room_member(
user_id=user_id,
@@ -349,7 +367,7 @@ class MessageHandler(BaseHandler):
for m in room_members:
try:
member_presence = yield presence_handler.get_state(
- target_user=self.hs.parse_userid(m.user_id),
+ target_user=UserID.from_string(m.user_id),
auth_user=auth_user,
as_event=True,
)
@@ -359,11 +377,13 @@ class MessageHandler(BaseHandler):
"Failed to get member presence of %r", m.user_id
)
+ time_now = self.clock.time_msec()
+
defer.returnValue({
"membership": member_event.membership,
"room_id": room_id,
"messages": {
- "chunk": [self.hs.serialize_event(m) for m in messages],
+ "chunk": [serialize_event(m, time_now) for m in messages],
"start": start_token.to_string(),
"end": end_token.to_string(),
},
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 8aeed99274..cd0798c2b0 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -20,6 +20,7 @@ from synapse.api.constants import PresenceState
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
+from synapse.types import UserID
from ._base import BaseHandler
@@ -86,6 +87,10 @@ class PresenceHandler(BaseHandler):
"changed_presencelike_data", self.changed_presencelike_data
)
+ # outbound signal from the presence module to advertise when a user's
+ # presence has changed
+ distributor.declare("user_presence_changed")
+
self.distributor = distributor
self.federation = hs.get_replication_layer()
@@ -96,22 +101,22 @@ class PresenceHandler(BaseHandler):
self.federation.register_edu_handler(
"m.presence_invite",
lambda origin, content: self.invite_presence(
- observed_user=hs.parse_userid(content["observed_user"]),
- observer_user=hs.parse_userid(content["observer_user"]),
+ observed_user=UserID.from_string(content["observed_user"]),
+ observer_user=UserID.from_string(content["observer_user"]),
)
)
self.federation.register_edu_handler(
"m.presence_accept",
lambda origin, content: self.accept_presence(
- observed_user=hs.parse_userid(content["observed_user"]),
- observer_user=hs.parse_userid(content["observer_user"]),
+ observed_user=UserID.from_string(content["observed_user"]),
+ observer_user=UserID.from_string(content["observer_user"]),
)
)
self.federation.register_edu_handler(
"m.presence_deny",
lambda origin, content: self.deny_presence(
- observed_user=hs.parse_userid(content["observed_user"]),
- observer_user=hs.parse_userid(content["observer_user"]),
+ observed_user=UserID.from_string(content["observed_user"]),
+ observer_user=UserID.from_string(content["observer_user"]),
)
)
@@ -418,7 +423,7 @@ class PresenceHandler(BaseHandler):
)
for p in presence:
- observed_user = self.hs.parse_userid(p.pop("observed_user_id"))
+ observed_user = UserID.from_string(p.pop("observed_user_id"))
p["observed_user"] = observed_user
p.update(self._get_or_offline_usercache(observed_user).get_state())
if "last_active" in p:
@@ -441,7 +446,7 @@ class PresenceHandler(BaseHandler):
user.localpart, accepted=True
)
target_users = set([
- self.hs.parse_userid(x["observed_user_id"]) for x in presence
+ UserID.from_string(x["observed_user_id"]) for x in presence
])
# Also include people in all my rooms
@@ -603,6 +608,7 @@ class PresenceHandler(BaseHandler):
room_ids=room_ids,
statuscache=statuscache,
)
+ yield self.distributor.fire("user_presence_changed", user, statuscache)
@defer.inlineCallbacks
def _push_presence_remote(self, user, destination, state=None):
@@ -646,7 +652,7 @@ class PresenceHandler(BaseHandler):
deferreds = []
for push in content.get("push", []):
- user = self.hs.parse_userid(push["user_id"])
+ user = UserID.from_string(push["user_id"])
logger.debug("Incoming presence update from %s", user)
@@ -694,7 +700,7 @@ class PresenceHandler(BaseHandler):
del self._user_cachemap[user]
for poll in content.get("poll", []):
- user = self.hs.parse_userid(poll)
+ user = UserID.from_string(poll)
if not self.hs.is_mine(user):
continue
@@ -709,7 +715,7 @@ class PresenceHandler(BaseHandler):
deferreds.append(self._push_presence_remote(user, origin))
for unpoll in content.get("unpoll", []):
- user = self.hs.parse_userid(unpoll)
+ user = UserID.from_string(unpoll)
if not self.hs.is_mine(user):
continue
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 7777d3cc94..03b2159c53 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError, CodeMessageException
from synapse.api.constants import EventTypes, Membership
from synapse.util.logcontext import PreserveLoggingContext
+from synapse.types import UserID
from ._base import BaseHandler
@@ -169,7 +170,7 @@ class ProfileHandler(BaseHandler):
@defer.inlineCallbacks
def on_profile_query(self, args):
- user = self.hs.parse_userid(args["user_id"])
+ user = UserID.from_string(args["user_id"])
if not self.hs.is_mine(user):
raise SynapseError(400, "User is not hosted on this Home Server")
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 732652c228..66a89c10b2 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -163,7 +163,7 @@ class RegistrationHandler(BaseHandler):
# each request
httpCli = SimpleHttpClient(self.hs)
# XXX: make this configurable!
- trustedIdServers = ['matrix.org:8090']
+ trustedIdServers = ['matrix.org:8090', 'matrix.org']
if not creds['idServer'] in trustedIdServers:
logger.warn('%s is not a trusted ID server: rejecting 3pid ' +
'credentials', creds['idServer'])
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 6d0db18e51..23821d321f 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -16,12 +16,14 @@
"""Contains functions for performing events on rooms."""
from twisted.internet import defer
+from ._base import BaseHandler
+
from synapse.types import UserID, RoomAlias, RoomID
from synapse.api.constants import EventTypes, Membership, JoinRules
from synapse.api.errors import StoreError, SynapseError
from synapse.util import stringutils
from synapse.util.async import run_on_reactor
-from ._base import BaseHandler
+from synapse.events.utils import serialize_event
import logging
@@ -64,7 +66,7 @@ class RoomCreationHandler(BaseHandler):
invite_list = config.get("invite", [])
for i in invite_list:
try:
- self.hs.parse_userid(i)
+ UserID.from_string(i)
except:
raise SynapseError(400, "Invalid user_id: %s" % (i,))
@@ -114,7 +116,7 @@ class RoomCreationHandler(BaseHandler):
servers=[self.hs.hostname],
)
- user = self.hs.parse_userid(user_id)
+ user = UserID.from_string(user_id)
creation_events = self._create_events_for_new_room(
user, room_id, is_public=is_public
)
@@ -246,11 +248,9 @@ class RoomMemberHandler(BaseHandler):
@defer.inlineCallbacks
def get_room_members(self, room_id):
- hs = self.hs
-
users = yield self.store.get_users_in_room(room_id)
- defer.returnValue([hs.parse_userid(u) for u in users])
+ defer.returnValue([UserID.from_string(u) for u in users])
@defer.inlineCallbacks
def fetch_room_distributions_into(self, room_id, localusers=None,
@@ -295,8 +295,9 @@ class RoomMemberHandler(BaseHandler):
yield self.auth.check_joined_room(room_id, user_id)
member_list = yield self.store.get_room_members(room_id=room_id)
+ time_now = self.clock.time_msec()
event_list = [
- self.hs.serialize_event(entry)
+ serialize_event(entry, time_now)
for entry in member_list
]
chunk_data = {
@@ -368,7 +369,7 @@ class RoomMemberHandler(BaseHandler):
)
if prev_state and prev_state.membership == Membership.JOIN:
- user = self.hs.parse_userid(event.user_id)
+ user = UserID.from_string(event.user_id)
self.distributor.fire(
"user_left_room", user=user, room_id=event.room_id
)
@@ -412,7 +413,7 @@ class RoomMemberHandler(BaseHandler):
@defer.inlineCallbacks
def _do_join(self, event, context, room_host=None, do_auth=True):
- joinee = self.hs.parse_userid(event.state_key)
+ joinee = UserID.from_string(event.state_key)
# room_id = RoomID.from_string(event.room_id, self.hs)
room_id = event.room_id
@@ -476,7 +477,7 @@ class RoomMemberHandler(BaseHandler):
do_auth=do_auth,
)
- user = self.hs.parse_userid(event.user_id)
+ user = UserID.from_string(event.user_id)
yield self.distributor.fire(
"user_joined_room", user=user, room_id=room_id
)
@@ -526,7 +527,7 @@ class RoomMemberHandler(BaseHandler):
do_auth):
yield run_on_reactor()
- target_user = self.hs.parse_userid(event.state_key)
+ target_user = UserID.from_string(event.state_key)
yield self.handle_new_client_event(
event,
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index cd9638dd04..c69787005f 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
from ._base import BaseHandler
from synapse.api.errors import SynapseError, AuthError
+from synapse.types import UserID
import logging
@@ -185,7 +186,7 @@ class TypingNotificationHandler(BaseHandler):
@defer.inlineCallbacks
def _recv_edu(self, origin, content):
room_id = content["room_id"]
- user = self.homeserver.parse_userid(content["user_id"])
+ user = UserID.from_string(content["user_id"])
localusers = set()
|