diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 2643438e84..8a7b4916cd 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -15,9 +15,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+import random
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
-from canonicaljson import encode_canonical_json, json
+from canonicaljson import encode_canonical_json
from twisted.internet.interfaces import IDelayedCall
@@ -34,6 +35,7 @@ from synapse.api.errors import (
Codes,
ConsentNotGivenError,
NotFoundError,
+ ShadowBanError,
SynapseError,
)
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
@@ -47,14 +49,8 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
-from synapse.types import (
- Collection,
- Requester,
- RoomAlias,
- StreamToken,
- UserID,
- create_requester,
-)
+from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
+from synapse.util import json_decoder
from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.metrics import measure_func
@@ -68,7 +64,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
-class MessageHandler(object):
+class MessageHandler:
"""Contains some read only APIs to get state about a room
"""
@@ -92,12 +88,7 @@ class MessageHandler(object):
)
async def get_room_data(
- self,
- user_id: str,
- room_id: str,
- event_type: str,
- state_key: str,
- is_guest: bool,
+ self, user_id: str, room_id: str, event_type: str, state_key: str,
) -> dict:
""" Get data from a room.
@@ -106,11 +97,10 @@ class MessageHandler(object):
room_id
event_type
state_key
- is_guest
Returns:
The path data content.
Raises:
- SynapseError if something went wrong.
+ SynapseError or AuthError if the user is not in the room
"""
(
membership,
@@ -127,6 +117,16 @@ class MessageHandler(object):
[membership_event_id], StateFilter.from_types([key])
)
data = room_state[membership_event_id].get(key)
+ else:
+ # check_user_in_room_or_world_readable, if it doesn't raise an AuthError, should
+ # only ever return a Membership.JOIN/LEAVE object
+ #
+ # Safeguard in case it returned something else
+ logger.error(
+ "Attempted to retrieve data from a room for a user that has never been in it. "
+ "This should not have happened."
+ )
+ raise SynapseError(403, "User not in room", errcode=Codes.FORBIDDEN)
return data
@@ -361,7 +361,7 @@ class MessageHandler(object):
_DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY = 7 * 24 * 60 * 60 * 1000
-class EventCreationHandler(object):
+class EventCreationHandler:
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.auth = hs.get_auth()
@@ -439,7 +439,7 @@ class EventCreationHandler(object):
event_dict: dict,
token_id: Optional[str] = None,
txn_id: Optional[str] = None,
- prev_event_ids: Optional[Collection[str]] = None,
+ prev_event_ids: Optional[List[str]] = None,
require_consent: bool = True,
) -> Tuple[EventBase, EventContext]:
"""
@@ -644,37 +644,48 @@ class EventCreationHandler(object):
event: EventBase,
context: EventContext,
ratelimit: bool = True,
+ ignore_shadow_ban: bool = False,
) -> int:
"""
Persists and notifies local clients and federation of an event.
Args:
- requester
- event the event to send.
- context: the context of the event.
+ requester: The requester sending the event.
+ event: The event to send.
+ context: The context of the event.
ratelimit: Whether to rate limit this send.
+ ignore_shadow_ban: True if shadow-banned users should be allowed to
+ send this event.
Return:
The stream_id of the persisted event.
+
+ Raises:
+ ShadowBanError if the requester has been shadow-banned.
"""
if event.type == EventTypes.Member:
raise SynapseError(
500, "Tried to send member event through non-member codepath"
)
+ if not ignore_shadow_ban and requester.shadow_banned:
+ # We randomly sleep a bit just to annoy the requester.
+ await self.clock.sleep(random.randint(1, 10))
+ raise ShadowBanError()
+
user = UserID.from_string(event.sender)
assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
if event.is_state():
- prev_state = await self.deduplicate_state_event(event, context)
- if prev_state is not None:
+ prev_event = await self.deduplicate_state_event(event, context)
+ if prev_event is not None:
logger.info(
"Not bothering to persist state event %s duplicated by %s",
event.event_id,
- prev_state.event_id,
+ prev_event.event_id,
)
- return prev_state
+ return await self.store.get_stream_id_for_event(prev_event.event_id)
return await self.handle_new_client_event(
requester=requester, event=event, context=context, ratelimit=ratelimit
@@ -682,27 +693,32 @@ class EventCreationHandler(object):
async def deduplicate_state_event(
self, event: EventBase, context: EventContext
- ) -> None:
+ ) -> Optional[EventBase]:
"""
Checks whether event is in the latest resolved state in context.
- If so, returns the version of the event in context.
- Otherwise, returns None.
+ Args:
+ event: The event to check for duplication.
+ context: The event context.
+
+ Returns:
+ The previous verion of the event is returned, if it is found in the
+ event context. Otherwise, None is returned.
"""
prev_state_ids = await context.get_prev_state_ids()
prev_event_id = prev_state_ids.get((event.type, event.state_key))
if not prev_event_id:
- return
+ return None
prev_event = await self.store.get_event(prev_event_id, allow_none=True)
if not prev_event:
- return
+ return None
if prev_event and event.user_id == prev_event.user_id:
prev_content = encode_canonical_json(prev_event.content)
next_content = encode_canonical_json(event.content)
if prev_content == next_content:
return prev_event
- return
+ return None
async def create_and_send_nonmember_event(
self,
@@ -710,12 +726,28 @@ class EventCreationHandler(object):
event_dict: dict,
ratelimit: bool = True,
txn_id: Optional[str] = None,
+ ignore_shadow_ban: bool = False,
) -> Tuple[EventBase, int]:
"""
Creates an event, then sends it.
See self.create_event and self.send_nonmember_event.
+
+ Args:
+ requester: The requester sending the event.
+ event_dict: An entire event.
+ ratelimit: Whether to rate limit this send.
+ txn_id: The transaction ID.
+ ignore_shadow_ban: True if shadow-banned users should be allowed to
+ send this event.
+
+ Raises:
+ ShadowBanError if the requester has been shadow-banned.
"""
+ if not ignore_shadow_ban and requester.shadow_banned:
+ # We randomly sleep a bit just to annoy the requester.
+ await self.clock.sleep(random.randint(1, 10))
+ raise ShadowBanError()
# We limit the number of concurrent event sends in a room so that we
# don't fork the DAG too much. If we don't limit then we can end up in
@@ -734,7 +766,11 @@ class EventCreationHandler(object):
raise SynapseError(403, spam_error, Codes.FORBIDDEN)
stream_id = await self.send_nonmember_event(
- requester, event, context, ratelimit=ratelimit
+ requester,
+ event,
+ context,
+ ratelimit=ratelimit,
+ ignore_shadow_ban=ignore_shadow_ban,
)
return event, stream_id
@@ -743,7 +779,7 @@ class EventCreationHandler(object):
self,
builder: EventBuilder,
requester: Optional[Requester] = None,
- prev_event_ids: Optional[Collection[str]] = None,
+ prev_event_ids: Optional[List[str]] = None,
) -> Tuple[EventBase, EventContext]:
"""Create a new event for a local client
@@ -859,7 +895,7 @@ class EventCreationHandler(object):
# Ensure that we can round trip before trying to persist in db
try:
dump = frozendict_json_encoder.encode(event.content)
- json.loads(dump)
+ json_decoder.decode(dump)
except Exception:
logger.exception("Failed to encode content: %r", event.content)
raise
@@ -891,9 +927,7 @@ class EventCreationHandler(object):
except Exception:
# Ensure that we actually remove the entries in the push actions
# staging area, if we calculated them.
- run_in_background(
- self.store.remove_push_actions_from_staging, event.event_id
- )
+ await self.store.remove_push_actions_from_staging(event.event_id)
raise
async def _validate_canonical_alias(
@@ -957,7 +991,7 @@ class EventCreationHandler(object):
allow_none=True,
)
- is_admin_redaction = (
+ is_admin_redaction = bool(
original_event and event.sender != original_event.sender
)
@@ -1077,8 +1111,8 @@ class EventCreationHandler(object):
auth_events_ids = self.auth.compute_auth_events(
event, prev_state_ids, for_verification=True
)
- auth_events = await self.store.get_events(auth_events_ids)
- auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
+ auth_events_map = await self.store.get_events(auth_events_ids)
+ auth_events = {(e.type, e.state_key): e for e in auth_events_map.values()}
room_version = await self.store.get_room_version_id(event.room_id)
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
@@ -1176,8 +1210,14 @@ class EventCreationHandler(object):
event.internal_metadata.proactively_send = False
+ # Since this is a dummy-event it is OK if it is sent by a
+ # shadow-banned user.
await self.send_nonmember_event(
- requester, event, context, ratelimit=False
+ requester,
+ event,
+ context,
+ ratelimit=False,
+ ignore_shadow_ban=True,
)
dummy_event_sent = True
break
|