diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a5e8dc1323..8949343801 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -15,9 +15,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, List, Optional, Tuple
+import random
+from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
-from canonicaljson import encode_canonical_json, json
+from canonicaljson import encode_canonical_json
from twisted.internet.interfaces import IDelayedCall
@@ -34,6 +35,7 @@ from synapse.api.errors import (
Codes,
ConsentNotGivenError,
NotFoundError,
+ ShadowBanError,
SynapseError,
)
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
@@ -45,16 +47,10 @@ from synapse.events.validator import EventValidator
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
-from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
+from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
-from synapse.types import (
- Collection,
- Requester,
- RoomAlias,
- StreamToken,
- UserID,
- create_requester,
-)
+from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
+from synapse.util import json_decoder
from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.metrics import measure_func
@@ -69,7 +65,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
-class MessageHandler(object):
+class MessageHandler:
"""Contains some read only APIs to get state about a room
"""
@@ -93,12 +89,7 @@ class MessageHandler(object):
)
async def get_room_data(
- self,
- user_id: str = None,
- room_id: str = None,
- event_type: Optional[str] = None,
- state_key: str = "",
- is_guest: bool = False,
+ self, user_id: str, room_id: str, event_type: str, state_key: str,
) -> dict:
""" Get data from a room.
@@ -107,11 +98,10 @@ class MessageHandler(object):
room_id
event_type
state_key
- is_guest
Returns:
The path data content.
Raises:
- SynapseError if something went wrong.
+ SynapseError or AuthError if the user is not in the room
"""
(
membership,
@@ -128,6 +118,16 @@ class MessageHandler(object):
[membership_event_id], StateFilter.from_types([key])
)
data = room_state[membership_event_id].get(key)
+ else:
+ # check_user_in_room_or_world_readable, if it doesn't raise an AuthError, should
+ # only ever return a Membership.JOIN/LEAVE object
+ #
+ # Safeguard in case it returned something else
+ logger.error(
+ "Attempted to retrieve data from a room for a user that has never been in it. "
+ "This should not have happened."
+ )
+ raise SynapseError(403, "User not in room", errcode=Codes.FORBIDDEN)
return data
@@ -362,7 +362,7 @@ class MessageHandler(object):
_DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY = 7 * 24 * 60 * 60 * 1000
-class EventCreationHandler(object):
+class EventCreationHandler:
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.auth = hs.get_auth()
@@ -377,9 +377,8 @@ class EventCreationHandler(object):
self.notifier = hs.get_notifier()
self.config = hs.config
self.require_membership_for_aliases = hs.config.require_membership_for_aliases
- self._is_event_writer = (
- self.config.worker.writers.events == hs.get_instance_name()
- )
+ self._events_shard_config = self.config.worker.events_shard_config
+ self._instance_name = hs.get_instance_name()
self.room_invite_state_types = self.hs.config.room_invite_state_types
@@ -388,8 +387,6 @@ class EventCreationHandler(object):
# This is only used to get at ratelimit function, and maybe_kick_guest_users
self.base_handler = BaseHandler(hs)
- self.pusher_pool = hs.get_pusherpool()
-
# We arbitrarily limit concurrent event creation for a room to 5.
# This is to stop us from diverging history *too* much.
self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
@@ -410,7 +407,7 @@ class EventCreationHandler(object):
#
# map from room id to time-of-last-attempt.
#
- self._rooms_to_exclude_from_dummy_event_insertion = {} # type: dict[str, int]
+ self._rooms_to_exclude_from_dummy_event_insertion = {} # type: Dict[str, int]
# we need to construct a ConsentURIBuilder here, as it checks that the necessary
# config options, but *only* if we have a configuration for which we are
@@ -442,7 +439,7 @@ class EventCreationHandler(object):
event_dict: dict,
token_id: Optional[str] = None,
txn_id: Optional[str] = None,
- prev_event_ids: Optional[Collection[str]] = None,
+ prev_event_ids: Optional[List[str]] = None,
require_consent: bool = True,
) -> Tuple[EventBase, EventContext]:
"""
@@ -647,37 +644,48 @@ class EventCreationHandler(object):
event: EventBase,
context: EventContext,
ratelimit: bool = True,
+ ignore_shadow_ban: bool = False,
) -> int:
"""
Persists and notifies local clients and federation of an event.
Args:
- requester
- event the event to send.
- context: the context of the event.
+ requester: The requester sending the event.
+ event: The event to send.
+ context: The context of the event.
ratelimit: Whether to rate limit this send.
+ ignore_shadow_ban: True if shadow-banned users should be allowed to
+ send this event.
Return:
The stream_id of the persisted event.
+
+ Raises:
+ ShadowBanError if the requester has been shadow-banned.
"""
if event.type == EventTypes.Member:
raise SynapseError(
500, "Tried to send member event through non-member codepath"
)
+ if not ignore_shadow_ban and requester.shadow_banned:
+ # We randomly sleep a bit just to annoy the requester.
+ await self.clock.sleep(random.randint(1, 10))
+ raise ShadowBanError()
+
user = UserID.from_string(event.sender)
assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
if event.is_state():
- prev_state = await self.deduplicate_state_event(event, context)
- if prev_state is not None:
+ prev_event = await self.deduplicate_state_event(event, context)
+ if prev_event is not None:
logger.info(
"Not bothering to persist state event %s duplicated by %s",
event.event_id,
- prev_state.event_id,
+ prev_event.event_id,
)
- return prev_state
+ return await self.store.get_stream_id_for_event(prev_event.event_id)
return await self.handle_new_client_event(
requester=requester, event=event, context=context, ratelimit=ratelimit
@@ -685,40 +693,61 @@ class EventCreationHandler(object):
async def deduplicate_state_event(
self, event: EventBase, context: EventContext
- ) -> None:
+ ) -> Optional[EventBase]:
"""
Checks whether event is in the latest resolved state in context.
- If so, returns the version of the event in context.
- Otherwise, returns None.
+ Args:
+ event: The event to check for duplication.
+ context: The event context.
+
+ Returns:
+ The previous verion of the event is returned, if it is found in the
+ event context. Otherwise, None is returned.
"""
prev_state_ids = await context.get_prev_state_ids()
prev_event_id = prev_state_ids.get((event.type, event.state_key))
if not prev_event_id:
- return
+ return None
prev_event = await self.store.get_event(prev_event_id, allow_none=True)
if not prev_event:
- return
+ return None
if prev_event and event.user_id == prev_event.user_id:
prev_content = encode_canonical_json(prev_event.content)
next_content = encode_canonical_json(event.content)
if prev_content == next_content:
return prev_event
- return
+ return None
async def create_and_send_nonmember_event(
self,
requester: Requester,
- event_dict: EventBase,
+ event_dict: dict,
ratelimit: bool = True,
txn_id: Optional[str] = None,
+ ignore_shadow_ban: bool = False,
) -> Tuple[EventBase, int]:
"""
Creates an event, then sends it.
See self.create_event and self.send_nonmember_event.
+
+ Args:
+ requester: The requester sending the event.
+ event_dict: An entire event.
+ ratelimit: Whether to rate limit this send.
+ txn_id: The transaction ID.
+ ignore_shadow_ban: True if shadow-banned users should be allowed to
+ send this event.
+
+ Raises:
+ ShadowBanError if the requester has been shadow-banned.
"""
+ if not ignore_shadow_ban and requester.shadow_banned:
+ # We randomly sleep a bit just to annoy the requester.
+ await self.clock.sleep(random.randint(1, 10))
+ raise ShadowBanError()
# We limit the number of concurrent event sends in a room so that we
# don't fork the DAG too much. If we don't limit then we can end up in
@@ -737,7 +766,11 @@ class EventCreationHandler(object):
raise SynapseError(403, spam_error, Codes.FORBIDDEN)
stream_id = await self.send_nonmember_event(
- requester, event, context, ratelimit=ratelimit
+ requester,
+ event,
+ context,
+ ratelimit=ratelimit,
+ ignore_shadow_ban=ignore_shadow_ban,
)
return event, stream_id
@@ -746,7 +779,7 @@ class EventCreationHandler(object):
self,
builder: EventBuilder,
requester: Optional[Requester] = None,
- prev_event_ids: Optional[Collection[str]] = None,
+ prev_event_ids: Optional[List[str]] = None,
) -> Tuple[EventBase, EventContext]:
"""Create a new event for a local client
@@ -771,6 +804,15 @@ class EventCreationHandler(object):
else:
prev_event_ids = await self.store.get_prev_events_for_room(builder.room_id)
+ # we now ought to have some prev_events (unless it's a create event).
+ #
+ # do a quick sanity check here, rather than waiting until we've created the
+ # event and then try to auth it (which fails with a somewhat confusing "No
+ # create event in auth events")
+ assert (
+ builder.type == EventTypes.Create or len(prev_event_ids) > 0
+ ), "Attempting to create an event with no prev_events"
+
event = await builder.build(prev_event_ids=prev_event_ids)
context = await self.state.compute_event_context(event)
if requester:
@@ -853,7 +895,7 @@ class EventCreationHandler(object):
# Ensure that we can round trip before trying to persist in db
try:
dump = frozendict_json_encoder.encode(event.content)
- json.loads(dump)
+ json_decoder.decode(dump)
except Exception:
logger.exception("Failed to encode content: %r", event.content)
raise
@@ -862,9 +904,10 @@ class EventCreationHandler(object):
try:
# If we're a worker we need to hit out to the master.
- if not self._is_event_writer:
+ writer_instance = self._events_shard_config.get_instance(event.room_id)
+ if writer_instance != self._instance_name:
result = await self.send_event(
- instance_name=self.config.worker.writers.events,
+ instance_name=writer_instance,
event_id=event.event_id,
store=self.store,
requester=requester,
@@ -885,9 +928,7 @@ class EventCreationHandler(object):
except Exception:
# Ensure that we actually remove the entries in the push actions
# staging area, if we calculated them.
- run_in_background(
- self.store.remove_push_actions_from_staging, event.event_id
- )
+ await self.store.remove_push_actions_from_staging(event.event_id)
raise
async def _validate_canonical_alias(
@@ -934,7 +975,10 @@ class EventCreationHandler(object):
This should only be run on the instance in charge of persisting events.
"""
- assert self._is_event_writer
+ assert self.storage.persistence is not None
+ assert self._events_shard_config.should_handle(
+ self._instance_name, event.room_id
+ )
if ratelimit:
# We check if this is a room admin redacting an event so that we
@@ -951,7 +995,7 @@ class EventCreationHandler(object):
allow_none=True,
)
- is_admin_redaction = (
+ is_admin_redaction = bool(
original_event and event.sender != original_event.sender
)
@@ -965,7 +1009,7 @@ class EventCreationHandler(object):
# Validate a newly added alias or newly added alt_aliases.
original_alias = None
- original_alt_aliases = set()
+ original_alt_aliases = [] # type: List[str]
original_event_id = event.unsigned.get("replaces_state")
if original_event_id:
@@ -1013,6 +1057,10 @@ class EventCreationHandler(object):
current_state_ids = await context.get_current_state_ids()
+ # We know this event is not an outlier, so this must be
+ # non-None.
+ assert current_state_ids is not None
+
state_to_include_ids = [
e_id
for k, e_id in current_state_ids.items()
@@ -1064,11 +1112,11 @@ class EventCreationHandler(object):
raise SynapseError(400, "Cannot redact event from a different room")
prev_state_ids = await context.get_prev_state_ids()
- auth_events_ids = await self.auth.compute_auth_events(
+ auth_events_ids = self.auth.compute_auth_events(
event, prev_state_ids, for_verification=True
)
- auth_events = await self.store.get_events(auth_events_ids)
- auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
+ auth_events_map = await self.store.get_events(auth_events_ids)
+ auth_events = {(e.type, e.state_key): e for e in auth_events_map.values()}
room_version = await self.store.get_room_version_id(event.room_id)
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
@@ -1093,7 +1141,7 @@ class EventCreationHandler(object):
if prev_state_ids:
raise AuthError(403, "Changing the room create event is forbidden")
- event_stream_id, max_stream_id = await self.storage.persistence.persist_event(
+ event_pos, max_stream_token = await self.storage.persistence.persist_event(
event, context=context
)
@@ -1101,12 +1149,10 @@ class EventCreationHandler(object):
# If there's an expiry timestamp on the event, schedule its expiry.
self._message_handler.maybe_schedule_expiry(event)
- await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
-
def _notify():
try:
self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id, extra_users=extra_users
+ event, event_pos, max_stream_token, extra_users=extra_users
)
except Exception:
logger.exception("Error notifying about new room event")
@@ -1118,7 +1164,7 @@ class EventCreationHandler(object):
# matters as sometimes presence code can take a while.
run_in_background(self._bump_active_time, requester.user)
- return event_stream_id
+ return event_pos.stream
async def _bump_active_time(self, user: UserID) -> None:
try:
@@ -1139,48 +1185,7 @@ class EventCreationHandler(object):
)
for room_id in room_ids:
- # For each room we need to find a joined member we can use to send
- # the dummy event with.
-
- latest_event_ids = await self.store.get_prev_events_for_room(room_id)
-
- members = await self.state.get_current_users_in_room(
- room_id, latest_event_ids=latest_event_ids
- )
- dummy_event_sent = False
- for user_id in members:
- if not self.hs.is_mine_id(user_id):
- continue
- requester = create_requester(user_id)
- try:
- event, context = await self.create_event(
- requester,
- {
- "type": "org.matrix.dummy_event",
- "content": {},
- "room_id": room_id,
- "sender": user_id,
- },
- prev_event_ids=latest_event_ids,
- )
-
- event.internal_metadata.proactively_send = False
-
- await self.send_nonmember_event(
- requester, event, context, ratelimit=False
- )
- dummy_event_sent = True
- break
- except ConsentNotGivenError:
- logger.info(
- "Failed to send dummy event into room %s for user %s due to "
- "lack of consent. Will try another user" % (room_id, user_id)
- )
- except AuthError:
- logger.info(
- "Failed to send dummy event into room %s for user %s due to "
- "lack of power. Will try another user" % (room_id, user_id)
- )
+ dummy_event_sent = await self._send_dummy_event_for_room(room_id)
if not dummy_event_sent:
# Did not find a valid user in the room, so remove from future attempts
@@ -1193,6 +1198,59 @@ class EventCreationHandler(object):
now = self.clock.time_msec()
self._rooms_to_exclude_from_dummy_event_insertion[room_id] = now
+ async def _send_dummy_event_for_room(self, room_id: str) -> bool:
+ """Attempt to send a dummy event for the given room.
+
+ Args:
+ room_id: room to try to send an event from
+
+ Returns:
+ True if a dummy event was successfully sent. False if no user was able
+ to send an event.
+ """
+
+ # For each room we need to find a joined member we can use to send
+ # the dummy event with.
+ latest_event_ids = await self.store.get_prev_events_for_room(room_id)
+ members = await self.state.get_current_users_in_room(
+ room_id, latest_event_ids=latest_event_ids
+ )
+ for user_id in members:
+ if not self.hs.is_mine_id(user_id):
+ continue
+ requester = create_requester(user_id)
+ try:
+ event, context = await self.create_event(
+ requester,
+ {
+ "type": "org.matrix.dummy_event",
+ "content": {},
+ "room_id": room_id,
+ "sender": user_id,
+ },
+ prev_event_ids=latest_event_ids,
+ )
+
+ event.internal_metadata.proactively_send = False
+
+ # Since this is a dummy-event it is OK if it is sent by a
+ # shadow-banned user.
+ await self.send_nonmember_event(
+ requester, event, context, ratelimit=False, ignore_shadow_ban=True,
+ )
+ return True
+ except ConsentNotGivenError:
+ logger.info(
+ "Failed to send dummy event into room %s for user %s due to "
+ "lack of consent. Will try another user" % (room_id, user_id)
+ )
+ except AuthError:
+ logger.info(
+ "Failed to send dummy event into room %s for user %s due to "
+ "lack of power. Will try another user" % (room_id, user_id)
+ )
+ return False
+
def _expire_rooms_to_exclude_from_dummy_event_insertion(self):
expire_before = self.clock.time_msec() - _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY
to_expire = set()
|