diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 11650dc80c..683da6bf32 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -34,9 +34,10 @@ from synapse.api.errors import (
from synapse.api.room_versions import RoomVersions
from synapse.api.urls import ConsentURIBuilder
from synapse.events.validator import EventValidator
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.state import StateFilter
-from synapse.types import RoomAlias, UserID
+from synapse.types import RoomAlias, UserID, create_requester
from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background
@@ -60,8 +61,9 @@ class MessageHandler(object):
self._event_serializer = hs.get_event_client_serializer()
@defer.inlineCallbacks
- def get_room_data(self, user_id=None, room_id=None,
- event_type=None, state_key="", is_guest=False):
+ def get_room_data(
+ self, user_id=None, room_id=None, event_type=None, state_key="", is_guest=False
+ ):
""" Get data from a room.
Args:
@@ -76,9 +78,7 @@ class MessageHandler(object):
)
if membership == Membership.JOIN:
- data = yield self.state.get_current_state(
- room_id, event_type, state_key
- )
+ data = yield self.state.get_current_state(room_id, event_type, state_key)
elif membership == Membership.LEAVE:
key = (event_type, state_key)
room_state = yield self.store.get_state_for_events(
@@ -90,8 +90,12 @@ class MessageHandler(object):
@defer.inlineCallbacks
def get_state_events(
- self, user_id, room_id, state_filter=StateFilter.all(),
- at_token=None, is_guest=False,
+ self,
+ user_id,
+ room_id,
+ state_filter=StateFilter.all(),
+ at_token=None,
+ is_guest=False,
):
"""Retrieve all state events for a given room. If the user is
joined to the room then return the current state. If the user has
@@ -123,50 +127,48 @@ class MessageHandler(object):
# does not reliably give you the state at the given stream position.
# (https://github.com/matrix-org/synapse/issues/3305)
last_events, _ = yield self.store.get_recent_events_for_room(
- room_id, end_token=at_token.room_key, limit=1,
+ room_id, end_token=at_token.room_key, limit=1
)
if not last_events:
- raise NotFoundError("Can't find event for token %s" % (at_token, ))
+ raise NotFoundError("Can't find event for token %s" % (at_token,))
visible_events = yield filter_events_for_client(
- self.store, user_id, last_events,
+ self.store, user_id, last_events
)
event = last_events[0]
if visible_events:
room_state = yield self.store.get_state_for_events(
- [event.event_id], state_filter=state_filter,
+ [event.event_id], state_filter=state_filter
)
room_state = room_state[event.event_id]
else:
raise AuthError(
403,
- "User %s not allowed to view events in room %s at token %s" % (
- user_id, room_id, at_token,
- )
+ "User %s not allowed to view events in room %s at token %s"
+ % (user_id, room_id, at_token),
)
else:
membership, membership_event_id = (
- yield self.auth.check_in_room_or_world_readable(
- room_id, user_id,
- )
+ yield self.auth.check_in_room_or_world_readable(room_id, user_id)
)
if membership == Membership.JOIN:
state_ids = yield self.store.get_filtered_current_state_ids(
- room_id, state_filter=state_filter,
+ room_id, state_filter=state_filter
)
room_state = yield self.store.get_events(state_ids.values())
elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events(
- [membership_event_id], state_filter=state_filter,
+ [membership_event_id], state_filter=state_filter
)
room_state = room_state[membership_event_id]
now = self.clock.time_msec()
events = yield self._event_serializer.serialize_events(
- room_state.values(), now,
+ room_state.values(),
+ now,
# We don't bother bundling aggregations in when asked for state
# events, as clients won't use them.
bundle_aggregations=False,
@@ -210,13 +212,15 @@ class MessageHandler(object):
# Loop fell through, AS has no interested users in room
raise AuthError(403, "Appservice not in room")
- defer.returnValue({
- user_id: {
- "avatar_url": profile.avatar_url,
- "display_name": profile.display_name,
+ defer.returnValue(
+ {
+ user_id: {
+ "avatar_url": profile.avatar_url,
+ "display_name": profile.display_name,
+ }
+ for user_id, profile in iteritems(users_with_profile)
}
- for user_id, profile in iteritems(users_with_profile)
- })
+ )
class EventCreationHandler(object):
@@ -261,9 +265,28 @@ class EventCreationHandler(object):
if self._block_events_without_consent_error:
self._consent_uri_builder = ConsentURIBuilder(self.config)
+ if (
+ not self.config.worker_app
+ and self.config.cleanup_extremities_with_dummy_events
+ ):
+ self.clock.looping_call(
+ lambda: run_as_background_process(
+ "send_dummy_events_to_fill_extremities",
+ self._send_dummy_events_to_fill_extremities,
+ ),
+ 5 * 60 * 1000,
+ )
+
@defer.inlineCallbacks
- def create_event(self, requester, event_dict, token_id=None, txn_id=None,
- prev_events_and_hashes=None, require_consent=True):
+ def create_event(
+ self,
+ requester,
+ event_dict,
+ token_id=None,
+ txn_id=None,
+ prev_events_and_hashes=None,
+ require_consent=True,
+ ):
"""
Given a dict from a client, create a new event.
@@ -323,8 +346,7 @@ class EventCreationHandler(object):
content["avatar_url"] = yield profile.get_avatar_url(target)
except Exception as e:
logger.info(
- "Failed to get profile information for %r: %s",
- target, e
+ "Failed to get profile information for %r: %s", target, e
)
is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
@@ -360,16 +382,17 @@ class EventCreationHandler(object):
prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
if not prev_event or prev_event.membership != Membership.JOIN:
logger.warning(
- ("Attempt to send `m.room.aliases` in room %s by user %s but"
- " membership is %s"),
+ (
+ "Attempt to send `m.room.aliases` in room %s by user %s but"
+ " membership is %s"
+ ),
event.room_id,
event.sender,
prev_event.membership if prev_event else None,
)
raise AuthError(
- 403,
- "You must be in the room to create an alias for it",
+ 403, "You must be in the room to create an alias for it"
)
self.validator.validate_new(event)
@@ -436,8 +459,8 @@ class EventCreationHandler(object):
# exempt the system notices user
if (
- self.config.server_notices_mxid is not None and
- user_id == self.config.server_notices_mxid
+ self.config.server_notices_mxid is not None
+ and user_id == self.config.server_notices_mxid
):
return
@@ -450,15 +473,10 @@ class EventCreationHandler(object):
return
consent_uri = self._consent_uri_builder.build_user_consent_uri(
- requester.user.localpart,
- )
- msg = self._block_events_without_consent_error % {
- 'consent_uri': consent_uri,
- }
- raise ConsentNotGivenError(
- msg=msg,
- consent_uri=consent_uri,
+ requester.user.localpart
)
+ msg = self._block_events_without_consent_error % {"consent_uri": consent_uri}
+ raise ConsentNotGivenError(msg=msg, consent_uri=consent_uri)
@defer.inlineCallbacks
def send_nonmember_event(self, requester, event, context, ratelimit=True):
@@ -473,8 +491,7 @@ class EventCreationHandler(object):
"""
if event.type == EventTypes.Member:
raise SynapseError(
- 500,
- "Tried to send member event through non-member codepath"
+ 500, "Tried to send member event through non-member codepath"
)
user = UserID.from_string(event.sender)
@@ -486,15 +503,13 @@ class EventCreationHandler(object):
if prev_state is not None:
logger.info(
"Not bothering to persist state event %s duplicated by %s",
- event.event_id, prev_state.event_id,
+ event.event_id,
+ prev_state.event_id,
)
defer.returnValue(prev_state)
yield self.handle_new_client_event(
- requester=requester,
- event=event,
- context=context,
- ratelimit=ratelimit,
+ requester=requester, event=event, context=context, ratelimit=ratelimit
)
@defer.inlineCallbacks
@@ -520,11 +535,7 @@ class EventCreationHandler(object):
@defer.inlineCallbacks
def create_and_send_nonmember_event(
- self,
- requester,
- event_dict,
- ratelimit=True,
- txn_id=None
+ self, requester, event_dict, ratelimit=True, txn_id=None
):
"""
Creates an event, then sends it.
@@ -539,32 +550,25 @@ class EventCreationHandler(object):
# taking longer.
with (yield self.limiter.queue(event_dict["room_id"])):
event, context = yield self.create_event(
- requester,
- event_dict,
- token_id=requester.access_token_id,
- txn_id=txn_id
+ requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id
)
spam_error = self.spam_checker.check_event_for_spam(event)
if spam_error:
if not isinstance(spam_error, string_types):
spam_error = "Spam is not permitted here"
- raise SynapseError(
- 403, spam_error, Codes.FORBIDDEN
- )
+ raise SynapseError(403, spam_error, Codes.FORBIDDEN)
yield self.send_nonmember_event(
- requester,
- event,
- context,
- ratelimit=ratelimit,
+ requester, event, context, ratelimit=ratelimit
)
defer.returnValue(event)
@measure_func("create_new_client_event")
@defer.inlineCallbacks
- def create_new_client_event(self, builder, requester=None,
- prev_events_and_hashes=None):
+ def create_new_client_event(
+ self, builder, requester=None, prev_events_and_hashes=None
+ ):
"""Create a new event for a local client
Args:
@@ -584,22 +588,21 @@ class EventCreationHandler(object):
"""
if prev_events_and_hashes is not None:
- assert len(prev_events_and_hashes) <= 10, \
- "Attempting to create an event with %i prev_events" % (
- len(prev_events_and_hashes),
+ assert len(prev_events_and_hashes) <= 10, (
+ "Attempting to create an event with %i prev_events"
+ % (len(prev_events_and_hashes),)
)
else:
- prev_events_and_hashes = \
- yield self.store.get_prev_events_for_room(builder.room_id)
+ prev_events_and_hashes = yield self.store.get_prev_events_for_room(
+ builder.room_id
+ )
prev_events = [
(event_id, prev_hashes)
for event_id, prev_hashes, _ in prev_events_and_hashes
]
- event = yield builder.build(
- prev_event_ids=[p for p, _ in prev_events],
- )
+ event = yield builder.build(prev_event_ids=[p for p, _ in prev_events])
context = yield self.state.compute_event_context(event)
if requester:
context.app_service = requester.app_service
@@ -615,29 +618,19 @@ class EventCreationHandler(object):
aggregation_key = relation["key"]
already_exists = yield self.store.has_user_annotated_event(
- relates_to, event.type, aggregation_key, event.sender,
+ relates_to, event.type, aggregation_key, event.sender
)
if already_exists:
raise SynapseError(400, "Can't send same reaction twice")
- logger.debug(
- "Created event %s",
- event.event_id,
- )
+ logger.debug("Created event %s", event.event_id)
- defer.returnValue(
- (event, context,)
- )
+ defer.returnValue((event, context))
@measure_func("handle_new_client_event")
@defer.inlineCallbacks
def handle_new_client_event(
- self,
- requester,
- event,
- context,
- ratelimit=True,
- extra_users=[],
+ self, requester, event, context, ratelimit=True, extra_users=[]
):
"""Processes a new event. This includes checking auth, persisting it,
notifying users, sending to remote servers, etc.
@@ -653,19 +646,20 @@ class EventCreationHandler(object):
extra_users (list(UserID)): Any extra users to notify about event
"""
- if event.is_state() and (event.type, event.state_key) == (EventTypes.Create, ""):
- room_version = event.content.get(
- "room_version", RoomVersions.V1.identifier
- )
+ if event.is_state() and (event.type, event.state_key) == (
+ EventTypes.Create,
+ "",
+ ):
+ room_version = event.content.get("room_version", RoomVersions.V1.identifier)
else:
room_version = yield self.store.get_room_version(event.room_id)
event_allowed = yield self.third_party_event_rules.check_event_allowed(
- event, context,
+ event, context
)
if not event_allowed:
raise SynapseError(
- 403, "This event is not allowed in this context", Codes.FORBIDDEN,
+ 403, "This event is not allowed in this context", Codes.FORBIDDEN
)
try:
@@ -682,9 +676,7 @@ class EventCreationHandler(object):
logger.exception("Failed to encode content: %r", event.content)
raise
- yield self.action_generator.handle_push_actions_for_event(
- event, context
- )
+ yield self.action_generator.handle_push_actions_for_event(event, context)
# reraise does not allow inlineCallbacks to preserve the stacktrace, so we
# hack around with a try/finally instead.
@@ -705,11 +697,7 @@ class EventCreationHandler(object):
return
yield self.persist_and_notify_client_event(
- requester,
- event,
- context,
- ratelimit=ratelimit,
- extra_users=extra_users,
+ requester, event, context, ratelimit=ratelimit, extra_users=extra_users
)
success = True
@@ -718,18 +706,12 @@ class EventCreationHandler(object):
# Ensure that we actually remove the entries in the push actions
# staging area, if we calculated them.
run_in_background(
- self.store.remove_push_actions_from_staging,
- event.event_id,
+ self.store.remove_push_actions_from_staging, event.event_id
)
@defer.inlineCallbacks
def persist_and_notify_client_event(
- self,
- requester,
- event,
- context,
- ratelimit=True,
- extra_users=[],
+ self, requester, event, context, ratelimit=True, extra_users=[]
):
"""Called when we have fully built the event, have already
calculated the push actions for the event, and checked auth.
@@ -754,20 +736,16 @@ class EventCreationHandler(object):
if mapping["room_id"] != event.room_id:
raise SynapseError(
400,
- "Room alias %s does not point to the room" % (
- room_alias_str,
- )
+ "Room alias %s does not point to the room" % (room_alias_str,),
)
federation_handler = self.hs.get_handlers().federation_handler
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.INVITE:
+
def is_inviter_member_event(e):
- return (
- e.type == EventTypes.Member and
- e.sender == event.sender
- )
+ return e.type == EventTypes.Member and e.sender == event.sender
current_state_ids = yield context.get_current_state_ids(self.store)
@@ -797,26 +775,21 @@ class EventCreationHandler(object):
# to get them to sign the event.
returned_invite = yield federation_handler.send_invite(
- invitee.domain,
- event,
+ invitee.domain, event
)
event.unsigned.pop("room_state", None)
# TODO: Make sure the signatures actually are correct.
- event.signatures.update(
- returned_invite.signatures
- )
+ event.signatures.update(returned_invite.signatures)
if event.type == EventTypes.Redaction:
prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.auth.compute_auth_events(
- event, prev_state_ids, for_verification=True,
+ event, prev_state_ids, for_verification=True
)
auth_events = yield self.store.get_events(auth_events_ids)
- auth_events = {
- (e.type, e.state_key): e for e in auth_events.values()
- }
+ auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
room_version = yield self.store.get_room_version(event.room_id)
if self.auth.check_redaction(room_version, event, auth_events=auth_events):
original_event = yield self.store.get_event(
@@ -824,13 +797,10 @@ class EventCreationHandler(object):
check_redacted=False,
get_prev_content=False,
allow_rejected=False,
- allow_none=False
+ allow_none=False,
)
if event.user_id != original_event.user_id:
- raise AuthError(
- 403,
- "You don't have permission to redact events"
- )
+ raise AuthError(403, "You don't have permission to redact events")
# We've already checked.
event.internal_metadata.recheck_redaction = False
@@ -838,24 +808,18 @@ class EventCreationHandler(object):
if event.type == EventTypes.Create:
prev_state_ids = yield context.get_prev_state_ids(self.store)
if prev_state_ids:
- raise AuthError(
- 403,
- "Changing the room create event is forbidden",
- )
+ raise AuthError(403, "Changing the room create event is forbidden")
(event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context=context
)
- yield self.pusher_pool.on_new_notifications(
- event_stream_id, max_stream_id,
- )
+ yield self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
def _notify():
try:
self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=extra_users
+ event, event_stream_id, max_stream_id, extra_users=extra_users
)
except Exception:
logger.exception("Error notifying about new room event")
@@ -874,3 +838,54 @@ class EventCreationHandler(object):
yield presence.bump_presence_active_time(user)
except Exception:
logger.exception("Error bumping presence active time")
+
+ @defer.inlineCallbacks
+ def _send_dummy_events_to_fill_extremities(self):
+ """Background task to send dummy events into rooms that have a large
+ number of extremities
+ """
+
+ room_ids = yield self.store.get_rooms_with_many_extremities(
+ min_count=10, limit=5
+ )
+
+ for room_id in room_ids:
+ # For each room we need to find a joined member we can use to send
+ # the dummy event with.
+
+ prev_events_and_hashes = yield self.store.get_prev_events_for_room(room_id)
+
+ latest_event_ids = (event_id for (event_id, _, _) in prev_events_and_hashes)
+
+ members = yield self.state.get_current_users_in_room(
+ room_id, latest_event_ids=latest_event_ids
+ )
+
+ user_id = None
+ for member in members:
+ if self.hs.is_mine_id(member):
+ user_id = member
+ break
+
+ if not user_id:
+ # We don't have a joined user.
+ # TODO: We should do something here to stop the room from
+ # appearing next time.
+ continue
+
+ requester = create_requester(user_id)
+
+ event, context = yield self.create_event(
+ requester,
+ {
+ "type": "org.matrix.dummy_event",
+ "content": {},
+ "room_id": room_id,
+ "sender": user_id,
+ },
+ prev_events_and_hashes=prev_events_and_hashes,
+ )
+
+ event.internal_metadata.proactively_send = False
+
+ yield self.send_nonmember_event(requester, event, context, ratelimit=False)
|