diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 4ed4a2c253..2b5465417f 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -56,7 +56,6 @@ from synapse.http.server import (
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
-from synapse.module_api import ModuleApi
from synapse.python_dependencies import check_requirements
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
@@ -106,7 +105,7 @@ class SynapseHomeServer(HomeServer):
additional_resources = listener_config.http_options.additional_resources
logger.debug("Configuring additional resources: %r", additional_resources)
- module_api = ModuleApi(self, self.get_auth_handler())
+ module_api = self.get_module_api()
for path, resmodule in additional_resources.items():
handler_cls, config = load_module(resmodule)
handler = handler_cls(config, module_api)
diff --git a/synapse/config/server.py b/synapse/config/server.py
index ef6d70e3f8..85aa49c02d 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -39,7 +39,7 @@ logger = logging.Logger(__name__)
# in the list.
DEFAULT_BIND_ADDRESSES = ["::", "0.0.0.0"]
-DEFAULT_ROOM_VERSION = "5"
+DEFAULT_ROOM_VERSION = "6"
ROOM_COMPLEXITY_TOO_GREAT = (
"Your homeserver is unable to join rooms this large or complex. "
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index 8c907ad596..56f8dc9caf 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -446,6 +446,8 @@ def check_redaction(
if room_version_obj.event_format == EventFormatVersions.V1:
redacter_domain = get_domain_from_id(event.event_id)
+ if not isinstance(event.redacts, str):
+ return False
redactee_domain = get_domain_from_id(event.redacts)
if redacter_domain == redactee_domain:
return True
diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py
index b0fc859a47..bad18f7fdf 100644
--- a/synapse/events/spamcheck.py
+++ b/synapse/events/spamcheck.py
@@ -17,24 +17,25 @@
import inspect
from typing import Any, Dict, List, Optional, Tuple
-from synapse.spam_checker_api import RegistrationBehaviour, SpamCheckerApi
+from synapse.spam_checker_api import RegistrationBehaviour
from synapse.types import Collection
MYPY = False
if MYPY:
+ import synapse.events
import synapse.server
class SpamChecker:
def __init__(self, hs: "synapse.server.HomeServer"):
self.spam_checkers = [] # type: List[Any]
+ api = hs.get_module_api()
for module, config in hs.config.spam_checkers:
# Older spam checkers don't accept the `api` argument, so we
# try and detect support.
spam_args = inspect.getfullargspec(module)
if "api" in spam_args.args:
- api = SpamCheckerApi(hs)
self.spam_checkers.append(module(config=config, api=api))
else:
self.spam_checkers.append(module(config=config))
diff --git a/synapse/events/third_party_rules.py b/synapse/events/third_party_rules.py
index fed459198a..1535cc5339 100644
--- a/synapse/events/third_party_rules.py
+++ b/synapse/events/third_party_rules.py
@@ -16,7 +16,6 @@ from typing import Callable
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
-from synapse.module_api import ModuleApi
from synapse.types import Requester, StateMap
@@ -40,7 +39,7 @@ class ThirdPartyEventRules:
if module is not None:
self.third_party_rules = module(
- config=config, module_api=ModuleApi(hs, hs.get_auth_handler()),
+ config=config, module_api=hs.get_module_api(),
)
async def check_event_allowed(
@@ -61,12 +60,14 @@ class ThirdPartyEventRules:
prev_state_ids = await context.get_prev_state_ids()
# Retrieve the state events from the database.
- state_events = {}
- for key, event_id in prev_state_ids.items():
- state_events[key] = await self.store.get_event(event_id, allow_none=True)
+ events = await self.store.get_events(prev_state_ids.values())
+ state_events = {(ev.type, ev.state_key): ev for ev in events.values()}
- ret = await self.third_party_rules.check_event_allowed(event, state_events)
- return ret
+ # The module can modify the event slightly if it wants, but caution should be
+ # exercised, and it's likely to go very wrong if applied to events received over
+ # federation.
+
+ return await self.third_party_rules.check_event_allowed(event, state_events)
async def on_create_room(
self, requester: Requester, config: dict, is_requester_admin: bool
@@ -131,7 +132,9 @@ class ThirdPartyEventRules:
if self.third_party_rules is None:
return True
- check_func = getattr(self.third_party_rules, "check_visibility_can_be_modified")
+ check_func = getattr(
+ self.third_party_rules, "check_visibility_can_be_modified", None
+ )
if not check_func or not isinstance(check_func, Callable):
return True
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 7c4b716b28..f6d17c53b1 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -164,7 +164,14 @@ class AuthHandler(BaseHandler):
self.bcrypt_rounds = hs.config.bcrypt_rounds
+ # we can't use hs.get_module_api() here, because to do so will create an
+ # import loop.
+ #
+ # TODO: refactor this class to separate the lower-level stuff that
+ # ModuleApi can use from the higher-level stuff that uses ModuleApi, as
+ # better way to break the loop
account_handler = ModuleApi(hs, self)
+
self.password_providers = [
module(config=config, account_handler=account_handler)
for module, config in hs.config.password_providers
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index dd40fd1299..611742ae72 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -496,6 +496,22 @@ class E2eKeysHandler:
log_kv(
{"message": "Did not update one_time_keys", "reason": "no keys given"}
)
+ fallback_keys = keys.get("org.matrix.msc2732.fallback_keys", None)
+ if fallback_keys and isinstance(fallback_keys, dict):
+ log_kv(
+ {
+ "message": "Updating fallback_keys for device.",
+ "user_id": user_id,
+ "device_id": device_id,
+ }
+ )
+ await self.store.set_e2e_fallback_keys(user_id, device_id, fallback_keys)
+ elif fallback_keys:
+ log_kv({"message": "Did not update fallback_keys", "reason": "not a dict"})
+ else:
+ log_kv(
+ {"message": "Did not update fallback_keys", "reason": "no keys given"}
+ )
# the device should have been registered already, but it may have been
# deleted due to a race with a DELETE request. Or we may be using an
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index a998e6b7f6..dd1f90e359 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -201,6 +201,8 @@ class SyncResult:
device_lists: List of user_ids whose devices have changed
device_one_time_keys_count: Dict of algorithm to count for one time keys
for this device
+ device_unused_fallback_key_types: List of key types that have an unused fallback
+ key
groups: Group updates, if any
"""
@@ -213,6 +215,7 @@ class SyncResult:
to_device = attr.ib(type=List[JsonDict])
device_lists = attr.ib(type=DeviceLists)
device_one_time_keys_count = attr.ib(type=JsonDict)
+ device_unused_fallback_key_types = attr.ib(type=List[str])
groups = attr.ib(type=Optional[GroupsSyncResult])
def __bool__(self) -> bool:
@@ -1014,10 +1017,14 @@ class SyncHandler:
logger.debug("Fetching OTK data")
device_id = sync_config.device_id
one_time_key_counts = {} # type: JsonDict
+ unused_fallback_key_types = [] # type: List[str]
if device_id:
one_time_key_counts = await self.store.count_e2e_one_time_keys(
user_id, device_id
)
+ unused_fallback_key_types = await self.store.get_e2e_unused_fallback_key_types(
+ user_id, device_id
+ )
logger.debug("Fetching group data")
await self._generate_sync_entry_for_groups(sync_result_builder)
@@ -1041,6 +1048,7 @@ class SyncHandler:
device_lists=device_lists,
groups=sync_result_builder.groups,
device_one_time_keys_count=one_time_key_counts,
+ device_unused_fallback_key_types=unused_fallback_key_types,
next_batch=sync_result_builder.now_token,
)
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 09ed74f6ce..00b98af3d4 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -651,6 +651,11 @@ def respond_with_json_bytes(
Returns:
twisted.web.server.NOT_DONE_YET if the request is still active.
"""
+ if request._disconnected:
+ logger.warning(
+ "Not sending response to request %s, already disconnected.", request
+ )
+ return
request.setResponseCode(code)
request.setHeader(b"Content-Type", b"application/json")
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 646f09d2bc..b410e3ad9c 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -14,13 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Iterable, Optional, Tuple
from twisted.internet import defer
from synapse.http.client import SimpleHttpClient
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable, run_in_background
+from synapse.storage.state import StateFilter
from synapse.types import UserID
if TYPE_CHECKING:
@@ -293,6 +294,32 @@ class ModuleApi:
registered_user_id, request, client_redirect_url,
)
+ @defer.inlineCallbacks
+ def get_state_events_in_room(
+ self, room_id: str, types: Iterable[Tuple[str, Optional[str]]]
+ ) -> defer.Deferred:
+ """Gets current state events for the given room.
+
+ (This is exposed for compatibility with the old SpamCheckerApi. We should
+ probably deprecate it and replace it with an async method in a subclass.)
+
+ Args:
+ room_id: The room ID to get state events in.
+ types: The event type and state key (using None
+ to represent 'any') of the room state to acquire.
+
+ Returns:
+ twisted.internet.defer.Deferred[list(synapse.events.FrozenEvent)]:
+ The filtered state events in the room.
+ """
+ state_ids = yield defer.ensureDeferred(
+ self._store.get_filtered_current_state_ids(
+ room_id=room_id, state_filter=StateFilter.from_types(types)
+ )
+ )
+ state = yield defer.ensureDeferred(self._store.get_events(state_ids.values()))
+ return state.values()
+
class PublicRoomListManager:
"""Contains methods for adding to, removing from and querying whether a room
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index 6779df952f..2b84eb89c0 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -236,6 +236,7 @@ class SyncRestServlet(RestServlet):
"leave": sync_result.groups.leave,
},
"device_one_time_keys_count": sync_result.device_one_time_keys_count,
+ "org.matrix.msc2732.device_unused_fallback_key_types": sync_result.device_unused_fallback_key_types,
"next_batch": await sync_result.next_batch.to_string(self.store),
}
diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py
index 6568e61829..67aa993f19 100644
--- a/synapse/rest/media/v1/_base.py
+++ b/synapse/rest/media/v1/_base.py
@@ -213,6 +213,12 @@ async def respond_with_responder(
file_size (int|None): Size in bytes of the media. If not known it should be None
upload_name (str|None): The name of the requested file, if any.
"""
+ if request._disconnected:
+ logger.warning(
+ "Not sending response to request %s, already disconnected.", request
+ )
+ return
+
if not responder:
respond_404(request)
return
diff --git a/synapse/server.py b/synapse/server.py
index aa2273955c..f83dd6148c 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -91,6 +91,7 @@ from synapse.handlers.typing import FollowerTypingHandler, TypingWriterHandler
from synapse.handlers.user_directory import UserDirectoryHandler
from synapse.http.client import InsecureInterceptableContextFactory, SimpleHttpClient
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
+from synapse.module_api import ModuleApi
from synapse.notifier import Notifier
from synapse.push.action_generator import ActionGenerator
from synapse.push.pusherpool import PusherPool
@@ -656,6 +657,10 @@ class HomeServer(metaclass=abc.ABCMeta):
def get_federation_ratelimiter(self) -> FederationRateLimiter:
return FederationRateLimiter(self.clock, config=self.config.rc_federation)
+ @cache_in_self
+ def get_module_api(self) -> ModuleApi:
+ return ModuleApi(self, self.get_auth_handler())
+
async def remove_pusher(self, app_id: str, push_key: str, user_id: str):
return await self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
diff --git a/synapse/spam_checker_api/__init__.py b/synapse/spam_checker_api/__init__.py
index 395ac5ab02..3ce25bb012 100644
--- a/synapse/spam_checker_api/__init__.py
+++ b/synapse/spam_checker_api/__init__.py
@@ -12,19 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
from enum import Enum
-from twisted.internet import defer
-
-from synapse.storage.state import StateFilter
-
-MYPY = False
-if MYPY:
- import synapse.server
-
-logger = logging.getLogger(__name__)
-
class RegistrationBehaviour(Enum):
"""
@@ -34,35 +23,3 @@ class RegistrationBehaviour(Enum):
ALLOW = "allow"
SHADOW_BAN = "shadow_ban"
DENY = "deny"
-
-
-class SpamCheckerApi:
- """A proxy object that gets passed to spam checkers so they can get
- access to rooms and other relevant information.
- """
-
- def __init__(self, hs: "synapse.server.HomeServer"):
- self.hs = hs
-
- self._store = hs.get_datastore()
-
- @defer.inlineCallbacks
- def get_state_events_in_room(self, room_id: str, types: tuple) -> defer.Deferred:
- """Gets state events for the given room.
-
- Args:
- room_id: The room ID to get state events in.
- types: The event type and state key (using None
- to represent 'any') of the room state to acquire.
-
- Returns:
- twisted.internet.defer.Deferred[list(synapse.events.FrozenEvent)]:
- The filtered state events in the room.
- """
- state_ids = yield defer.ensureDeferred(
- self._store.get_filtered_current_state_ids(
- room_id=room_id, state_filter=StateFilter.from_types(types)
- )
- )
- state = yield defer.ensureDeferred(self._store.get_events(state_ids.values()))
- return state.values()
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 22e1ed15d0..8c97f2af5c 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -367,6 +367,57 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
"count_e2e_one_time_keys", _count_e2e_one_time_keys
)
+ async def set_e2e_fallback_keys(
+ self, user_id: str, device_id: str, fallback_keys: JsonDict
+ ) -> None:
+ """Set the user's e2e fallback keys.
+
+ Args:
+ user_id: the user whose keys are being set
+ device_id: the device whose keys are being set
+ fallback_keys: the keys to set. This is a map from key ID (which is
+ of the form "algorithm:id") to key data.
+ """
+ # fallback_keys will usually only have one item in it, so using a for
+ # loop (as opposed to calling simple_upsert_many_txn) won't be too bad
+ # FIXME: make sure that only one key per algorithm is uploaded
+ for key_id, fallback_key in fallback_keys.items():
+ algorithm, key_id = key_id.split(":", 1)
+ await self.db_pool.simple_upsert(
+ "e2e_fallback_keys_json",
+ keyvalues={
+ "user_id": user_id,
+ "device_id": device_id,
+ "algorithm": algorithm,
+ },
+ values={
+ "key_id": key_id,
+ "key_json": json_encoder.encode(fallback_key),
+ "used": False,
+ },
+ desc="set_e2e_fallback_key",
+ )
+
+ @cached(max_entries=10000)
+ async def get_e2e_unused_fallback_key_types(
+ self, user_id: str, device_id: str
+ ) -> List[str]:
+ """Returns the fallback key types that have an unused key.
+
+ Args:
+ user_id: the user whose keys are being queried
+ device_id: the device whose keys are being queried
+
+ Returns:
+ a list of key types
+ """
+ return await self.db_pool.simple_select_onecol(
+ "e2e_fallback_keys_json",
+ keyvalues={"user_id": user_id, "device_id": device_id, "used": False},
+ retcol="algorithm",
+ desc="get_e2e_unused_fallback_key_types",
+ )
+
async def get_e2e_cross_signing_key(
self, user_id: str, key_type: str, from_user_id: Optional[str] = None
) -> Optional[dict]:
@@ -701,15 +752,37 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
" WHERE user_id = ? AND device_id = ? AND algorithm = ?"
" LIMIT 1"
)
+ fallback_sql = (
+ "SELECT key_id, key_json, used FROM e2e_fallback_keys_json"
+ " WHERE user_id = ? AND device_id = ? AND algorithm = ?"
+ " LIMIT 1"
+ )
result = {}
delete = []
+ used_fallbacks = []
for user_id, device_id, algorithm in query_list:
user_result = result.setdefault(user_id, {})
device_result = user_result.setdefault(device_id, {})
txn.execute(sql, (user_id, device_id, algorithm))
- for key_id, key_json in txn:
+ otk_row = txn.fetchone()
+ if otk_row is not None:
+ key_id, key_json = otk_row
device_result[algorithm + ":" + key_id] = key_json
delete.append((user_id, device_id, algorithm, key_id))
+ else:
+ # no one-time key available, so see if there's a fallback
+ # key
+ txn.execute(fallback_sql, (user_id, device_id, algorithm))
+ fallback_row = txn.fetchone()
+ if fallback_row is not None:
+ key_id, key_json, used = fallback_row
+ device_result[algorithm + ":" + key_id] = key_json
+ if not used:
+ used_fallbacks.append(
+ (user_id, device_id, algorithm, key_id)
+ )
+
+ # drop any one-time keys that were claimed
sql = (
"DELETE FROM e2e_one_time_keys_json"
" WHERE user_id = ? AND device_id = ? AND algorithm = ?"
@@ -726,6 +799,23 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
self._invalidate_cache_and_stream(
txn, self.count_e2e_one_time_keys, (user_id, device_id)
)
+ # mark fallback keys as used
+ for user_id, device_id, algorithm, key_id in used_fallbacks:
+ self.db_pool.simple_update_txn(
+ txn,
+ "e2e_fallback_keys_json",
+ {
+ "user_id": user_id,
+ "device_id": device_id,
+ "algorithm": algorithm,
+ "key_id": key_id,
+ },
+ {"used": True},
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_e2e_unused_fallback_key_types, (user_id, device_id)
+ )
+
return result
return await self.db_pool.runInteraction(
@@ -754,6 +844,14 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
self._invalidate_cache_and_stream(
txn, self.count_e2e_one_time_keys, (user_id, device_id)
)
+ self.db_pool.simple_delete_txn(
+ txn,
+ table="e2e_fallback_keys_json",
+ keyvalues={"user_id": user_id, "device_id": device_id},
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_e2e_unused_fallback_key_types, (user_id, device_id)
+ )
await self.db_pool.runInteraction(
"delete_e2e_keys_by_device", delete_e2e_keys_by_device_txn
diff --git a/synapse/storage/databases/main/schema/delta/58/11fallback.sql b/synapse/storage/databases/main/schema/delta/58/11fallback.sql
new file mode 100644
index 0000000000..4ed981dbf8
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/11fallback.sql
@@ -0,0 +1,24 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS e2e_fallback_keys_json (
+ user_id TEXT NOT NULL, -- The user this fallback key is for.
+ device_id TEXT NOT NULL, -- The device this fallback key is for.
+ algorithm TEXT NOT NULL, -- Which algorithm this fallback key is for.
+ key_id TEXT NOT NULL, -- An id for suppressing duplicate uploads.
+ key_json TEXT NOT NULL, -- The key as a JSON blob.
+ used BOOLEAN NOT NULL DEFAULT FALSE, -- Whether the key has been used or not.
+ CONSTRAINT e2e_fallback_keys_json_uniqueness UNIQUE (user_id, device_id, algorithm)
+);
|