diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index a7b5a4e9c9..b7213b67a5 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Union
+from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Union
from prometheus_client import Counter
@@ -58,7 +58,7 @@ class ApplicationServicesHandler:
self.current_max = 0
self.is_processing = False
- def notify_interested_services(self, max_token: RoomStreamToken):
+ def notify_interested_services(self, max_token: RoomStreamToken) -> None:
"""Notifies (pushes) all application services interested in this event.
Pushing is done asynchronously, so this method won't block for any
@@ -82,7 +82,7 @@ class ApplicationServicesHandler:
self._notify_interested_services(max_token)
@wrap_as_background_process("notify_interested_services")
- async def _notify_interested_services(self, max_token: RoomStreamToken):
+ async def _notify_interested_services(self, max_token: RoomStreamToken) -> None:
with Measure(self.clock, "notify_interested_services"):
self.is_processing = True
try:
@@ -100,7 +100,7 @@ class ApplicationServicesHandler:
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
- async def handle_event(event):
+ async def handle_event(event: EventBase) -> None:
# Gather interested services
services = await self._get_services_for_event(event)
if len(services) == 0:
@@ -116,9 +116,9 @@ class ApplicationServicesHandler:
if not self.started_scheduler:
- async def start_scheduler():
+ async def start_scheduler() -> None:
try:
- return await self.scheduler.start()
+ await self.scheduler.start()
except Exception:
logger.error("Application Services Failure")
@@ -137,7 +137,7 @@ class ApplicationServicesHandler:
"appservice_sender"
).observe((now - ts) / 1000)
- async def handle_room_events(events):
+ async def handle_room_events(events: Iterable[EventBase]) -> None:
for event in events:
await handle_event(event)
@@ -184,7 +184,7 @@ class ApplicationServicesHandler:
stream_key: str,
new_token: Optional[int],
users: Optional[Collection[Union[str, UserID]]] = None,
- ):
+ ) -> None:
"""This is called by the notifier in the background
when a ephemeral event handled by the homeserver.
@@ -226,7 +226,7 @@ class ApplicationServicesHandler:
stream_key: str,
new_token: Optional[int],
users: Collection[Union[str, UserID]],
- ):
+ ) -> None:
logger.debug("Checking interested services for %s" % (stream_key))
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
@@ -254,7 +254,7 @@ class ApplicationServicesHandler:
async def _handle_typing(
self, service: ApplicationService, new_token: int
) -> List[JsonDict]:
- typing_source = self.event_sources.sources["typing"]
+ typing_source = self.event_sources.sources.typing
# Get the typing events from just before current
typing, _ = await typing_source.get_new_events_as(
service=service,
@@ -269,7 +269,7 @@ class ApplicationServicesHandler:
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
- receipts_source = self.event_sources.sources["receipt"]
+ receipts_source = self.event_sources.sources.receipt
receipts, _ = await receipts_source.get_new_events_as(
service=service, from_key=from_key
)
@@ -279,7 +279,7 @@ class ApplicationServicesHandler:
self, service: ApplicationService, users: Collection[Union[str, UserID]]
) -> List[JsonDict]:
events: List[JsonDict] = []
- presence_source = self.event_sources.sources["presence"]
+ presence_source = self.event_sources.sources.presence
from_key = await self.store.get_type_stream_id_for_appservice(
service, "presence"
)
|