diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index c8d5e58035..07240d3a14 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -14,6 +14,7 @@
# limitations under the License.
import logging
+from typing import Dict, List, Optional
from prometheus_client import Counter
@@ -21,13 +22,16 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import EventTypes
+from synapse.appservice import ApplicationService
+from synapse.events import EventBase
+from synapse.handlers.presence import format_user_presence_state
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import (
event_processing_loop_counter,
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import RoomStreamToken
+from synapse.types import Collection, JsonDict, RoomStreamToken, UserID
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@@ -44,6 +48,7 @@ class ApplicationServicesHandler:
self.started_scheduler = False
self.clock = hs.get_clock()
self.notify_appservices = hs.config.notify_appservices
+ self.event_sources = hs.get_event_sources()
self.current_max = 0
self.is_processing = False
@@ -82,7 +87,7 @@ class ApplicationServicesHandler:
if not events:
break
- events_by_room = {}
+ events_by_room = {} # type: Dict[str, List[EventBase]]
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
@@ -161,6 +166,104 @@ class ApplicationServicesHandler:
finally:
self.is_processing = False
+ async def notify_interested_services_ephemeral(
+ self, stream_key: str, new_token: Optional[int], users: Collection[UserID] = [],
+ ):
+ """This is called by the notifier in the background
+ when a ephemeral event handled by the homeserver.
+
+ This will determine which appservices
+ are interested in the event, and submit them.
+
+ Events will only be pushed to appservices
+ that have opted into ephemeral events
+
+ Args:
+ stream_key: The stream the event came from.
+ new_token: The latest stream token
+ users: The user(s) involved with the event.
+ """
+ services = [
+ service
+ for service in self.store.get_app_services()
+ if service.supports_ephemeral
+ ]
+ if not services or not self.notify_appservices:
+ return
+ logger.info("Checking interested services for %s" % (stream_key))
+ with Measure(self.clock, "notify_interested_services_ephemeral"):
+ for service in services:
+ # Only handle typing if we have the latest token
+ if stream_key == "typing_key" and new_token is not None:
+ events = await self._handle_typing(service, new_token)
+ if events:
+ self.scheduler.submit_ephemeral_events_for_as(service, events)
+ # We don't persist the token for typing_key for performance reasons
+ elif stream_key == "receipt_key":
+ events = await self._handle_receipts(service)
+ if events:
+ self.scheduler.submit_ephemeral_events_for_as(service, events)
+ await self.store.set_type_stream_id_for_appservice(
+ service, "read_receipt", new_token
+ )
+ elif stream_key == "presence_key":
+ events = await self._handle_presence(service, users)
+ if events:
+ self.scheduler.submit_ephemeral_events_for_as(service, events)
+ await self.store.set_type_stream_id_for_appservice(
+ service, "presence", new_token
+ )
+
+ async def _handle_typing(self, service: ApplicationService, new_token: int):
+ typing_source = self.event_sources.sources["typing"]
+ # Get the typing events from just before current
+ typing, _ = await typing_source.get_new_events_as(
+ service=service,
+ # For performance reasons, we don't persist the previous
+ # token in the DB and instead fetch the latest typing information
+ # for appservices.
+ from_key=new_token - 1,
+ )
+ return typing
+
+ async def _handle_receipts(self, service: ApplicationService):
+ from_key = await self.store.get_type_stream_id_for_appservice(
+ service, "read_receipt"
+ )
+ receipts_source = self.event_sources.sources["receipt"]
+ receipts, _ = await receipts_source.get_new_events_as(
+ service=service, from_key=from_key
+ )
+ return receipts
+
+ async def _handle_presence(
+ self, service: ApplicationService, users: Collection[UserID]
+ ):
+ events = [] # type: List[JsonDict]
+ presence_source = self.event_sources.sources["presence"]
+ from_key = await self.store.get_type_stream_id_for_appservice(
+ service, "presence"
+ )
+ for user in users:
+ interested = await service.is_interested_in_presence(user, self.store)
+ if not interested:
+ continue
+ presence_events, _ = await presence_source.get_new_events(
+ user=user, service=service, from_key=from_key,
+ )
+ time_now = self.clock.time_msec()
+ presence_events = [
+ {
+ "type": "m.presence",
+ "sender": event.user_id,
+ "content": format_user_presence_state(
+ event, time_now, include_user_id=False
+ ),
+ }
+ for event in presence_events
+ ]
+ events = events + presence_events
+
async def query_user_exists(self, user_id):
"""Check if any application service knows this user_id exists.
@@ -223,7 +326,7 @@ class ApplicationServicesHandler:
async def get_3pe_protocols(self, only_protocol=None):
services = self.store.get_app_services()
- protocols = {}
+ protocols = {} # type: Dict[str, List[JsonDict]]
# Collect up all the individual protocol responses out of the ASes
for s in services:
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 7225923757..c242c409cf 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -13,9 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+from typing import List, Tuple
+from synapse.appservice import ApplicationService
from synapse.handlers._base import BaseHandler
-from synapse.types import ReadReceipt, get_domain_from_id
+from synapse.types import JsonDict, ReadReceipt, get_domain_from_id
from synapse.util.async_helpers import maybe_awaitable
logger = logging.getLogger(__name__)
@@ -140,5 +142,36 @@ class ReceiptEventSource:
return (events, to_key)
+ async def get_new_events_as(
+ self, from_key: int, service: ApplicationService
+ ) -> Tuple[List[JsonDict], int]:
+ """Returns a set of new receipt events that an appservice
+ may be interested in.
+
+ Args:
+ from_key: the stream position at which events should be fetched from
+ service: The appservice which may be interested
+ """
+ from_key = int(from_key)
+ to_key = self.get_current_key()
+
+ if from_key == to_key:
+ return [], to_key
+
+ # We first need to fetch all new receipts
+ rooms_to_events = await self.store.get_linearized_receipts_for_all_rooms(
+ from_key=from_key, to_key=to_key
+ )
+
+ # Then filter down to rooms that the AS can read
+ events = []
+ for room_id, event in rooms_to_events.items():
+ if not await service.matches_user_in_member_list(room_id, self.store):
+ continue
+
+ events.append(event)
+
+ return (events, to_key)
+
def get_current_key(self, direction="f"):
return self.store.get_max_receipt_stream_id()
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index a306631094..b527724bc4 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -13,7 +13,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import itertools
import logging
from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 3cbfc2d780..d3692842e3 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -12,16 +12,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import logging
import random
from collections import namedtuple
from typing import TYPE_CHECKING, List, Set, Tuple
from synapse.api.errors import AuthError, ShadowBanError, SynapseError
+from synapse.appservice import ApplicationService
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.streams import TypingStream
-from synapse.types import UserID, get_domain_from_id
+from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
@@ -430,6 +430,33 @@ class TypingNotificationEventSource:
"content": {"user_ids": list(typing)},
}
+ async def get_new_events_as(
+ self, from_key: int, service: ApplicationService
+ ) -> Tuple[List[JsonDict], int]:
+ """Returns a set of new typing events that an appservice
+ may be interested in.
+
+ Args:
+ from_key: the stream position at which events should be fetched from
+ service: The appservice which may be interested
+ """
+ with Measure(self.clock, "typing.get_new_events_as"):
+ from_key = int(from_key)
+ handler = self.get_typing_handler()
+
+ events = []
+ for room_id in handler._room_serials.keys():
+ if handler._room_serials[room_id] <= from_key:
+ continue
+ if not await service.matches_user_in_member_list(
+ room_id, handler.store
+ ):
+ continue
+
+ events.append(self._make_event_for(room_id))
+
+ return (events, handler._latest_room_serial)
+
async def get_new_events(self, from_key, room_ids, **kwargs):
with Measure(self.clock, "typing.get_new_events"):
from_key = int(from_key)
|