diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 01e67b0818..025e7e7e62 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.logutils import log_function
from synapse.types import UserID
+from synapse.events.utils import serialize_event
from ._base import BaseHandler
@@ -48,24 +49,25 @@ class EventStreamHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
def get_stream(self, auth_user_id, pagin_config, timeout=0,
- as_client_event=True):
+ as_client_event=True, affect_presence=True):
auth_user = UserID.from_string(auth_user_id)
try:
- if auth_user not in self._streams_per_user:
- self._streams_per_user[auth_user] = 0
- if auth_user in self._stop_timer_per_user:
- try:
- self.clock.cancel_call_later(
- self._stop_timer_per_user.pop(auth_user)
+ if affect_presence:
+ if auth_user not in self._streams_per_user:
+ self._streams_per_user[auth_user] = 0
+ if auth_user in self._stop_timer_per_user:
+ try:
+ self.clock.cancel_call_later(
+ self._stop_timer_per_user.pop(auth_user)
+ )
+ except:
+ logger.exception("Failed to cancel event timer")
+ else:
+ yield self.distributor.fire(
+ "started_user_eventstream", auth_user
)
- except:
- logger.exception("Failed to cancel event timer")
- else:
- yield self.distributor.fire(
- "started_user_eventstream", auth_user
- )
- self._streams_per_user[auth_user] += 1
+ self._streams_per_user[auth_user] += 1
if pagin_config.from_token is None:
pagin_config.from_token = None
@@ -78,8 +80,10 @@ class EventStreamHandler(BaseHandler):
auth_user, room_ids, pagin_config, timeout
)
+ time_now = self.clock.time_msec()
+
chunks = [
- self.hs.serialize_event(e, as_client_event) for e in events
+ serialize_event(e, time_now, as_client_event) for e in events
]
chunk = {
@@ -91,27 +95,28 @@ class EventStreamHandler(BaseHandler):
defer.returnValue(chunk)
finally:
- self._streams_per_user[auth_user] -= 1
- if not self._streams_per_user[auth_user]:
- del self._streams_per_user[auth_user]
-
- # 10 seconds of grace to allow the client to reconnect again
- # before we think they're gone
- def _later():
- logger.debug(
- "_later stopped_user_eventstream %s", auth_user
- )
+ if affect_presence:
+ self._streams_per_user[auth_user] -= 1
+ if not self._streams_per_user[auth_user]:
+ del self._streams_per_user[auth_user]
+
+ # 10 seconds of grace to allow the client to reconnect again
+ # before we think they're gone
+ def _later():
+ logger.debug(
+ "_later stopped_user_eventstream %s", auth_user
+ )
- self._stop_timer_per_user.pop(auth_user, None)
+ self._stop_timer_per_user.pop(auth_user, None)
- yield self.distributor.fire(
- "stopped_user_eventstream", auth_user
- )
+ return self.distributor.fire(
+ "stopped_user_eventstream", auth_user
+ )
- logger.debug("Scheduling _later: for %s", auth_user)
- self._stop_timer_per_user[auth_user] = (
- self.clock.call_later(30, _later)
- )
+ logger.debug("Scheduling _later: for %s", auth_user)
+ self._stop_timer_per_user[auth_user] = (
+ self.clock.call_later(30, _later)
+ )
class EventHandler(BaseHandler):
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 6a1104a890..6fbd2af4ab 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import RoomError
from synapse.streams.config import PaginationConfig
+from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.util.logcontext import PreserveLoggingContext
from synapse.types import UserID
@@ -100,9 +101,11 @@ class MessageHandler(BaseHandler):
"room_key", next_key
)
+ time_now = self.clock.time_msec()
+
chunk = {
"chunk": [
- self.hs.serialize_event(e, as_client_event) for e in events
+ serialize_event(e, time_now, as_client_event) for e in events
],
"start": pagin_config.from_token.to_string(),
"end": next_token.to_string(),
@@ -111,7 +114,8 @@ class MessageHandler(BaseHandler):
defer.returnValue(chunk)
@defer.inlineCallbacks
- def create_and_send_event(self, event_dict, ratelimit=True):
+ def create_and_send_event(self, event_dict, ratelimit=True,
+ client=None, txn_id=None):
""" Given a dict from a client, create and handle a new event.
Creates an FrozenEvent object, filling out auth_events, prev_events,
@@ -145,6 +149,15 @@ class MessageHandler(BaseHandler):
builder.content
)
+ if client is not None:
+ if client.token_id is not None:
+ builder.internal_metadata.token_id = client.token_id
+ if client.device_id is not None:
+ builder.internal_metadata.device_id = client.device_id
+
+ if txn_id is not None:
+ builder.internal_metadata.txn_id = txn_id
+
event, context = yield self._create_new_client_event(
builder=builder,
)
@@ -211,7 +224,8 @@ class MessageHandler(BaseHandler):
# TODO: This is duplicating logic from snapshot_all_rooms
current_state = yield self.state_handler.get_current_state(room_id)
- defer.returnValue([self.hs.serialize_event(c) for c in current_state])
+ now = self.clock.time_msec()
+ defer.returnValue([serialize_event(c, now) for c in current_state])
@defer.inlineCallbacks
def snapshot_all_rooms(self, user_id=None, pagin_config=None,
@@ -283,10 +297,11 @@ class MessageHandler(BaseHandler):
start_token = now_token.copy_and_replace("room_key", token[0])
end_token = now_token.copy_and_replace("room_key", token[1])
+ time_now = self.clock.time_msec()
d["messages"] = {
"chunk": [
- self.hs.serialize_event(m, as_client_event)
+ serialize_event(m, time_now, as_client_event)
for m in messages
],
"start": start_token.to_string(),
@@ -297,7 +312,8 @@ class MessageHandler(BaseHandler):
event.room_id
)
d["state"] = [
- self.hs.serialize_event(c) for c in current_state
+ serialize_event(c, time_now, as_client_event)
+ for c in current_state
]
except:
logger.exception("Failed to get snapshot")
@@ -320,8 +336,9 @@ class MessageHandler(BaseHandler):
auth_user = UserID.from_string(user_id)
# TODO: These concurrently
+ time_now = self.clock.time_msec()
state_tuples = yield self.state_handler.get_current_state(room_id)
- state = [self.hs.serialize_event(x) for x in state_tuples]
+ state = [serialize_event(x, time_now) for x in state_tuples]
member_event = (yield self.store.get_room_member(
user_id=user_id,
@@ -360,11 +377,13 @@ class MessageHandler(BaseHandler):
"Failed to get member presence of %r", m.user_id
)
+ time_now = self.clock.time_msec()
+
defer.returnValue({
"membership": member_event.membership,
"room_id": room_id,
"messages": {
- "chunk": [self.hs.serialize_event(m) for m in messages],
+ "chunk": [serialize_event(m, time_now) for m in messages],
"start": start_token.to_string(),
"end": end_token.to_string(),
},
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index d66bfea7b1..cd0798c2b0 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -87,6 +87,10 @@ class PresenceHandler(BaseHandler):
"changed_presencelike_data", self.changed_presencelike_data
)
+ # outbound signal from the presence module to advertise when a user's
+ # presence has changed
+ distributor.declare("user_presence_changed")
+
self.distributor = distributor
self.federation = hs.get_replication_layer()
@@ -604,6 +608,7 @@ class PresenceHandler(BaseHandler):
room_ids=room_ids,
statuscache=statuscache,
)
+ yield self.distributor.fire("user_presence_changed", user, statuscache)
@defer.inlineCallbacks
def _push_presence_remote(self, user, destination, state=None):
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 732652c228..66a89c10b2 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -163,7 +163,7 @@ class RegistrationHandler(BaseHandler):
# each request
httpCli = SimpleHttpClient(self.hs)
# XXX: make this configurable!
- trustedIdServers = ['matrix.org:8090']
+ trustedIdServers = ['matrix.org:8090', 'matrix.org']
if not creds['idServer'] in trustedIdServers:
logger.warn('%s is not a trusted ID server: rejecting 3pid ' +
'credentials', creds['idServer'])
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index edb96cec83..23821d321f 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -16,12 +16,14 @@
"""Contains functions for performing events on rooms."""
from twisted.internet import defer
+from ._base import BaseHandler
+
from synapse.types import UserID, RoomAlias, RoomID
from synapse.api.constants import EventTypes, Membership, JoinRules
from synapse.api.errors import StoreError, SynapseError
from synapse.util import stringutils
from synapse.util.async import run_on_reactor
-from ._base import BaseHandler
+from synapse.events.utils import serialize_event
import logging
@@ -293,8 +295,9 @@ class RoomMemberHandler(BaseHandler):
yield self.auth.check_joined_room(room_id, user_id)
member_list = yield self.store.get_room_members(room_id=room_id)
+ time_now = self.clock.time_msec()
event_list = [
- self.hs.serialize_event(entry)
+ serialize_event(entry, time_now)
for entry in member_list
]
chunk_data = {
|