diff --git a/synapse/__init__.py b/synapse/__init__.py
index f2d3ac68eb..99fb675748 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -48,7 +48,7 @@ try:
except ImportError:
pass
-__version__ = "1.24.0"
+__version__ = "1.25.0rc1"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index e850e45e46..a9abdf42e0 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -13,17 +13,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import List, Tuple
+from typing import TYPE_CHECKING, List, Optional, Tuple
from synapse.appservice import ApplicationService
from synapse.handlers._base import BaseHandler
from synapse.types import JsonDict, ReadReceipt, get_domain_from_id
+if TYPE_CHECKING:
+ from synapse.app.homeserver import HomeServer
+
logger = logging.getLogger(__name__)
class ReceiptsHandler(BaseHandler):
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.server_name = hs.config.server_name
@@ -36,7 +39,7 @@ class ReceiptsHandler(BaseHandler):
self.clock = self.hs.get_clock()
self.state = hs.get_state_handler()
- async def _received_remote_receipt(self, origin, content):
+ async def _received_remote_receipt(self, origin: str, content: JsonDict) -> None:
"""Called when we receive an EDU of type m.receipt from a remote HS.
"""
receipts = []
@@ -63,11 +66,11 @@ class ReceiptsHandler(BaseHandler):
await self._handle_new_receipts(receipts)
- async def _handle_new_receipts(self, receipts):
+ async def _handle_new_receipts(self, receipts: List[ReadReceipt]) -> bool:
"""Takes a list of receipts, stores them and informs the notifier.
"""
- min_batch_id = None
- max_batch_id = None
+ min_batch_id = None # type: Optional[int]
+ max_batch_id = None # type: Optional[int]
for receipt in receipts:
res = await self.store.insert_receipt(
@@ -89,7 +92,8 @@ class ReceiptsHandler(BaseHandler):
if max_batch_id is None or max_persisted_id > max_batch_id:
max_batch_id = max_persisted_id
- if min_batch_id is None:
+ # Either both of these should be None or neither.
+ if min_batch_id is None or max_batch_id is None:
# no new receipts
return False
@@ -103,7 +107,9 @@ class ReceiptsHandler(BaseHandler):
return True
- async def received_client_receipt(self, room_id, receipt_type, user_id, event_id):
+ async def received_client_receipt(
+ self, room_id: str, receipt_type: str, user_id: str, event_id: str
+ ) -> None:
"""Called when a client tells us a local user has read up to the given
event_id in the room.
"""
@@ -123,10 +129,12 @@ class ReceiptsHandler(BaseHandler):
class ReceiptEventSource:
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
- async def get_new_events(self, from_key, room_ids, **kwargs):
+ async def get_new_events(
+ self, from_key: int, room_ids: List[str], **kwargs
+ ) -> Tuple[List[JsonDict], int]:
from_key = int(from_key)
to_key = self.get_current_key()
@@ -171,5 +179,5 @@ class ReceiptEventSource:
return (events, to_key)
- def get_current_key(self, direction="f"):
+ def get_current_key(self, direction: str = "f") -> int:
return self.store.get_max_receipt_stream_id()
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 7c4eeaaa5e..d4651c8348 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -14,14 +14,19 @@
# limitations under the License.
import logging
+from typing import TYPE_CHECKING, Any, Dict, List, Optional
import synapse.metrics
from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, Membership
from synapse.handlers.state_deltas import StateDeltasHandler
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.roommember import ProfileInfo
+from synapse.types import JsonDict
from synapse.util.metrics import Measure
+if TYPE_CHECKING:
+ from synapse.app.homeserver import HomeServer
+
logger = logging.getLogger(__name__)
@@ -36,7 +41,7 @@ class UserDirectoryHandler(StateDeltasHandler):
be in the directory or not when necessary.
"""
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.store = hs.get_datastore()
@@ -49,7 +54,7 @@ class UserDirectoryHandler(StateDeltasHandler):
self.search_all_users = hs.config.user_directory_search_all_users
self.spam_checker = hs.get_spam_checker()
# The current position in the current_state_delta stream
- self.pos = None
+ self.pos = None # type: Optional[int]
# Guard to ensure we only process deltas one at a time
self._is_processing = False
@@ -61,7 +66,9 @@ class UserDirectoryHandler(StateDeltasHandler):
# we start populating the user directory
self.clock.call_later(0, self.notify_new_event)
- async def search_users(self, user_id, search_term, limit):
+ async def search_users(
+ self, user_id: str, search_term: str, limit: int
+ ) -> JsonDict:
"""Searches for users in directory
Returns:
@@ -89,7 +96,7 @@ class UserDirectoryHandler(StateDeltasHandler):
return results
- def notify_new_event(self):
+ def notify_new_event(self) -> None:
"""Called when there may be more deltas to process
"""
if not self.update_user_directory:
@@ -107,7 +114,9 @@ class UserDirectoryHandler(StateDeltasHandler):
self._is_processing = True
run_as_background_process("user_directory.notify_new_event", process)
- async def handle_local_profile_change(self, user_id, profile):
+ async def handle_local_profile_change(
+ self, user_id: str, profile: ProfileInfo
+ ) -> None:
"""Called to update index of our local user profiles when they change
irrespective of any rooms the user may be in.
"""
@@ -124,14 +133,14 @@ class UserDirectoryHandler(StateDeltasHandler):
user_id, profile.display_name, profile.avatar_url
)
- async def handle_user_deactivated(self, user_id):
+ async def handle_user_deactivated(self, user_id: str) -> None:
"""Called when a user ID is deactivated
"""
# FIXME(#3714): We should probably do this in the same worker as all
# the other changes.
await self.store.remove_from_user_dir(user_id)
- async def _unsafe_process(self):
+ async def _unsafe_process(self) -> None:
# If self.pos is None then means we haven't fetched it from DB
if self.pos is None:
self.pos = await self.store.get_user_directory_stream_pos()
@@ -166,7 +175,7 @@ class UserDirectoryHandler(StateDeltasHandler):
await self.store.update_user_directory_stream_pos(max_pos)
- async def _handle_deltas(self, deltas):
+ async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None:
"""Called with the state deltas to process
"""
for delta in deltas:
@@ -236,16 +245,20 @@ class UserDirectoryHandler(StateDeltasHandler):
logger.debug("Ignoring irrelevant type: %r", typ)
async def _handle_room_publicity_change(
- self, room_id, prev_event_id, event_id, typ
- ):
+ self,
+ room_id: str,
+ prev_event_id: Optional[str],
+ event_id: Optional[str],
+ typ: str,
+ ) -> None:
"""Handle a room having potentially changed from/to world_readable/publicly
joinable.
Args:
- room_id (str)
- prev_event_id (str|None): The previous event before the state change
- event_id (str|None): The new event after the state change
- typ (str): Type of the event
+ room_id: The ID of the room which changed.
+ prev_event_id: The previous event before the state change
+ event_id: The new event after the state change
+ typ: Type of the event
"""
logger.debug("Handling change for %s: %s", typ, room_id)
@@ -303,12 +316,14 @@ class UserDirectoryHandler(StateDeltasHandler):
for user_id, profile in users_with_profile.items():
await self._handle_new_user(room_id, user_id, profile)
- async def _handle_new_user(self, room_id, user_id, profile):
+ async def _handle_new_user(
+ self, room_id: str, user_id: str, profile: ProfileInfo
+ ) -> None:
"""Called when we might need to add user to directory
Args:
- room_id (str): room_id that user joined or started being public
- user_id (str)
+ room_id: The room ID that user joined or started being public
+ user_id
"""
logger.debug("Adding new user to dir, %r", user_id)
@@ -356,12 +371,12 @@ class UserDirectoryHandler(StateDeltasHandler):
if to_insert:
await self.store.add_users_who_share_private_room(room_id, to_insert)
- async def _handle_remove_user(self, room_id, user_id):
+ async def _handle_remove_user(self, room_id: str, user_id: str) -> None:
"""Called when we might need to remove user from directory
Args:
- room_id (str): room_id that user left or stopped being public that
- user_id (str)
+ room_id: The room ID that user left or stopped being public that
+ user_id
"""
logger.debug("Removing user %r", user_id)
@@ -374,7 +389,13 @@ class UserDirectoryHandler(StateDeltasHandler):
if len(rooms_user_is_in) == 0:
await self.store.remove_from_user_dir(user_id)
- async def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id):
+ async def _handle_profile_change(
+ self,
+ user_id: str,
+ room_id: str,
+ prev_event_id: Optional[str],
+ event_id: Optional[str],
+ ) -> None:
"""Check member event changes for any profile changes and update the
database if there are.
"""
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 9e7ac149a1..f4f7ec96f8 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -40,7 +40,7 @@ class PusherConfig:
ts = attr.ib(type=int)
lang = attr.ib(type=Optional[str])
data = attr.ib(type=Optional[JsonDict])
- last_stream_ordering = attr.ib(type=Optional[int])
+ last_stream_ordering = attr.ib(type=int)
last_success = attr.ib(type=Optional[int])
failing_since = attr.ib(type=Optional[int])
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index d2eff75a58..4ac1b31748 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -157,7 +157,6 @@ class EmailPusher(Pusher):
being run.
"""
start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
- assert start is not None
unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_email(
self.user_id, start, self.max_stream_ordering
)
@@ -220,12 +219,8 @@ class EmailPusher(Pusher):
)
async def save_last_stream_ordering_and_success(
- self, last_stream_ordering: Optional[int]
+ self, last_stream_ordering: int
) -> None:
- if last_stream_ordering is None:
- # This happens if we haven't yet processed anything
- return
-
self.last_stream_ordering = last_stream_ordering
pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success(
self.app_id,
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 417fe0f1f5..e048b0d59e 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -176,7 +176,6 @@ class HttpPusher(Pusher):
Never call this directly: use _process which will only allow this to
run once per pusher.
"""
- assert self.last_stream_ordering is not None
unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_http(
self.user_id, self.last_stream_ordering, self.max_stream_ordering
)
@@ -205,7 +204,6 @@ class HttpPusher(Pusher):
http_push_processed_counter.inc()
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action["stream_ordering"]
- assert self.last_stream_ordering is not None
pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success(
self.app_id,
self.pushkey,
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 37f3193917..3e843c97fe 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -106,6 +106,10 @@ class PusherPool:
time_now_msec = self.clock.time_msec()
+ # create the pusher setting last_stream_ordering to the current maximum
+ # stream ordering, so it will process pushes from this point onwards.
+ last_stream_ordering = self.store.get_room_max_stream_ordering()
+
# we try to create the pusher just to validate the config: it
# will then get pulled out of the database,
# recreated, added and started: this means we have only one
@@ -124,16 +128,12 @@ class PusherPool:
ts=time_now_msec,
lang=lang,
data=data,
- last_stream_ordering=None,
+ last_stream_ordering=last_stream_ordering,
last_success=None,
failing_since=None,
)
)
- # create the pusher setting last_stream_ordering to the current maximum
- # stream ordering, so it will process pushes from this point onwards.
- last_stream_ordering = self.store.get_room_max_stream_ordering()
-
await self.store.add_pusher(
user_id=user_id,
access_token=access_token,
|