diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7195de98b5..3355adefcf 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -16,10 +16,12 @@
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import RoomError
+from synapse.api.errors import RoomError, SynapseError
from synapse.streams.config import PaginationConfig
+from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.util.logcontext import PreserveLoggingContext
+from synapse.types import UserID
from ._base import BaseHandler
@@ -33,6 +35,7 @@ class MessageHandler(BaseHandler):
def __init__(self, hs):
super(MessageHandler, self).__init__(hs)
self.hs = hs
+ self.state = hs.get_state_handler()
self.clock = hs.get_clock()
self.validator = EventValidator()
@@ -67,7 +70,7 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks
def get_messages(self, user_id=None, room_id=None, pagin_config=None,
- feedback=False):
+ feedback=False, as_client_event=True):
"""Get messages in a room.
Args:
@@ -76,6 +79,7 @@ class MessageHandler(BaseHandler):
pagin_config (synapse.api.streams.PaginationConfig): The pagination
config rules to apply, if any.
feedback (bool): True to get compressed feedback with the messages
+ as_client_event (bool): True to get events in client-server format.
Returns:
dict: Pagination API results
"""
@@ -88,7 +92,7 @@ class MessageHandler(BaseHandler):
yield self.hs.get_event_sources().get_current_token()
)
- user = self.hs.parse_userid(user_id)
+ user = UserID.from_string(user_id)
events, next_key = yield data_source.get_pagination_rows(
user, pagin_config.get_source_config("room"), room_id
@@ -98,8 +102,12 @@ class MessageHandler(BaseHandler):
"room_key", next_key
)
+ time_now = self.clock.time_msec()
+
chunk = {
- "chunk": [self.hs.serialize_event(e) for e in events],
+ "chunk": [
+ serialize_event(e, time_now, as_client_event) for e in events
+ ],
"start": pagin_config.from_token.to_string(),
"end": next_token.to_string(),
}
@@ -107,7 +115,8 @@ class MessageHandler(BaseHandler):
defer.returnValue(chunk)
@defer.inlineCallbacks
- def create_and_send_event(self, event_dict, ratelimit=True):
+ def create_and_send_event(self, event_dict, ratelimit=True,
+ client=None, txn_id=None):
""" Given a dict from a client, create and handle a new event.
Creates an FrozenEvent object, filling out auth_events, prev_events,
@@ -127,13 +136,13 @@ class MessageHandler(BaseHandler):
if ratelimit:
self.ratelimit(builder.user_id)
# TODO(paul): Why does 'event' not have a 'user' object?
- user = self.hs.parse_userid(builder.user_id)
+ user = UserID.from_string(builder.user_id)
assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
if builder.type == EventTypes.Member:
membership = builder.content.get("membership", None)
if membership == Membership.JOIN:
- joinee = self.hs.parse_userid(builder.state_key)
+ joinee = UserID.from_string(builder.state_key)
# If event doesn't include a display name, add one.
yield self.distributor.fire(
"collect_presencelike_data",
@@ -141,6 +150,15 @@ class MessageHandler(BaseHandler):
builder.content
)
+ if client is not None:
+ if client.token_id is not None:
+ builder.internal_metadata.token_id = client.token_id
+ if client.device_id is not None:
+ builder.internal_metadata.device_id = client.device_id
+
+ if txn_id is not None:
+ builder.internal_metadata.txn_id = txn_id
+
event, context = yield self._create_new_client_event(
builder=builder,
)
@@ -207,11 +225,14 @@ class MessageHandler(BaseHandler):
# TODO: This is duplicating logic from snapshot_all_rooms
current_state = yield self.state_handler.get_current_state(room_id)
- defer.returnValue([self.hs.serialize_event(c) for c in current_state])
+ now = self.clock.time_msec()
+ defer.returnValue(
+ [serialize_event(c, now) for c in current_state.values()]
+ )
@defer.inlineCallbacks
def snapshot_all_rooms(self, user_id=None, pagin_config=None,
- feedback=False):
+ feedback=False, as_client_event=True):
"""Retrieve a snapshot of all rooms the user is invited or has joined.
This snapshot may include messages for all rooms where the user is
@@ -222,6 +243,7 @@ class MessageHandler(BaseHandler):
pagin_config (synapse.api.streams.PaginationConfig): The pagination
config used to determine how many messages *PER ROOM* to return.
feedback (bool): True to get feedback along with these messages.
+ as_client_event (bool): True to get events in client-server format.
Returns:
A list of dicts with "room_id" and "membership" keys for all rooms
the user is currently invited or joined in on. Rooms where the user
@@ -233,7 +255,7 @@ class MessageHandler(BaseHandler):
membership_list=[Membership.INVITE, Membership.JOIN]
)
- user = self.hs.parse_userid(user_id)
+ user = UserID.from_string(user_id)
rooms_ret = []
@@ -278,9 +300,13 @@ class MessageHandler(BaseHandler):
start_token = now_token.copy_and_replace("room_key", token[0])
end_token = now_token.copy_and_replace("room_key", token[1])
+ time_now = self.clock.time_msec()
d["messages"] = {
- "chunk": [self.hs.serialize_event(m) for m in messages],
+ "chunk": [
+ serialize_event(m, time_now, as_client_event)
+ for m in messages
+ ],
"start": start_token.to_string(),
"end": end_token.to_string(),
}
@@ -289,7 +315,8 @@ class MessageHandler(BaseHandler):
event.room_id
)
d["state"] = [
- self.hs.serialize_event(c) for c in current_state
+ serialize_event(c, time_now, as_client_event)
+ for c in current_state.values()
]
except:
logger.exception("Failed to get snapshot")
@@ -305,20 +332,27 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks
def room_initial_sync(self, user_id, room_id, pagin_config=None,
feedback=False):
- yield self.auth.check_joined_room(room_id, user_id)
+ current_state = yield self.state.get_current_state(
+ room_id=room_id,
+ )
+
+ yield self.auth.check_joined_room(
+ room_id, user_id,
+ current_state=current_state
+ )
# TODO(paul): I wish I was called with user objects not user_id
# strings...
- auth_user = self.hs.parse_userid(user_id)
+ auth_user = UserID.from_string(user_id)
# TODO: These concurrently
- state_tuples = yield self.state_handler.get_current_state(room_id)
- state = [self.hs.serialize_event(x) for x in state_tuples]
+ time_now = self.clock.time_msec()
+ state = [
+ serialize_event(x, time_now)
+ for x in current_state.values()
+ ]
- member_event = (yield self.store.get_room_member(
- user_id=user_id,
- room_id=room_id
- ))
+ member_event = current_state.get((EventTypes.Member, user_id,))
now_token = yield self.hs.get_event_sources().get_current_token()
@@ -335,28 +369,40 @@ class MessageHandler(BaseHandler):
start_token = now_token.copy_and_replace("room_key", token[0])
end_token = now_token.copy_and_replace("room_key", token[1])
- room_members = yield self.store.get_room_members(room_id)
+ room_members = [
+ m for m in current_state.values()
+ if m.type == EventTypes.Member
+ ]
presence_handler = self.hs.get_handlers().presence_handler
presence = []
for m in room_members:
try:
member_presence = yield presence_handler.get_state(
- target_user=self.hs.parse_userid(m.user_id),
+ target_user=UserID.from_string(m.user_id),
auth_user=auth_user,
as_event=True,
)
presence.append(member_presence)
- except Exception:
- logger.exception(
- "Failed to get member presence of %r", m.user_id
- )
+ except SynapseError as e:
+ if e.code == 404:
+ # FIXME: We are doing this as a warn since this gets hit a
+ # lot and spams the logs. Why is this happening?
+ logger.warn(
+ "Failed to get member presence of %r", m.user_id
+ )
+ else:
+ logger.exception(
+ "Failed to get member presence of %r", m.user_id
+ )
+
+ time_now = self.clock.time_msec()
defer.returnValue({
"membership": member_event.membership,
"room_id": room_id,
"messages": {
- "chunk": [self.hs.serialize_event(m) for m in messages],
+ "chunk": [serialize_event(m, time_now) for m in messages],
"start": start_token.to_string(),
"end": end_token.to_string(),
},
|