diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 5c50c611ba..fd09397226 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -16,27 +16,29 @@
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import AuthError, Codes, SynapseError
-from synapse.streams.config import PaginationConfig
+from synapse.api.errors import AuthError, Codes, SynapseError, LimitExceededError
+from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
-from synapse.util import unwrapFirstError
-from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.types import UserID, RoomStreamToken, StreamToken
+from synapse.push.action_generator import ActionGenerator
+from synapse.types import (
+ UserID, RoomAlias, RoomStreamToken,
+)
+from synapse.util.async import run_on_reactor, ReadWriteLock
+from synapse.util.logcontext import preserve_fn
+from synapse.util.metrics import measure_func
+from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
from canonicaljson import encode_canonical_json
import logging
+import random
logger = logging.getLogger(__name__)
-def collect_presencelike_data(distributor, user, content):
- return distributor.fire("collect_presencelike_data", user, content)
-
-
class MessageHandler(BaseHandler):
def __init__(self, hs):
@@ -45,40 +47,24 @@ class MessageHandler(BaseHandler):
self.state = hs.get_state_handler()
self.clock = hs.get_clock()
self.validator = EventValidator()
- self.snapshot_cache = SnapshotCache()
- @defer.inlineCallbacks
- def get_message(self, msg_id=None, room_id=None, sender_id=None,
- user_id=None):
- """ Retrieve a message.
+ self.pagination_lock = ReadWriteLock()
- Args:
- msg_id (str): The message ID to obtain.
- room_id (str): The room where the message resides.
- sender_id (str): The user ID of the user who sent the message.
- user_id (str): The user ID of the user making this request.
- Returns:
- The message, or None if no message exists.
- Raises:
- SynapseError if something went wrong.
- """
- yield self.auth.check_joined_room(room_id, user_id)
+ @defer.inlineCallbacks
+ def purge_history(self, room_id, event_id):
+ event = yield self.store.get_event(event_id)
- # Pull out the message from the db
-# msg = yield self.store.get_message(
-# room_id=room_id,
-# msg_id=msg_id,
-# user_id=sender_id
-# )
+ if event.room_id != room_id:
+ raise SynapseError(400, "Event is for wrong room.")
- # TODO (erikj): Once we work out the correct c-s api we need to think
- # on how to do this.
+ depth = event.depth
- defer.returnValue(None)
+ with (yield self.pagination_lock.write(room_id)):
+ yield self.store.delete_old_state(room_id, depth)
@defer.inlineCallbacks
def get_messages(self, requester, room_id=None, pagin_config=None,
- as_client_event=True):
+ as_client_event=True, event_filter=None):
"""Get messages in a room.
Args:
@@ -87,18 +73,18 @@ class MessageHandler(BaseHandler):
pagin_config (synapse.api.streams.PaginationConfig): The pagination
config rules to apply, if any.
as_client_event (bool): True to get events in client-server format.
+ event_filter (Filter): Filter to apply to results or None
Returns:
dict: Pagination API results
"""
user_id = requester.user.to_string()
- data_source = self.hs.get_event_sources().sources["room"]
if pagin_config.from_token:
room_token = pagin_config.from_token.room_key
else:
pagin_config.from_token = (
- yield self.hs.get_event_sources().get_current_token(
- direction='b'
+ yield self.hs.get_event_sources().get_current_token_for_room(
+ room_id=room_id
)
)
room_token = pagin_config.from_token.room_key
@@ -111,42 +97,48 @@ class MessageHandler(BaseHandler):
source_config = pagin_config.get_source_config("room")
- membership, member_event_id = yield self._check_in_room_or_world_readable(
- room_id, user_id
- )
+ with (yield self.pagination_lock.read(room_id)):
+ membership, member_event_id = yield self._check_in_room_or_world_readable(
+ room_id, user_id
+ )
- if source_config.direction == 'b':
- # if we're going backwards, we might need to backfill. This
- # requires that we have a topo token.
- if room_token.topological:
- max_topo = room_token.topological
- else:
- max_topo = yield self.store.get_max_topological_token_for_stream_and_room(
- room_id, room_token.stream
- )
+ if source_config.direction == 'b':
+ # if we're going backwards, we might need to backfill. This
+ # requires that we have a topo token.
+ if room_token.topological:
+ max_topo = room_token.topological
+ else:
+ max_topo = yield self.store.get_max_topological_token(
+ room_id, room_token.stream
+ )
- if membership == Membership.LEAVE:
- # If they have left the room then clamp the token to be before
- # they left the room, to save the effort of loading from the
- # database.
- leave_token = yield self.store.get_topological_token_for_event(
- member_event_id
+ if membership == Membership.LEAVE:
+ # If they have left the room then clamp the token to be before
+ # they left the room, to save the effort of loading from the
+ # database.
+ leave_token = yield self.store.get_topological_token_for_event(
+ member_event_id
+ )
+ leave_token = RoomStreamToken.parse(leave_token)
+ if leave_token.topological < max_topo:
+ source_config.from_key = str(leave_token)
+
+ yield self.hs.get_handlers().federation_handler.maybe_backfill(
+ room_id, max_topo
)
- leave_token = RoomStreamToken.parse(leave_token)
- if leave_token.topological < max_topo:
- source_config.from_key = str(leave_token)
- yield self.hs.get_handlers().federation_handler.maybe_backfill(
- room_id, max_topo
+ events, next_key = yield self.store.paginate_room_events(
+ room_id=room_id,
+ from_key=source_config.from_key,
+ to_key=source_config.to_key,
+ direction=source_config.direction,
+ limit=source_config.limit,
+ event_filter=event_filter,
)
- events, next_key = yield data_source.get_pagination_rows(
- requester.user, source_config, room_id
- )
-
- next_token = pagin_config.from_token.copy_and_replace(
- "room_key", next_key
- )
+ next_token = pagin_config.from_token.copy_and_replace(
+ "room_key", next_key
+ )
if not events:
defer.returnValue({
@@ -155,7 +147,11 @@ class MessageHandler(BaseHandler):
"end": next_token.to_string(),
})
- events = yield self._filter_events_for_client(
+ if event_filter:
+ events = event_filter.filter(events)
+
+ events = yield filter_events_for_client(
+ self.store,
user_id,
events,
is_peeking=(member_event_id is None),
@@ -175,7 +171,7 @@ class MessageHandler(BaseHandler):
defer.returnValue(chunk)
@defer.inlineCallbacks
- def create_event(self, event_dict, token_id=None, txn_id=None):
+ def create_event(self, event_dict, token_id=None, txn_id=None, prev_event_ids=None):
"""
Given a dict from a client, create a new event.
@@ -186,6 +182,9 @@ class MessageHandler(BaseHandler):
Args:
event_dict (dict): An entire event
+ token_id (str)
+ txn_id (str)
+ prev_event_ids (list): The prev event ids to use when creating the event
Returns:
Tuple of created event (FrozenEvent), Context
@@ -198,12 +197,8 @@ class MessageHandler(BaseHandler):
membership = builder.content.get("membership", None)
target = UserID.from_string(builder.state_key)
- if membership == Membership.JOIN:
+ if membership in {Membership.JOIN, Membership.INVITE}:
# If event doesn't include a display name, add one.
- yield collect_presencelike_data(
- self.distributor, target, builder.content
- )
- elif membership == Membership.INVITE:
profile = self.hs.get_handlers().profile_handler
content = builder.content
@@ -224,6 +219,7 @@ class MessageHandler(BaseHandler):
event, context = yield self._create_new_client_event(
builder=builder,
+ prev_event_ids=prev_event_ids,
)
defer.returnValue((event, context))
@@ -244,12 +240,27 @@ class MessageHandler(BaseHandler):
"Tried to send member event through non-member codepath"
)
+ # We check here if we are currently being rate limited, so that we
+ # don't do unnecessary work. We check again just before we actually
+ # send the event.
+ time_now = self.clock.time()
+ allowed, time_allowed = self.ratelimiter.send_message(
+ event.sender, time_now,
+ msg_rate_hz=self.hs.config.rc_messages_per_second,
+ burst_count=self.hs.config.rc_message_burst_count,
+ update=False,
+ )
+ if not allowed:
+ raise LimitExceededError(
+ retry_after_ms=int(1000 * (time_allowed - time_now)),
+ )
+
user = UserID.from_string(event.sender)
assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
if event.is_state():
- prev_state = self.deduplicate_state_event(event, context)
+ prev_state = yield self.deduplicate_state_event(event, context)
if prev_state is not None:
defer.returnValue(prev_state)
@@ -261,9 +272,10 @@ class MessageHandler(BaseHandler):
)
if event.type == EventTypes.Message:
- presence = self.hs.get_handlers().presence_handler
+ presence = self.hs.get_presence_handler()
yield presence.bump_presence_active_time(user)
+ @defer.inlineCallbacks
def deduplicate_state_event(self, event, context):
"""
Checks whether event is in the latest resolved state in context.
@@ -271,13 +283,17 @@ class MessageHandler(BaseHandler):
If so, returns the version of the event in context.
Otherwise, returns None.
"""
- prev_event = context.current_state.get((event.type, event.state_key))
+ prev_event_id = context.prev_state_ids.get((event.type, event.state_key))
+ prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
+ if not prev_event:
+ return
+
if prev_event and event.user_id == prev_event.user_id:
prev_content = encode_canonical_json(prev_event.content)
next_content = encode_canonical_json(event.content)
if prev_content == next_content:
- return prev_event
- return None
+ defer.returnValue(prev_event)
+ return
@defer.inlineCallbacks
def create_and_send_nonmember_event(
@@ -388,378 +404,210 @@ class MessageHandler(BaseHandler):
[serialize_event(c, now) for c in room_state.values()]
)
- def snapshot_all_rooms(self, user_id=None, pagin_config=None,
- as_client_event=True, include_archived=False):
- """Retrieve a snapshot of all rooms the user is invited or has joined.
-
- This snapshot may include messages for all rooms where the user is
- joined, depending on the pagination config.
-
- Args:
- user_id (str): The ID of the user making the request.
- pagin_config (synapse.api.streams.PaginationConfig): The pagination
- config used to determine how many messages *PER ROOM* to return.
- as_client_event (bool): True to get events in client-server format.
- include_archived (bool): True to get rooms that the user has left
- Returns:
- A list of dicts with "room_id" and "membership" keys for all rooms
- the user is currently invited or joined in on. Rooms where the user
- is joined on, may return a "messages" key with messages, depending
- on the specified PaginationConfig.
- """
- key = (
- user_id,
- pagin_config.from_token,
- pagin_config.to_token,
- pagin_config.direction,
- pagin_config.limit,
- as_client_event,
- include_archived,
- )
- now_ms = self.clock.time_msec()
- result = self.snapshot_cache.get(now_ms, key)
- if result is not None:
- return result
-
- return self.snapshot_cache.set(now_ms, key, self._snapshot_all_rooms(
- user_id, pagin_config, as_client_event, include_archived
- ))
-
+ @measure_func("_create_new_client_event")
@defer.inlineCallbacks
- def _snapshot_all_rooms(self, user_id=None, pagin_config=None,
- as_client_event=True, include_archived=False):
-
- memberships = [Membership.INVITE, Membership.JOIN]
- if include_archived:
- memberships.append(Membership.LEAVE)
+ def _create_new_client_event(self, builder, prev_event_ids=None):
+ if prev_event_ids:
+ prev_events = yield self.store.add_event_hashes(prev_event_ids)
+ prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
+ depth = prev_max_depth + 1
+ else:
+ latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
+ builder.room_id,
+ )
- room_list = yield self.store.get_rooms_for_user_where_membership_is(
- user_id=user_id, membership_list=memberships
- )
+ # We want to limit the max number of prev events we point to in our
+ # new event
+ if len(latest_ret) > 10:
+ # Sort by reverse depth, so we point to the most recent.
+ latest_ret.sort(key=lambda a: -a[2])
+ new_latest_ret = latest_ret[:5]
+
+ # We also randomly point to some of the older events, to make
+ # sure that we don't completely ignore the older events.
+ if latest_ret[5:]:
+ sample_size = min(5, len(latest_ret[5:]))
+ new_latest_ret.extend(random.sample(latest_ret[5:], sample_size))
+ latest_ret = new_latest_ret
+
+ if latest_ret:
+ depth = max([d for _, _, d in latest_ret]) + 1
+ else:
+ depth = 1
- user = UserID.from_string(user_id)
+ prev_events = [
+ (event_id, prev_hashes)
+ for event_id, prev_hashes, _ in latest_ret
+ ]
- rooms_ret = []
+ builder.prev_events = prev_events
+ builder.depth = depth
- now_token = yield self.hs.get_event_sources().get_current_token()
+ state_handler = self.state_handler
- presence_stream = self.hs.get_event_sources().sources["presence"]
- pagination_config = PaginationConfig(from_token=now_token)
- presence, _ = yield presence_stream.get_pagination_rows(
- user, pagination_config.get_source_config("presence"), None
- )
+ context = yield state_handler.compute_event_context(builder)
- receipt_stream = self.hs.get_event_sources().sources["receipt"]
- receipt, _ = yield receipt_stream.get_pagination_rows(
- user, pagination_config.get_source_config("receipt"), None
- )
+ if builder.is_state():
+ builder.prev_state = yield self.store.add_event_hashes(
+ context.prev_state_events
+ )
- tags_by_room = yield self.store.get_tags_for_user(user_id)
+ yield self.auth.add_auth_events(builder, context)
- account_data, account_data_by_room = (
- yield self.store.get_account_data_for_user(user_id)
+ signing_key = self.hs.config.signing_key[0]
+ add_hashes_and_signatures(
+ builder, self.server_name, signing_key
)
- public_room_ids = yield self.store.get_public_room_ids()
+ event = builder.build()
- limit = pagin_config.limit
- if limit is None:
- limit = 10
+ logger.debug(
+ "Created event %s with state: %s",
+ event.event_id, context.prev_state_ids,
+ )
- @defer.inlineCallbacks
- def handle_room(event):
- d = {
- "room_id": event.room_id,
- "membership": event.membership,
- "visibility": (
- "public" if event.room_id in public_room_ids
- else "private"
- ),
- }
+ defer.returnValue(
+ (event, context,)
+ )
- if event.membership == Membership.INVITE:
- time_now = self.clock.time_msec()
- d["inviter"] = event.sender
+ @measure_func("handle_new_client_event")
+ @defer.inlineCallbacks
+ def handle_new_client_event(
+ self,
+ requester,
+ event,
+ context,
+ ratelimit=True,
+ extra_users=[]
+ ):
+ # We now need to go and hit out to wherever we need to hit out to.
- invite_event = yield self.store.get_event(event.event_id)
- d["invite"] = serialize_event(invite_event, time_now, as_client_event)
+ if ratelimit:
+ self.ratelimit(requester)
- rooms_ret.append(d)
+ try:
+ yield self.auth.check_from_context(event, context)
+ except AuthError as err:
+ logger.warn("Denying new event %r because %s", event, err)
+ raise err
+
+ yield self.maybe_kick_guest_users(event, context)
+
+ if event.type == EventTypes.CanonicalAlias:
+ # Check the alias is acually valid (at this time at least)
+ room_alias_str = event.content.get("alias", None)
+ if room_alias_str:
+ room_alias = RoomAlias.from_string(room_alias_str)
+ directory_handler = self.hs.get_handlers().directory_handler
+ mapping = yield directory_handler.get_association(room_alias)
+
+ if mapping["room_id"] != event.room_id:
+ raise SynapseError(
+ 400,
+ "Room alias %s does not point to the room" % (
+ room_alias_str,
+ )
+ )
- if event.membership not in (Membership.JOIN, Membership.LEAVE):
- return
+ federation_handler = self.hs.get_handlers().federation_handler
- try:
- if event.membership == Membership.JOIN:
- room_end_token = now_token.room_key
- deferred_room_state = self.state_handler.get_current_state(
- event.room_id
- )
- elif event.membership == Membership.LEAVE:
- room_end_token = "s%d" % (event.stream_ordering,)
- deferred_room_state = self.store.get_state_for_events(
- [event.event_id], None
- )
- deferred_room_state.addCallback(
- lambda states: states[event.event_id]
+ if event.type == EventTypes.Member:
+ if event.content["membership"] == Membership.INVITE:
+ def is_inviter_member_event(e):
+ return (
+ e.type == EventTypes.Member and
+ e.sender == event.sender
)
- (messages, token), current_state = yield defer.gatherResults(
- [
- self.store.get_recent_events_for_room(
- event.room_id,
- limit=limit,
- end_token=room_end_token,
- ),
- deferred_room_state,
- ]
- ).addErrback(unwrapFirstError)
-
- messages = yield self._filter_events_for_client(
- user_id, messages
- )
-
- start_token = now_token.copy_and_replace("room_key", token[0])
- end_token = now_token.copy_and_replace("room_key", token[1])
- time_now = self.clock.time_msec()
-
- d["messages"] = {
- "chunk": [
- serialize_event(m, time_now, as_client_event)
- for m in messages
- ],
- "start": start_token.to_string(),
- "end": end_token.to_string(),
- }
-
- d["state"] = [
- serialize_event(c, time_now, as_client_event)
- for c in current_state.values()
+ state_to_include_ids = [
+ e_id
+ for k, e_id in context.current_state_ids.items()
+ if k[0] in self.hs.config.room_invite_state_types
+ or k[0] == EventTypes.Member and k[1] == event.sender
]
- account_data_events = []
- tags = tags_by_room.get(event.room_id)
- if tags:
- account_data_events.append({
- "type": "m.tag",
- "content": {"tags": tags},
- })
-
- account_data = account_data_by_room.get(event.room_id, {})
- for account_data_type, content in account_data.items():
- account_data_events.append({
- "type": account_data_type,
- "content": content,
- })
-
- d["account_data"] = account_data_events
- except:
- logger.exception("Failed to get snapshot")
-
- # Only do N rooms at once
- n = 5
- d_list = [handle_room(e) for e in room_list]
- for i in range(0, len(d_list), n):
- yield defer.gatherResults(
- d_list[i:i + n],
- consumeErrors=True
- ).addErrback(unwrapFirstError)
-
- account_data_events = []
- for account_data_type, content in account_data.items():
- account_data_events.append({
- "type": account_data_type,
- "content": content,
- })
-
- ret = {
- "rooms": rooms_ret,
- "presence": presence,
- "account_data": account_data_events,
- "receipts": receipt,
- "end": now_token.to_string(),
- }
+ state_to_include = yield self.store.get_events(state_to_include_ids)
- defer.returnValue(ret)
+ event.unsigned["invite_room_state"] = [
+ {
+ "type": e.type,
+ "state_key": e.state_key,
+ "content": e.content,
+ "sender": e.sender,
+ }
+ for e in state_to_include.values()
+ ]
- @defer.inlineCallbacks
- def room_initial_sync(self, requester, room_id, pagin_config=None):
- """Capture the a snapshot of a room. If user is currently a member of
- the room this will be what is currently in the room. If the user left
- the room this will be what was in the room when they left.
+ invitee = UserID.from_string(event.state_key)
+ if not self.hs.is_mine(invitee):
+ # TODO: Can we add signature from remote server in a nicer
+ # way? If we have been invited by a remote server, we need
+ # to get them to sign the event.
- Args:
- requester(Requester): The user to get a snapshot for.
- room_id(str): The room to get a snapshot of.
- pagin_config(synapse.streams.config.PaginationConfig):
- The pagination config used to determine how many messages to
- return.
- Raises:
- AuthError if the user wasn't in the room.
- Returns:
- A JSON serialisable dict with the snapshot of the room.
- """
+ returned_invite = yield federation_handler.send_invite(
+ invitee.domain,
+ event,
+ )
- user_id = requester.user.to_string()
+ event.unsigned.pop("room_state", None)
- membership, member_event_id = yield self._check_in_room_or_world_readable(
- room_id, user_id,
- )
- is_peeking = member_event_id is None
+ # TODO: Make sure the signatures actually are correct.
+ event.signatures.update(
+ returned_invite.signatures
+ )
- if membership == Membership.JOIN:
- result = yield self._room_initial_sync_joined(
- user_id, room_id, pagin_config, membership, is_peeking
- )
- elif membership == Membership.LEAVE:
- result = yield self._room_initial_sync_parted(
- user_id, room_id, pagin_config, membership, member_event_id, is_peeking
+ if event.type == EventTypes.Redaction:
+ auth_events_ids = yield self.auth.compute_auth_events(
+ event, context.prev_state_ids, for_verification=True,
)
+ auth_events = yield self.store.get_events(auth_events_ids)
+ auth_events = {
+ (e.type, e.state_key): e for e in auth_events.values()
+ }
+ if self.auth.check_redaction(event, auth_events=auth_events):
+ original_event = yield self.store.get_event(
+ event.redacts,
+ check_redacted=False,
+ get_prev_content=False,
+ allow_rejected=False,
+ allow_none=False
+ )
+ if event.user_id != original_event.user_id:
+ raise AuthError(
+ 403,
+ "You don't have permission to redact events"
+ )
- account_data_events = []
- tags = yield self.store.get_tags_for_room(user_id, room_id)
- if tags:
- account_data_events.append({
- "type": "m.tag",
- "content": {"tags": tags},
- })
-
- account_data = yield self.store.get_account_data_for_room(user_id, room_id)
- for account_data_type, content in account_data.items():
- account_data_events.append({
- "type": account_data_type,
- "content": content,
- })
-
- result["account_data"] = account_data_events
-
- defer.returnValue(result)
-
- @defer.inlineCallbacks
- def _room_initial_sync_parted(self, user_id, room_id, pagin_config,
- membership, member_event_id, is_peeking):
- room_state = yield self.store.get_state_for_events(
- [member_event_id], None
- )
-
- room_state = room_state[member_event_id]
-
- limit = pagin_config.limit if pagin_config else None
- if limit is None:
- limit = 10
+ if event.type == EventTypes.Create and context.prev_state_ids:
+ raise AuthError(
+ 403,
+ "Changing the room create event is forbidden",
+ )
- stream_token = yield self.store.get_stream_token_for_event(
- member_event_id
+ action_generator = ActionGenerator(self.hs)
+ yield action_generator.handle_push_actions_for_event(
+ event, context
)
- messages, token = yield self.store.get_recent_events_for_room(
- room_id,
- limit=limit,
- end_token=stream_token
+ (event_stream_id, max_stream_id) = yield self.store.persist_event(
+ event, context=context
)
- messages = yield self._filter_events_for_client(
- user_id, messages, is_peeking=is_peeking
+ # this intentionally does not yield: we don't care about the result
+ # and don't need to wait for it.
+ preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
+ event_stream_id, max_stream_id
)
- start_token = StreamToken.START.copy_and_replace("room_key", token[0])
- end_token = StreamToken.START.copy_and_replace("room_key", token[1])
-
- time_now = self.clock.time_msec()
-
- defer.returnValue({
- "membership": membership,
- "room_id": room_id,
- "messages": {
- "chunk": [serialize_event(m, time_now) for m in messages],
- "start": start_token.to_string(),
- "end": end_token.to_string(),
- },
- "state": [serialize_event(s, time_now) for s in room_state.values()],
- "presence": [],
- "receipts": [],
- })
-
- @defer.inlineCallbacks
- def _room_initial_sync_joined(self, user_id, room_id, pagin_config,
- membership, is_peeking):
- current_state = yield self.state.get_current_state(
- room_id=room_id,
- )
-
- # TODO: These concurrently
- time_now = self.clock.time_msec()
- state = [
- serialize_event(x, time_now)
- for x in current_state.values()
- ]
-
- now_token = yield self.hs.get_event_sources().get_current_token()
-
- limit = pagin_config.limit if pagin_config else None
- if limit is None:
- limit = 10
-
- room_members = [
- m for m in current_state.values()
- if m.type == EventTypes.Member
- and m.content["membership"] == Membership.JOIN
- ]
-
- presence_handler = self.hs.get_handlers().presence_handler
-
@defer.inlineCallbacks
- def get_presence():
- states = yield presence_handler.get_states(
- [m.user_id for m in room_members],
- as_event=True,
+ def _notify():
+ yield run_on_reactor()
+ yield self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id,
+ extra_users=extra_users
)
- defer.returnValue(states)
-
- @defer.inlineCallbacks
- def get_receipts():
- receipts_handler = self.hs.get_handlers().receipts_handler
- receipts = yield receipts_handler.get_receipts_for_room(
- room_id,
- now_token.receipt_key
- )
- defer.returnValue(receipts)
-
- presence, receipts, (messages, token) = yield defer.gatherResults(
- [
- get_presence(),
- get_receipts(),
- self.store.get_recent_events_for_room(
- room_id,
- limit=limit,
- end_token=now_token.room_key,
- )
- ],
- consumeErrors=True,
- ).addErrback(unwrapFirstError)
-
- messages = yield self._filter_events_for_client(
- user_id, messages, is_peeking=is_peeking,
- )
-
- start_token = now_token.copy_and_replace("room_key", token[0])
- end_token = now_token.copy_and_replace("room_key", token[1])
-
- time_now = self.clock.time_msec()
-
- ret = {
- "room_id": room_id,
- "messages": {
- "chunk": [serialize_event(m, time_now) for m in messages],
- "start": start_token.to_string(),
- "end": end_token.to_string(),
- },
- "state": state,
- "presence": presence,
- "receipts": receipts,
- }
- if not is_peeking:
- ret["membership"] = membership
+ preserve_fn(_notify)()
- defer.returnValue(ret)
+ # If invite, remove room_state from unsigned before sending.
+ event.unsigned.pop("invite_room_state", None)
|