diff --git a/changelog.d/7372.misc b/changelog.d/7372.misc
new file mode 100644
index 0000000000..67a39f0471
--- /dev/null
+++ b/changelog.d/7372.misc
@@ -0,0 +1 @@
+Reduce the amount of whitespace in JSON stored and sent in responses. Contributed by David Vo.
diff --git a/changelog.d/7736.feature b/changelog.d/7736.feature
deleted file mode 100644
index feb02be234..0000000000
--- a/changelog.d/7736.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add unread messages count to sync responses, as specified in [MSC2654](https://github.com/matrix-org/matrix-doc/pull/2654).
diff --git a/changelog.d/8009.misc b/changelog.d/8009.misc
new file mode 100644
index 0000000000..3d58a11313
--- /dev/null
+++ b/changelog.d/8009.misc
@@ -0,0 +1 @@
+Improve the performance of the register endpoint.
diff --git a/changelog.d/8010.doc b/changelog.d/8010.doc
new file mode 100644
index 0000000000..fc8b3f0c3d
--- /dev/null
+++ b/changelog.d/8010.doc
@@ -0,0 +1 @@
+Add documentation for how to undo a room shutdown.
diff --git a/changelog.d/8031.misc b/changelog.d/8031.misc
new file mode 100644
index 0000000000..dfe4c03171
--- /dev/null
+++ b/changelog.d/8031.misc
@@ -0,0 +1 @@
+Convert various parts of the codebase to async/await.
diff --git a/changelog.d/8032.misc b/changelog.d/8032.misc
new file mode 100644
index 0000000000..dfe4c03171
--- /dev/null
+++ b/changelog.d/8032.misc
@@ -0,0 +1 @@
+Convert various parts of the codebase to async/await.
diff --git a/changelog.d/8035.misc b/changelog.d/8035.misc
new file mode 100644
index 0000000000..dfe4c03171
--- /dev/null
+++ b/changelog.d/8035.misc
@@ -0,0 +1 @@
+Convert various parts of the codebase to async/await.
diff --git a/changelog.d/8039.misc b/changelog.d/8039.misc
new file mode 100644
index 0000000000..599933c80e
--- /dev/null
+++ b/changelog.d/8039.misc
@@ -0,0 +1 @@
+Revert MSC2654 implementation because of perf issues. Please delete this line when processing the 1.19 changelog.
diff --git a/changelog.d/8042.misc b/changelog.d/8042.misc
new file mode 100644
index 0000000000..dfe4c03171
--- /dev/null
+++ b/changelog.d/8042.misc
@@ -0,0 +1 @@
+Convert various parts of the codebase to async/await.
diff --git a/changelog.d/8043.misc b/changelog.d/8043.misc
new file mode 100644
index 0000000000..683d553666
--- /dev/null
+++ b/changelog.d/8043.misc
@@ -0,0 +1 @@
+Add a comment to `ServerContextFactory` about the use of `SSLv23_METHOD`.
diff --git a/changelog.d/8044.misc b/changelog.d/8044.misc
new file mode 100644
index 0000000000..dfe4c03171
--- /dev/null
+++ b/changelog.d/8044.misc
@@ -0,0 +1 @@
+Convert various parts of the codebase to async/await.
diff --git a/changelog.d/8045.misc b/changelog.d/8045.misc
new file mode 100644
index 0000000000..dfe4c03171
--- /dev/null
+++ b/changelog.d/8045.misc
@@ -0,0 +1 @@
+Convert various parts of the codebase to async/await.
diff --git a/changelog.d/8048.feature b/changelog.d/8048.feature
new file mode 100644
index 0000000000..8521d1920e
--- /dev/null
+++ b/changelog.d/8048.feature
@@ -0,0 +1 @@
+Add a `/health` endpoint to every configured HTTP listener that can be used as a health check endpoint by load balancers.
diff --git a/changelog.d/8049.misc b/changelog.d/8049.misc
new file mode 100644
index 0000000000..7fce36215d
--- /dev/null
+++ b/changelog.d/8049.misc
@@ -0,0 +1 @@
+Log `OPTIONS` requests at `DEBUG` rather than `INFO` level to reduce amount logged at `INFO`.
diff --git a/docs/admin_api/shutdown_room.md b/docs/admin_api/shutdown_room.md
index 2ff552bcb3..9b1cb1c184 100644
--- a/docs/admin_api/shutdown_room.md
+++ b/docs/admin_api/shutdown_room.md
@@ -79,13 +79,20 @@ Response:
the structure can and does change without notice.
First, it's important to understand that a room shutdown is very destructive. Undoing a shutdown is not as simple as pretending it
-never happened - work has to be done to move forward instead of resetting the past.
+never happened - work has to be done to move forward instead of resetting the past. In fact, in some cases it might not be possible
+to recover at all:
-1. For safety reasons, it is recommended to shut down Synapse prior to continuing.
+* If the room was invite-only, your users will need to be re-invited.
+* If the room no longer has any members at all, it'll be impossible to rejoin.
+* The first user to rejoin will have to do so via an alias on a different server.
+
+With all that being said, if you still want to try and recover the room:
+
+1. For safety reasons, shut down Synapse.
2. In the database, run `DELETE FROM blocked_rooms WHERE room_id = '!example:example.org';`
* For caution: it's recommended to run this in a transaction: `BEGIN; DELETE ...;`, verify you got 1 result, then `COMMIT;`.
* The room ID is the same one supplied to the shutdown room API, not the Content Violation room.
-3. Restart Synapse (required).
+3. Restart Synapse.
You will have to manually handle, if you so choose, the following:
diff --git a/docs/reverse_proxy.md b/docs/reverse_proxy.md
index 7bfb96eff6..fd48ba0874 100644
--- a/docs/reverse_proxy.md
+++ b/docs/reverse_proxy.md
@@ -139,3 +139,10 @@ client IP addresses are recorded correctly.
Having done so, you can then use `https://matrix.example.com` (instead
of `https://matrix.example.com:8448`) as the "Custom server" when
connecting to Synapse from a client.
+
+
+## Health check endpoint
+
+Synapse exposes a health check endpoint for use by reverse proxies.
+Each configured HTTP listener has a `/health` endpoint which always returns
+200 OK (and doesn't get logged).
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index ae5e1810fc..a34bdf1830 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -67,7 +67,7 @@ logger = logging.getLogger("synapse_port_db")
BOOLEAN_COLUMNS = {
- "events": ["processed", "outlier", "contains_url", "count_as_unread"],
+ "events": ["processed", "outlier", "contains_url"],
"rooms": ["is_public"],
"event_edges": ["is_state"],
"presence_list": ["accepted"],
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 2178e623da..d8190f92ab 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -13,12 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import Optional
+from typing import List, Optional, Tuple
import pymacaroons
from netaddr import IPAddress
-from twisted.internet import defer
from twisted.web.server import Request
import synapse.types
@@ -80,13 +79,14 @@ class Auth(object):
self._track_appservice_user_ips = hs.config.track_appservice_user_ips
self._macaroon_secret_key = hs.config.macaroon_secret_key
- @defer.inlineCallbacks
- def check_from_context(self, room_version: str, event, context, do_sig_check=True):
- prev_state_ids = yield defer.ensureDeferred(context.get_prev_state_ids())
- auth_events_ids = yield self.compute_auth_events(
+ async def check_from_context(
+ self, room_version: str, event, context, do_sig_check=True
+ ):
+ prev_state_ids = await context.get_prev_state_ids()
+ auth_events_ids = self.compute_auth_events(
event, prev_state_ids, for_verification=True
)
- auth_events = yield self.store.get_events(auth_events_ids)
+ auth_events = await self.store.get_events(auth_events_ids)
auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
@@ -94,14 +94,13 @@ class Auth(object):
room_version_obj, event, auth_events=auth_events, do_sig_check=do_sig_check
)
- @defer.inlineCallbacks
- def check_user_in_room(
+ async def check_user_in_room(
self,
room_id: str,
user_id: str,
current_state: Optional[StateMap[EventBase]] = None,
allow_departed_users: bool = False,
- ):
+ ) -> EventBase:
"""Check if the user is in the room, or was at some point.
Args:
room_id: The room to check.
@@ -119,37 +118,35 @@ class Auth(object):
Raises:
AuthError if the user is/was not in the room.
Returns:
- Deferred[Optional[EventBase]]:
- Membership event for the user if the user was in the
- room. This will be the join event if they are currently joined to
- the room. This will be the leave event if they have left the room.
+ Membership event for the user if the user was in the
+ room. This will be the join event if they are currently joined to
+ the room. This will be the leave event if they have left the room.
"""
if current_state:
member = current_state.get((EventTypes.Member, user_id), None)
else:
- member = yield defer.ensureDeferred(
- self.state.get_current_state(
- room_id=room_id, event_type=EventTypes.Member, state_key=user_id
- )
+ member = await self.state.get_current_state(
+ room_id=room_id, event_type=EventTypes.Member, state_key=user_id
)
- membership = member.membership if member else None
- if membership == Membership.JOIN:
- return member
+ if member:
+ membership = member.membership
- # XXX this looks totally bogus. Why do we not allow users who have been banned,
- # or those who were members previously and have been re-invited?
- if allow_departed_users and membership == Membership.LEAVE:
- forgot = yield self.store.did_forget(user_id, room_id)
- if not forgot:
+ if membership == Membership.JOIN:
return member
+ # XXX this looks totally bogus. Why do we not allow users who have been banned,
+ # or those who were members previously and have been re-invited?
+ if allow_departed_users and membership == Membership.LEAVE:
+ forgot = await self.store.did_forget(user_id, room_id)
+ if not forgot:
+ return member
+
raise AuthError(403, "User %s not in room %s" % (user_id, room_id))
- @defer.inlineCallbacks
- def check_host_in_room(self, room_id, host):
+ async def check_host_in_room(self, room_id, host):
with Measure(self.clock, "check_host_in_room"):
- latest_event_ids = yield self.store.is_host_joined(room_id, host)
+ latest_event_ids = await self.store.is_host_joined(room_id, host)
return latest_event_ids
def can_federate(self, event, auth_events):
@@ -160,14 +157,13 @@ class Auth(object):
def get_public_keys(self, invite_event):
return event_auth.get_public_keys(invite_event)
- @defer.inlineCallbacks
- def get_user_by_req(
+ async def get_user_by_req(
self,
request: Request,
allow_guest: bool = False,
rights: str = "access",
allow_expired: bool = False,
- ):
+ ) -> synapse.types.Requester:
""" Get a registered user's ID.
Args:
@@ -180,7 +176,7 @@ class Auth(object):
/login will deliver access tokens regardless of expiration.
Returns:
- defer.Deferred: resolves to a `synapse.types.Requester` object
+ Resolves to the requester
Raises:
InvalidClientCredentialsError if no user by that token exists or the token
is invalid.
@@ -194,14 +190,14 @@ class Auth(object):
access_token = self.get_access_token_from_request(request)
- user_id, app_service = yield self._get_appservice_user_id(request)
+ user_id, app_service = await self._get_appservice_user_id(request)
if user_id:
request.authenticated_entity = user_id
opentracing.set_tag("authenticated_entity", user_id)
opentracing.set_tag("appservice_id", app_service.id)
if ip_addr and self._track_appservice_user_ips:
- yield self.store.insert_client_ip(
+ await self.store.insert_client_ip(
user_id=user_id,
access_token=access_token,
ip=ip_addr,
@@ -211,7 +207,7 @@ class Auth(object):
return synapse.types.create_requester(user_id, app_service=app_service)
- user_info = yield self.get_user_by_access_token(
+ user_info = await self.get_user_by_access_token(
access_token, rights, allow_expired=allow_expired
)
user = user_info["user"]
@@ -221,7 +217,7 @@ class Auth(object):
# Deny the request if the user account has expired.
if self._account_validity.enabled and not allow_expired:
user_id = user.to_string()
- expiration_ts = yield self.store.get_expiration_ts_for_user(user_id)
+ expiration_ts = await self.store.get_expiration_ts_for_user(user_id)
if (
expiration_ts is not None
and self.clock.time_msec() >= expiration_ts
@@ -235,7 +231,7 @@ class Auth(object):
device_id = user_info.get("device_id")
if user and access_token and ip_addr:
- yield self.store.insert_client_ip(
+ await self.store.insert_client_ip(
user_id=user.to_string(),
access_token=access_token,
ip=ip_addr,
@@ -261,8 +257,7 @@ class Auth(object):
except KeyError:
raise MissingClientTokenError()
- @defer.inlineCallbacks
- def _get_appservice_user_id(self, request):
+ async def _get_appservice_user_id(self, request):
app_service = self.store.get_app_service_by_token(
self.get_access_token_from_request(request)
)
@@ -283,14 +278,13 @@ class Auth(object):
if not app_service.is_interested_in_user(user_id):
raise AuthError(403, "Application service cannot masquerade as this user.")
- if not (yield self.store.get_user_by_id(user_id)):
+ if not (await self.store.get_user_by_id(user_id)):
raise AuthError(403, "Application service has not registered this user")
return user_id, app_service
- @defer.inlineCallbacks
- def get_user_by_access_token(
+ async def get_user_by_access_token(
self, token: str, rights: str = "access", allow_expired: bool = False,
- ):
+ ) -> dict:
""" Validate access token and get user_id from it
Args:
@@ -300,7 +294,7 @@ class Auth(object):
allow_expired: If False, raises an InvalidClientTokenError
if the token is expired
Returns:
- Deferred[dict]: dict that includes:
+ dict that includes:
`user` (UserID)
`is_guest` (bool)
`token_id` (int|None): access token id. May be None if guest
@@ -314,7 +308,7 @@ class Auth(object):
if rights == "access":
# first look in the database
- r = yield self._look_up_user_by_access_token(token)
+ r = await self._look_up_user_by_access_token(token)
if r:
valid_until_ms = r["valid_until_ms"]
if (
@@ -352,7 +346,7 @@ class Auth(object):
# It would of course be much easier to store guest access
# tokens in the database as well, but that would break existing
# guest tokens.
- stored_user = yield self.store.get_user_by_id(user_id)
+ stored_user = await self.store.get_user_by_id(user_id)
if not stored_user:
raise InvalidClientTokenError("Unknown user_id %s" % user_id)
if not stored_user["is_guest"]:
@@ -482,9 +476,8 @@ class Auth(object):
now = self.hs.get_clock().time_msec()
return now < expiry
- @defer.inlineCallbacks
- def _look_up_user_by_access_token(self, token):
- ret = yield self.store.get_user_by_access_token(token)
+ async def _look_up_user_by_access_token(self, token):
+ ret = await self.store.get_user_by_access_token(token)
if not ret:
return None
@@ -507,7 +500,7 @@ class Auth(object):
logger.warning("Unrecognised appservice access token.")
raise InvalidClientTokenError()
request.authenticated_entity = service.sender
- return defer.succeed(service)
+ return service
async def is_server_admin(self, user: UserID) -> bool:
""" Check if the given user is a local server admin.
@@ -522,7 +515,7 @@ class Auth(object):
def compute_auth_events(
self, event, current_state_ids: StateMap[str], for_verification: bool = False,
- ):
+ ) -> List[str]:
"""Given an event and current state return the list of event IDs used
to auth an event.
@@ -530,11 +523,11 @@ class Auth(object):
should be added to the event's `auth_events`.
Returns:
- defer.Deferred(list[str]): List of event IDs.
+ List of event IDs.
"""
if event.type == EventTypes.Create:
- return defer.succeed([])
+ return []
# Currently we ignore the `for_verification` flag even though there are
# some situations where we can drop particular auth events when adding
@@ -553,7 +546,7 @@ class Auth(object):
if auth_ev_id:
auth_ids.append(auth_ev_id)
- return defer.succeed(auth_ids)
+ return auth_ids
async def check_can_change_room_list(self, room_id: str, user: UserID):
"""Determine whether the user is allowed to edit the room's entry in the
@@ -636,10 +629,9 @@ class Auth(object):
return query_params[0].decode("ascii")
- @defer.inlineCallbacks
- def check_user_in_room_or_world_readable(
+ async def check_user_in_room_or_world_readable(
self, room_id: str, user_id: str, allow_departed_users: bool = False
- ):
+ ) -> Tuple[str, Optional[str]]:
"""Checks that the user is or was in the room or the room is world
readable. If it isn't then an exception is raised.
@@ -650,10 +642,9 @@ class Auth(object):
members but have now departed
Returns:
- Deferred[tuple[str, str|None]]: Resolves to the current membership of
- the user in the room and the membership event ID of the user. If
- the user is not in the room and never has been, then
- `(Membership.JOIN, None)` is returned.
+ Resolves to the current membership of the user in the room and the
+ membership event ID of the user. If the user is not in the room and
+ never has been, then `(Membership.JOIN, None)` is returned.
"""
try:
@@ -662,15 +653,13 @@ class Auth(object):
# * The user is a non-guest user, and was ever in the room
# * The user is a guest user, and has joined the room
# else it will throw.
- member_event = yield self.check_user_in_room(
+ member_event = await self.check_user_in_room(
room_id, user_id, allow_departed_users=allow_departed_users
)
return member_event.membership, member_event.event_id
except AuthError:
- visibility = yield defer.ensureDeferred(
- self.state.get_current_state(
- room_id, EventTypes.RoomHistoryVisibility, ""
- )
+ visibility = await self.state.get_current_state(
+ room_id, EventTypes.RoomHistoryVisibility, ""
)
if (
visibility
diff --git a/synapse/api/auth_blocking.py b/synapse/api/auth_blocking.py
index 5c499b6b4e..49093bf181 100644
--- a/synapse/api/auth_blocking.py
+++ b/synapse/api/auth_blocking.py
@@ -15,8 +15,6 @@
import logging
-from twisted.internet import defer
-
from synapse.api.constants import LimitBlockingTypes, UserTypes
from synapse.api.errors import Codes, ResourceLimitError
from synapse.config.server import is_threepid_reserved
@@ -36,8 +34,7 @@ class AuthBlocking(object):
self._limit_usage_by_mau = hs.config.limit_usage_by_mau
self._mau_limits_reserved_threepids = hs.config.mau_limits_reserved_threepids
- @defer.inlineCallbacks
- def check_auth_blocking(self, user_id=None, threepid=None, user_type=None):
+ async def check_auth_blocking(self, user_id=None, threepid=None, user_type=None):
"""Checks if the user should be rejected for some external reason,
such as monthly active user limiting or global disable flag
@@ -60,7 +57,7 @@ class AuthBlocking(object):
if user_id is not None:
if user_id == self._server_notices_mxid:
return
- if (yield self.store.is_support_user(user_id)):
+ if await self.store.is_support_user(user_id):
return
if self._hs_disabled:
@@ -76,11 +73,11 @@ class AuthBlocking(object):
# If the user is already part of the MAU cohort or a trial user
if user_id:
- timestamp = yield self.store.user_last_seen_monthly_active(user_id)
+ timestamp = await self.store.user_last_seen_monthly_active(user_id)
if timestamp:
return
- is_trial = yield self.store.is_trial_user(user_id)
+ is_trial = await self.store.is_trial_user(user_id)
if is_trial:
return
elif threepid:
@@ -93,7 +90,7 @@ class AuthBlocking(object):
# allow registration. Support users are excluded from MAU checks.
return
# Else if there is no room in the MAU bucket, bail
- current_mau = yield self.store.get_monthly_active_count()
+ current_mau = await self.store.get_monthly_active_count()
if current_mau >= self._max_mau_value:
raise ResourceLimitError(
403,
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index b3bab1aa52..6e40630ab6 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -238,14 +238,16 @@ class InteractiveAuthIncompleteError(Exception):
(This indicates we should return a 401 with 'result' as the body)
Attributes:
+ session_id: The ID of the ongoing interactive auth session.
result: the server response to the request, which should be
passed back to the client
"""
- def __init__(self, result: "JsonDict"):
+ def __init__(self, session_id: str, result: "JsonDict"):
super(InteractiveAuthIncompleteError, self).__init__(
"Interactive auth not yet complete"
)
+ self.session_id = session_id
self.result = result
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index f988f62a1e..7393d6cb74 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -21,8 +21,6 @@ import jsonschema
from canonicaljson import json
from jsonschema import FormatChecker
-from twisted.internet import defer
-
from synapse.api.constants import EventContentFields
from synapse.api.errors import SynapseError
from synapse.storage.presence import UserPresenceState
@@ -137,9 +135,8 @@ class Filtering(object):
super(Filtering, self).__init__()
self.store = hs.get_datastore()
- @defer.inlineCallbacks
- def get_user_filter(self, user_localpart, filter_id):
- result = yield self.store.get_user_filter(user_localpart, filter_id)
+ async def get_user_filter(self, user_localpart, filter_id):
+ result = await self.store.get_user_filter(user_localpart, filter_id)
return FilterCollection(result)
def add_user_filter(self, user_localpart, user_filter):
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 1a16d0b9f8..7957586d69 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -123,6 +123,7 @@ from synapse.rest.client.v2_alpha.account_data import (
from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.rest.client.versions import VersionsRestServlet
+from synapse.rest.health import HealthResource
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.server import HomeServer
from synapse.storage.databases.main.censor_events import CensorEventsStore
@@ -493,7 +494,10 @@ class GenericWorkerServer(HomeServer):
site_tag = listener_config.http_options.tag
if site_tag is None:
site_tag = port
- resources = {}
+
+ # We always include a health resource.
+ resources = {"/health": HealthResource()}
+
for res in listener_config.http_options.resources:
for name in res.names:
if name == "metrics":
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index d87a77718e..98d0d14a12 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -68,6 +68,7 @@ from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.rest import ClientRestResource
from synapse.rest.admin import AdminRestResource
+from synapse.rest.health import HealthResource
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.rest.well_known import WellKnownResource
from synapse.server import HomeServer
@@ -98,7 +99,9 @@ class SynapseHomeServer(HomeServer):
if site_tag is None:
site_tag = port
- resources = {}
+ # We always include a health resource.
+ resources = {"/health": HealthResource()}
+
for res in listener_config.http_options.resources:
for name in res.names:
if name == "openid" and "federation" in res.names:
diff --git a/synapse/crypto/context_factory.py b/synapse/crypto/context_factory.py
index a5a2a7815d..777c0f00b1 100644
--- a/synapse/crypto/context_factory.py
+++ b/synapse/crypto/context_factory.py
@@ -48,6 +48,14 @@ class ServerContextFactory(ContextFactory):
connections."""
def __init__(self, config):
+ # TODO: once pyOpenSSL exposes TLS_METHOD and SSL_CTX_set_min_proto_version,
+ # switch to those (see https://github.com/pyca/cryptography/issues/5379).
+ #
+ # note that, despite the confusing name, SSLv23_METHOD does *not* enforce SSLv2
+ # or v3, but is a synonym for TLS_METHOD, which allows the client and server
+ # to negotiate an appropriate version of TLS constrained by the version options
+ # set with context.set_options.
+ #
self._context = SSL.Context(SSL.SSLv23_METHOD)
self.configure_context(self._context, config)
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index 69b53ca2bc..4e179d49b3 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -106,7 +106,7 @@ class EventBuilder(object):
state_ids = await self._state.get_current_state_ids(
self.room_id, prev_event_ids
)
- auth_ids = await self._auth.compute_auth_events(self, state_ids)
+ auth_ids = self._auth.compute_auth_events(self, state_ids)
format_version = self.room_version.event_format
if format_version == EventFormatVersions.V1:
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index fbc56c351b..c9044a5019 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -101,7 +101,7 @@ class ApplicationServicesHandler(object):
async def start_scheduler():
try:
- return self.scheduler.start()
+ return await self.scheduler.start()
except Exception:
logger.error("Application Services Failure")
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index c7d921c21a..c24e7bafe0 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -162,7 +162,7 @@ class AuthHandler(BaseHandler):
request_body: Dict[str, Any],
clientip: str,
description: str,
- ) -> dict:
+ ) -> Tuple[dict, str]:
"""
Checks that the user is who they claim to be, via a UI auth.
@@ -183,9 +183,14 @@ class AuthHandler(BaseHandler):
describes the operation happening on their account.
Returns:
- The parameters for this request (which may
+ A tuple of (params, session_id).
+
+ 'params' contains the parameters for this request (which may
have been given only in a previous call).
+ 'session_id' is the ID of this session, either passed in by the
+ client or assigned by this call
+
Raises:
InteractiveAuthIncompleteError if the client has not yet completed
any of the permitted login flows
@@ -207,7 +212,7 @@ class AuthHandler(BaseHandler):
flows = [[login_type] for login_type in self._supported_ui_auth_types]
try:
- result, params, _ = await self.check_auth(
+ result, params, session_id = await self.check_ui_auth(
flows, request, request_body, clientip, description
)
except LoginError:
@@ -230,7 +235,7 @@ class AuthHandler(BaseHandler):
if user_id != requester.user.to_string():
raise AuthError(403, "Invalid auth")
- return params
+ return params, session_id
def get_enabled_auth_types(self):
"""Return the enabled user-interactive authentication types
@@ -240,7 +245,7 @@ class AuthHandler(BaseHandler):
"""
return self.checkers.keys()
- async def check_auth(
+ async def check_ui_auth(
self,
flows: List[List[str]],
request: SynapseRequest,
@@ -363,7 +368,7 @@ class AuthHandler(BaseHandler):
if not authdict:
raise InteractiveAuthIncompleteError(
- self._auth_dict_for_flows(flows, session.session_id)
+ session.session_id, self._auth_dict_for_flows(flows, session.session_id)
)
# check auth type currently being presented
@@ -410,7 +415,7 @@ class AuthHandler(BaseHandler):
ret = self._auth_dict_for_flows(flows, session.session_id)
ret["completed"] = list(creds)
ret.update(errordict)
- raise InteractiveAuthIncompleteError(ret)
+ raise InteractiveAuthIncompleteError(session.session_id, ret)
async def add_oob_auth(
self, stagetype: str, authdict: Dict[str, Any], clientip: str
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b3764dedae..593932adb7 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -2064,7 +2064,7 @@ class FederationHandler(BaseHandler):
if not auth_events:
prev_state_ids = await context.get_prev_state_ids()
- auth_events_ids = await self.auth.compute_auth_events(
+ auth_events_ids = self.auth.compute_auth_events(
event, prev_state_ids, for_verification=True
)
auth_events_x = await self.store.get_events(auth_events_ids)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 43901d0934..708533d4d1 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1061,7 +1061,7 @@ class EventCreationHandler(object):
raise SynapseError(400, "Cannot redact event from a different room")
prev_state_ids = await context.get_prev_state_ids()
- auth_events_ids = await self.auth.compute_auth_events(
+ auth_events_ids = self.auth.compute_auth_events(
event, prev_state_ids, for_verification=True
)
auth_events = await self.store.get_events(auth_events_ids)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 5a19bac929..c42dac18f5 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -103,7 +103,6 @@ class JoinedSyncResult:
account_data = attr.ib(type=List[JsonDict])
unread_notifications = attr.ib(type=JsonDict)
summary = attr.ib(type=Optional[JsonDict])
- unread_count = attr.ib(type=int)
def __nonzero__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
@@ -1887,10 +1886,6 @@ class SyncHandler(object):
if room_builder.rtype == "joined":
unread_notifications = {} # type: Dict[str, str]
-
- unread_count = await self.store.get_unread_message_count_for_user(
- room_id, sync_config.user.to_string(),
- )
room_sync = JoinedSyncResult(
room_id=room_id,
timeline=batch,
@@ -1899,7 +1894,6 @@ class SyncHandler(object):
account_data=account_data_events,
unread_notifications=unread_notifications,
summary=summary,
- unread_count=unread_count,
)
if room_sync or always_include:
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 94ab29974a..ffe6cfa09e 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -25,7 +25,7 @@ from io import BytesIO
from typing import Any, Callable, Dict, Tuple, Union
import jinja2
-from canonicaljson import encode_canonical_json, encode_pretty_printed_json, json
+from canonicaljson import encode_canonical_json, encode_pretty_printed_json
from twisted.internet import defer
from twisted.python import failure
@@ -46,6 +46,7 @@ from synapse.api.errors import (
from synapse.http.site import SynapseRequest
from synapse.logging.context import preserve_fn
from synapse.logging.opentracing import trace_servlet
+from synapse.util import json_encoder
from synapse.util.caches import intern_dict
logger = logging.getLogger(__name__)
@@ -538,7 +539,7 @@ def respond_with_json(
# canonicaljson already encodes to bytes
json_bytes = encode_canonical_json(json_object)
else:
- json_bytes = json.dumps(json_object).encode("utf-8")
+ json_bytes = json_encoder.encode(json_object).encode("utf-8")
return respond_with_json_bytes(request, code, json_bytes, send_cors=send_cors)
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 6f3b2258cc..6e79b47828 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -146,10 +146,9 @@ class SynapseRequest(Request):
Returns a context manager; the correct way to use this is:
- @defer.inlineCallbacks
- def handle_request(request):
+ async def handle_request(request):
with request.processing("FooServlet"):
- yield really_handle_the_request()
+ await really_handle_the_request()
Once the context manager is closed, the completion of the request will be logged,
and the various metrics will be updated.
@@ -287,7 +286,9 @@ class SynapseRequest(Request):
# the connection dropped)
code += "!"
- self.site.access_logger.info(
+ log_level = logging.INFO if self._should_log_request() else logging.DEBUG
+ self.site.access_logger.log(
+ log_level,
"%s - %s - {%s}"
" Processed request: %.3fsec/%.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
' %sB %s "%s %s %s" "%s" [%d dbevts]',
@@ -315,6 +316,17 @@ class SynapseRequest(Request):
except Exception as e:
logger.warning("Failed to stop metrics: %r", e)
+ def _should_log_request(self) -> bool:
+ """Whether we should log at INFO that we processed the request.
+ """
+ if self.path == b"/health":
+ return False
+
+ if self.method == b"OPTIONS":
+ return False
+
+ return True
+
class XForwardedForRequest(SynapseRequest):
def __init__(self, *args, **kw):
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index a9269196b3..f766d16db6 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -13,16 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import inspect
import logging
import threading
-from asyncio import iscoroutine
from functools import wraps
from typing import TYPE_CHECKING, Dict, Optional, Set
from prometheus_client.core import REGISTRY, Counter, Gauge
from twisted.internet import defer
-from twisted.python.failure import Failure
from synapse.logging.context import LoggingContext, PreserveLoggingContext
@@ -167,7 +166,7 @@ class _BackgroundProcess(object):
)
-def run_as_background_process(desc, func, *args, **kwargs):
+def run_as_background_process(desc: str, func, *args, **kwargs):
"""Run the given function in its own logcontext, with resource metrics
This should be used to wrap processes which are fired off to run in the
@@ -179,7 +178,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
normal synapse inlineCallbacks function).
Args:
- desc (str): a description for this background process type
+ desc: a description for this background process type
func: a function, which may return a Deferred or a coroutine
args: positional args for func
kwargs: keyword args for func
@@ -188,8 +187,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
follow the synapse logcontext rules.
"""
- @defer.inlineCallbacks
- def run():
+ async def run():
with _bg_metrics_lock:
count = _background_process_counts.get(desc, 0)
_background_process_counts[desc] = count + 1
@@ -203,29 +201,21 @@ def run_as_background_process(desc, func, *args, **kwargs):
try:
result = func(*args, **kwargs)
- # We probably don't have an ensureDeferred in our call stack to handle
- # coroutine results, so we need to ensureDeferred here.
- #
- # But we need this check because ensureDeferred doesn't like being
- # called on immediate values (as opposed to Deferreds or coroutines).
- if iscoroutine(result):
- result = defer.ensureDeferred(result)
+ if inspect.isawaitable(result):
+ result = await result
- return (yield result)
+ return result
except Exception:
- # failure.Failure() fishes the original Failure out of our stack, and
- # thus gives us a sensible stack trace.
- f = Failure()
- logger.error(
- "Background process '%s' threw an exception",
- desc,
- exc_info=(f.type, f.value, f.getTracebackObject()),
+ logger.exception(
+ "Background process '%s' threw an exception", desc,
)
finally:
_background_process_in_flight_count.labels(desc).dec()
with PreserveLoggingContext():
- return run()
+ # Note that we return a Deferred here so that it can be used in a
+ # looping_call and other places that expect a Deferred.
+ return defer.ensureDeferred(run())
def wrap_as_background_process(desc):
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 8201849951..c2fb757d9a 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -194,12 +194,16 @@ class ModuleApi(object):
synapse.api.errors.AuthError: the access token is invalid
"""
# see if the access token corresponds to a device
- user_info = yield self._auth.get_user_by_access_token(access_token)
+ user_info = yield defer.ensureDeferred(
+ self._auth.get_user_by_access_token(access_token)
+ )
device_id = user_info.get("device_id")
user_id = user_info["user"].to_string()
if device_id:
# delete the device, which will also delete its access tokens
- yield self._hs.get_device_handler().delete_device(user_id, device_id)
+ yield defer.ensureDeferred(
+ self._hs.get_device_handler().delete_device(user_id, device_id)
+ )
else:
# no associated device. Just delete the access token.
yield defer.ensureDeferred(
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 04b9d8ac82..e7fcee0e87 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -120,7 +120,7 @@ class BulkPushRuleEvaluator(object):
pl_event = await self.store.get_event(pl_event_id)
auth_events = {POWER_KEY: pl_event}
else:
- auth_events_ids = await self.auth.compute_auth_events(
+ auth_events_ids = self.auth.compute_auth_events(
event, prev_state_ids, for_verification=False
)
auth_events = await self.store.get_events(auth_events_ids)
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index bc8f71916b..d0145666bf 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -21,13 +21,22 @@ async def get_badge_count(store, user_id):
invites = await store.get_invited_rooms_for_local_user(user_id)
joins = await store.get_rooms_for_user(user_id)
+ my_receipts_by_room = await store.get_receipts_for_user(user_id, "m.read")
+
badge = len(invites)
for room_id in joins:
- unread_count = await store.get_unread_message_count_for_user(room_id, user_id)
- # return one badge count per conversation, as count per
- # message is so noisy as to be almost useless
- badge += 1 if unread_count else 0
+ if room_id in my_receipts_by_room:
+ last_unread_event_id = my_receipts_by_room[room_id]
+
+ notifs = await (
+ store.get_unread_event_push_actions_by_room_for_user(
+ room_id, user_id, last_unread_event_id
+ )
+ )
+ # return one badge count per conversation, as count per
+ # message is so noisy as to be almost useless
+ badge += 1 if notifs["notify_count"] else 0
return badge
diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
index 60dd3f6701..a6fdedde63 100644
--- a/synapse/replication/slave/storage/client_ips.py
+++ b/synapse/replication/slave/storage/client_ips.py
@@ -28,7 +28,7 @@ class SlavedClientIpStore(BaseSlavedStore):
name="client_ip_last_seen", keylen=4, max_entries=50000
)
- def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id):
+ async def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id):
now = int(self._clock.time_msec())
key = (user_id, access_token, ip)
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index f33801f883..d853e4447e 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -18,11 +18,12 @@ The VALID_SERVER_COMMANDS and VALID_CLIENT_COMMANDS define which commands are
allowed to be sent by which side.
"""
import abc
-import json
import logging
from typing import Tuple, Type
-_json_encoder = json.JSONEncoder()
+from canonicaljson import json
+
+from synapse.util import json_encoder as _json_encoder
logger = logging.getLogger(__name__)
diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py
index 5934b1fe8b..b210015173 100644
--- a/synapse/rest/client/v1/directory.py
+++ b/synapse/rest/client/v1/directory.py
@@ -89,7 +89,7 @@ class ClientDirectoryServer(RestServlet):
dir_handler = self.handlers.directory_handler
try:
- service = await self.auth.get_appservice_by_req(request)
+ service = self.auth.get_appservice_by_req(request)
room_alias = RoomAlias.from_string(room_alias)
await dir_handler.delete_appservice_association(service, room_alias)
logger.info(
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index 3767a809a4..fead85074b 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -18,7 +18,12 @@ import logging
from http import HTTPStatus
from synapse.api.constants import LoginType
-from synapse.api.errors import Codes, SynapseError, ThreepidValidationError
+from synapse.api.errors import (
+ Codes,
+ InteractiveAuthIncompleteError,
+ SynapseError,
+ ThreepidValidationError,
+)
from synapse.config.emailconfig import ThreepidBehaviour
from synapse.http.server import finish_request, respond_with_html
from synapse.http.servlet import (
@@ -239,18 +244,12 @@ class PasswordRestServlet(RestServlet):
# we do basic sanity checks here because the auth layer will store these
# in sessions. Pull out the new password provided to us.
- if "new_password" in body:
- new_password = body.pop("new_password")
+ new_password = body.pop("new_password", None)
+ if new_password is not None:
if not isinstance(new_password, str) or len(new_password) > 512:
raise SynapseError(400, "Invalid password")
self.password_policy_handler.validate_password(new_password)
- # If the password is valid, hash it and store it back on the body.
- # This ensures that only the hashed password is handled everywhere.
- if "new_password_hash" in body:
- raise SynapseError(400, "Unexpected property: new_password_hash")
- body["new_password_hash"] = await self.auth_handler.hash(new_password)
-
# there are two possibilities here. Either the user does not have an
# access token, and needs to do a password reset; or they have one and
# need to validate their identity.
@@ -263,23 +262,49 @@ class PasswordRestServlet(RestServlet):
if self.auth.has_access_token(request):
requester = await self.auth.get_user_by_req(request)
- params = await self.auth_handler.validate_user_via_ui_auth(
- requester,
- request,
- body,
- self.hs.get_ip_from_request(request),
- "modify your account password",
- )
+ try:
+ params, session_id = await self.auth_handler.validate_user_via_ui_auth(
+ requester,
+ request,
+ body,
+ self.hs.get_ip_from_request(request),
+ "modify your account password",
+ )
+ except InteractiveAuthIncompleteError as e:
+ # The user needs to provide more steps to complete auth, but
+ # they're not required to provide the password again.
+ #
+ # If a password is available now, hash the provided password and
+ # store it for later.
+ if new_password:
+ password_hash = await self.auth_handler.hash(new_password)
+ await self.auth_handler.set_session_data(
+ e.session_id, "password_hash", password_hash
+ )
+ raise
user_id = requester.user.to_string()
else:
requester = None
- result, params, _ = await self.auth_handler.check_auth(
- [[LoginType.EMAIL_IDENTITY]],
- request,
- body,
- self.hs.get_ip_from_request(request),
- "modify your account password",
- )
+ try:
+ result, params, session_id = await self.auth_handler.check_ui_auth(
+ [[LoginType.EMAIL_IDENTITY]],
+ request,
+ body,
+ self.hs.get_ip_from_request(request),
+ "modify your account password",
+ )
+ except InteractiveAuthIncompleteError as e:
+ # The user needs to provide more steps to complete auth, but
+ # they're not required to provide the password again.
+ #
+ # If a password is available now, hash the provided password and
+ # store it for later.
+ if new_password:
+ password_hash = await self.auth_handler.hash(new_password)
+ await self.auth_handler.set_session_data(
+ e.session_id, "password_hash", password_hash
+ )
+ raise
if LoginType.EMAIL_IDENTITY in result:
threepid = result[LoginType.EMAIL_IDENTITY]
@@ -304,12 +329,21 @@ class PasswordRestServlet(RestServlet):
logger.error("Auth succeeded but no known type! %r", result.keys())
raise SynapseError(500, "", Codes.UNKNOWN)
- assert_params_in_dict(params, ["new_password_hash"])
- new_password_hash = params["new_password_hash"]
+ # If we have a password in this request, prefer it. Otherwise, there
+ # must be a password hash from an earlier request.
+ if new_password:
+ password_hash = await self.auth_handler.hash(new_password)
+ else:
+ password_hash = await self.auth_handler.get_session_data(
+ session_id, "password_hash", None
+ )
+ if not password_hash:
+ raise SynapseError(400, "Missing params: password", Codes.MISSING_PARAM)
+
logout_devices = params.get("logout_devices", True)
await self._set_password_handler.set_password(
- user_id, new_password_hash, logout_devices, requester
+ user_id, password_hash, logout_devices, requester
)
return 200, {}
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 370742ce59..f808175698 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -24,6 +24,7 @@ import synapse.types
from synapse.api.constants import LoginType
from synapse.api.errors import (
Codes,
+ InteractiveAuthIncompleteError,
SynapseError,
ThreepidValidationError,
UnrecognizedRequestError,
@@ -387,6 +388,7 @@ class RegisterRestServlet(RestServlet):
self.ratelimiter = hs.get_registration_ratelimiter()
self.password_policy_handler = hs.get_password_policy_handler()
self.clock = hs.get_clock()
+ self._registration_enabled = self.hs.config.enable_registration
self._registration_flows = _calculate_registration_flows(
hs.config, self.auth_handler
@@ -412,20 +414,8 @@ class RegisterRestServlet(RestServlet):
"Do not understand membership kind: %s" % (kind.decode("utf8"),)
)
- # we do basic sanity checks here because the auth layer will store these
- # in sessions. Pull out the username/password provided to us.
- if "password" in body:
- password = body.pop("password")
- if not isinstance(password, str) or len(password) > 512:
- raise SynapseError(400, "Invalid password")
- self.password_policy_handler.validate_password(password)
-
- # If the password is valid, hash it and store it back on the body.
- # This ensures that only the hashed password is handled everywhere.
- if "password_hash" in body:
- raise SynapseError(400, "Unexpected property: password_hash")
- body["password_hash"] = await self.auth_handler.hash(password)
-
+ # Pull out the provided username and do basic sanity checks early since
+ # the auth layer will store these in sessions.
desired_username = None
if "username" in body:
if not isinstance(body["username"], str) or len(body["username"]) > 512:
@@ -434,7 +424,7 @@ class RegisterRestServlet(RestServlet):
appservice = None
if self.auth.has_access_token(request):
- appservice = await self.auth.get_appservice_by_req(request)
+ appservice = self.auth.get_appservice_by_req(request)
# fork off as soon as possible for ASes which have completely
# different registration flows to normal users
@@ -459,22 +449,35 @@ class RegisterRestServlet(RestServlet):
)
return 200, result # we throw for non 200 responses
- # for regular registration, downcase the provided username before
- # attempting to register it. This should mean
- # that people who try to register with upper-case in their usernames
- # don't get a nasty surprise. (Note that we treat username
- # case-insenstively in login, so they are free to carry on imagining
- # that their username is CrAzYh4cKeR if that keeps them happy)
- if desired_username is not None:
- desired_username = desired_username.lower()
-
# == Normal User Registration == (everyone else)
- if not self.hs.config.enable_registration:
+ if not self._registration_enabled:
raise SynapseError(403, "Registration has been disabled")
+ # For regular registration, convert the provided username to lowercase
+ # before attempting to register it. This should mean that people who try
+ # to register with upper-case in their usernames don't get a nasty surprise.
+ #
+ # Note that we treat usernames case-insensitively in login, so they are
+ # free to carry on imagining that their username is CrAzYh4cKeR if that
+ # keeps them happy.
+ if desired_username is not None:
+ desired_username = desired_username.lower()
+
+ # Check if this account is upgrading from a guest account.
guest_access_token = body.get("guest_access_token", None)
- if "initial_device_display_name" in body and "password_hash" not in body:
+ # Pull out the provided password and do basic sanity checks early.
+ #
+ # Note that we remove the password from the body since the auth layer
+ # will store the body in the session and we don't want a plaintext
+ # password store there.
+ password = body.pop("password", None)
+ if password is not None:
+ if not isinstance(password, str) or len(password) > 512:
+ raise SynapseError(400, "Invalid password")
+ self.password_policy_handler.validate_password(password)
+
+ if "initial_device_display_name" in body and password is None:
# ignore 'initial_device_display_name' if sent without
# a password to work around a client bug where it sent
# the 'initial_device_display_name' param alone, wiping out
@@ -484,6 +487,7 @@ class RegisterRestServlet(RestServlet):
session_id = self.auth_handler.get_session_id(body)
registered_user_id = None
+ password_hash = None
if session_id:
# if we get a registered user id out of here, it means we previously
# registered a user for this session, so we could just return the
@@ -492,7 +496,12 @@ class RegisterRestServlet(RestServlet):
registered_user_id = await self.auth_handler.get_session_data(
session_id, "registered_user_id", None
)
+ # Extract the previously-hashed password from the session.
+ password_hash = await self.auth_handler.get_session_data(
+ session_id, "password_hash", None
+ )
+ # Ensure that the username is valid.
if desired_username is not None:
await self.registration_handler.check_username(
desired_username,
@@ -500,20 +509,38 @@ class RegisterRestServlet(RestServlet):
assigned_user_id=registered_user_id,
)
- auth_result, params, session_id = await self.auth_handler.check_auth(
- self._registration_flows,
- request,
- body,
- self.hs.get_ip_from_request(request),
- "register a new account",
- )
+ # Check if the user-interactive authentication flows are complete, if
+ # not this will raise a user-interactive auth error.
+ try:
+ auth_result, params, session_id = await self.auth_handler.check_ui_auth(
+ self._registration_flows,
+ request,
+ body,
+ self.hs.get_ip_from_request(request),
+ "register a new account",
+ )
+ except InteractiveAuthIncompleteError as e:
+ # The user needs to provide more steps to complete auth.
+ #
+ # Hash the password and store it with the session since the client
+ # is not required to provide the password again.
+ #
+ # If a password hash was previously stored we will not attempt to
+ # re-hash and store it for efficiency. This assumes the password
+ # does not change throughout the authentication flow, but this
+ # should be fine since the data is meant to be consistent.
+ if not password_hash and password:
+ password_hash = await self.auth_handler.hash(password)
+ await self.auth_handler.set_session_data(
+ e.session_id, "password_hash", password_hash
+ )
+ raise
# Check that we're not trying to register a denied 3pid.
#
# the user-facing checks will probably already have happened in
# /register/email/requestToken when we requested a 3pid, but that's not
# guaranteed.
-
if auth_result:
for login_type in [LoginType.EMAIL_IDENTITY, LoginType.MSISDN]:
if login_type in auth_result:
@@ -535,12 +562,15 @@ class RegisterRestServlet(RestServlet):
# don't re-register the threepids
registered = False
else:
- # NB: This may be from the auth handler and NOT from the POST
- assert_params_in_dict(params, ["password_hash"])
+ # If we have a password in this request, prefer it. Otherwise, there
+ # might be a password hash from an earlier request.
+ if password:
+ password_hash = await self.auth_handler.hash(password)
+ if not password_hash:
+ raise SynapseError(400, "Missing params: password", Codes.MISSING_PARAM)
desired_username = params.get("username", None)
guest_access_token = params.get("guest_access_token", None)
- new_password_hash = params.get("password_hash", None)
if desired_username is not None:
desired_username = desired_username.lower()
@@ -582,7 +612,7 @@ class RegisterRestServlet(RestServlet):
registered_user_id = await self.registration_handler.register_user(
localpart=desired_username,
- password_hash=new_password_hash,
+ password_hash=password_hash,
guest_access_token=guest_access_token,
threepid=threepid,
address=client_addr,
@@ -595,8 +625,8 @@ class RegisterRestServlet(RestServlet):
):
await self.store.upsert_monthly_active_user(registered_user_id)
- # remember that we've now registered that user account, and with
- # what user ID (since the user may not have specified)
+ # Remember that the user account has been registered (and the user
+ # ID it was registered with, since it might not have been specified).
await self.auth_handler.set_session_data(
session_id, "registered_user_id", registered_user_id
)
@@ -635,7 +665,7 @@ class RegisterRestServlet(RestServlet):
(object) params: registration parameters, from which we pull
device_id, initial_device_name and inhibit_login
Returns:
- defer.Deferred: (object) dictionary for response from /register
+ (object) dictionary for response from /register
"""
result = {"user_id": user_id, "home_server": self.hs.hostname}
if not params.get("inhibit_login", False):
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index 3f5bf75e59..a5c24fbd63 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -426,7 +426,6 @@ class SyncRestServlet(RestServlet):
result["ephemeral"] = {"events": ephemeral_events}
result["unread_notifications"] = room.unread_notifications
result["summary"] = room.summary
- result["org.matrix.msc2654.unread_count"] = room.unread_count
return result
diff --git a/synapse/rest/consent/consent_resource.py b/synapse/rest/consent/consent_resource.py
index 4386eb4e72..b3e4d5612e 100644
--- a/synapse/rest/consent/consent_resource.py
+++ b/synapse/rest/consent/consent_resource.py
@@ -22,8 +22,6 @@ from os import path
import jinja2
from jinja2 import TemplateNotFound
-from twisted.internet import defer
-
from synapse.api.errors import NotFoundError, StoreError, SynapseError
from synapse.config import ConfigError
from synapse.http.server import DirectServeHtmlResource, respond_with_html
@@ -135,7 +133,7 @@ class ConsentResource(DirectServeHtmlResource):
else:
qualified_user_id = UserID(username, self.hs.hostname).to_string()
- u = await defer.maybeDeferred(self.store.get_user_by_id, qualified_user_id)
+ u = await self.store.get_user_by_id(qualified_user_id)
if u is None:
raise NotFoundError("Unknown user")
diff --git a/synapse/rest/health.py b/synapse/rest/health.py
new file mode 100644
index 0000000000..0170950bf3
--- /dev/null
+++ b/synapse/rest/health.py
@@ -0,0 +1,31 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.web.resource import Resource
+
+
+class HealthResource(Resource):
+ """A resource that does nothing except return a 200 with a body of `OK`,
+ which can be used as a health check.
+
+ Note: `SynapseRequest._should_log_request` ensures that requests to
+ `/health` do not get logged at INFO.
+ """
+
+ isLeaf = 1
+
+ def render_GET(self, request):
+ request.setHeader(b"Content-Type", b"text/plain")
+ return b"OK"
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index f4768a9e8b..cd8c246594 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -27,9 +27,7 @@ from typing import Dict, Optional
from urllib import parse as urlparse
import attr
-from canonicaljson import json
-from twisted.internet import defer
from twisted.internet.error import DNSLookupError
from synapse.api.errors import Codes, SynapseError
@@ -43,6 +41,7 @@ from synapse.http.servlet import parse_integer, parse_string
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.media.v1._base import get_filename_from_headers
+from synapse.util import json_encoder
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.stringutils import random_string
@@ -228,7 +227,7 @@ class PreviewUrlResource(DirectServeJsonResource):
else:
logger.info("Returning cached response")
- og = await make_deferred_yieldable(defer.maybeDeferred(observable.observe))
+ og = await make_deferred_yieldable(observable.observe())
respond_with_json_bytes(request, 200, og, send_cors=True)
async def _do_preview(self, url: str, user: str, ts: int) -> bytes:
@@ -355,7 +354,7 @@ class PreviewUrlResource(DirectServeJsonResource):
logger.debug("Calculated OG for %s as %s", url, og)
- jsonog = json.dumps(og)
+ jsonog = json_encoder.encode(og)
# store OG in history-aware DB cache
await self.store.store_url_cache(
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index 2193d8fdc5..cf039e7f7d 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -18,13 +18,12 @@ import abc
import logging
from typing import List, Tuple
-from canonicaljson import json
-
from twisted.internet import defer
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool
from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -327,7 +326,7 @@ class AccountDataStore(AccountDataWorkerStore):
Returns:
A deferred that completes once the account_data has been added.
"""
- content_json = json.dumps(content)
+ content_json = json_encoder.encode(content)
with self._account_data_id_gen.get_next() as next_id:
# no need to lock here as room_account_data has a unique constraint
@@ -373,7 +372,7 @@ class AccountDataStore(AccountDataWorkerStore):
Returns:
A deferred that completes once the account_data has been added.
"""
- content_json = json.dumps(content)
+ content_json = json_encoder.encode(content)
with self._account_data_id_gen.get_next() as next_id:
# no need to lock here as account_data has a unique constraint on
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 683afde52b..10de446065 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -172,7 +172,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self.get_latest_event_ids_in_room.invalidate((room_id,))
- self.get_unread_message_count_for_user.invalidate_many((room_id,))
self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,))
if not backfilled:
diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index 712c8d0264..216a5925fc 100644
--- a/synapse/storage/databases/main/client_ips.py
+++ b/synapse/storage/databases/main/client_ips.py
@@ -14,8 +14,7 @@
# limitations under the License.
import logging
-
-from twisted.internet import defer
+from typing import Dict, Optional, Tuple
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore
@@ -82,21 +81,19 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore):
"devices_last_seen", self._devices_last_seen_update
)
- @defer.inlineCallbacks
- def _remove_user_ip_nonunique(self, progress, batch_size):
+ async def _remove_user_ip_nonunique(self, progress, batch_size):
def f(conn):
txn = conn.cursor()
txn.execute("DROP INDEX IF EXISTS user_ips_user_ip")
txn.close()
- yield self.db_pool.runWithConnection(f)
- yield self.db_pool.updates._end_background_update(
+ await self.db_pool.runWithConnection(f)
+ await self.db_pool.updates._end_background_update(
"user_ips_drop_nonunique_index"
)
return 1
- @defer.inlineCallbacks
- def _analyze_user_ip(self, progress, batch_size):
+ async def _analyze_user_ip(self, progress, batch_size):
# Background update to analyze user_ips table before we run the
# deduplication background update. The table may not have been analyzed
# for ages due to the table locks.
@@ -106,14 +103,13 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore):
def user_ips_analyze(txn):
txn.execute("ANALYZE user_ips")
- yield self.db_pool.runInteraction("user_ips_analyze", user_ips_analyze)
+ await self.db_pool.runInteraction("user_ips_analyze", user_ips_analyze)
- yield self.db_pool.updates._end_background_update("user_ips_analyze")
+ await self.db_pool.updates._end_background_update("user_ips_analyze")
return 1
- @defer.inlineCallbacks
- def _remove_user_ip_dupes(self, progress, batch_size):
+ async def _remove_user_ip_dupes(self, progress, batch_size):
# This works function works by scanning the user_ips table in batches
# based on `last_seen`. For each row in a batch it searches the rest of
# the table to see if there are any duplicates, if there are then they
@@ -140,7 +136,7 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore):
return None
# Get a last seen that has roughly `batch_size` since `begin_last_seen`
- end_last_seen = yield self.db_pool.runInteraction(
+ end_last_seen = await self.db_pool.runInteraction(
"user_ips_dups_get_last_seen", get_last_seen
)
@@ -275,15 +271,14 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore):
txn, "user_ips_remove_dupes", {"last_seen": end_last_seen}
)
- yield self.db_pool.runInteraction("user_ips_dups_remove", remove)
+ await self.db_pool.runInteraction("user_ips_dups_remove", remove)
if last:
- yield self.db_pool.updates._end_background_update("user_ips_remove_dupes")
+ await self.db_pool.updates._end_background_update("user_ips_remove_dupes")
return batch_size
- @defer.inlineCallbacks
- def _devices_last_seen_update(self, progress, batch_size):
+ async def _devices_last_seen_update(self, progress, batch_size):
"""Background update to insert last seen info into devices table
"""
@@ -346,12 +341,12 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore):
return len(rows)
- updated = yield self.db_pool.runInteraction(
+ updated = await self.db_pool.runInteraction(
"_devices_last_seen_update", _devices_last_seen_update_txn
)
if not updated:
- yield self.db_pool.updates._end_background_update("devices_last_seen")
+ await self.db_pool.updates._end_background_update("devices_last_seen")
return updated
@@ -380,8 +375,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
if self.user_ips_max_age:
self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)
- @defer.inlineCallbacks
- def insert_client_ip(
+ async def insert_client_ip(
self, user_id, access_token, ip, user_agent, device_id, now=None
):
if not now:
@@ -392,7 +386,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
last_seen = self.client_ip_last_seen.get(key)
except KeyError:
last_seen = None
- yield self.populate_monthly_active_users(user_id)
+ await self.populate_monthly_active_users(user_id)
# Rate-limited inserts
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
return
@@ -461,25 +455,25 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
# Failed to upsert, log and continue
logger.error("Failed to insert client IP %r: %r", entry, e)
- @defer.inlineCallbacks
- def get_last_client_ip_by_device(self, user_id, device_id):
+ async def get_last_client_ip_by_device(
+ self, user_id: str, device_id: Optional[str]
+ ) -> Dict[Tuple[str, str], dict]:
"""For each device_id listed, give the user_ip it was last seen on
Args:
- user_id (str)
- device_id (str): If None fetches all devices for the user
+ user_id: The user to fetch devices for.
+ device_id: If None fetches all devices for the user
Returns:
- defer.Deferred: resolves to a dict, where the keys
- are (user_id, device_id) tuples. The values are also dicts, with
- keys giving the column names
+ A dictionary mapping a tuple of (user_id, device_id) to dicts, with
+ keys giving the column names from the devices table.
"""
keyvalues = {"user_id": user_id}
if device_id is not None:
keyvalues["device_id"] = device_id
- res = yield self.db_pool.simple_select_list(
+ res = await self.db_pool.simple_select_list(
table="devices",
keyvalues=keyvalues,
retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"),
@@ -501,8 +495,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
}
return ret
- @defer.inlineCallbacks
- def get_user_ip_and_agents(self, user):
+ async def get_user_ip_and_agents(self, user):
user_id = user.to_string()
results = {}
@@ -512,7 +505,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
user_agent, _, last_seen = self._batch_row_update[key]
results[(access_token, ip)] = (user_agent, last_seen)
- rows = yield self.db_pool.simple_select_list(
+ rows = await self.db_pool.simple_select_list(
table="user_ips",
keyvalues={"user_id": user_id},
retcols=["access_token", "ip", "user_agent", "last_seen"],
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 874ecdf8d2..76ec954f44 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -16,13 +16,12 @@
import logging
from typing import List, Tuple
-from canonicaljson import json
-
from twisted.internet import defer
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool
+from synapse.util import json_encoder
from synapse.util.caches.expiringcache import ExpiringCache
logger = logging.getLogger(__name__)
@@ -354,7 +353,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
)
rows = []
for destination, edu in remote_messages_by_destination.items():
- edu_json = json.dumps(edu)
+ edu_json = json_encoder.encode(edu)
rows.append((destination, stream_id, now_ms, edu_json))
txn.executemany(sql, rows)
@@ -432,7 +431,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
# Handle wildcard device_ids.
sql = "SELECT device_id FROM devices WHERE user_id = ?"
txn.execute(sql, (user_id,))
- message_json = json.dumps(messages_by_device["*"])
+ message_json = json_encoder.encode(messages_by_device["*"])
for row in txn:
# Add the message for all devices for this user on this
# server.
@@ -454,7 +453,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
# Only insert into the local inbox if the device exists on
# this server
device = row[0]
- message_json = json.dumps(messages_by_device[device])
+ message_json = json_encoder.encode(messages_by_device[device])
messages_json_for_user[device] = message_json
if messages_json_for_user:
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 88a7aadfc6..7a5f0bab05 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -17,8 +17,6 @@
import logging
from typing import List, Optional, Set, Tuple
-from canonicaljson import json
-
from twisted.internet import defer
from synapse.api.errors import Codes, StoreError
@@ -36,6 +34,7 @@ from synapse.storage.database import (
make_tuple_comparison_clause,
)
from synapse.types import Collection, get_verify_key_from_cross_signing_key
+from synapse.util import json_encoder
from synapse.util.caches.descriptors import (
Cache,
cached,
@@ -137,7 +136,9 @@ class DeviceWorkerStore(SQLBaseStore):
master_key_by_user = {}
self_signing_key_by_user = {}
for user in users:
- cross_signing_key = yield self.get_e2e_cross_signing_key(user, "master")
+ cross_signing_key = yield defer.ensureDeferred(
+ self.get_e2e_cross_signing_key(user, "master")
+ )
if cross_signing_key:
key_id, verify_key = get_verify_key_from_cross_signing_key(
cross_signing_key
@@ -150,8 +151,8 @@ class DeviceWorkerStore(SQLBaseStore):
"device_id": verify_key.version,
}
- cross_signing_key = yield self.get_e2e_cross_signing_key(
- user, "self_signing"
+ cross_signing_key = yield defer.ensureDeferred(
+ self.get_e2e_cross_signing_key(user, "self_signing")
)
if cross_signing_key:
key_id, verify_key = get_verify_key_from_cross_signing_key(
@@ -247,7 +248,7 @@ class DeviceWorkerStore(SQLBaseStore):
destination (str): The host the device updates are intended for
from_stream_id (int): The minimum stream_id to filter updates by, exclusive
query_map (Dict[(str, str): (int, str|None)]): Dictionary mapping
- user_id/device_id to update stream_id and the relevent json-encoded
+ user_id/device_id to update stream_id and the relevant json-encoded
opentracing context
Returns:
@@ -397,7 +398,7 @@ class DeviceWorkerStore(SQLBaseStore):
values={
"stream_id": stream_id,
"from_user_id": from_user_id,
- "user_ids": json.dumps(user_ids),
+ "user_ids": json_encoder.encode(user_ids),
},
)
@@ -600,7 +601,7 @@ class DeviceWorkerStore(SQLBaseStore):
between the requested tokens due to the limit.
The token returned can be used in a subsequent call to this
- function to get further updatees.
+ function to get further updates.
The updates are a list of 2-tuples of stream ID and the row data
"""
@@ -1032,7 +1033,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
txn,
table="device_lists_remote_cache",
keyvalues={"user_id": user_id, "device_id": device_id},
- values={"content": json.dumps(content)},
+ values={"content": json_encoder.encode(content)},
# we don't need to lock, because we assume we are the only thread
# updating this user's devices.
lock=False,
@@ -1088,7 +1089,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
{
"user_id": user_id,
"device_id": content["device_id"],
- "content": json.dumps(content),
+ "content": json_encoder.encode(content),
}
for content in devices
],
@@ -1209,7 +1210,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
"device_id": device_id,
"sent": False,
"ts": now,
- "opentracing_context": json.dumps(context)
+ "opentracing_context": json_encoder.encode(context)
if whitelisted_homeserver(destination)
else "{}",
}
diff --git a/synapse/storage/databases/main/directory.py b/synapse/storage/databases/main/directory.py
index 7819bfcbb3..037e02603c 100644
--- a/synapse/storage/databases/main/directory.py
+++ b/synapse/storage/databases/main/directory.py
@@ -14,30 +14,29 @@
# limitations under the License.
from collections import namedtuple
-from typing import Optional
-
-from twisted.internet import defer
+from typing import Iterable, Optional
from synapse.api.errors import SynapseError
from synapse.storage._base import SQLBaseStore
+from synapse.types import RoomAlias
from synapse.util.caches.descriptors import cached
RoomAliasMapping = namedtuple("RoomAliasMapping", ("room_id", "room_alias", "servers"))
class DirectoryWorkerStore(SQLBaseStore):
- @defer.inlineCallbacks
- def get_association_from_room_alias(self, room_alias):
- """ Get's the room_id and server list for a given room_alias
+ async def get_association_from_room_alias(
+ self, room_alias: RoomAlias
+ ) -> Optional[RoomAliasMapping]:
+ """Gets the room_id and server list for a given room_alias
Args:
- room_alias (RoomAlias)
+ room_alias: The alias to translate to an ID.
Returns:
- Deferred: results in namedtuple with keys "room_id" and
- "servers" or None if no association can be found
+ The room alias mapping or None if no association can be found.
"""
- room_id = yield self.db_pool.simple_select_one_onecol(
+ room_id = await self.db_pool.simple_select_one_onecol(
"room_aliases",
{"room_alias": room_alias.to_string()},
"room_id",
@@ -48,7 +47,7 @@ class DirectoryWorkerStore(SQLBaseStore):
if not room_id:
return None
- servers = yield self.db_pool.simple_select_onecol(
+ servers = await self.db_pool.simple_select_onecol(
"room_alias_servers",
{"room_alias": room_alias.to_string()},
"server",
@@ -79,18 +78,20 @@ class DirectoryWorkerStore(SQLBaseStore):
class DirectoryStore(DirectoryWorkerStore):
- @defer.inlineCallbacks
- def create_room_alias_association(self, room_alias, room_id, servers, creator=None):
+ async def create_room_alias_association(
+ self,
+ room_alias: RoomAlias,
+ room_id: str,
+ servers: Iterable[str],
+ creator: Optional[str] = None,
+ ) -> None:
""" Creates an association between a room alias and room_id/servers
Args:
- room_alias (RoomAlias)
- room_id (str)
- servers (list)
- creator (str): Optional user_id of creator.
-
- Returns:
- Deferred
+ room_alias: The alias to create.
+ room_id: The target of the alias.
+ servers: A list of servers through which it may be possible to join the room
+ creator: Optional user_id of creator.
"""
def alias_txn(txn):
@@ -118,24 +119,22 @@ class DirectoryStore(DirectoryWorkerStore):
)
try:
- ret = yield self.db_pool.runInteraction(
+ await self.db_pool.runInteraction(
"create_room_alias_association", alias_txn
)
except self.database_engine.module.IntegrityError:
raise SynapseError(
409, "Room alias %s already exists" % room_alias.to_string()
)
- return ret
- @defer.inlineCallbacks
- def delete_room_alias(self, room_alias):
- room_id = yield self.db_pool.runInteraction(
+ async def delete_room_alias(self, room_alias: RoomAlias) -> str:
+ room_id = await self.db_pool.runInteraction(
"delete_room_alias", self._delete_room_alias_txn, room_alias
)
return room_id
- def _delete_room_alias_txn(self, txn, room_alias):
+ def _delete_room_alias_txn(self, txn, room_alias: RoomAlias) -> str:
txn.execute(
"SELECT room_id FROM room_aliases WHERE room_alias = ?",
(room_alias.to_string(),),
diff --git a/synapse/storage/databases/main/e2e_room_keys.py b/synapse/storage/databases/main/e2e_room_keys.py
index 90152edc3c..2eeb9f97dc 100644
--- a/synapse/storage/databases/main/e2e_room_keys.py
+++ b/synapse/storage/databases/main/e2e_room_keys.py
@@ -14,18 +14,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from canonicaljson import json
-
-from twisted.internet import defer
-
from synapse.api.errors import StoreError
from synapse.logging.opentracing import log_kv, trace
from synapse.storage._base import SQLBaseStore, db_to_json
+from synapse.util import json_encoder
class EndToEndRoomKeyStore(SQLBaseStore):
- @defer.inlineCallbacks
- def update_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
+ async def update_e2e_room_key(
+ self, user_id, version, room_id, session_id, room_key
+ ):
"""Replaces the encrypted E2E room key for a given session in a given backup
Args:
@@ -38,7 +36,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
StoreError
"""
- yield self.db_pool.simple_update_one(
+ await self.db_pool.simple_update_one(
table="e2e_room_keys",
keyvalues={
"user_id": user_id,
@@ -50,13 +48,12 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"first_message_index": room_key["first_message_index"],
"forwarded_count": room_key["forwarded_count"],
"is_verified": room_key["is_verified"],
- "session_data": json.dumps(room_key["session_data"]),
+ "session_data": json_encoder.encode(room_key["session_data"]),
},
desc="update_e2e_room_key",
)
- @defer.inlineCallbacks
- def add_e2e_room_keys(self, user_id, version, room_keys):
+ async def add_e2e_room_keys(self, user_id, version, room_keys):
"""Bulk add room keys to a given backup.
Args:
@@ -77,7 +74,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"first_message_index": room_key["first_message_index"],
"forwarded_count": room_key["forwarded_count"],
"is_verified": room_key["is_verified"],
- "session_data": json.dumps(room_key["session_data"]),
+ "session_data": json_encoder.encode(room_key["session_data"]),
}
)
log_kv(
@@ -89,13 +86,12 @@ class EndToEndRoomKeyStore(SQLBaseStore):
}
)
- yield self.db_pool.simple_insert_many(
+ await self.db_pool.simple_insert_many(
table="e2e_room_keys", values=values, desc="add_e2e_room_keys"
)
@trace
- @defer.inlineCallbacks
- def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
+ async def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
room, or a given session.
@@ -110,7 +106,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
the backup (or for the specified room)
Returns:
- A deferred list of dicts giving the session_data and message metadata for
+ A list of dicts giving the session_data and message metadata for
these room keys.
"""
@@ -125,7 +121,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
if session_id:
keyvalues["session_id"] = session_id
- rows = yield self.db_pool.simple_select_list(
+ rows = await self.db_pool.simple_select_list(
table="e2e_room_keys",
keyvalues=keyvalues,
retcols=(
@@ -243,8 +239,9 @@ class EndToEndRoomKeyStore(SQLBaseStore):
)
@trace
- @defer.inlineCallbacks
- def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
+ async def delete_e2e_room_keys(
+ self, user_id, version, room_id=None, session_id=None
+ ):
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
room or a given session.
@@ -259,7 +256,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
the backup (or for the specified room)
Returns:
- A deferred of the deletion transaction
+ The deletion transaction
"""
keyvalues = {"user_id": user_id, "version": int(version)}
@@ -268,7 +265,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
if session_id:
keyvalues["session_id"] = session_id
- yield self.db_pool.simple_delete(
+ await self.db_pool.simple_delete(
table="e2e_room_keys", keyvalues=keyvalues, desc="delete_e2e_room_keys"
)
@@ -360,7 +357,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"user_id": user_id,
"version": new_version,
"algorithm": info["algorithm"],
- "auth_data": json.dumps(info["auth_data"]),
+ "auth_data": json_encoder.encode(info["auth_data"]),
},
)
@@ -387,7 +384,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
updatevalues = {}
if info is not None and "auth_data" in info:
- updatevalues["auth_data"] = json.dumps(info["auth_data"])
+ updatevalues["auth_data"] = json_encoder.encode(info["auth_data"])
if version_etag is not None:
updatevalues["etag"] = version_etag
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 40354b8304..f93e0d320d 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -14,24 +14,23 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import Dict, List, Tuple
+from typing import Dict, Iterable, List, Optional, Tuple
-from canonicaljson import encode_canonical_json, json
+from canonicaljson import encode_canonical_json
from twisted.enterprise.adbapi import Connection
-from twisted.internet import defer
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import make_in_list_sql_clause
+from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.iterutils import batch_iter
class EndToEndKeyWorkerStore(SQLBaseStore):
@trace
- @defer.inlineCallbacks
- def get_e2e_device_keys(
+ async def get_e2e_device_keys(
self, query_list, include_all_devices=False, include_deleted_devices=False
):
"""Fetch a list of device keys.
@@ -51,7 +50,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
if not query_list:
return {}
- results = yield self.db_pool.runInteraction(
+ results = await self.db_pool.runInteraction(
"get_e2e_device_keys",
self._get_e2e_device_keys_txn,
query_list,
@@ -174,8 +173,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
log_kv(result)
return result
- @defer.inlineCallbacks
- def get_e2e_one_time_keys(self, user_id, device_id, key_ids):
+ async def get_e2e_one_time_keys(
+ self, user_id: str, device_id: str, key_ids: List[str]
+ ) -> Dict[Tuple[str, str], str]:
"""Retrieve a number of one-time keys for a user
Args:
@@ -185,11 +185,10 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
retrieve
Returns:
- deferred resolving to Dict[(str, str), str]: map from (algorithm,
- key_id) to json string for key
+ A map from (algorithm, key_id) to json string for key
"""
- rows = yield self.db_pool.simple_select_many_batch(
+ rows = await self.db_pool.simple_select_many_batch(
table="e2e_one_time_keys_json",
column="key_id",
iterable=key_ids,
@@ -201,17 +200,21 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
log_kv({"message": "Fetched one time keys for user", "one_time_keys": result})
return result
- @defer.inlineCallbacks
- def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys):
+ async def add_e2e_one_time_keys(
+ self,
+ user_id: str,
+ device_id: str,
+ time_now: int,
+ new_keys: Iterable[Tuple[str, str, str]],
+ ) -> None:
"""Insert some new one time keys for a device. Errors if any of the
keys already exist.
Args:
- user_id(str): id of user to get keys for
- device_id(str): id of device to get keys for
- time_now(long): insertion time to record (ms since epoch)
- new_keys(iterable[(str, str, str)]: keys to add - each a tuple of
- (algorithm, key_id, key json)
+ user_id: id of user to get keys for
+ device_id: id of device to get keys for
+ time_now: insertion time to record (ms since epoch)
+ new_keys: keys to add - each a tuple of (algorithm, key_id, key json)
"""
def _add_e2e_one_time_keys(txn):
@@ -241,7 +244,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
txn, self.count_e2e_one_time_keys, (user_id, device_id)
)
- yield self.db_pool.runInteraction(
+ await self.db_pool.runInteraction(
"add_e2e_one_time_keys_insert", _add_e2e_one_time_keys
)
@@ -268,22 +271,23 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
"count_e2e_one_time_keys", _count_e2e_one_time_keys
)
- @defer.inlineCallbacks
- def get_e2e_cross_signing_key(self, user_id, key_type, from_user_id=None):
+ async def get_e2e_cross_signing_key(
+ self, user_id: str, key_type: str, from_user_id: Optional[str] = None
+ ) -> Optional[dict]:
"""Returns a user's cross-signing key.
Args:
- user_id (str): the user whose key is being requested
- key_type (str): the type of key that is being requested: either 'master'
+ user_id: the user whose key is being requested
+ key_type: the type of key that is being requested: either 'master'
for a master key, 'self_signing' for a self-signing key, or
'user_signing' for a user-signing key
- from_user_id (str): if specified, signatures made by this user on
+ from_user_id: if specified, signatures made by this user on
the self-signing key will be included in the result
Returns:
dict of the key data or None if not found
"""
- res = yield self.get_e2e_cross_signing_keys_bulk([user_id], from_user_id)
+ res = await self.get_e2e_cross_signing_keys_bulk([user_id], from_user_id)
user_keys = res.get(user_id)
if not user_keys:
return None
@@ -449,28 +453,26 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
return keys
- @defer.inlineCallbacks
- def get_e2e_cross_signing_keys_bulk(
- self, user_ids: List[str], from_user_id: str = None
- ) -> defer.Deferred:
+ async def get_e2e_cross_signing_keys_bulk(
+ self, user_ids: List[str], from_user_id: Optional[str] = None
+ ) -> Dict[str, Dict[str, dict]]:
"""Returns the cross-signing keys for a set of users.
Args:
- user_ids (list[str]): the users whose keys are being requested
- from_user_id (str): if specified, signatures made by this user on
+ user_ids: the users whose keys are being requested
+ from_user_id: if specified, signatures made by this user on
the self-signing keys will be included in the result
Returns:
- Deferred[dict[str, dict[str, dict]]]: map of user ID to key type to
- key data. If a user's cross-signing keys were not found, either
- their user ID will not be in the dict, or their user ID will map
- to None.
+ A map of user ID to key type to key data. If a user's cross-signing
+ keys were not found, either their user ID will not be in the dict,
+ or their user ID will map to None.
"""
- result = yield self._get_bare_e2e_cross_signing_keys_bulk(user_ids)
+ result = await self._get_bare_e2e_cross_signing_keys_bulk(user_ids)
if from_user_id:
- result = yield self.db_pool.runInteraction(
+ result = await self.db_pool.runInteraction(
"get_e2e_cross_signing_signatures",
self._get_e2e_cross_signing_signatures_txn,
result,
@@ -700,7 +702,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
values={
"user_id": user_id,
"keytype": key_type,
- "keydata": json.dumps(key),
+ "keydata": json_encoder.encode(key),
"stream_id": stream_id,
},
)
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index b8cefb4d5e..7c246d3e4c 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -17,11 +17,10 @@
import logging
from typing import List
-from canonicaljson import json
-
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import LoggingTransaction, SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool
+from synapse.util import json_encoder
from synapse.util.caches.descriptors import cachedInlineCallbacks
logger = logging.getLogger(__name__)
@@ -50,7 +49,7 @@ def _serialize_action(actions, is_highlight):
else:
if actions == DEFAULT_NOTIF_ACTION:
return ""
- return json.dumps(actions)
+ return json_encoder.encode(actions)
def _deserialize_action(actions, is_highlight):
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 4d8a24ce4b..1a68bf32cb 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -53,47 +53,6 @@ event_counter = Counter(
["type", "origin_type", "origin_entity"],
)
-STATE_EVENT_TYPES_TO_MARK_UNREAD = {
- EventTypes.Topic,
- EventTypes.Name,
- EventTypes.RoomAvatar,
- EventTypes.Tombstone,
-}
-
-
-def should_count_as_unread(event: EventBase, context: EventContext) -> bool:
- # Exclude rejected and soft-failed events.
- if context.rejected or event.internal_metadata.is_soft_failed():
- return False
-
- # Exclude notices.
- if (
- not event.is_state()
- and event.type == EventTypes.Message
- and event.content.get("msgtype") == "m.notice"
- ):
- return False
-
- # Exclude edits.
- relates_to = event.content.get("m.relates_to", {})
- if relates_to.get("rel_type") == RelationTypes.REPLACE:
- return False
-
- # Mark events that have a non-empty string body as unread.
- body = event.content.get("body")
- if isinstance(body, str) and body:
- return True
-
- # Mark some state events as unread.
- if event.is_state() and event.type in STATE_EVENT_TYPES_TO_MARK_UNREAD:
- return True
-
- # Mark encrypted events as unread.
- if not event.is_state() and event.type == EventTypes.Encrypted:
- return True
-
- return False
-
def encode_json(json_object):
"""
@@ -239,10 +198,6 @@ class PersistEventsStore:
event_counter.labels(event.type, origin_type, origin_entity).inc()
- self.store.get_unread_message_count_for_user.invalidate_many(
- (event.room_id,),
- )
-
for room_id, new_state in current_state_for_room.items():
self.store.get_current_state_ids.prefill((room_id,), new_state)
@@ -864,9 +819,8 @@ class PersistEventsStore:
"contains_url": (
"url" in event.content and isinstance(event.content["url"], str)
),
- "count_as_unread": should_count_as_unread(event, context),
}
- for event, context in events_and_contexts
+ for event, _ in events_and_contexts
],
)
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index a7b7393f6e..755b7a2a85 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -41,15 +41,9 @@ from synapse.replication.tcp.streams import BackfillStream
from synapse.replication.tcp.streams.events import EventsStream
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool
-from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.types import get_domain_from_id
-from synapse.util.caches.descriptors import (
- Cache,
- _CacheContext,
- cached,
- cachedInlineCallbacks,
-)
+from synapse.util.caches.descriptors import Cache, cached, cachedInlineCallbacks
from synapse.util.iterutils import batch_iter
from synapse.util.metrics import Measure
@@ -1364,84 +1358,6 @@ class EventsWorkerStore(SQLBaseStore):
desc="get_next_event_to_expire", func=get_next_event_to_expire_txn
)
- @cached(tree=True, cache_context=True)
- async def get_unread_message_count_for_user(
- self, room_id: str, user_id: str, cache_context: _CacheContext,
- ) -> int:
- """Retrieve the count of unread messages for the given room and user.
-
- Args:
- room_id: The ID of the room to count unread messages in.
- user_id: The ID of the user to count unread messages for.
-
- Returns:
- The number of unread messages for the given user in the given room.
- """
- with Measure(self._clock, "get_unread_message_count_for_user"):
- last_read_event_id = await self.get_last_receipt_event_id_for_user(
- user_id=user_id,
- room_id=room_id,
- receipt_type="m.read",
- on_invalidate=cache_context.invalidate,
- )
-
- return await self.db_pool.runInteraction(
- "get_unread_message_count_for_user",
- self._get_unread_message_count_for_user_txn,
- user_id,
- room_id,
- last_read_event_id,
- )
-
- def _get_unread_message_count_for_user_txn(
- self,
- txn: Cursor,
- user_id: str,
- room_id: str,
- last_read_event_id: Optional[str],
- ) -> int:
- if last_read_event_id:
- # Get the stream ordering for the last read event.
- stream_ordering = self.db_pool.simple_select_one_onecol_txn(
- txn=txn,
- table="events",
- keyvalues={"room_id": room_id, "event_id": last_read_event_id},
- retcol="stream_ordering",
- )
- else:
- # If there's no read receipt for that room, it probably means the user hasn't
- # opened it yet, in which case use the stream ID of their join event.
- # We can't just set it to 0 otherwise messages from other local users from
- # before this user joined will be counted as well.
- txn.execute(
- """
- SELECT stream_ordering FROM local_current_membership
- LEFT JOIN events USING (event_id, room_id)
- WHERE membership = 'join'
- AND user_id = ?
- AND room_id = ?
- """,
- (user_id, room_id),
- )
- row = txn.fetchone()
-
- if row is None:
- return 0
-
- stream_ordering = row[0]
-
- # Count the messages that qualify as unread after the stream ordering we've just
- # retrieved.
- sql = """
- SELECT COUNT(*) FROM events
- WHERE sender != ? AND room_id = ? AND stream_ordering > ? AND count_as_unread
- """
-
- txn.execute(sql, (user_id, room_id, stream_ordering))
- row = txn.fetchone()
-
- return row[0] if row else 0
-
AllNewEventsResult = namedtuple(
"AllNewEventsResult",
diff --git a/synapse/storage/databases/main/group_server.py b/synapse/storage/databases/main/group_server.py
index a98181f445..75ea6d4b2f 100644
--- a/synapse/storage/databases/main/group_server.py
+++ b/synapse/storage/databases/main/group_server.py
@@ -16,12 +16,11 @@
from typing import List, Tuple
-from canonicaljson import json
-
from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.storage._base import SQLBaseStore, db_to_json
+from synapse.util import json_encoder
# The category ID for the "default" category. We don't store as null in the
# database to avoid the fun of null != null
@@ -752,7 +751,7 @@ class GroupServerStore(GroupServerWorkerStore):
if profile is None:
insertion_values["profile"] = "{}"
else:
- update_values["profile"] = json.dumps(profile)
+ update_values["profile"] = json_encoder.encode(profile)
if is_public is None:
insertion_values["is_public"] = True
@@ -783,7 +782,7 @@ class GroupServerStore(GroupServerWorkerStore):
if profile is None:
insertion_values["profile"] = "{}"
else:
- update_values["profile"] = json.dumps(profile)
+ update_values["profile"] = json_encoder.encode(profile)
if is_public is None:
insertion_values["is_public"] = True
@@ -1007,7 +1006,7 @@ class GroupServerStore(GroupServerWorkerStore):
"group_id": group_id,
"user_id": user_id,
"valid_until_ms": remote_attestation["valid_until_ms"],
- "attestation_json": json.dumps(remote_attestation),
+ "attestation_json": json_encoder.encode(remote_attestation),
},
)
@@ -1131,7 +1130,7 @@ class GroupServerStore(GroupServerWorkerStore):
"is_admin": is_admin,
"membership": membership,
"is_publicised": is_publicised,
- "content": json.dumps(content),
+ "content": json_encoder.encode(content),
},
)
@@ -1143,7 +1142,7 @@ class GroupServerStore(GroupServerWorkerStore):
"group_id": group_id,
"user_id": user_id,
"type": "membership",
- "content": json.dumps(
+ "content": json_encoder.encode(
{"membership": membership, "content": content}
),
},
@@ -1171,7 +1170,7 @@ class GroupServerStore(GroupServerWorkerStore):
"group_id": group_id,
"user_id": user_id,
"valid_until_ms": remote_attestation["valid_until_ms"],
- "attestation_json": json.dumps(remote_attestation),
+ "attestation_json": json_encoder.encode(remote_attestation),
},
)
else:
@@ -1240,7 +1239,7 @@ class GroupServerStore(GroupServerWorkerStore):
keyvalues={"group_id": group_id, "user_id": user_id},
updatevalues={
"valid_until_ms": attestation["valid_until_ms"],
- "attestation_json": json.dumps(attestation),
+ "attestation_json": json_encoder.encode(attestation),
},
desc="update_remote_attestion",
)
diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index 02b01d9619..e71cdd2cb4 100644
--- a/synapse/storage/databases/main/monthly_active_users.py
+++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -15,8 +15,6 @@
import logging
from typing import List
-from twisted.internet import defer
-
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool, make_in_list_sql_clause
from synapse.util.caches.descriptors import cached
@@ -252,16 +250,12 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
"reap_monthly_active_users", _reap_users, reserved_users
)
- @defer.inlineCallbacks
- def upsert_monthly_active_user(self, user_id):
+ async def upsert_monthly_active_user(self, user_id: str) -> None:
"""Updates or inserts the user into the monthly active user table, which
is used to track the current MAU usage of the server
Args:
- user_id (str): user to add/update
-
- Returns:
- Deferred
+ user_id: user to add/update
"""
# Support user never to be included in MAU stats. Note I can't easily call this
# from upsert_monthly_active_user_txn because then I need a _txn form of
@@ -271,11 +265,11 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
# _initialise_reserved_users reasoning that it would be very strange to
# include a support user in this context.
- is_support = yield self.is_support_user(user_id)
+ is_support = await self.is_support_user(user_id)
if is_support:
return
- yield self.db_pool.runInteraction(
+ await self.db_pool.runInteraction(
"upsert_monthly_active_user", self.upsert_monthly_active_user_txn, user_id
)
@@ -322,8 +316,7 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
return is_insert
- @defer.inlineCallbacks
- def populate_monthly_active_users(self, user_id):
+ async def populate_monthly_active_users(self, user_id):
"""Checks on the state of monthly active user limits and optionally
add the user to the monthly active tables
@@ -332,14 +325,14 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
"""
if self._limit_usage_by_mau or self._mau_stats_only:
# Trial users and guests should not be included as part of MAU group
- is_guest = yield self.is_guest(user_id)
+ is_guest = await self.is_guest(user_id)
if is_guest:
return
- is_trial = yield self.is_trial_user(user_id)
+ is_trial = await self.is_trial_user(user_id)
if is_trial:
return
- last_seen_timestamp = yield self.user_last_seen_monthly_active(user_id)
+ last_seen_timestamp = await self.user_last_seen_monthly_active(user_id)
now = self.hs.get_clock().time_msec()
# We want to reduce to the total number of db writes, and are happy
@@ -352,10 +345,10 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
# False, there is no point in checking get_monthly_active_count - it
# adds no value and will break the logic if max_mau_value is exceeded.
if not self._limit_usage_by_mau:
- yield self.upsert_monthly_active_user(user_id)
+ await self.upsert_monthly_active_user(user_id)
else:
- count = yield self.get_monthly_active_count()
+ count = await self.get_monthly_active_count()
if count < self._max_mau_value:
- yield self.upsert_monthly_active_user(user_id)
+ await self.upsert_monthly_active_user(user_id)
elif now - last_seen_timestamp > LAST_SEEN_GRANULARITY:
- yield self.upsert_monthly_active_user(user_id)
+ await self.upsert_monthly_active_user(user_id)
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index 5fd899326a..19a0211a03 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -18,8 +18,6 @@ import abc
import logging
from typing import List, Tuple, Union
-from canonicaljson import json
-
from twisted.internet import defer
from synapse.push.baserules import list_with_base_rules
@@ -33,6 +31,7 @@ from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException
from synapse.storage.util.id_generators import ChainedIdGenerator
+from synapse.util import json_encoder
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -419,8 +418,8 @@ class PushRuleStore(PushRulesWorkerStore):
before=None,
after=None,
):
- conditions_json = json.dumps(conditions)
- actions_json = json.dumps(actions)
+ conditions_json = json_encoder.encode(conditions)
+ actions_json = json_encoder.encode(actions)
with self._push_rules_stream_id_gen.get_next() as ids:
stream_id, event_stream_ordering = ids
if before or after:
@@ -689,7 +688,7 @@ class PushRuleStore(PushRulesWorkerStore):
@defer.inlineCallbacks
def set_push_rule_actions(self, user_id, rule_id, actions, is_default_rule):
- actions_json = json.dumps(actions)
+ actions_json = json_encoder.encode(actions)
def set_push_rule_actions_txn(txn, stream_id, event_stream_ordering):
if is_default_rule:
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 6255977c92..1920a8a152 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -18,13 +18,12 @@ import abc
import logging
from typing import List, Tuple
-from canonicaljson import json
-
from twisted.internet import defer
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool
from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.util import json_encoder
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -459,7 +458,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
values={
"stream_id": stream_id,
"event_id": event_id,
- "data": json.dumps(data),
+ "data": json_encoder.encode(data),
},
# receipts_linearized has a unique constraint on
# (user_id, room_id, receipt_type), so no need to lock
@@ -585,7 +584,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
"room_id": room_id,
"receipt_type": receipt_type,
"user_id": user_id,
- "event_ids": json.dumps(event_ids),
- "data": json.dumps(data),
+ "event_ids": json_encoder.encode(event_ids),
+ "data": json_encoder.encode(data),
},
)
diff --git a/synapse/storage/databases/main/schema/delta/58/12unread_messages.sql b/synapse/storage/databases/main/schema/delta/58/12unread_messages.sql
deleted file mode 100644
index 531b532c73..0000000000
--- a/synapse/storage/databases/main/schema/delta/58/12unread_messages.sql
+++ /dev/null
@@ -1,18 +0,0 @@
-/* Copyright 2020 The Matrix.org Foundation C.I.C
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
--- Store a boolean value in the events table for whether the event should be counted in
--- the unread_count property of sync responses.
-ALTER TABLE events ADD COLUMN count_as_unread BOOLEAN;
diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py
index 2162d0712d..7f8d1880e5 100644
--- a/synapse/storage/databases/main/search.py
+++ b/synapse/storage/databases/main/search.py
@@ -16,8 +16,7 @@
import logging
import re
from collections import namedtuple
-
-from twisted.internet import defer
+from typing import List, Optional
from synapse.api.errors import SynapseError
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
@@ -114,8 +113,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME, self._background_reindex_gin_search
)
- @defer.inlineCallbacks
- def _background_reindex_search(self, progress, batch_size):
+ async def _background_reindex_search(self, progress, batch_size):
# we work through the events table from highest stream id to lowest
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
@@ -206,19 +204,18 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
return len(event_search_rows)
- result = yield self.db_pool.runInteraction(
+ result = await self.db_pool.runInteraction(
self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn
)
if not result:
- yield self.db_pool.updates._end_background_update(
+ await self.db_pool.updates._end_background_update(
self.EVENT_SEARCH_UPDATE_NAME
)
return result
- @defer.inlineCallbacks
- def _background_reindex_gin_search(self, progress, batch_size):
+ async def _background_reindex_gin_search(self, progress, batch_size):
"""This handles old synapses which used GIST indexes, if any;
converting them back to be GIN as per the actual schema.
"""
@@ -255,15 +252,14 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
conn.set_session(autocommit=False)
if isinstance(self.database_engine, PostgresEngine):
- yield self.db_pool.runWithConnection(create_index)
+ await self.db_pool.runWithConnection(create_index)
- yield self.db_pool.updates._end_background_update(
+ await self.db_pool.updates._end_background_update(
self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME
)
return 1
- @defer.inlineCallbacks
- def _background_reindex_search_order(self, progress, batch_size):
+ async def _background_reindex_search_order(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)
@@ -288,12 +284,12 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
)
conn.set_session(autocommit=False)
- yield self.db_pool.runWithConnection(create_index)
+ await self.db_pool.runWithConnection(create_index)
pg = dict(progress)
pg["have_added_indexes"] = True
- yield self.db_pool.runInteraction(
+ await self.db_pool.runInteraction(
self.EVENT_SEARCH_ORDER_UPDATE_NAME,
self.db_pool.updates._background_update_progress_txn,
self.EVENT_SEARCH_ORDER_UPDATE_NAME,
@@ -331,12 +327,12 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
return len(rows), True
- num_rows, finished = yield self.db_pool.runInteraction(
+ num_rows, finished = await self.db_pool.runInteraction(
self.EVENT_SEARCH_ORDER_UPDATE_NAME, reindex_search_txn
)
if not finished:
- yield self.db_pool.updates._end_background_update(
+ await self.db_pool.updates._end_background_update(
self.EVENT_SEARCH_ORDER_UPDATE_NAME
)
@@ -347,8 +343,7 @@ class SearchStore(SearchBackgroundUpdateStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super(SearchStore, self).__init__(database, db_conn, hs)
- @defer.inlineCallbacks
- def search_msgs(self, room_ids, search_term, keys):
+ async def search_msgs(self, room_ids, search_term, keys):
"""Performs a full text search over events with given keys.
Args:
@@ -425,7 +420,7 @@ class SearchStore(SearchBackgroundUpdateStore):
# entire table from the database.
sql += " ORDER BY rank DESC LIMIT 500"
- results = yield self.db_pool.execute(
+ results = await self.db_pool.execute(
"search_msgs", self.db_pool.cursor_to_dict, sql, *args
)
@@ -433,7 +428,7 @@ class SearchStore(SearchBackgroundUpdateStore):
# We set redact_behaviour to BLOCK here to prevent redacted events being returned in
# search results (which is a data leak)
- events = yield self.get_events_as_list(
+ events = await self.get_events_as_list(
[r["event_id"] for r in results],
redact_behaviour=EventRedactBehaviour.BLOCK,
)
@@ -442,11 +437,11 @@ class SearchStore(SearchBackgroundUpdateStore):
highlights = None
if isinstance(self.database_engine, PostgresEngine):
- highlights = yield self._find_highlights_in_postgres(search_query, events)
+ highlights = await self._find_highlights_in_postgres(search_query, events)
count_sql += " GROUP BY room_id"
- count_results = yield self.db_pool.execute(
+ count_results = await self.db_pool.execute(
"search_rooms_count", self.db_pool.cursor_to_dict, count_sql, *count_args
)
@@ -462,19 +457,25 @@ class SearchStore(SearchBackgroundUpdateStore):
"count": count,
}
- @defer.inlineCallbacks
- def search_rooms(self, room_ids, search_term, keys, limit, pagination_token=None):
+ async def search_rooms(
+ self,
+ room_ids: List[str],
+ search_term: str,
+ keys: List[str],
+ limit,
+ pagination_token: Optional[str] = None,
+ ) -> List[dict]:
"""Performs a full text search over events with given keys.
Args:
- room_id (list): The room_ids to search in
- search_term (str): Search term to search for
- keys (list): List of keys to search in, currently supports
- "content.body", "content.name", "content.topic"
- pagination_token (str): A pagination token previously returned
+ room_ids: The room_ids to search in
+ search_term: Search term to search for
+ keys: List of keys to search in, currently supports "content.body",
+ "content.name", "content.topic"
+ pagination_token: A pagination token previously returned
Returns:
- list of dicts
+ Each match as a dictionary.
"""
clauses = []
@@ -577,7 +578,7 @@ class SearchStore(SearchBackgroundUpdateStore):
args.append(limit)
- results = yield self.db_pool.execute(
+ results = await self.db_pool.execute(
"search_rooms", self.db_pool.cursor_to_dict, sql, *args
)
@@ -585,7 +586,7 @@ class SearchStore(SearchBackgroundUpdateStore):
# We set redact_behaviour to BLOCK here to prevent redacted events being returned in
# search results (which is a data leak)
- events = yield self.get_events_as_list(
+ events = await self.get_events_as_list(
[r["event_id"] for r in results],
redact_behaviour=EventRedactBehaviour.BLOCK,
)
@@ -594,11 +595,11 @@ class SearchStore(SearchBackgroundUpdateStore):
highlights = None
if isinstance(self.database_engine, PostgresEngine):
- highlights = yield self._find_highlights_in_postgres(search_query, events)
+ highlights = await self._find_highlights_in_postgres(search_query, events)
count_sql += " GROUP BY room_id"
- count_results = yield self.db_pool.execute(
+ count_results = await self.db_pool.execute(
"search_rooms_count", self.db_pool.cursor_to_dict, count_sql, *count_args
)
diff --git a/synapse/storage/databases/main/signatures.py b/synapse/storage/databases/main/signatures.py
index dae8e8bd29..be191dd870 100644
--- a/synapse/storage/databases/main/signatures.py
+++ b/synapse/storage/databases/main/signatures.py
@@ -15,8 +15,6 @@
from unpaddedbase64 import encode_base64
-from twisted.internet import defer
-
from synapse.storage._base import SQLBaseStore
from synapse.util.caches.descriptors import cached, cachedList
@@ -40,9 +38,8 @@ class SignatureWorkerStore(SQLBaseStore):
return self.db_pool.runInteraction("get_event_reference_hashes", f)
- @defer.inlineCallbacks
- def add_event_hashes(self, event_ids):
- hashes = yield self.get_event_reference_hashes(event_ids)
+ async def add_event_hashes(self, event_ids):
+ hashes = await self.get_event_reference_hashes(event_ids)
hashes = {
e_id: {k: encode_base64(v) for k, v in h.items() if k == "sha256"}
for e_id, h in hashes.items()
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index d73a8e8ab9..af21fe457a 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -16,8 +16,6 @@
import logging
import re
-from twisted.internet import defer
-
from synapse.api.constants import EventTypes, JoinRules
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.state import StateFilter
@@ -59,8 +57,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
"populate_user_directory_cleanup", self._populate_user_directory_cleanup
)
- @defer.inlineCallbacks
- def _populate_user_directory_createtables(self, progress, batch_size):
+ async def _populate_user_directory_createtables(self, progress, batch_size):
# Get all the rooms that we want to process.
def _make_staging_area(txn):
@@ -102,45 +99,43 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
self.db_pool.simple_insert_many_txn(txn, TEMP_TABLE + "_users", users)
- new_pos = yield self.get_max_stream_id_in_current_state_deltas()
- yield self.db_pool.runInteraction(
+ new_pos = await self.get_max_stream_id_in_current_state_deltas()
+ await self.db_pool.runInteraction(
"populate_user_directory_temp_build", _make_staging_area
)
- yield self.db_pool.simple_insert(
+ await self.db_pool.simple_insert(
TEMP_TABLE + "_position", {"position": new_pos}
)
- yield self.db_pool.updates._end_background_update(
+ await self.db_pool.updates._end_background_update(
"populate_user_directory_createtables"
)
return 1
- @defer.inlineCallbacks
- def _populate_user_directory_cleanup(self, progress, batch_size):
+ async def _populate_user_directory_cleanup(self, progress, batch_size):
"""
Update the user directory stream position, then clean up the old tables.
"""
- position = yield self.db_pool.simple_select_one_onecol(
+ position = await self.db_pool.simple_select_one_onecol(
TEMP_TABLE + "_position", None, "position"
)
- yield self.update_user_directory_stream_pos(position)
+ await self.update_user_directory_stream_pos(position)
def _delete_staging_area(txn):
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users")
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
- yield self.db_pool.runInteraction(
+ await self.db_pool.runInteraction(
"populate_user_directory_cleanup", _delete_staging_area
)
- yield self.db_pool.updates._end_background_update(
+ await self.db_pool.updates._end_background_update(
"populate_user_directory_cleanup"
)
return 1
- @defer.inlineCallbacks
- def _populate_user_directory_process_rooms(self, progress, batch_size):
+ async def _populate_user_directory_process_rooms(self, progress, batch_size):
"""
Args:
progress (dict)
@@ -151,7 +146,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
# If we don't have progress filed, delete everything.
if not progress:
- yield self.delete_all_from_user_dir()
+ await self.delete_all_from_user_dir()
def _get_next_batch(txn):
# Only fetch 250 rooms, so we don't fetch too many at once, even
@@ -176,13 +171,13 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
return rooms_to_work_on
- rooms_to_work_on = yield self.db_pool.runInteraction(
+ rooms_to_work_on = await self.db_pool.runInteraction(
"populate_user_directory_temp_read", _get_next_batch
)
# No more rooms -- complete the transaction.
if not rooms_to_work_on:
- yield self.db_pool.updates._end_background_update(
+ await self.db_pool.updates._end_background_update(
"populate_user_directory_process_rooms"
)
return 1
@@ -195,21 +190,19 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
processed_event_count = 0
for room_id, event_count in rooms_to_work_on:
- is_in_room = yield self.is_host_joined(room_id, self.server_name)
+ is_in_room = await self.is_host_joined(room_id, self.server_name)
if is_in_room:
- is_public = yield self.is_room_world_readable_or_publicly_joinable(
+ is_public = await self.is_room_world_readable_or_publicly_joinable(
room_id
)
- users_with_profile = yield defer.ensureDeferred(
- state.get_current_users_in_room(room_id)
- )
+ users_with_profile = await state.get_current_users_in_room(room_id)
user_ids = set(users_with_profile)
# Update each user in the user directory.
for user_id, profile in users_with_profile.items():
- yield self.update_profile_in_user_dir(
+ await self.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url
)
@@ -223,7 +216,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
to_insert.add(user_id)
if to_insert:
- yield self.add_users_in_public_rooms(room_id, to_insert)
+ await self.add_users_in_public_rooms(room_id, to_insert)
to_insert.clear()
else:
for user_id in user_ids:
@@ -243,22 +236,22 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
# If it gets too big, stop and write to the database
# to prevent storing too much in RAM.
if len(to_insert) >= self.SHARE_PRIVATE_WORKING_SET:
- yield self.add_users_who_share_private_room(
+ await self.add_users_who_share_private_room(
room_id, to_insert
)
to_insert.clear()
if to_insert:
- yield self.add_users_who_share_private_room(room_id, to_insert)
+ await self.add_users_who_share_private_room(room_id, to_insert)
to_insert.clear()
# We've finished a room. Delete it from the table.
- yield self.db_pool.simple_delete_one(
+ await self.db_pool.simple_delete_one(
TEMP_TABLE + "_rooms", {"room_id": room_id}
)
# Update the remaining counter.
progress["remaining"] -= 1
- yield self.db_pool.runInteraction(
+ await self.db_pool.runInteraction(
"populate_user_directory",
self.db_pool.updates._background_update_progress_txn,
"populate_user_directory_process_rooms",
@@ -273,13 +266,12 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
return processed_event_count
- @defer.inlineCallbacks
- def _populate_user_directory_process_users(self, progress, batch_size):
+ async def _populate_user_directory_process_users(self, progress, batch_size):
"""
If search_all_users is enabled, add all of the users to the user directory.
"""
if not self.hs.config.user_directory_search_all_users:
- yield self.db_pool.updates._end_background_update(
+ await self.db_pool.updates._end_background_update(
"populate_user_directory_process_users"
)
return 1
@@ -305,13 +297,13 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
return users_to_work_on
- users_to_work_on = yield self.db_pool.runInteraction(
+ users_to_work_on = await self.db_pool.runInteraction(
"populate_user_directory_temp_read", _get_next_batch
)
# No more users -- complete the transaction.
if not users_to_work_on:
- yield self.db_pool.updates._end_background_update(
+ await self.db_pool.updates._end_background_update(
"populate_user_directory_process_users"
)
return 1
@@ -322,18 +314,18 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
)
for user_id in users_to_work_on:
- profile = yield self.get_profileinfo(get_localpart_from_id(user_id))
- yield self.update_profile_in_user_dir(
+ profile = await self.get_profileinfo(get_localpart_from_id(user_id))
+ await self.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url
)
# We've finished processing a user. Delete it from the table.
- yield self.db_pool.simple_delete_one(
+ await self.db_pool.simple_delete_one(
TEMP_TABLE + "_users", {"user_id": user_id}
)
# Update the remaining counter.
progress["remaining"] -= 1
- yield self.db_pool.runInteraction(
+ await self.db_pool.runInteraction(
"populate_user_directory",
self.db_pool.updates._background_update_progress_txn,
"populate_user_directory_process_users",
@@ -342,8 +334,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
return len(users_to_work_on)
- @defer.inlineCallbacks
- def is_room_world_readable_or_publicly_joinable(self, room_id):
+ async def is_room_world_readable_or_publicly_joinable(self, room_id):
"""Check if the room is either world_readable or publically joinable
"""
@@ -353,20 +344,20 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
(EventTypes.RoomHistoryVisibility, ""),
)
- current_state_ids = yield self.get_filtered_current_state_ids(
+ current_state_ids = await self.get_filtered_current_state_ids(
room_id, StateFilter.from_types(types_to_filter)
)
join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
if join_rules_id:
- join_rule_ev = yield self.get_event(join_rules_id, allow_none=True)
+ join_rule_ev = await self.get_event(join_rules_id, allow_none=True)
if join_rule_ev:
if join_rule_ev.content.get("join_rule") == JoinRules.PUBLIC:
return True
hist_vis_id = current_state_ids.get((EventTypes.RoomHistoryVisibility, ""))
if hist_vis_id:
- hist_vis_ev = yield self.get_event(hist_vis_id, allow_none=True)
+ hist_vis_ev = await self.get_event(hist_vis_id, allow_none=True)
if hist_vis_ev:
if hist_vis_ev.content.get("history_visibility") == "world_readable":
return True
@@ -590,19 +581,18 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
"remove_from_user_dir", _remove_from_user_dir_txn
)
- @defer.inlineCallbacks
- def get_users_in_dir_due_to_room(self, room_id):
+ async def get_users_in_dir_due_to_room(self, room_id):
"""Get all user_ids that are in the room directory because they're
in the given room_id
"""
- user_ids_share_pub = yield self.db_pool.simple_select_onecol(
+ user_ids_share_pub = await self.db_pool.simple_select_onecol(
table="users_in_public_rooms",
keyvalues={"room_id": room_id},
retcol="user_id",
desc="get_users_in_dir_due_to_room",
)
- user_ids_share_priv = yield self.db_pool.simple_select_onecol(
+ user_ids_share_priv = await self.db_pool.simple_select_onecol(
table="users_who_share_private_rooms",
keyvalues={"room_id": room_id},
retcol="other_user_id",
@@ -645,8 +635,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
"remove_user_who_share_room", _remove_user_who_share_room_txn
)
- @defer.inlineCallbacks
- def get_user_dir_rooms_user_is_in(self, user_id):
+ async def get_user_dir_rooms_user_is_in(self, user_id):
"""
Returns the rooms that a user is in.
@@ -656,14 +645,14 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
Returns:
list: user_id
"""
- rows = yield self.db_pool.simple_select_onecol(
+ rows = await self.db_pool.simple_select_onecol(
table="users_who_share_private_rooms",
keyvalues={"user_id": user_id},
retcol="room_id",
desc="get_rooms_user_is_in",
)
- pub_rows = yield self.db_pool.simple_select_onecol(
+ pub_rows = await self.db_pool.simple_select_onecol(
table="users_in_public_rooms",
keyvalues={"user_id": user_id},
retcol="room_id",
@@ -674,32 +663,6 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
users.update(rows)
return list(users)
- @defer.inlineCallbacks
- def get_rooms_in_common_for_users(self, user_id, other_user_id):
- """Given two user_ids find out the list of rooms they share.
- """
- sql = """
- SELECT room_id FROM (
- SELECT c.room_id FROM current_state_events AS c
- INNER JOIN room_memberships AS m USING (event_id)
- WHERE type = 'm.room.member'
- AND m.membership = 'join'
- AND state_key = ?
- ) AS f1 INNER JOIN (
- SELECT c.room_id FROM current_state_events AS c
- INNER JOIN room_memberships AS m USING (event_id)
- WHERE type = 'm.room.member'
- AND m.membership = 'join'
- AND state_key = ?
- ) f2 USING (room_id)
- """
-
- rows = yield self.db_pool.execute(
- "get_rooms_in_common_for_users", None, sql, user_id, other_user_id
- )
-
- return [room_id for room_id, in rows]
-
def get_user_directory_stream_pos(self):
return self.db_pool.simple_select_one_onecol(
table="user_directory_stream_pos",
@@ -708,8 +671,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
desc="get_user_directory_stream_pos",
)
- @defer.inlineCallbacks
- def search_user_dir(self, user_id, search_term, limit):
+ async def search_user_dir(self, user_id, search_term, limit):
"""Searches for users in directory
Returns:
@@ -806,7 +768,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
# This should be unreachable.
raise Exception("Unrecognized database engine")
- results = yield self.db_pool.execute(
+ results = await self.db_pool.execute(
"search_user_dir", self.db_pool.cursor_to_dict, sql, *args
)
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index c63256d3bd..b3f76428b6 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -17,6 +17,7 @@ import logging
import re
import attr
+from canonicaljson import json
from twisted.internet import defer, task
@@ -24,6 +25,9 @@ from synapse.logging import context
logger = logging.getLogger(__name__)
+# Create a custom encoder to reduce the whitespace produced by JSON encoding.
+json_encoder = json.JSONEncoder(separators=(",", ":"))
+
def unwrapFirstError(failure):
# defer.gatherResults and DeferredLists wrap failures.
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 9b09c08b89..c2d72a82cf 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -192,7 +192,7 @@ class Cache(object):
callbacks = [callback] if callback else []
self.check_thread()
observable = ObservableDeferred(value, consumeErrors=True)
- observer = defer.maybeDeferred(observable.observe)
+ observer = observable.observe()
entry = CacheEntry(deferred=observable, callbacks=callbacks)
existing_entry = self._pending_deferred_cache.pop(key, None)
diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py
index eab78dd256..0e445e01d7 100644
--- a/synapse/util/frozenutils.py
+++ b/synapse/util/frozenutils.py
@@ -63,5 +63,8 @@ def _handle_frozendict(obj):
)
-# A JSONEncoder which is capable of encoding frozendicts without barfing
-frozendict_json_encoder = json.JSONEncoder(default=_handle_frozendict)
+# A JSONEncoder which is capable of encoding frozendicts without barfing.
+# Additionally reduce the whitespace produced by JSON encoding.
+frozendict_json_encoder = json.JSONEncoder(
+ default=_handle_frozendict, separators=(",", ":"),
+)
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index ec61e14423..a805f51df1 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -13,14 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import inspect
import logging
from functools import wraps
from prometheus_client import Counter
-from twisted.internet import defer
-
from synapse.logging.context import LoggingContext, current_context
from synapse.metrics import InFlightGauge
@@ -62,25 +59,31 @@ in_flight = InFlightGauge(
def measure_func(name=None):
- def wrapper(func):
- block_name = func.__name__ if name is None else name
+ """
+ Used to decorate an async function with a `Measure` context manager.
+
+ Usage:
- if inspect.iscoroutinefunction(func):
+ @measure_func()
+ async def foo(...):
+ ...
- @wraps(func)
- async def measured_func(self, *args, **kwargs):
- with Measure(self.clock, block_name):
- r = await func(self, *args, **kwargs)
- return r
+ Which is analogous to:
- else:
+ async def foo(...):
+ with Measure(...):
+ ...
+
+ """
+
+ def wrapper(func):
+ block_name = func.__name__ if name is None else name
- @wraps(func)
- @defer.inlineCallbacks
- def measured_func(self, *args, **kwargs):
- with Measure(self.clock, block_name):
- r = yield func(self, *args, **kwargs)
- return r
+ @wraps(func)
+ async def measured_func(self, *args, **kwargs):
+ with Measure(self.clock, block_name):
+ r = await func(self, *args, **kwargs)
+ return r
return measured_func
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 8794317caa..919988d3bc 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -15,8 +15,6 @@
import logging
import random
-from twisted.internet import defer
-
import synapse.logging.context
from synapse.api.errors import CodeMessageException
@@ -54,8 +52,7 @@ class NotRetryingDestination(Exception):
self.destination = destination
-@defer.inlineCallbacks
-def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs):
+async def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs):
"""For a given destination check if we have previously failed to
send a request there and are waiting before retrying the destination.
If we are not ready to retry the destination, this will raise a
@@ -73,9 +70,9 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs)
Example usage:
try:
- limiter = yield get_retry_limiter(destination, clock, store)
+ limiter = await get_retry_limiter(destination, clock, store)
with limiter:
- response = yield do_request()
+ response = await do_request()
except NotRetryingDestination:
# We aren't ready to retry that destination.
raise
@@ -83,7 +80,7 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs)
failure_ts = None
retry_last_ts, retry_interval = (0, 0)
- retry_timings = yield store.get_destination_retry_timings(destination)
+ retry_timings = await store.get_destination_retry_timings(destination)
if retry_timings:
failure_ts = retry_timings["failure_ts"]
@@ -222,10 +219,9 @@ class RetryDestinationLimiter(object):
if self.failure_ts is None:
self.failure_ts = retry_last_ts
- @defer.inlineCallbacks
- def store_retry_timings():
+ async def store_retry_timings():
try:
- yield self.store.set_destination_retry_timings(
+ await self.store.set_destination_retry_timings(
self.destination,
self.failure_ts,
retry_last_ts,
diff --git a/tests/api/test_auth.py b/tests/api/test_auth.py
index 0bfb86bf1f..5d45689c8c 100644
--- a/tests/api/test_auth.py
+++ b/tests/api/test_auth.py
@@ -62,12 +62,15 @@ class AuthTestCase(unittest.TestCase):
# this is overridden for the appservice tests
self.store.get_app_service_by_token = Mock(return_value=None)
+ self.store.insert_client_ip = Mock(return_value=defer.succeed(None))
self.store.is_support_user = Mock(return_value=defer.succeed(False))
@defer.inlineCallbacks
def test_get_user_by_req_user_valid_token(self):
user_info = {"name": self.test_user, "token_id": "ditto", "device_id": "device"}
- self.store.get_user_by_access_token = Mock(return_value=user_info)
+ self.store.get_user_by_access_token = Mock(
+ return_value=defer.succeed(user_info)
+ )
request = Mock(args={})
request.args[b"access_token"] = [self.test_token]
@@ -76,23 +79,25 @@ class AuthTestCase(unittest.TestCase):
self.assertEquals(requester.user.to_string(), self.test_user)
def test_get_user_by_req_user_bad_token(self):
- self.store.get_user_by_access_token = Mock(return_value=None)
+ self.store.get_user_by_access_token = Mock(return_value=defer.succeed(None))
request = Mock(args={})
request.args[b"access_token"] = [self.test_token]
request.requestHeaders.getRawHeaders = mock_getRawHeaders()
- d = self.auth.get_user_by_req(request)
+ d = defer.ensureDeferred(self.auth.get_user_by_req(request))
f = self.failureResultOf(d, InvalidClientTokenError).value
self.assertEqual(f.code, 401)
self.assertEqual(f.errcode, "M_UNKNOWN_TOKEN")
def test_get_user_by_req_user_missing_token(self):
user_info = {"name": self.test_user, "token_id": "ditto"}
- self.store.get_user_by_access_token = Mock(return_value=user_info)
+ self.store.get_user_by_access_token = Mock(
+ return_value=defer.succeed(user_info)
+ )
request = Mock(args={})
request.requestHeaders.getRawHeaders = mock_getRawHeaders()
- d = self.auth.get_user_by_req(request)
+ d = defer.ensureDeferred(self.auth.get_user_by_req(request))
f = self.failureResultOf(d, MissingClientTokenError).value
self.assertEqual(f.code, 401)
self.assertEqual(f.errcode, "M_MISSING_TOKEN")
@@ -103,7 +108,7 @@ class AuthTestCase(unittest.TestCase):
token="foobar", url="a_url", sender=self.test_user, ip_range_whitelist=None
)
self.store.get_app_service_by_token = Mock(return_value=app_service)
- self.store.get_user_by_access_token = Mock(return_value=None)
+ self.store.get_user_by_access_token = Mock(return_value=defer.succeed(None))
request = Mock(args={})
request.getClientIP.return_value = "127.0.0.1"
@@ -123,7 +128,7 @@ class AuthTestCase(unittest.TestCase):
ip_range_whitelist=IPSet(["192.168/16"]),
)
self.store.get_app_service_by_token = Mock(return_value=app_service)
- self.store.get_user_by_access_token = Mock(return_value=None)
+ self.store.get_user_by_access_token = Mock(return_value=defer.succeed(None))
request = Mock(args={})
request.getClientIP.return_value = "192.168.10.10"
@@ -142,25 +147,25 @@ class AuthTestCase(unittest.TestCase):
ip_range_whitelist=IPSet(["192.168/16"]),
)
self.store.get_app_service_by_token = Mock(return_value=app_service)
- self.store.get_user_by_access_token = Mock(return_value=None)
+ self.store.get_user_by_access_token = Mock(return_value=defer.succeed(None))
request = Mock(args={})
request.getClientIP.return_value = "131.111.8.42"
request.args[b"access_token"] = [self.test_token]
request.requestHeaders.getRawHeaders = mock_getRawHeaders()
- d = self.auth.get_user_by_req(request)
+ d = defer.ensureDeferred(self.auth.get_user_by_req(request))
f = self.failureResultOf(d, InvalidClientTokenError).value
self.assertEqual(f.code, 401)
self.assertEqual(f.errcode, "M_UNKNOWN_TOKEN")
def test_get_user_by_req_appservice_bad_token(self):
self.store.get_app_service_by_token = Mock(return_value=None)
- self.store.get_user_by_access_token = Mock(return_value=None)
+ self.store.get_user_by_access_token = Mock(return_value=defer.succeed(None))
request = Mock(args={})
request.args[b"access_token"] = [self.test_token]
request.requestHeaders.getRawHeaders = mock_getRawHeaders()
- d = self.auth.get_user_by_req(request)
+ d = defer.ensureDeferred(self.auth.get_user_by_req(request))
f = self.failureResultOf(d, InvalidClientTokenError).value
self.assertEqual(f.code, 401)
self.assertEqual(f.errcode, "M_UNKNOWN_TOKEN")
@@ -168,11 +173,11 @@ class AuthTestCase(unittest.TestCase):
def test_get_user_by_req_appservice_missing_token(self):
app_service = Mock(token="foobar", url="a_url", sender=self.test_user)
self.store.get_app_service_by_token = Mock(return_value=app_service)
- self.store.get_user_by_access_token = Mock(return_value=None)
+ self.store.get_user_by_access_token = Mock(return_value=defer.succeed(None))
request = Mock(args={})
request.requestHeaders.getRawHeaders = mock_getRawHeaders()
- d = self.auth.get_user_by_req(request)
+ d = defer.ensureDeferred(self.auth.get_user_by_req(request))
f = self.failureResultOf(d, MissingClientTokenError).value
self.assertEqual(f.code, 401)
self.assertEqual(f.errcode, "M_MISSING_TOKEN")
@@ -185,7 +190,11 @@ class AuthTestCase(unittest.TestCase):
)
app_service.is_interested_in_user = Mock(return_value=True)
self.store.get_app_service_by_token = Mock(return_value=app_service)
- self.store.get_user_by_access_token = Mock(return_value=None)
+ # This just needs to return a truth-y value.
+ self.store.get_user_by_id = Mock(
+ return_value=defer.succeed({"is_guest": False})
+ )
+ self.store.get_user_by_access_token = Mock(return_value=defer.succeed(None))
request = Mock(args={})
request.getClientIP.return_value = "127.0.0.1"
@@ -204,20 +213,22 @@ class AuthTestCase(unittest.TestCase):
)
app_service.is_interested_in_user = Mock(return_value=False)
self.store.get_app_service_by_token = Mock(return_value=app_service)
- self.store.get_user_by_access_token = Mock(return_value=None)
+ self.store.get_user_by_access_token = Mock(return_value=defer.succeed(None))
request = Mock(args={})
request.getClientIP.return_value = "127.0.0.1"
request.args[b"access_token"] = [self.test_token]
request.args[b"user_id"] = [masquerading_user_id]
request.requestHeaders.getRawHeaders = mock_getRawHeaders()
- d = self.auth.get_user_by_req(request)
+ d = defer.ensureDeferred(self.auth.get_user_by_req(request))
self.failureResultOf(d, AuthError)
@defer.inlineCallbacks
def test_get_user_from_macaroon(self):
self.store.get_user_by_access_token = Mock(
- return_value={"name": "@baldrick:matrix.org", "device_id": "device"}
+ return_value=defer.succeed(
+ {"name": "@baldrick:matrix.org", "device_id": "device"}
+ )
)
user_id = "@baldrick:matrix.org"
@@ -241,8 +252,8 @@ class AuthTestCase(unittest.TestCase):
@defer.inlineCallbacks
def test_get_guest_user_from_macaroon(self):
- self.store.get_user_by_id = Mock(return_value={"is_guest": True})
- self.store.get_user_by_access_token = Mock(return_value=None)
+ self.store.get_user_by_id = Mock(return_value=defer.succeed({"is_guest": True}))
+ self.store.get_user_by_access_token = Mock(return_value=defer.succeed(None))
user_id = "@baldrick:matrix.org"
macaroon = pymacaroons.Macaroon(
@@ -282,16 +293,20 @@ class AuthTestCase(unittest.TestCase):
def get_user(tok):
if token != tok:
- return None
- return {
- "name": USER_ID,
- "is_guest": False,
- "token_id": 1234,
- "device_id": "DEVICE",
- }
+ return defer.succeed(None)
+ return defer.succeed(
+ {
+ "name": USER_ID,
+ "is_guest": False,
+ "token_id": 1234,
+ "device_id": "DEVICE",
+ }
+ )
self.store.get_user_by_access_token = get_user
- self.store.get_user_by_id = Mock(return_value={"is_guest": False})
+ self.store.get_user_by_id = Mock(
+ return_value=defer.succeed({"is_guest": False})
+ )
# check the token works
request = Mock(args={})
diff --git a/tests/api/test_filtering.py b/tests/api/test_filtering.py
index 4e67503cf0..1fab1d6b69 100644
--- a/tests/api/test_filtering.py
+++ b/tests/api/test_filtering.py
@@ -375,8 +375,10 @@ class FilteringTestCase(unittest.TestCase):
event = MockEvent(sender="@foo:bar", type="m.profile")
events = [event]
- user_filter = yield self.filtering.get_user_filter(
- user_localpart=user_localpart, filter_id=filter_id
+ user_filter = yield defer.ensureDeferred(
+ self.filtering.get_user_filter(
+ user_localpart=user_localpart, filter_id=filter_id
+ )
)
results = user_filter.filter_presence(events=events)
@@ -396,8 +398,10 @@ class FilteringTestCase(unittest.TestCase):
)
events = [event]
- user_filter = yield self.filtering.get_user_filter(
- user_localpart=user_localpart + "2", filter_id=filter_id
+ user_filter = yield defer.ensureDeferred(
+ self.filtering.get_user_filter(
+ user_localpart=user_localpart + "2", filter_id=filter_id
+ )
)
results = user_filter.filter_presence(events=events)
@@ -412,8 +416,10 @@ class FilteringTestCase(unittest.TestCase):
event = MockEvent(sender="@foo:bar", type="m.room.topic", room_id="!foo:bar")
events = [event]
- user_filter = yield self.filtering.get_user_filter(
- user_localpart=user_localpart, filter_id=filter_id
+ user_filter = yield defer.ensureDeferred(
+ self.filtering.get_user_filter(
+ user_localpart=user_localpart, filter_id=filter_id
+ )
)
results = user_filter.filter_room_state(events=events)
@@ -430,8 +436,10 @@ class FilteringTestCase(unittest.TestCase):
)
events = [event]
- user_filter = yield self.filtering.get_user_filter(
- user_localpart=user_localpart, filter_id=filter_id
+ user_filter = yield defer.ensureDeferred(
+ self.filtering.get_user_filter(
+ user_localpart=user_localpart, filter_id=filter_id
+ )
)
results = user_filter.filter_room_state(events)
@@ -465,8 +473,10 @@ class FilteringTestCase(unittest.TestCase):
self.assertEquals(
user_filter_json,
(
- yield self.datastore.get_user_filter(
- user_localpart=user_localpart, filter_id=0
+ yield defer.ensureDeferred(
+ self.datastore.get_user_filter(
+ user_localpart=user_localpart, filter_id=0
+ )
)
),
)
@@ -479,8 +489,10 @@ class FilteringTestCase(unittest.TestCase):
user_localpart=user_localpart, user_filter=user_filter_json
)
- filter = yield self.filtering.get_user_filter(
- user_localpart=user_localpart, filter_id=filter_id
+ filter = yield defer.ensureDeferred(
+ self.filtering.get_user_filter(
+ user_localpart=user_localpart, filter_id=filter_id
+ )
)
self.assertEquals(filter.get_filter_json(), user_filter_json)
diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py
index 628f7d8db0..2a0b7c1b56 100644
--- a/tests/handlers/test_appservice.py
+++ b/tests/handlers/test_appservice.py
@@ -120,7 +120,7 @@ class AppServiceHandlerTestCase(unittest.TestCase):
self.mock_as_api.query_alias.return_value = make_awaitable(True)
self.mock_store.get_app_services.return_value = services
- self.mock_store.get_association_from_room_alias.return_value = defer.succeed(
+ self.mock_store.get_association_from_room_alias.return_value = make_awaitable(
Mock(room_id=room_id, servers=servers)
)
diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py
index 5878f74175..b7d0adb10e 100644
--- a/tests/handlers/test_typing.py
+++ b/tests/handlers/test_typing.py
@@ -126,10 +126,10 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
self.room_members = []
- def check_user_in_room(room_id, user_id):
+ async def check_user_in_room(room_id, user_id):
if user_id not in [u.to_string() for u in self.room_members]:
raise AuthError(401, "User is not in the room")
- return defer.succeed(None)
+ return None
hs.get_auth().check_user_in_room = check_user_in_room
diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py
index f16eef15f7..17d0aae2e9 100644
--- a/tests/rest/admin/test_user.py
+++ b/tests/rest/admin/test_user.py
@@ -20,6 +20,8 @@ import urllib.parse
from mock import Mock
+from twisted.internet import defer
+
import synapse.rest.admin
from synapse.api.constants import UserTypes
from synapse.api.errors import HttpResponseException, ResourceLimitError
@@ -335,7 +337,9 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
store = self.hs.get_datastore()
# Set monthly active users to the limit
- store.get_monthly_active_count = Mock(return_value=self.hs.config.max_mau_value)
+ store.get_monthly_active_count = Mock(
+ return_value=defer.succeed(self.hs.config.max_mau_value)
+ )
# Check that the blocking of monthly active users is working as expected
# The registration of a new user fails due to the limit
self.get_failure(
@@ -588,7 +592,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
# Set monthly active users to the limit
self.store.get_monthly_active_count = Mock(
- return_value=self.hs.config.max_mau_value
+ return_value=defer.succeed(self.hs.config.max_mau_value)
)
# Check that the blocking of monthly active users is working as expected
# The registration of a new user fails due to the limit
@@ -628,7 +632,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
# Set monthly active users to the limit
self.store.get_monthly_active_count = Mock(
- return_value=self.hs.config.max_mau_value
+ return_value=defer.succeed(self.hs.config.max_mau_value)
)
# Check that the blocking of monthly active users is working as expected
# The registration of a new user fails due to the limit
diff --git a/tests/rest/client/v1/test_profile.py b/tests/rest/client/v1/test_profile.py
index 8df58b4a63..ace0a3c08d 100644
--- a/tests/rest/client/v1/test_profile.py
+++ b/tests/rest/client/v1/test_profile.py
@@ -70,8 +70,8 @@ class MockHandlerProfileTestCase(unittest.TestCase):
profile_handler=self.mock_handler,
)
- def _get_user_by_req(request=None, allow_guest=False):
- return defer.succeed(synapse.types.create_requester(myid))
+ async def _get_user_by_req(request=None, allow_guest=False):
+ return synapse.types.create_requester(myid)
hs.get_auth().get_user_by_req = _get_user_by_req
diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py
index 5ccda8b2bd..ef6b775ed2 100644
--- a/tests/rest/client/v1/test_rooms.py
+++ b/tests/rest/client/v1/test_rooms.py
@@ -23,8 +23,6 @@ from urllib import parse as urlparse
from mock import Mock
-from twisted.internet import defer
-
import synapse.rest.admin
from synapse.api.constants import EventContentFields, EventTypes, Membership
from synapse.handlers.pagination import PurgeStatus
@@ -51,8 +49,8 @@ class RoomBase(unittest.HomeserverTestCase):
self.hs.get_federation_handler = Mock(return_value=Mock())
- def _insert_client_ip(*args, **kwargs):
- return defer.succeed(None)
+ async def _insert_client_ip(*args, **kwargs):
+ return None
self.hs.get_datastore().insert_client_ip = _insert_client_ip
diff --git a/tests/rest/client/v1/test_typing.py b/tests/rest/client/v1/test_typing.py
index 18260bb90e..94d2bf2eb1 100644
--- a/tests/rest/client/v1/test_typing.py
+++ b/tests/rest/client/v1/test_typing.py
@@ -46,7 +46,7 @@ class RoomTypingTestCase(unittest.HomeserverTestCase):
hs.get_handlers().federation_handler = Mock()
- def get_user_by_access_token(token=None, allow_guest=False):
+ async def get_user_by_access_token(token=None, allow_guest=False):
return {
"user": UserID.from_string(self.auth_user_id),
"token_id": 1,
@@ -55,8 +55,8 @@ class RoomTypingTestCase(unittest.HomeserverTestCase):
hs.get_auth().get_user_by_access_token = get_user_by_access_token
- def _insert_client_ip(*args, **kwargs):
- return defer.succeed(None)
+ async def _insert_client_ip(*args, **kwargs):
+ return None
hs.get_datastore().insert_client_ip = _insert_client_ip
diff --git a/tests/rest/client/v1/utils.py b/tests/rest/client/v1/utils.py
index 51941f99f9..8933b560d2 100644
--- a/tests/rest/client/v1/utils.py
+++ b/tests/rest/client/v1/utils.py
@@ -165,26 +165,6 @@ class RestHelper(object):
return channel.json_body
- def redact(self, room_id, event_id, txn_id=None, tok=None, expect_code=200):
- if txn_id is None:
- txn_id = "m%s" % (str(time.time()))
-
- path = "/_matrix/client/r0/rooms/%s/redact/%s/%s" % (room_id, event_id, txn_id)
- if tok:
- path = path + "?access_token=%s" % tok
-
- request, channel = make_request(
- self.hs.get_reactor(), "PUT", path, json.dumps({}).encode("utf8")
- )
- render(request, self.resource, self.hs.get_reactor())
-
- assert int(channel.result["code"]) == expect_code, (
- "Expected: %d, got: %d, resp: %r"
- % (expect_code, int(channel.result["code"]), channel.result["body"])
- )
-
- return channel.json_body
-
def _read_write_state(
self,
room_id: str,
diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py
index 7deaf5b24a..53a43038f0 100644
--- a/tests/rest/client/v2_alpha/test_register.py
+++ b/tests/rest/client/v2_alpha/test_register.py
@@ -116,8 +116,8 @@ class RegisterRestServletTestCase(unittest.HomeserverTestCase):
self.assertEquals(channel.result["code"], b"200", channel.result)
self.assertDictContainsSubset(det_data, channel.json_body)
+ @override_config({"enable_registration": False})
def test_POST_disabled_registration(self):
- self.hs.config.enable_registration = False
request_data = json.dumps({"username": "kermit", "password": "monkey"})
self.auth_result = (None, {"username": "kermit", "password": "monkey"}, None)
diff --git a/tests/rest/client/v2_alpha/test_sync.py b/tests/rest/client/v2_alpha/test_sync.py
index a31e44c97e..fa3a3ec1bd 100644
--- a/tests/rest/client/v2_alpha/test_sync.py
+++ b/tests/rest/client/v2_alpha/test_sync.py
@@ -16,9 +16,9 @@
import json
import synapse.rest.admin
-from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
+from synapse.api.constants import EventContentFields, EventTypes
from synapse.rest.client.v1 import login, room
-from synapse.rest.client.v2_alpha import read_marker, sync
+from synapse.rest.client.v2_alpha import sync
from tests import unittest
from tests.server import TimedOutException
@@ -324,156 +324,3 @@ class SyncTypingTests(unittest.HomeserverTestCase):
"GET", sync_url % (access_token, next_batch)
)
self.assertRaises(TimedOutException, self.render, request)
-
-
-class UnreadMessagesTestCase(unittest.HomeserverTestCase):
- servlets = [
- synapse.rest.admin.register_servlets,
- login.register_servlets,
- read_marker.register_servlets,
- room.register_servlets,
- sync.register_servlets,
- ]
-
- def prepare(self, reactor, clock, hs):
- self.url = "/sync?since=%s"
- self.next_batch = "s0"
-
- # Register the first user (used to check the unread counts).
- self.user_id = self.register_user("kermit", "monkey")
- self.tok = self.login("kermit", "monkey")
-
- # Create the room we'll check unread counts for.
- self.room_id = self.helper.create_room_as(self.user_id, tok=self.tok)
-
- # Register the second user (used to send events to the room).
- self.user2 = self.register_user("kermit2", "monkey")
- self.tok2 = self.login("kermit2", "monkey")
-
- # Change the power levels of the room so that the second user can send state
- # events.
- self.helper.send_state(
- self.room_id,
- EventTypes.PowerLevels,
- {
- "users": {self.user_id: 100, self.user2: 100},
- "users_default": 0,
- "events": {
- "m.room.name": 50,
- "m.room.power_levels": 100,
- "m.room.history_visibility": 100,
- "m.room.canonical_alias": 50,
- "m.room.avatar": 50,
- "m.room.tombstone": 100,
- "m.room.server_acl": 100,
- "m.room.encryption": 100,
- },
- "events_default": 0,
- "state_default": 50,
- "ban": 50,
- "kick": 50,
- "redact": 50,
- "invite": 0,
- },
- tok=self.tok,
- )
-
- def test_unread_counts(self):
- """Tests that /sync returns the right value for the unread count (MSC2654)."""
-
- # Check that our own messages don't increase the unread count.
- self.helper.send(self.room_id, "hello", tok=self.tok)
- self._check_unread_count(0)
-
- # Join the new user and check that this doesn't increase the unread count.
- self.helper.join(room=self.room_id, user=self.user2, tok=self.tok2)
- self._check_unread_count(0)
-
- # Check that the new user sending a message increases our unread count.
- res = self.helper.send(self.room_id, "hello", tok=self.tok2)
- self._check_unread_count(1)
-
- # Send a read receipt to tell the server we've read the latest event.
- body = json.dumps({"m.read": res["event_id"]}).encode("utf8")
- request, channel = self.make_request(
- "POST",
- "/rooms/%s/read_markers" % self.room_id,
- body,
- access_token=self.tok,
- )
- self.render(request)
- self.assertEqual(channel.code, 200, channel.json_body)
-
- # Check that the unread counter is back to 0.
- self._check_unread_count(0)
-
- # Check that room name changes increase the unread counter.
- self.helper.send_state(
- self.room_id, "m.room.name", {"name": "my super room"}, tok=self.tok2,
- )
- self._check_unread_count(1)
-
- # Check that room topic changes increase the unread counter.
- self.helper.send_state(
- self.room_id, "m.room.topic", {"topic": "welcome!!!"}, tok=self.tok2,
- )
- self._check_unread_count(2)
-
- # Check that encrypted messages increase the unread counter.
- self.helper.send_event(self.room_id, EventTypes.Encrypted, {}, tok=self.tok2)
- self._check_unread_count(3)
-
- # Check that custom events with a body increase the unread counter.
- self.helper.send_event(
- self.room_id, "org.matrix.custom_type", {"body": "hello"}, tok=self.tok2,
- )
- self._check_unread_count(4)
-
- # Check that edits don't increase the unread counter.
- self.helper.send_event(
- room_id=self.room_id,
- type=EventTypes.Message,
- content={
- "body": "hello",
- "msgtype": "m.text",
- "m.relates_to": {"rel_type": RelationTypes.REPLACE},
- },
- tok=self.tok2,
- )
- self._check_unread_count(4)
-
- # Check that notices don't increase the unread counter.
- self.helper.send_event(
- room_id=self.room_id,
- type=EventTypes.Message,
- content={"body": "hello", "msgtype": "m.notice"},
- tok=self.tok2,
- )
- self._check_unread_count(4)
-
- # Check that tombstone events changes increase the unread counter.
- self.helper.send_state(
- self.room_id,
- EventTypes.Tombstone,
- {"replacement_room": "!someroom:test"},
- tok=self.tok2,
- )
- self._check_unread_count(5)
-
- def _check_unread_count(self, expected_count: True):
- """Syncs and compares the unread count with the expected value."""
-
- request, channel = self.make_request(
- "GET", self.url % self.next_batch, access_token=self.tok,
- )
- self.render(request)
-
- self.assertEqual(channel.code, 200, channel.json_body)
-
- room_entry = channel.json_body["rooms"]["join"][self.room_id]
- self.assertEqual(
- room_entry["org.matrix.msc2654.unread_count"], expected_count, room_entry,
- )
-
- # Store the next batch for the next request.
- self.next_batch = channel.json_body["next_batch"]
diff --git a/tests/rest/test_health.py b/tests/rest/test_health.py
new file mode 100644
index 0000000000..2d021f6565
--- /dev/null
+++ b/tests/rest/test_health.py
@@ -0,0 +1,34 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from synapse.rest.health import HealthResource
+
+from tests import unittest
+
+
+class HealthCheckTests(unittest.HomeserverTestCase):
+ def setUp(self):
+ super().setUp()
+
+ # replace the JsonResource with a HealthResource.
+ self.resource = HealthResource()
+
+ def test_health(self):
+ request, channel = self.make_request("GET", "/health", shorthand=False)
+ self.render(request)
+
+ self.assertEqual(request.code, 200)
+ self.assertEqual(channel.result["body"], b"OK")
diff --git a/tests/server_notices/test_resource_limits_server_notices.py b/tests/server_notices/test_resource_limits_server_notices.py
index 7f70353b0d..3f88abe3d2 100644
--- a/tests/server_notices/test_resource_limits_server_notices.py
+++ b/tests/server_notices/test_resource_limits_server_notices.py
@@ -258,7 +258,7 @@ class TestResourceLimitsServerNoticesWithRealRooms(unittest.HomeserverTestCase):
self.user_id = "@user_id:test"
def test_server_notice_only_sent_once(self):
- self.store.get_monthly_active_count = Mock(return_value=1000)
+ self.store.get_monthly_active_count = Mock(return_value=defer.succeed(1000))
self.store.user_last_seen_monthly_active = Mock(
return_value=defer.succeed(1000)
diff --git a/tests/storage/test_directory.py b/tests/storage/test_directory.py
index 4e128e1047..daac947cb2 100644
--- a/tests/storage/test_directory.py
+++ b/tests/storage/test_directory.py
@@ -34,8 +34,10 @@ class DirectoryStoreTestCase(unittest.TestCase):
@defer.inlineCallbacks
def test_room_to_alias(self):
- yield self.store.create_room_alias_association(
- room_alias=self.alias, room_id=self.room.to_string(), servers=["test"]
+ yield defer.ensureDeferred(
+ self.store.create_room_alias_association(
+ room_alias=self.alias, room_id=self.room.to_string(), servers=["test"]
+ )
)
self.assertEquals(
@@ -45,24 +47,36 @@ class DirectoryStoreTestCase(unittest.TestCase):
@defer.inlineCallbacks
def test_alias_to_room(self):
- yield self.store.create_room_alias_association(
- room_alias=self.alias, room_id=self.room.to_string(), servers=["test"]
+ yield defer.ensureDeferred(
+ self.store.create_room_alias_association(
+ room_alias=self.alias, room_id=self.room.to_string(), servers=["test"]
+ )
)
self.assertObjectHasAttributes(
{"room_id": self.room.to_string(), "servers": ["test"]},
- (yield self.store.get_association_from_room_alias(self.alias)),
+ (
+ yield defer.ensureDeferred(
+ self.store.get_association_from_room_alias(self.alias)
+ )
+ ),
)
@defer.inlineCallbacks
def test_delete_alias(self):
- yield self.store.create_room_alias_association(
- room_alias=self.alias, room_id=self.room.to_string(), servers=["test"]
+ yield defer.ensureDeferred(
+ self.store.create_room_alias_association(
+ room_alias=self.alias, room_id=self.room.to_string(), servers=["test"]
+ )
)
- room_id = yield self.store.delete_room_alias(self.alias)
+ room_id = yield defer.ensureDeferred(self.store.delete_room_alias(self.alias))
self.assertEqual(self.room.to_string(), room_id)
self.assertIsNone(
- (yield self.store.get_association_from_room_alias(self.alias))
+ (
+ yield defer.ensureDeferred(
+ self.store.get_association_from_room_alias(self.alias)
+ )
+ )
)
diff --git a/tests/storage/test_end_to_end_keys.py b/tests/storage/test_end_to_end_keys.py
index 398d546280..9f8d30373b 100644
--- a/tests/storage/test_end_to_end_keys.py
+++ b/tests/storage/test_end_to_end_keys.py
@@ -34,7 +34,9 @@ class EndToEndKeyStoreTestCase(tests.unittest.TestCase):
yield self.store.set_e2e_device_keys("user", "device", now, json)
- res = yield self.store.get_e2e_device_keys((("user", "device"),))
+ res = yield defer.ensureDeferred(
+ self.store.get_e2e_device_keys((("user", "device"),))
+ )
self.assertIn("user", res)
self.assertIn("device", res["user"])
dev = res["user"]["device"]
@@ -63,7 +65,9 @@ class EndToEndKeyStoreTestCase(tests.unittest.TestCase):
yield self.store.set_e2e_device_keys("user", "device", now, json)
yield self.store.store_device("user", "device", "display_name")
- res = yield self.store.get_e2e_device_keys((("user", "device"),))
+ res = yield defer.ensureDeferred(
+ self.store.get_e2e_device_keys((("user", "device"),))
+ )
self.assertIn("user", res)
self.assertIn("device", res["user"])
dev = res["user"]["device"]
@@ -85,8 +89,8 @@ class EndToEndKeyStoreTestCase(tests.unittest.TestCase):
yield self.store.set_e2e_device_keys("user2", "device1", now, {"key": "json21"})
yield self.store.set_e2e_device_keys("user2", "device2", now, {"key": "json22"})
- res = yield self.store.get_e2e_device_keys(
- (("user1", "device1"), ("user2", "device2"))
+ res = yield defer.ensureDeferred(
+ self.store.get_e2e_device_keys((("user1", "device1"), ("user2", "device2")))
)
self.assertIn("user1", res)
self.assertIn("device1", res["user1"])
diff --git a/tests/storage/test_monthly_active_users.py b/tests/storage/test_monthly_active_users.py
index 259f2215f1..e793781a26 100644
--- a/tests/storage/test_monthly_active_users.py
+++ b/tests/storage/test_monthly_active_users.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
from synapse.api.constants import UserTypes
from tests import unittest
+from tests.test_utils import make_awaitable
from tests.unittest import default_config, override_config
FORTY_DAYS = 40 * 24 * 60 * 60
@@ -230,7 +231,9 @@ class MonthlyActiveUsersTestCase(unittest.HomeserverTestCase):
)
self.get_success(d)
- self.store.upsert_monthly_active_user = Mock()
+ self.store.upsert_monthly_active_user = Mock(
+ side_effect=lambda user_id: make_awaitable(None)
+ )
d = self.store.populate_monthly_active_users(user_id)
self.get_success(d)
@@ -238,7 +241,9 @@ class MonthlyActiveUsersTestCase(unittest.HomeserverTestCase):
self.store.upsert_monthly_active_user.assert_not_called()
def test_populate_monthly_users_should_update(self):
- self.store.upsert_monthly_active_user = Mock()
+ self.store.upsert_monthly_active_user = Mock(
+ side_effect=lambda user_id: make_awaitable(None)
+ )
self.store.is_trial_user = Mock(return_value=defer.succeed(False))
@@ -251,7 +256,9 @@ class MonthlyActiveUsersTestCase(unittest.HomeserverTestCase):
self.store.upsert_monthly_active_user.assert_called_once()
def test_populate_monthly_users_should_not_update(self):
- self.store.upsert_monthly_active_user = Mock()
+ self.store.upsert_monthly_active_user = Mock(
+ side_effect=lambda user_id: make_awaitable(None)
+ )
self.store.is_trial_user = Mock(return_value=defer.succeed(False))
self.store.user_last_seen_monthly_active = Mock(
@@ -333,7 +340,9 @@ class MonthlyActiveUsersTestCase(unittest.HomeserverTestCase):
@override_config({"limit_usage_by_mau": False, "mau_stats_only": False})
def test_no_users_when_not_tracking(self):
- self.store.upsert_monthly_active_user = Mock()
+ self.store.upsert_monthly_active_user = Mock(
+ side_effect=lambda user_id: make_awaitable(None)
+ )
self.get_success(self.store.populate_monthly_active_users("@user:sever"))
diff --git a/tests/storage/test_user_directory.py b/tests/storage/test_user_directory.py
index 6a545d2eb0..ecfafe68a9 100644
--- a/tests/storage/test_user_directory.py
+++ b/tests/storage/test_user_directory.py
@@ -40,7 +40,7 @@ class UserDirectoryStoreTestCase(unittest.TestCase):
def test_search_user_dir(self):
# normally when alice searches the directory she should just find
# bob because bobby doesn't share a room with her.
- r = yield self.store.search_user_dir(ALICE, "bob", 10)
+ r = yield defer.ensureDeferred(self.store.search_user_dir(ALICE, "bob", 10))
self.assertFalse(r["limited"])
self.assertEqual(1, len(r["results"]))
self.assertDictEqual(
@@ -51,7 +51,7 @@ class UserDirectoryStoreTestCase(unittest.TestCase):
def test_search_user_dir_all_users(self):
self.hs.config.user_directory_search_all_users = True
try:
- r = yield self.store.search_user_dir(ALICE, "bob", 10)
+ r = yield defer.ensureDeferred(self.store.search_user_dir(ALICE, "bob", 10))
self.assertFalse(r["limited"])
self.assertEqual(2, len(r["results"]))
self.assertDictEqual(
diff --git a/tests/unittest.py b/tests/unittest.py
index 2152c693f2..d0bba3ddef 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -241,20 +241,16 @@ class HomeserverTestCase(TestCase):
if hasattr(self, "user_id"):
if self.hijack_auth:
- def get_user_by_access_token(token=None, allow_guest=False):
- return succeed(
- {
- "user": UserID.from_string(self.helper.auth_user_id),
- "token_id": 1,
- "is_guest": False,
- }
- )
-
- def get_user_by_req(request, allow_guest=False, rights="access"):
- return succeed(
- create_requester(
- UserID.from_string(self.helper.auth_user_id), 1, False, None
- )
+ async def get_user_by_access_token(token=None, allow_guest=False):
+ return {
+ "user": UserID.from_string(self.helper.auth_user_id),
+ "token_id": 1,
+ "is_guest": False,
+ }
+
+ async def get_user_by_req(request, allow_guest=False, rights="access"):
+ return create_requester(
+ UserID.from_string(self.helper.auth_user_id), 1, False, None
)
self.hs.get_auth().get_user_by_req = get_user_by_req
diff --git a/tests/util/test_retryutils.py b/tests/util/test_retryutils.py
index 9e348694ad..bc42ffce88 100644
--- a/tests/util/test_retryutils.py
+++ b/tests/util/test_retryutils.py
@@ -26,9 +26,7 @@ class RetryLimiterTestCase(HomeserverTestCase):
def test_new_destination(self):
"""A happy-path case with a new destination and a successful operation"""
store = self.hs.get_datastore()
- d = get_retry_limiter("test_dest", self.clock, store)
- self.pump()
- limiter = self.successResultOf(d)
+ limiter = self.get_success(get_retry_limiter("test_dest", self.clock, store))
# advance the clock a bit before making the request
self.pump(1)
@@ -36,18 +34,14 @@ class RetryLimiterTestCase(HomeserverTestCase):
with limiter:
pass
- d = store.get_destination_retry_timings("test_dest")
- self.pump()
- new_timings = self.successResultOf(d)
+ new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
self.assertIsNone(new_timings)
def test_limiter(self):
"""General test case which walks through the process of a failing request"""
store = self.hs.get_datastore()
- d = get_retry_limiter("test_dest", self.clock, store)
- self.pump()
- limiter = self.successResultOf(d)
+ limiter = self.get_success(get_retry_limiter("test_dest", self.clock, store))
self.pump(1)
try:
@@ -58,29 +52,22 @@ class RetryLimiterTestCase(HomeserverTestCase):
except AssertionError:
pass
- # wait for the update to land
- self.pump()
-
- d = store.get_destination_retry_timings("test_dest")
- self.pump()
- new_timings = self.successResultOf(d)
+ new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
self.assertEqual(new_timings["failure_ts"], failure_ts)
self.assertEqual(new_timings["retry_last_ts"], failure_ts)
self.assertEqual(new_timings["retry_interval"], MIN_RETRY_INTERVAL)
# now if we try again we should get a failure
- d = get_retry_limiter("test_dest", self.clock, store)
- self.pump()
- self.failureResultOf(d, NotRetryingDestination)
+ self.get_failure(
+ get_retry_limiter("test_dest", self.clock, store), NotRetryingDestination
+ )
#
# advance the clock and try again
#
self.pump(MIN_RETRY_INTERVAL)
- d = get_retry_limiter("test_dest", self.clock, store)
- self.pump()
- limiter = self.successResultOf(d)
+ limiter = self.get_success(get_retry_limiter("test_dest", self.clock, store))
self.pump(1)
try:
@@ -91,12 +78,7 @@ class RetryLimiterTestCase(HomeserverTestCase):
except AssertionError:
pass
- # wait for the update to land
- self.pump()
-
- d = store.get_destination_retry_timings("test_dest")
- self.pump()
- new_timings = self.successResultOf(d)
+ new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
self.assertEqual(new_timings["failure_ts"], failure_ts)
self.assertEqual(new_timings["retry_last_ts"], retry_ts)
self.assertGreaterEqual(
@@ -110,9 +92,7 @@ class RetryLimiterTestCase(HomeserverTestCase):
# one more go, with success
#
self.pump(MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 2.0)
- d = get_retry_limiter("test_dest", self.clock, store)
- self.pump()
- limiter = self.successResultOf(d)
+ limiter = self.get_success(get_retry_limiter("test_dest", self.clock, store))
self.pump(1)
with limiter:
@@ -121,7 +101,5 @@ class RetryLimiterTestCase(HomeserverTestCase):
# wait for the update to land
self.pump()
- d = store.get_destination_retry_timings("test_dest")
- self.pump()
- new_timings = self.successResultOf(d)
+ new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
self.assertIsNone(new_timings)
|