diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 6798a5d60c..0807086eb5 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -14,14 +14,15 @@
# limitations under the License.
import logging
import re
-from typing import TYPE_CHECKING, List
+from typing import TYPE_CHECKING, List, Optional
from synapse.api.constants import EventTypes
from synapse.events import EventBase
-from synapse.types import GroupID, UserID, get_domain_from_id
-from synapse.util.caches.descriptors import _CacheContext, cached
+from synapse.types import GroupID, JsonDict, RoomAlias, UserID, get_domain_from_id
+from synapse.util.caches.descriptors import cached
if TYPE_CHECKING:
+ from synapse.appservice.api import ApplicationServiceApi
from synapse.storage.databases.main import DataStore
logger = logging.getLogger(__name__)
@@ -131,19 +132,19 @@ class ApplicationService:
raise ValueError("Expected string for 'regex' in ns '%s'" % ns)
return namespaces
- def _matches_regex(self, test_string, namespace_key):
+ def _matches_regex(self, test_string: str, namespace_key: str):
for regex_obj in self.namespaces[namespace_key]:
if regex_obj["regex"].match(test_string):
return regex_obj
return None
- def _is_exclusive(self, ns_key, test_string):
+ def _is_exclusive(self, ns_key: str, test_string: str):
regex_obj = self._matches_regex(test_string, ns_key)
if regex_obj:
return regex_obj["exclusive"]
return False
- async def _matches_user(self, event, store):
+ async def _matches_user(self, event, store: "DataStore"):
if not event:
return False
@@ -161,10 +162,16 @@ class ApplicationService:
does_match = await self.matches_user_in_member_list(event.room_id, store)
return does_match
- @cached(num_args=1, cache_context=True)
- async def matches_user_in_member_list(
- self, room_id: str, store, cache_context: _CacheContext
- ):
+ @cached(num_args=1)
+ async def matches_user_in_member_list(self, room_id: str, store: "DataStore"):
+ """Check if this service is interested a room based upon it's membership
+
+ Args:
+ room_id(RoomId): The room to check.
+ store(DataStore)
+ Returns:
+ True if this service would like to know about this room.
+ """
member_list = await store.get_users_in_room(room_id)
# check joined member events
@@ -178,7 +185,7 @@ class ApplicationService:
return self.is_interested_in_room(event.room_id)
return False
- async def _matches_aliases(self, event, store):
+ async def _matches_aliases(self, event, store: "DataStore"):
if not store or not event:
return False
@@ -188,7 +195,7 @@ class ApplicationService:
return True
return False
- async def is_interested(self, event, store=None) -> bool:
+ async def is_interested(self, event, store: "DataStore") -> bool:
"""Check if this service is interested in this event.
Args:
@@ -209,10 +216,16 @@ class ApplicationService:
return False
- @cached(num_args=1, cache_context=True)
- async def is_interested_in_presence(
- self, user_id: UserID, store, cache_context: _CacheContext
- ):
+ @cached(num_args=1)
+ async def is_interested_in_presence(self, user_id: UserID, store: "DataStore"):
+ """Check if this service is interested a user's presence
+
+ Args:
+ user_id(UserID): The user to check.
+ store(DataStore)
+ Returns:
+ True if this service would like to know about presence for this user.
+ """
# Find all the rooms the sender is in
if self.is_interested_in_user(user_id.to_string()):
return True
@@ -224,31 +237,31 @@ class ApplicationService:
return True
return False
- def is_interested_in_user(self, user_id):
+ def is_interested_in_user(self, user_id: UserID):
return (
self._matches_regex(user_id, ApplicationService.NS_USERS)
or user_id == self.sender
)
- def is_interested_in_alias(self, alias):
+ def is_interested_in_alias(self, alias: RoomAlias):
return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES))
- def is_interested_in_room(self, room_id):
+ def is_interested_in_room(self, room_id: UserID):
return bool(self._matches_regex(room_id, ApplicationService.NS_ROOMS))
- def is_exclusive_user(self, user_id):
+ def is_exclusive_user(self, user_id: UserID):
return (
self._is_exclusive(ApplicationService.NS_USERS, user_id)
or user_id == self.sender
)
- def is_interested_in_protocol(self, protocol):
+ def is_interested_in_protocol(self, protocol: str):
return protocol in self.protocols
- def is_exclusive_alias(self, alias):
+ def is_exclusive_alias(self, alias: str):
return self._is_exclusive(ApplicationService.NS_ALIASES, alias)
- def is_exclusive_room(self, room_id):
+ def is_exclusive_room(self, room_id: str):
return self._is_exclusive(ApplicationService.NS_ROOMS, room_id)
def get_exclusive_user_regexes(self):
@@ -261,7 +274,7 @@ class ApplicationService:
if regex_obj["exclusive"]
]
- def get_groups_for_user(self, user_id):
+ def get_groups_for_user(self, user_id: str):
"""Get the groups that this user is associated with by this AS
Args:
@@ -295,18 +308,18 @@ class AppServiceTransaction:
service: ApplicationService,
id: int,
events: List[EventBase],
- ephemeral=None,
+ ephemeral: Optional[List[JsonDict]] = None,
):
self.service = service
self.id = id
self.events = events
self.ephemeral = ephemeral
- async def send(self, as_api) -> bool:
+ async def send(self, as_api: "ApplicationServiceApi") -> bool:
"""Sends this transaction using the provided AS API interface.
Args:
- as_api: The API to use to send.
+ as_api(ApplicationServiceApi): The API to use to send.
Returns:
True if the transaction was sent.
"""
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index edeb4c47dd..fa736a97b4 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -14,7 +14,7 @@
# limitations under the License.
import logging
import urllib
-from typing import TYPE_CHECKING, Any, List, Optional
+from typing import TYPE_CHECKING, List, Optional
from prometheus_client import Counter
@@ -206,7 +206,7 @@ class ApplicationServiceApi(SimpleHttpClient):
self,
service: "ApplicationService",
events: List[EventBase],
- ephemeral: Optional[Any] = None,
+ ephemeral: Optional[JsonDict] = None,
txn_id: Optional[int] = None,
):
if service.url is None:
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index e17d468208..2f0729a534 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -55,6 +55,7 @@ from synapse.appservice import ApplicationService, ApplicationServiceState
from synapse.events import EventBase
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import JsonDict
logger = logging.getLogger(__name__)
@@ -110,11 +111,8 @@ class _ServiceQueuer:
self.txn_ctrl = txn_ctrl
self.clock = clock
- def enqueue(self, service, event):
- self.queued_events.setdefault(service.id, []).append(event)
-
+ def _start_background_request(self, service):
# start a sender for this appservice if we don't already have one
-
if service.id in self.requests_in_flight:
return
@@ -122,17 +120,13 @@ class _ServiceQueuer:
"as-sender-%s" % (service.id,), self._send_request, service
)
+ def enqueue(self, service, event):
+ self.queued_events.setdefault(service.id, []).append(event)
+ self._start_background_request(service)
+
def enqueue_ephemeral(self, service: ApplicationService, events: List[Any]):
self.queued_ephemeral.setdefault(service.id, []).extend(events)
-
- # start a sender for this appservice if we don't already have one
-
- if service.id in self.requests_in_flight:
- return
-
- run_as_background_process(
- "as-sender-%s" % (service.id,), self._send_request, service
- )
+ self._start_background_request(service)
async def _send_request(self, service: ApplicationService):
# sanity-check: we shouldn't get here if this service already has a sender
@@ -183,13 +177,13 @@ class _TransactionController:
self,
service: ApplicationService,
events: List[EventBase],
- ephemeral: Optional[Any] = None,
+ ephemeral: Optional[JsonDict] = None,
):
try:
txn = await self.store.create_appservice_txn(
service=service, events=events, ephemeral=ephemeral
)
- service_is_up = await self.is_service_up(service)
+ service_is_up = await self._is_service_up(service)
if service_is_up:
sent = await txn.send(self.as_api)
if sent:
@@ -232,7 +226,7 @@ class _TransactionController:
recoverer.recover()
logger.info("Now %i active recoverers", len(self.recoverers))
- async def is_service_up(self, service: ApplicationService):
+ async def _is_service_up(self, service: ApplicationService) -> bool:
state = await self.store.get_appservice_state(service)
return state == ApplicationServiceState.UP or state is None
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 3d8f82b365..9e88e5e591 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -52,14 +52,14 @@ class ApplicationServicesHandler:
self.current_max = 0
self.is_processing = False
- async def notify_interested_services(self, current_id):
+ async def notify_interested_services(self, current_id: int):
"""Notifies (pushes) all application services interested in this event.
Pushing is done asynchronously, so this method won't block for any
prolonged length of time.
Args:
- current_id(int): The current maximum ID.
+ current_id: The current maximum ID.
"""
services = self.store.get_app_services()
if not services or not self.notify_appservices:
@@ -169,6 +169,17 @@ class ApplicationServicesHandler:
new_token: Union[int, RoomStreamToken],
users: Collection[UserID] = [],
):
+ """This is called by the notifier in the background
+ when a ephemeral event handled by the homeserver.
+
+ This will determine which appservices
+ are interested in the event, and submit them.
+
+ Args:
+ stream_key: The stream the event came from.
+ new_token: The latest stream token
+ users: The user(s) involved with the event.
+ """
services = [
service
for service in self.store.get_app_services()
@@ -192,7 +203,7 @@ class ApplicationServicesHandler:
service, "read_receipt", new_token
)
elif stream_key == "presence_key":
- events = await self._handle_as_presence(service, users)
+ events = await self._handle_presence(service, users)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
await self.store.set_type_stream_id_for_appservice(
@@ -211,7 +222,7 @@ class ApplicationServicesHandler:
)
return typing
- async def _handle_receipts(self, service: ApplicationService, token: int):
+ async def _handle_receipts(self, service: ApplicationService):
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
@@ -221,7 +232,7 @@ class ApplicationServicesHandler:
)
return receipts
- async def _handle_as_presence(self, service: ApplicationService, users: List[str]):
+ async def _handle_presence(self, service: ApplicationService, users: List[str]):
events = []
presence_source = self.event_sources.sources["presence"]
from_key = await self.store.get_type_stream_id_for_appservice(
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index a6db85c888..c242c409cf 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -13,10 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+from typing import List, Tuple
from synapse.appservice import ApplicationService
from synapse.handlers._base import BaseHandler
-from synapse.types import ReadReceipt, get_domain_from_id
+from synapse.types import JsonDict, ReadReceipt, get_domain_from_id
from synapse.util.async_helpers import maybe_awaitable
logger = logging.getLogger(__name__)
@@ -142,8 +143,15 @@ class ReceiptEventSource:
return (events, to_key)
async def get_new_events_as(
- self, from_key: int, service: ApplicationService, **kwargs
- ):
+ self, from_key: int, service: ApplicationService
+ ) -> Tuple[List[JsonDict], int]:
+ """Returns a set of new receipt events that an appservice
+ may be interested in.
+
+ Args:
+ from_key: the stream position at which events should be fetched from
+ service: The appservice which may be interested
+ """
from_key = int(from_key)
to_key = self.get_current_key()
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index e948efef2e..423eab666d 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -13,7 +13,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import itertools
import logging
from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 8a8f480777..d3692842e3 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import logging
import random
from collections import namedtuple
@@ -22,7 +21,7 @@ from synapse.api.errors import AuthError, ShadowBanError, SynapseError
from synapse.appservice import ApplicationService
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.streams import TypingStream
-from synapse.types import UserID, get_domain_from_id
+from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
@@ -432,8 +431,15 @@ class TypingNotificationEventSource:
}
async def get_new_events_as(
- self, from_key: int, service: ApplicationService, **kwargs
- ):
+ self, from_key: int, service: ApplicationService
+ ) -> Tuple[List[JsonDict], int]:
+ """Returns a set of new typing events that an appservice
+ may be interested in.
+
+ Args:
+ from_key: the stream position at which events should be fetched from
+ service: The appservice which may be interested
+ """
with Measure(self.clock, "typing.get_new_events_as"):
from_key = int(from_key)
handler = self.get_typing_handler()
@@ -441,7 +447,6 @@ class TypingNotificationEventSource:
events = []
for room_id in handler._room_serials.keys():
if handler._room_serials[room_id] <= from_key:
- print("Key too old")
continue
if not await service.matches_user_in_member_list(
room_id, handler.store
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index a1d3f4be16..af3b8be943 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -15,7 +15,7 @@
# limitations under the License.
import logging
import re
-from typing import Any, List, Optional
+from typing import List, Optional
from synapse.appservice import ApplicationService, AppServiceTransaction
from synapse.config.appservice import load_appservices
@@ -23,6 +23,7 @@ from synapse.events import EventBase
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.events_worker import EventsWorkerStore
+from synapse.types import JsonDict
from synapse.util import json_encoder
logger = logging.getLogger(__name__)
@@ -178,16 +179,14 @@ class ApplicationServiceTransactionWorkerStore(
self,
service: ApplicationService,
events: List[EventBase],
- ephemeral: Optional[Any] = None,
- ):
+ ephemeral: Optional[JsonDict] = None,
+ ) -> AppServiceTransaction:
"""Atomically creates a new transaction for this application service
with the given list of events.
Args:
- service(ApplicationService): The service who the transaction is for.
- events(list<Event>): A list of events to put in the transaction.
- Returns:
- AppServiceTransaction: A new transaction.
+ service: The service who the transaction is for.
+ events: A list of events to put in the transaction.
"""
def _create_appservice_txn(txn):
|