diff options
author | Will Hunt <will@half-shot.uk> | 2020-10-08 15:05:09 +0100 |
---|---|---|
committer | Will Hunt <will@half-shot.uk> | 2020-10-08 15:05:09 +0100 |
commit | d0dd953c272510e7487bc020177817b7f1f7c691 (patch) | |
tree | 302a2eca49699249a2f00d31e272be99a0957c05 | |
parent | Linkify MSC (diff) | |
download | synapse-d0dd953c272510e7487bc020177817b7f1f7c691.tar.xz |
Fix types / add docstrings
-rw-r--r-- | synapse/appservice/__init__.py | 67 | ||||
-rw-r--r-- | synapse/appservice/api.py | 4 | ||||
-rw-r--r-- | synapse/appservice/scheduler.py | 26 | ||||
-rw-r--r-- | synapse/handlers/appservice.py | 21 | ||||
-rw-r--r-- | synapse/handlers/receipts.py | 14 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 1 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 15 | ||||
-rw-r--r-- | synapse/storage/databases/main/appservice.py | 13 |
8 files changed, 95 insertions, 66 deletions
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 6798a5d60c..0807086eb5 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -14,14 +14,15 @@ # limitations under the License. import logging import re -from typing import TYPE_CHECKING, List +from typing import TYPE_CHECKING, List, Optional from synapse.api.constants import EventTypes from synapse.events import EventBase -from synapse.types import GroupID, UserID, get_domain_from_id -from synapse.util.caches.descriptors import _CacheContext, cached +from synapse.types import GroupID, JsonDict, RoomAlias, UserID, get_domain_from_id +from synapse.util.caches.descriptors import cached if TYPE_CHECKING: + from synapse.appservice.api import ApplicationServiceApi from synapse.storage.databases.main import DataStore logger = logging.getLogger(__name__) @@ -131,19 +132,19 @@ class ApplicationService: raise ValueError("Expected string for 'regex' in ns '%s'" % ns) return namespaces - def _matches_regex(self, test_string, namespace_key): + def _matches_regex(self, test_string: str, namespace_key: str): for regex_obj in self.namespaces[namespace_key]: if regex_obj["regex"].match(test_string): return regex_obj return None - def _is_exclusive(self, ns_key, test_string): + def _is_exclusive(self, ns_key: str, test_string: str): regex_obj = self._matches_regex(test_string, ns_key) if regex_obj: return regex_obj["exclusive"] return False - async def _matches_user(self, event, store): + async def _matches_user(self, event, store: "DataStore"): if not event: return False @@ -161,10 +162,16 @@ class ApplicationService: does_match = await self.matches_user_in_member_list(event.room_id, store) return does_match - @cached(num_args=1, cache_context=True) - async def matches_user_in_member_list( - self, room_id: str, store, cache_context: _CacheContext - ): + @cached(num_args=1) + async def matches_user_in_member_list(self, room_id: str, store: "DataStore"): + """Check if this service is interested a room based upon it's membership + + Args: + room_id(RoomId): The room to check. + store(DataStore) + Returns: + True if this service would like to know about this room. + """ member_list = await store.get_users_in_room(room_id) # check joined member events @@ -178,7 +185,7 @@ class ApplicationService: return self.is_interested_in_room(event.room_id) return False - async def _matches_aliases(self, event, store): + async def _matches_aliases(self, event, store: "DataStore"): if not store or not event: return False @@ -188,7 +195,7 @@ class ApplicationService: return True return False - async def is_interested(self, event, store=None) -> bool: + async def is_interested(self, event, store: "DataStore") -> bool: """Check if this service is interested in this event. Args: @@ -209,10 +216,16 @@ class ApplicationService: return False - @cached(num_args=1, cache_context=True) - async def is_interested_in_presence( - self, user_id: UserID, store, cache_context: _CacheContext - ): + @cached(num_args=1) + async def is_interested_in_presence(self, user_id: UserID, store: "DataStore"): + """Check if this service is interested a user's presence + + Args: + user_id(UserID): The user to check. + store(DataStore) + Returns: + True if this service would like to know about presence for this user. + """ # Find all the rooms the sender is in if self.is_interested_in_user(user_id.to_string()): return True @@ -224,31 +237,31 @@ class ApplicationService: return True return False - def is_interested_in_user(self, user_id): + def is_interested_in_user(self, user_id: UserID): return ( self._matches_regex(user_id, ApplicationService.NS_USERS) or user_id == self.sender ) - def is_interested_in_alias(self, alias): + def is_interested_in_alias(self, alias: RoomAlias): return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES)) - def is_interested_in_room(self, room_id): + def is_interested_in_room(self, room_id: UserID): return bool(self._matches_regex(room_id, ApplicationService.NS_ROOMS)) - def is_exclusive_user(self, user_id): + def is_exclusive_user(self, user_id: UserID): return ( self._is_exclusive(ApplicationService.NS_USERS, user_id) or user_id == self.sender ) - def is_interested_in_protocol(self, protocol): + def is_interested_in_protocol(self, protocol: str): return protocol in self.protocols - def is_exclusive_alias(self, alias): + def is_exclusive_alias(self, alias: str): return self._is_exclusive(ApplicationService.NS_ALIASES, alias) - def is_exclusive_room(self, room_id): + def is_exclusive_room(self, room_id: str): return self._is_exclusive(ApplicationService.NS_ROOMS, room_id) def get_exclusive_user_regexes(self): @@ -261,7 +274,7 @@ class ApplicationService: if regex_obj["exclusive"] ] - def get_groups_for_user(self, user_id): + def get_groups_for_user(self, user_id: str): """Get the groups that this user is associated with by this AS Args: @@ -295,18 +308,18 @@ class AppServiceTransaction: service: ApplicationService, id: int, events: List[EventBase], - ephemeral=None, + ephemeral: Optional[List[JsonDict]] = None, ): self.service = service self.id = id self.events = events self.ephemeral = ephemeral - async def send(self, as_api) -> bool: + async def send(self, as_api: "ApplicationServiceApi") -> bool: """Sends this transaction using the provided AS API interface. Args: - as_api: The API to use to send. + as_api(ApplicationServiceApi): The API to use to send. Returns: True if the transaction was sent. """ diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index edeb4c47dd..fa736a97b4 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -14,7 +14,7 @@ # limitations under the License. import logging import urllib -from typing import TYPE_CHECKING, Any, List, Optional +from typing import TYPE_CHECKING, List, Optional from prometheus_client import Counter @@ -206,7 +206,7 @@ class ApplicationServiceApi(SimpleHttpClient): self, service: "ApplicationService", events: List[EventBase], - ephemeral: Optional[Any] = None, + ephemeral: Optional[JsonDict] = None, txn_id: Optional[int] = None, ): if service.url is None: diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index e17d468208..2f0729a534 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -55,6 +55,7 @@ from synapse.appservice import ApplicationService, ApplicationServiceState from synapse.events import EventBase from synapse.logging.context import run_in_background from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import JsonDict logger = logging.getLogger(__name__) @@ -110,11 +111,8 @@ class _ServiceQueuer: self.txn_ctrl = txn_ctrl self.clock = clock - def enqueue(self, service, event): - self.queued_events.setdefault(service.id, []).append(event) - + def _start_background_request(self, service): # start a sender for this appservice if we don't already have one - if service.id in self.requests_in_flight: return @@ -122,17 +120,13 @@ class _ServiceQueuer: "as-sender-%s" % (service.id,), self._send_request, service ) + def enqueue(self, service, event): + self.queued_events.setdefault(service.id, []).append(event) + self._start_background_request(service) + def enqueue_ephemeral(self, service: ApplicationService, events: List[Any]): self.queued_ephemeral.setdefault(service.id, []).extend(events) - - # start a sender for this appservice if we don't already have one - - if service.id in self.requests_in_flight: - return - - run_as_background_process( - "as-sender-%s" % (service.id,), self._send_request, service - ) + self._start_background_request(service) async def _send_request(self, service: ApplicationService): # sanity-check: we shouldn't get here if this service already has a sender @@ -183,13 +177,13 @@ class _TransactionController: self, service: ApplicationService, events: List[EventBase], - ephemeral: Optional[Any] = None, + ephemeral: Optional[JsonDict] = None, ): try: txn = await self.store.create_appservice_txn( service=service, events=events, ephemeral=ephemeral ) - service_is_up = await self.is_service_up(service) + service_is_up = await self._is_service_up(service) if service_is_up: sent = await txn.send(self.as_api) if sent: @@ -232,7 +226,7 @@ class _TransactionController: recoverer.recover() logger.info("Now %i active recoverers", len(self.recoverers)) - async def is_service_up(self, service: ApplicationService): + async def _is_service_up(self, service: ApplicationService) -> bool: state = await self.store.get_appservice_state(service) return state == ApplicationServiceState.UP or state is None diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 3d8f82b365..9e88e5e591 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -52,14 +52,14 @@ class ApplicationServicesHandler: self.current_max = 0 self.is_processing = False - async def notify_interested_services(self, current_id): + async def notify_interested_services(self, current_id: int): """Notifies (pushes) all application services interested in this event. Pushing is done asynchronously, so this method won't block for any prolonged length of time. Args: - current_id(int): The current maximum ID. + current_id: The current maximum ID. """ services = self.store.get_app_services() if not services or not self.notify_appservices: @@ -169,6 +169,17 @@ class ApplicationServicesHandler: new_token: Union[int, RoomStreamToken], users: Collection[UserID] = [], ): + """This is called by the notifier in the background + when a ephemeral event handled by the homeserver. + + This will determine which appservices + are interested in the event, and submit them. + + Args: + stream_key: The stream the event came from. + new_token: The latest stream token + users: The user(s) involved with the event. + """ services = [ service for service in self.store.get_app_services() @@ -192,7 +203,7 @@ class ApplicationServicesHandler: service, "read_receipt", new_token ) elif stream_key == "presence_key": - events = await self._handle_as_presence(service, users) + events = await self._handle_presence(service, users) if events: self.scheduler.submit_ephemeral_events_for_as(service, events) await self.store.set_type_stream_id_for_appservice( @@ -211,7 +222,7 @@ class ApplicationServicesHandler: ) return typing - async def _handle_receipts(self, service: ApplicationService, token: int): + async def _handle_receipts(self, service: ApplicationService): from_key = await self.store.get_type_stream_id_for_appservice( service, "read_receipt" ) @@ -221,7 +232,7 @@ class ApplicationServicesHandler: ) return receipts - async def _handle_as_presence(self, service: ApplicationService, users: List[str]): + async def _handle_presence(self, service: ApplicationService, users: List[str]): events = [] presence_source = self.event_sources.sources["presence"] from_key = await self.store.get_type_stream_id_for_appservice( diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index a6db85c888..c242c409cf 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -13,10 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from typing import List, Tuple from synapse.appservice import ApplicationService from synapse.handlers._base import BaseHandler -from synapse.types import ReadReceipt, get_domain_from_id +from synapse.types import JsonDict, ReadReceipt, get_domain_from_id from synapse.util.async_helpers import maybe_awaitable logger = logging.getLogger(__name__) @@ -142,8 +143,15 @@ class ReceiptEventSource: return (events, to_key) async def get_new_events_as( - self, from_key: int, service: ApplicationService, **kwargs - ): + self, from_key: int, service: ApplicationService + ) -> Tuple[List[JsonDict], int]: + """Returns a set of new receipt events that an appservice + may be interested in. + + Args: + from_key: the stream position at which events should be fetched from + service: The appservice which may be interested + """ from_key = int(from_key) to_key = self.get_current_key() diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index e948efef2e..423eab666d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -13,7 +13,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import itertools import logging from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 8a8f480777..d3692842e3 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import random from collections import namedtuple @@ -22,7 +21,7 @@ from synapse.api.errors import AuthError, ShadowBanError, SynapseError from synapse.appservice import ApplicationService from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.streams import TypingStream -from synapse.types import UserID, get_domain_from_id +from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer @@ -432,8 +431,15 @@ class TypingNotificationEventSource: } async def get_new_events_as( - self, from_key: int, service: ApplicationService, **kwargs - ): + self, from_key: int, service: ApplicationService + ) -> Tuple[List[JsonDict], int]: + """Returns a set of new typing events that an appservice + may be interested in. + + Args: + from_key: the stream position at which events should be fetched from + service: The appservice which may be interested + """ with Measure(self.clock, "typing.get_new_events_as"): from_key = int(from_key) handler = self.get_typing_handler() @@ -441,7 +447,6 @@ class TypingNotificationEventSource: events = [] for room_id in handler._room_serials.keys(): if handler._room_serials[room_id] <= from_key: - print("Key too old") continue if not await service.matches_user_in_member_list( room_id, handler.store diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index a1d3f4be16..af3b8be943 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -15,7 +15,7 @@ # limitations under the License. import logging import re -from typing import Any, List, Optional +from typing import List, Optional from synapse.appservice import ApplicationService, AppServiceTransaction from synapse.config.appservice import load_appservices @@ -23,6 +23,7 @@ from synapse.events import EventBase from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool from synapse.storage.databases.main.events_worker import EventsWorkerStore +from synapse.types import JsonDict from synapse.util import json_encoder logger = logging.getLogger(__name__) @@ -178,16 +179,14 @@ class ApplicationServiceTransactionWorkerStore( self, service: ApplicationService, events: List[EventBase], - ephemeral: Optional[Any] = None, - ): + ephemeral: Optional[JsonDict] = None, + ) -> AppServiceTransaction: """Atomically creates a new transaction for this application service with the given list of events. Args: - service(ApplicationService): The service who the transaction is for. - events(list<Event>): A list of events to put in the transaction. - Returns: - AppServiceTransaction: A new transaction. + service: The service who the transaction is for. + events: A list of events to put in the transaction. """ def _create_appservice_txn(txn): |