diff --git a/synapse/handlers/acme.py b/synapse/handlers/acme.py
index a2d7959abe..7666d3abcd 100644
--- a/synapse/handlers/acme.py
+++ b/synapse/handlers/acme.py
@@ -17,7 +17,6 @@ import logging
import twisted
import twisted.internet.error
-from twisted.internet import defer
from twisted.web import server, static
from twisted.web.resource import Resource
@@ -41,8 +40,7 @@ class AcmeHandler(object):
self.reactor = hs.get_reactor()
self._acme_domain = hs.config.acme_domain
- @defer.inlineCallbacks
- def start_listening(self):
+ async def start_listening(self):
from synapse.handlers import acme_issuing_service
# Configure logging for txacme, if you need to debug
@@ -82,18 +80,17 @@ class AcmeHandler(object):
self._issuer._registered = False
try:
- yield self._issuer._ensure_registered()
+ await self._issuer._ensure_registered()
except Exception:
logger.error(ACME_REGISTER_FAIL_ERROR)
raise
- @defer.inlineCallbacks
- def provision_certificate(self):
+ async def provision_certificate(self):
logger.warning("Reprovisioning %s", self._acme_domain)
try:
- yield self._issuer.issue_cert(self._acme_domain)
+ await self._issuer.issue_cert(self._acme_domain)
except Exception:
logger.exception("Fail!")
raise
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0d7d1adcea..b3764dedae 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -71,7 +71,7 @@ from synapse.replication.http.federation import (
)
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import StateResolutionStore, resolve_events_with_store
-from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
+from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import JsonDict, StateMap, UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.distributor import user_joined_room
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 0bd2c3e37a..92b7404706 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -22,14 +22,10 @@ import urllib.parse
from typing import Awaitable, Callable, Dict, List, Optional, Tuple
from canonicaljson import json
-from signedjson.key import decode_verify_key_bytes
-from signedjson.sign import verify_signed_json
-from unpaddedbase64 import decode_base64
from twisted.internet.error import TimeoutError
from synapse.api.errors import (
- AuthError,
CodeMessageException,
Codes,
HttpResponseException,
@@ -628,9 +624,9 @@ class IdentityHandler(BaseHandler):
)
if "mxid" in data:
- if "signatures" not in data:
- raise AuthError(401, "No signatures on 3pid binding")
- await self._verify_any_signature(data, id_server)
+ # note: we used to verify the identity server's signature here, but no longer
+ # require or validate it. See the following for context:
+ # https://github.com/matrix-org/synapse/issues/5253#issuecomment-666246950
return data["mxid"]
except TimeoutError:
raise SynapseError(500, "Timed out contacting identity server")
@@ -751,30 +747,6 @@ class IdentityHandler(BaseHandler):
mxid = lookup_results["mappings"].get(lookup_value)
return mxid
- async def _verify_any_signature(self, data, server_hostname):
- if server_hostname not in data["signatures"]:
- raise AuthError(401, "No signature from server %s" % (server_hostname,))
- for key_name, signature in data["signatures"][server_hostname].items():
- try:
- key_data = await self.blacklisting_http_client.get_json(
- "%s%s/_matrix/identity/api/v1/pubkey/%s"
- % (id_server_scheme, server_hostname, key_name)
- )
- except TimeoutError:
- raise SynapseError(500, "Timed out contacting identity server")
- if "public_key" not in key_data:
- raise AuthError(
- 401, "No public key named %s from %s" % (key_name, server_hostname)
- )
- verify_signed_json(
- data,
- server_hostname,
- decode_verify_key_bytes(
- key_name, decode_base64(key_data["public_key"])
- ),
- )
- return
-
async def ask_id_server_for_third_party_invite(
self,
requester: Requester,
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index f88bad5f25..ae6bd1d352 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -109,7 +109,7 @@ class InitialSyncHandler(BaseHandler):
rooms_ret = []
- now_token = await self.hs.get_event_sources().get_current_token()
+ now_token = self.hs.get_event_sources().get_current_token()
presence_stream = self.hs.get_event_sources().sources["presence"]
pagination_config = PaginationConfig(from_token=now_token)
@@ -360,7 +360,7 @@ class InitialSyncHandler(BaseHandler):
current_state.values(), time_now
)
- now_token = await self.hs.get_event_sources().get_current_token()
+ now_token = self.hs.get_event_sources().get_current_token()
limit = pagin_config.limit if pagin_config else None
if limit is None:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e451d6dc86..43901d0934 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -45,7 +45,7 @@ from synapse.events.validator import EventValidator
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
-from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
+from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import (
Collection,
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index da06582d4b..487420bb5d 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -309,7 +309,7 @@ class PaginationHandler(object):
room_token = pagin_config.from_token.room_key
else:
pagin_config.from_token = (
- await self.hs.get_event_sources().get_current_token_for_pagination()
+ self.hs.get_event_sources().get_current_token_for_pagination()
)
room_token = pagin_config.from_token.room_key
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index b3a3bb8c3f..5387b3724f 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -38,7 +38,7 @@ from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.state import StateHandler
-from synapse.storage.data_stores.main import DataStore
+from synapse.storage.databases.main import DataStore
from synapse.storage.presence import UserPresenceState
from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer
@@ -319,7 +319,7 @@ class PresenceHandler(BasePresenceHandler):
is some spurious presence changes that will self-correct.
"""
# If the DB pool has already terminated, don't try updating
- if not self.store.db.is_running():
+ if not self.store.db_pool.is_running():
return
logger.info(
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 501f0fe795..c94209ab3d 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -548,7 +548,7 @@ class RegistrationHandler(BaseHandler):
address (str|None): the IP address used to perform the registration.
Returns:
- Deferred
+ Awaitable
"""
if self.hs.config.worker_app:
return self._register_client(
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 0c5b99234d..a8545255b1 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -22,7 +22,7 @@ import logging
import math
import string
from collections import OrderedDict
-from typing import Optional, Tuple
+from typing import Awaitable, Optional, Tuple
from synapse.api.constants import (
EventTypes,
@@ -1041,7 +1041,7 @@ class RoomEventSource(object):
):
# We just ignore the key for now.
- to_key = await self.get_current_key()
+ to_key = self.get_current_key()
from_token = RoomStreamToken.parse(from_key)
if from_token.topological:
@@ -1081,10 +1081,10 @@ class RoomEventSource(object):
return (events, end_key)
- def get_current_key(self):
- return self.store.get_room_events_max_id()
+ def get_current_key(self) -> str:
+ return "s%d" % (self.store.get_room_max_stream_ordering(),)
- def get_current_key_for_room(self, room_id):
+ def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
return self.store.get_room_events_max_id(room_id)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 78586a0a1e..8e409f24e8 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -22,7 +22,8 @@ from unpaddedbase64 import encode_base64
from synapse import types
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
-from synapse.api.errors import AuthError, Codes, SynapseError
+from synapse.api.errors import AuthError, Codes, LimitExceededError, SynapseError
+from synapse.api.ratelimiting import Ratelimiter
from synapse.api.room_versions import EventFormatVersions
from synapse.crypto.event_signing import compute_event_reference_hash
from synapse.events import EventBase
@@ -77,6 +78,17 @@ class RoomMemberHandler(object):
if self._is_on_event_persistence_instance:
self.persist_event_storage = hs.get_storage().persistence
+ self._join_rate_limiter_local = Ratelimiter(
+ clock=self.clock,
+ rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
+ burst_count=hs.config.ratelimiting.rc_joins_local.burst_count,
+ )
+ self._join_rate_limiter_remote = Ratelimiter(
+ clock=self.clock,
+ rate_hz=hs.config.ratelimiting.rc_joins_remote.per_second,
+ burst_count=hs.config.ratelimiting.rc_joins_remote.burst_count,
+ )
+
# This is only used to get at ratelimit function, and
# maybe_kick_guest_users. It's fine there are multiple of these as
# it doesn't store state.
@@ -441,7 +453,28 @@ class RoomMemberHandler(object):
# so don't really fit into the general auth process.
raise AuthError(403, "Guest access not allowed")
- if not is_host_in_room:
+ if is_host_in_room:
+ time_now_s = self.clock.time()
+ allowed, time_allowed = self._join_rate_limiter_local.can_do_action(
+ requester.user.to_string(),
+ )
+
+ if not allowed:
+ raise LimitExceededError(
+ retry_after_ms=int(1000 * (time_allowed - time_now_s))
+ )
+
+ else:
+ time_now_s = self.clock.time()
+ allowed, time_allowed = self._join_rate_limiter_remote.can_do_action(
+ requester.user.to_string(),
+ )
+
+ if not allowed:
+ raise LimitExceededError(
+ retry_after_ms=int(1000 * (time_allowed - time_now_s))
+ )
+
inviter = await self._get_inviter(target.to_string(), room_id)
if inviter and not self.hs.is_mine(inviter):
remote_room_hosts.append(inviter.domain)
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 9b312a1558..d58f9788c5 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -340,7 +340,7 @@ class SearchHandler(BaseHandler):
# If client has asked for "context" for each event (i.e. some surrounding
# events and state), fetch that
if event_context is not None:
- now_token = await self.hs.get_event_sources().get_current_token()
+ now_token = self.hs.get_event_sources().get_current_token()
contexts = {}
for event in allowed_events:
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 149f861239..249ffe2a55 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -232,7 +232,7 @@ class StatsHandler:
if membership == prev_membership:
pass # noop
- if membership == Membership.JOIN:
+ elif membership == Membership.JOIN:
room_stats_delta["joined_members"] += 1
elif membership == Membership.INVITE:
room_stats_delta["invited_members"] += 1
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index eaa4eeadf7..5a19bac929 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -961,7 +961,7 @@ class SyncHandler(object):
# this is due to some of the underlying streams not supporting the ability
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
- now_token = await self.event_sources.get_current_token()
+ now_token = self.event_sources.get_current_token()
logger.debug(
"Calculating sync response for %r between %s and %s",
|