diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index c8d5e58035..07240d3a14 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -14,6 +14,7 @@
# limitations under the License.
import logging
+from typing import Dict, List, Optional
from prometheus_client import Counter
@@ -21,13 +22,16 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import EventTypes
+from synapse.appservice import ApplicationService
+from synapse.events import EventBase
+from synapse.handlers.presence import format_user_presence_state
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import (
event_processing_loop_counter,
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import RoomStreamToken
+from synapse.types import Collection, JsonDict, RoomStreamToken, UserID
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@@ -44,6 +48,7 @@ class ApplicationServicesHandler:
self.started_scheduler = False
self.clock = hs.get_clock()
self.notify_appservices = hs.config.notify_appservices
+ self.event_sources = hs.get_event_sources()
self.current_max = 0
self.is_processing = False
@@ -82,7 +87,7 @@ class ApplicationServicesHandler:
if not events:
break
- events_by_room = {}
+ events_by_room = {} # type: Dict[str, List[EventBase]]
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
@@ -161,6 +166,104 @@ class ApplicationServicesHandler:
finally:
self.is_processing = False
+ async def notify_interested_services_ephemeral(
+ self, stream_key: str, new_token: Optional[int], users: Collection[UserID] = [],
+ ):
+ """This is called by the notifier in the background
+ when a ephemeral event handled by the homeserver.
+
+ This will determine which appservices
+ are interested in the event, and submit them.
+
+ Events will only be pushed to appservices
+ that have opted into ephemeral events
+
+ Args:
+ stream_key: The stream the event came from.
+ new_token: The latest stream token
+ users: The user(s) involved with the event.
+ """
+ services = [
+ service
+ for service in self.store.get_app_services()
+ if service.supports_ephemeral
+ ]
+ if not services or not self.notify_appservices:
+ return
+ logger.info("Checking interested services for %s" % (stream_key))
+ with Measure(self.clock, "notify_interested_services_ephemeral"):
+ for service in services:
+ # Only handle typing if we have the latest token
+ if stream_key == "typing_key" and new_token is not None:
+ events = await self._handle_typing(service, new_token)
+ if events:
+ self.scheduler.submit_ephemeral_events_for_as(service, events)
+ # We don't persist the token for typing_key for performance reasons
+ elif stream_key == "receipt_key":
+ events = await self._handle_receipts(service)
+ if events:
+ self.scheduler.submit_ephemeral_events_for_as(service, events)
+ await self.store.set_type_stream_id_for_appservice(
+ service, "read_receipt", new_token
+ )
+ elif stream_key == "presence_key":
+ events = await self._handle_presence(service, users)
+ if events:
+ self.scheduler.submit_ephemeral_events_for_as(service, events)
+ await self.store.set_type_stream_id_for_appservice(
+ service, "presence", new_token
+ )
+
+ async def _handle_typing(self, service: ApplicationService, new_token: int):
+ typing_source = self.event_sources.sources["typing"]
+ # Get the typing events from just before current
+ typing, _ = await typing_source.get_new_events_as(
+ service=service,
+ # For performance reasons, we don't persist the previous
+ # token in the DB and instead fetch the latest typing information
+ # for appservices.
+ from_key=new_token - 1,
+ )
+ return typing
+
+ async def _handle_receipts(self, service: ApplicationService):
+ from_key = await self.store.get_type_stream_id_for_appservice(
+ service, "read_receipt"
+ )
+ receipts_source = self.event_sources.sources["receipt"]
+ receipts, _ = await receipts_source.get_new_events_as(
+ service=service, from_key=from_key
+ )
+ return receipts
+
+ async def _handle_presence(
+ self, service: ApplicationService, users: Collection[UserID]
+ ):
+ events = [] # type: List[JsonDict]
+ presence_source = self.event_sources.sources["presence"]
+ from_key = await self.store.get_type_stream_id_for_appservice(
+ service, "presence"
+ )
+ for user in users:
+ interested = await service.is_interested_in_presence(user, self.store)
+ if not interested:
+ continue
+ presence_events, _ = await presence_source.get_new_events(
+ user=user, service=service, from_key=from_key,
+ )
+ time_now = self.clock.time_msec()
+ presence_events = [
+ {
+ "type": "m.presence",
+ "sender": event.user_id,
+ "content": format_user_presence_state(
+ event, time_now, include_user_id=False
+ ),
+ }
+ for event in presence_events
+ ]
+ events = events + presence_events
+
async def query_user_exists(self, user_id):
"""Check if any application service knows this user_id exists.
@@ -223,7 +326,7 @@ class ApplicationServicesHandler:
async def get_3pe_protocols(self, only_protocol=None):
services = self.store.get_app_services()
- protocols = {}
+ protocols = {} # type: Dict[str, List[JsonDict]]
# Collect up all the individual protocol responses out of the ASes
for s in services:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 455acd7669..fde8f00531 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1507,18 +1507,9 @@ class FederationHandler(BaseHandler):
event, context = await self.event_creation_handler.create_new_client_event(
builder=builder
)
- except AuthError as e:
+ except SynapseError as e:
logger.warning("Failed to create join to %s because %s", room_id, e)
- raise e
-
- event_allowed = await self.third_party_event_rules.check_event_allowed(
- event, context
- )
- if not event_allowed:
- logger.info("Creation of join %s forbidden by third-party rules", event)
- raise SynapseError(
- 403, "This event is not allowed in this context", Codes.FORBIDDEN
- )
+ raise
# The remote hasn't signed it yet, obviously. We'll do the full checks
# when we get the event back in `on_send_join_request`
@@ -1567,15 +1558,6 @@ class FederationHandler(BaseHandler):
context = await self._handle_new_event(origin, event)
- event_allowed = await self.third_party_event_rules.check_event_allowed(
- event, context
- )
- if not event_allowed:
- logger.info("Sending of join %s forbidden by third-party rules", event)
- raise SynapseError(
- 403, "This event is not allowed in this context", Codes.FORBIDDEN
- )
-
logger.debug(
"on_send_join_request: After _handle_new_event: %s, sigs: %s",
event.event_id,
@@ -1748,15 +1730,6 @@ class FederationHandler(BaseHandler):
builder=builder
)
- event_allowed = await self.third_party_event_rules.check_event_allowed(
- event, context
- )
- if not event_allowed:
- logger.warning("Creation of leave %s forbidden by third-party rules", event)
- raise SynapseError(
- 403, "This event is not allowed in this context", Codes.FORBIDDEN
- )
-
try:
# The remote hasn't signed it yet, obviously. We'll do the full checks
# when we get the event back in `on_send_leave_request`
@@ -1789,16 +1762,7 @@ class FederationHandler(BaseHandler):
event.internal_metadata.outlier = False
- context = await self._handle_new_event(origin, event)
-
- event_allowed = await self.third_party_event_rules.check_event_allowed(
- event, context
- )
- if not event_allowed:
- logger.info("Sending of leave %s forbidden by third-party rules", event)
- raise SynapseError(
- 403, "This event is not allowed in this context", Codes.FORBIDDEN
- )
+ await self._handle_new_event(origin, event)
logger.debug(
"on_send_leave_request: After _handle_new_event: %s, sigs: %s",
@@ -2694,18 +2658,6 @@ class FederationHandler(BaseHandler):
builder=builder
)
- event_allowed = await self.third_party_event_rules.check_event_allowed(
- event, context
- )
- if not event_allowed:
- logger.info(
- "Creation of threepid invite %s forbidden by third-party rules",
- event,
- )
- raise SynapseError(
- 403, "This event is not allowed in this context", Codes.FORBIDDEN
- )
-
event, context = await self.add_display_name_to_third_party_invite(
room_version, event_dict, event, context
)
@@ -2756,18 +2708,6 @@ class FederationHandler(BaseHandler):
event, context = await self.event_creation_handler.create_new_client_event(
builder=builder
)
-
- event_allowed = await self.third_party_event_rules.check_event_allowed(
- event, context
- )
- if not event_allowed:
- logger.warning(
- "Exchange of threepid invite %s forbidden by third-party rules", event
- )
- raise SynapseError(
- 403, "This event is not allowed in this context", Codes.FORBIDDEN
- )
-
event, context = await self.add_display_name_to_third_party_invite(
room_version, event_dict, event, context
)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 415c0935ed..e37bca3d12 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -437,9 +437,9 @@ class EventCreationHandler:
self,
requester: Requester,
event_dict: dict,
- token_id: Optional[str] = None,
txn_id: Optional[str] = None,
prev_event_ids: Optional[List[str]] = None,
+ auth_event_ids: Optional[List[str]] = None,
require_consent: bool = True,
) -> Tuple[EventBase, EventContext]:
"""
@@ -453,13 +453,18 @@ class EventCreationHandler:
Args:
requester
event_dict: An entire event
- token_id
txn_id
prev_event_ids:
the forward extremities to use as the prev_events for the
new event.
If None, they will be requested from the database.
+
+ auth_event_ids:
+ The event ids to use as the auth_events for the new event.
+ Should normally be left as None, which will cause them to be calculated
+ based on the room state at the prev_events.
+
require_consent: Whether to check if the requester has
consented to the privacy policy.
Raises:
@@ -511,14 +516,17 @@ class EventCreationHandler:
if require_consent and not is_exempt:
await self.assert_accepted_privacy_policy(requester)
- if token_id is not None:
- builder.internal_metadata.token_id = token_id
+ if requester.access_token_id is not None:
+ builder.internal_metadata.token_id = requester.access_token_id
if txn_id is not None:
builder.internal_metadata.txn_id = txn_id
event, context = await self.create_new_client_event(
- builder=builder, requester=requester, prev_event_ids=prev_event_ids,
+ builder=builder,
+ requester=requester,
+ prev_event_ids=prev_event_ids,
+ auth_event_ids=auth_event_ids,
)
# In an ideal world we wouldn't need the second part of this condition. However,
@@ -726,7 +734,7 @@ class EventCreationHandler:
return event, event.internal_metadata.stream_ordering
event, context = await self.create_event(
- requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id
+ requester, event_dict, txn_id=txn_id
)
assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
@@ -757,6 +765,7 @@ class EventCreationHandler:
builder: EventBuilder,
requester: Optional[Requester] = None,
prev_event_ids: Optional[List[str]] = None,
+ auth_event_ids: Optional[List[str]] = None,
) -> Tuple[EventBase, EventContext]:
"""Create a new event for a local client
@@ -769,6 +778,11 @@ class EventCreationHandler:
If None, they will be requested from the database.
+ auth_event_ids:
+ The event ids to use as the auth_events for the new event.
+ Should normally be left as None, which will cause them to be calculated
+ based on the room state at the prev_events.
+
Returns:
Tuple of created event, context
"""
@@ -790,11 +804,30 @@ class EventCreationHandler:
builder.type == EventTypes.Create or len(prev_event_ids) > 0
), "Attempting to create an event with no prev_events"
- event = await builder.build(prev_event_ids=prev_event_ids)
+ event = await builder.build(
+ prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids
+ )
context = await self.state.compute_event_context(event)
if requester:
context.app_service = requester.app_service
+ third_party_result = await self.third_party_event_rules.check_event_allowed(
+ event, context
+ )
+ if not third_party_result:
+ logger.info(
+ "Event %s forbidden by third-party rules", event,
+ )
+ raise SynapseError(
+ 403, "This event is not allowed in this context", Codes.FORBIDDEN
+ )
+ elif isinstance(third_party_result, dict):
+ # the third-party rules want to replace the event. We'll need to build a new
+ # event.
+ event, context = await self._rebuild_event_after_third_party_rules(
+ third_party_result, event
+ )
+
self.validator.validate_new(event, self.config)
# If this event is an annotation then we check that that the sender
@@ -881,14 +914,6 @@ class EventCreationHandler:
else:
room_version = await self.store.get_room_version_id(event.room_id)
- event_allowed = await self.third_party_event_rules.check_event_allowed(
- event, context
- )
- if not event_allowed:
- raise SynapseError(
- 403, "This event is not allowed in this context", Codes.FORBIDDEN
- )
-
if event.internal_metadata.is_out_of_band_membership():
# the only sort of out-of-band-membership events we expect to see here
# are invite rejections we have generated ourselves.
@@ -1291,3 +1316,57 @@ class EventCreationHandler:
room_id,
)
del self._rooms_to_exclude_from_dummy_event_insertion[room_id]
+
+ async def _rebuild_event_after_third_party_rules(
+ self, third_party_result: dict, original_event: EventBase
+ ) -> Tuple[EventBase, EventContext]:
+ # the third_party_event_rules want to replace the event.
+ # we do some basic checks, and then return the replacement event and context.
+
+ # Construct a new EventBuilder and validate it, which helps with the
+ # rest of these checks.
+ try:
+ builder = self.event_builder_factory.for_room_version(
+ original_event.room_version, third_party_result
+ )
+ self.validator.validate_builder(builder)
+ except SynapseError as e:
+ raise Exception(
+ "Third party rules module created an invalid event: " + e.msg,
+ )
+
+ immutable_fields = [
+ # changing the room is going to break things: we've already checked that the
+ # room exists, and are holding a concurrency limiter token for that room.
+ # Also, we might need to use a different room version.
+ "room_id",
+ # changing the type or state key might work, but we'd need to check that the
+ # calling functions aren't making assumptions about them.
+ "type",
+ "state_key",
+ ]
+
+ for k in immutable_fields:
+ if getattr(builder, k, None) != original_event.get(k):
+ raise Exception(
+ "Third party rules module created an invalid event: "
+ "cannot change field " + k
+ )
+
+ # check that the new sender belongs to this HS
+ if not self.hs.is_mine_id(builder.sender):
+ raise Exception(
+ "Third party rules module created an invalid event: "
+ "invalid sender " + builder.sender
+ )
+
+ # copy over the original internal metadata
+ for k, v in original_event.internal_metadata.get_dict().items():
+ setattr(builder.internal_metadata, k, v)
+
+ event = await builder.build(prev_event_ids=original_event.prev_event_ids())
+
+ # we rebuild the event context, to be on the safe side. If nothing else,
+ # delta_ids might need an update.
+ context = await self.state.compute_event_context(event)
+ return event, context
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 7225923757..c242c409cf 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -13,9 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+from typing import List, Tuple
+from synapse.appservice import ApplicationService
from synapse.handlers._base import BaseHandler
-from synapse.types import ReadReceipt, get_domain_from_id
+from synapse.types import JsonDict, ReadReceipt, get_domain_from_id
from synapse.util.async_helpers import maybe_awaitable
logger = logging.getLogger(__name__)
@@ -140,5 +142,36 @@ class ReceiptEventSource:
return (events, to_key)
+ async def get_new_events_as(
+ self, from_key: int, service: ApplicationService
+ ) -> Tuple[List[JsonDict], int]:
+ """Returns a set of new receipt events that an appservice
+ may be interested in.
+
+ Args:
+ from_key: the stream position at which events should be fetched from
+ service: The appservice which may be interested
+ """
+ from_key = int(from_key)
+ to_key = self.get_current_key()
+
+ if from_key == to_key:
+ return [], to_key
+
+ # We first need to fetch all new receipts
+ rooms_to_events = await self.store.get_linearized_receipts_for_all_rooms(
+ from_key=from_key, to_key=to_key
+ )
+
+ # Then filter down to rooms that the AS can read
+ events = []
+ for room_id, event in rooms_to_events.items():
+ if not await service.matches_user_in_member_list(room_id, self.store):
+ continue
+
+ events.append(event)
+
+ return (events, to_key)
+
def get_current_key(self, direction="f"):
return self.store.get_max_receipt_stream_id()
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 93ed51063a..ec300d8877 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -214,7 +214,6 @@ class RoomCreationHandler(BaseHandler):
"replacement_room": new_room_id,
},
},
- token_id=requester.access_token_id,
)
old_room_version = await self.store.get_room_version_id(old_room_id)
await self.auth.check_from_context(
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index bc4fe7890a..0268288600 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -17,12 +17,10 @@ import abc
import logging
import random
from http import HTTPStatus
-from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple, Union
-
-from unpaddedbase64 import encode_base64
+from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
from synapse import types
-from synapse.api.constants import MAX_DEPTH, AccountDataTypes, EventTypes, Membership
+from synapse.api.constants import AccountDataTypes, EventTypes, Membership
from synapse.api.errors import (
AuthError,
Codes,
@@ -31,12 +29,8 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.api.ratelimiting import Ratelimiter
-from synapse.api.room_versions import EventFormatVersions
-from synapse.crypto.event_signing import compute_event_reference_hash
from synapse.events import EventBase
-from synapse.events.builder import create_local_event_from_event_dict
from synapse.events.snapshot import EventContext
-from synapse.events.validator import EventValidator
from synapse.storage.roommember import RoomsForUser
from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID
from synapse.util.async_helpers import Linearizer
@@ -194,7 +188,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# For backwards compatibility:
"membership": membership,
},
- token_id=requester.access_token_id,
txn_id=txn_id,
prev_event_ids=prev_event_ids,
require_consent=require_consent,
@@ -1153,31 +1146,10 @@ class RoomMemberMasterHandler(RoomMemberHandler):
room_id = invite_event.room_id
target_user = invite_event.state_key
- room_version = await self.store.get_room_version(room_id)
content["membership"] = Membership.LEAVE
- # the auth events for the new event are the same as that of the invite, plus
- # the invite itself.
- #
- # the prev_events are just the invite.
- invite_hash = invite_event.event_id # type: Union[str, Tuple]
- if room_version.event_format == EventFormatVersions.V1:
- alg, h = compute_event_reference_hash(invite_event)
- invite_hash = (invite_event.event_id, {alg: encode_base64(h)})
-
- auth_events = tuple(invite_event.auth_events) + (invite_hash,)
- prev_events = (invite_hash,)
-
- # we cap depth of generated events, to ensure that they are not
- # rejected by other servers (and so that they can be persisted in
- # the db)
- depth = min(invite_event.depth + 1, MAX_DEPTH)
-
event_dict = {
- "depth": depth,
- "auth_events": auth_events,
- "prev_events": prev_events,
"type": EventTypes.Member,
"room_id": room_id,
"sender": target_user,
@@ -1185,24 +1157,23 @@ class RoomMemberMasterHandler(RoomMemberHandler):
"state_key": target_user,
}
- event = create_local_event_from_event_dict(
- clock=self.clock,
- hostname=self.hs.hostname,
- signing_key=self.hs.signing_key,
- room_version=room_version,
- event_dict=event_dict,
+ # the auth events for the new event are the same as that of the invite, plus
+ # the invite itself.
+ #
+ # the prev_events are just the invite.
+ prev_event_ids = [invite_event.event_id]
+ auth_event_ids = invite_event.auth_event_ids() + prev_event_ids
+
+ event, context = await self.event_creation_handler.create_event(
+ requester,
+ event_dict,
+ txn_id=txn_id,
+ prev_event_ids=prev_event_ids,
+ auth_event_ids=auth_event_ids,
)
event.internal_metadata.outlier = True
event.internal_metadata.out_of_band_membership = True
- if txn_id is not None:
- event.internal_metadata.txn_id = txn_id
- if requester.access_token_id is not None:
- event.internal_metadata.token_id = requester.access_token_id
-
- EventValidator().validate_new(event, self.config)
- context = await self.state_handler.compute_event_context(event)
- context.app_service = requester.app_service
result_event = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[UserID.from_string(target_user)],
)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 01b3bc27e9..c4a025da34 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -13,7 +13,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import itertools
import logging
from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 3cbfc2d780..d3692842e3 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -12,16 +12,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import logging
import random
from collections import namedtuple
from typing import TYPE_CHECKING, List, Set, Tuple
from synapse.api.errors import AuthError, ShadowBanError, SynapseError
+from synapse.appservice import ApplicationService
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.streams import TypingStream
-from synapse.types import UserID, get_domain_from_id
+from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
@@ -430,6 +430,33 @@ class TypingNotificationEventSource:
"content": {"user_ids": list(typing)},
}
+ async def get_new_events_as(
+ self, from_key: int, service: ApplicationService
+ ) -> Tuple[List[JsonDict], int]:
+ """Returns a set of new typing events that an appservice
+ may be interested in.
+
+ Args:
+ from_key: the stream position at which events should be fetched from
+ service: The appservice which may be interested
+ """
+ with Measure(self.clock, "typing.get_new_events_as"):
+ from_key = int(from_key)
+ handler = self.get_typing_handler()
+
+ events = []
+ for room_id in handler._room_serials.keys():
+ if handler._room_serials[room_id] <= from_key:
+ continue
+ if not await service.matches_user_in_member_list(
+ room_id, handler.store
+ ):
+ continue
+
+ events.append(self._make_event_for(room_id))
+
+ return (events, handler._latest_room_serial)
+
async def get_new_events(self, from_key, room_ids, **kwargs):
with Measure(self.clock, "typing.get_new_events"):
from_key = int(from_key)
|