From 4ff0201e6235b8b2efc5ce5a7dc3c479ea96df53 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 1 Oct 2020 08:09:18 -0400 Subject: Enable mypy checking for unreachable code and fix instances. (#8432) --- mypy.ini | 1 + 1 file changed, 1 insertion(+) (limited to 'mypy.ini') diff --git a/mypy.ini b/mypy.ini index 7986781432..c283f15b21 100644 --- a/mypy.ini +++ b/mypy.ini @@ -6,6 +6,7 @@ check_untyped_defs = True show_error_codes = True show_traceback = True mypy_path = stubs +warn_unreachable = True files = synapse/api, synapse/appservice, -- cgit 1.5.1 From 6c5d5e507e629cf57ae8c1034879e8ffaef33e9f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 2 Oct 2020 09:57:12 +0100 Subject: Add unit test for event persister sharding (#8433) --- changelog.d/8433.misc | 1 + mypy.ini | 3 + stubs/txredisapi.pyi | 20 +- synapse/replication/tcp/handler.py | 6 +- synapse/replication/tcp/redis.py | 40 +++- tests/replication/_base.py | 224 ++++++++++++++++++++-- tests/replication/test_sharded_event_persister.py | 102 ++++++++++ tests/unittest.py | 2 +- 8 files changed, 371 insertions(+), 27 deletions(-) create mode 100644 changelog.d/8433.misc create mode 100644 tests/replication/test_sharded_event_persister.py (limited to 'mypy.ini') diff --git a/changelog.d/8433.misc b/changelog.d/8433.misc new file mode 100644 index 0000000000..05f8b5bbf4 --- /dev/null +++ b/changelog.d/8433.misc @@ -0,0 +1 @@ +Add unit test for event persister sharding. diff --git a/mypy.ini b/mypy.ini index c283f15b21..e84ad04e41 100644 --- a/mypy.ini +++ b/mypy.ini @@ -143,3 +143,6 @@ ignore_missing_imports = True [mypy-nacl.*] ignore_missing_imports = True + +[mypy-hiredis] +ignore_missing_imports = True diff --git a/stubs/txredisapi.pyi b/stubs/txredisapi.pyi index c66413f003..522244bb57 100644 --- a/stubs/txredisapi.pyi +++ b/stubs/txredisapi.pyi @@ -16,7 +16,7 @@ """Contains *incomplete* type hints for txredisapi. """ -from typing import List, Optional, Union +from typing import List, Optional, Union, Type class RedisProtocol: def publish(self, channel: str, message: bytes): ... @@ -42,3 +42,21 @@ def lazyConnection( class SubscriberFactory: def buildProtocol(self, addr): ... + +class ConnectionHandler: ... + +class RedisFactory: + continueTrying: bool + handler: RedisProtocol + def __init__( + self, + uuid: str, + dbid: Optional[int], + poolsize: int, + isLazy: bool = False, + handler: Type = ConnectionHandler, + charset: str = "utf-8", + password: Optional[str] = None, + replyTimeout: Optional[int] = None, + convertNumbers: Optional[int] = True, + ): ... diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index b323841f73..e92da7b263 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -251,10 +251,9 @@ class ReplicationCommandHandler: using TCP. """ if hs.config.redis.redis_enabled: - import txredisapi - from synapse.replication.tcp.redis import ( RedisDirectTcpReplicationClientFactory, + lazyConnection, ) logger.info( @@ -271,7 +270,8 @@ class ReplicationCommandHandler: # connection after SUBSCRIBE is called). # First create the connection for sending commands. - outbound_redis_connection = txredisapi.lazyConnection( + outbound_redis_connection = lazyConnection( + reactor=hs.get_reactor(), host=hs.config.redis_host, port=hs.config.redis_port, password=hs.config.redis.redis_password, diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index f225e533de..de19705c1f 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -15,7 +15,7 @@ import logging from inspect import isawaitable -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional import txredisapi @@ -228,3 +228,41 @@ class RedisDirectTcpReplicationClientFactory(txredisapi.SubscriberFactory): p.password = self.password return p + + +def lazyConnection( + reactor, + host: str = "localhost", + port: int = 6379, + dbid: Optional[int] = None, + reconnect: bool = True, + charset: str = "utf-8", + password: Optional[str] = None, + connectTimeout: Optional[int] = None, + replyTimeout: Optional[int] = None, + convertNumbers: bool = True, +) -> txredisapi.RedisProtocol: + """Equivalent to `txredisapi.lazyConnection`, except allows specifying a + reactor. + """ + + isLazy = True + poolsize = 1 + + uuid = "%s:%d" % (host, port) + factory = txredisapi.RedisFactory( + uuid, + dbid, + poolsize, + isLazy, + txredisapi.ConnectionHandler, + charset, + password, + replyTimeout, + convertNumbers, + ) + factory.continueTrying = reconnect + for x in range(poolsize): + reactor.connectTCP(host, port, factory, connectTimeout) + + return factory.handler diff --git a/tests/replication/_base.py b/tests/replication/_base.py index ae60874ec3..81ea985b9f 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -12,13 +12,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from typing import Any, Callable, List, Optional, Tuple import attr +import hiredis from twisted.internet.interfaces import IConsumer, IPullProducer, IReactorTime +from twisted.internet.protocol import Protocol from twisted.internet.task import LoopingCall from twisted.web.http import HTTPChannel @@ -27,7 +28,7 @@ from synapse.app.generic_worker import ( GenericWorkerServer, ) from synapse.http.server import JsonResource -from synapse.http.site import SynapseRequest +from synapse.http.site import SynapseRequest, SynapseSite from synapse.replication.http import ReplicationRestResource, streams from synapse.replication.tcp.handler import ReplicationCommandHandler from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol @@ -197,19 +198,37 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): self.server_factory = ReplicationStreamProtocolFactory(self.hs) self.streamer = self.hs.get_replication_streamer() + # Fake in memory Redis server that servers can connect to. + self._redis_server = FakeRedisPubSubServer() + store = self.hs.get_datastore() self.database_pool = store.db_pool self.reactor.lookups["testserv"] = "1.2.3.4" + self.reactor.lookups["localhost"] = "127.0.0.1" + + # A map from a HS instance to the associated HTTP Site to use for + # handling inbound HTTP requests to that instance. + self._hs_to_site = {self.hs: self.site} + + if self.hs.config.redis.redis_enabled: + # Handle attempts to connect to fake redis server. + self.reactor.add_tcp_client_callback( + "localhost", 6379, self.connect_any_redis_attempts, + ) - self._worker_hs_to_resource = {} + self.hs.get_tcp_replication().start_replication(self.hs) # When we see a connection attempt to the master replication listener we # automatically set up the connection. This is so that tests don't # manually have to go and explicitly set it up each time (plus sometimes # it is impossible to write the handling explicitly in the tests). + # + # Register the master replication listener: self.reactor.add_tcp_client_callback( - "1.2.3.4", 8765, self._handle_http_replication_attempt + "1.2.3.4", + 8765, + lambda: self._handle_http_replication_attempt(self.hs, 8765), ) def create_test_json_resource(self): @@ -253,28 +272,63 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): **kwargs ) + # If the instance is in the `instance_map` config then workers may try + # and send HTTP requests to it, so we register it with + # `_handle_http_replication_attempt` like we do with the master HS. + instance_name = worker_hs.get_instance_name() + instance_loc = worker_hs.config.worker.instance_map.get(instance_name) + if instance_loc: + # Ensure the host is one that has a fake DNS entry. + if instance_loc.host not in self.reactor.lookups: + raise Exception( + "Host does not have an IP for instance_map[%r].host = %r" + % (instance_name, instance_loc.host,) + ) + + self.reactor.add_tcp_client_callback( + self.reactor.lookups[instance_loc.host], + instance_loc.port, + lambda: self._handle_http_replication_attempt( + worker_hs, instance_loc.port + ), + ) + store = worker_hs.get_datastore() store.db_pool._db_pool = self.database_pool._db_pool - repl_handler = ReplicationCommandHandler(worker_hs) - client = ClientReplicationStreamProtocol( - worker_hs, "client", "test", self.clock, repl_handler, - ) - server = self.server_factory.buildProtocol(None) + # Set up TCP replication between master and the new worker if we don't + # have Redis support enabled. + if not worker_hs.config.redis_enabled: + repl_handler = ReplicationCommandHandler(worker_hs) + client = ClientReplicationStreamProtocol( + worker_hs, "client", "test", self.clock, repl_handler, + ) + server = self.server_factory.buildProtocol(None) - client_transport = FakeTransport(server, self.reactor) - client.makeConnection(client_transport) + client_transport = FakeTransport(server, self.reactor) + client.makeConnection(client_transport) - server_transport = FakeTransport(client, self.reactor) - server.makeConnection(server_transport) + server_transport = FakeTransport(client, self.reactor) + server.makeConnection(server_transport) # Set up a resource for the worker - resource = ReplicationRestResource(self.hs) + resource = ReplicationRestResource(worker_hs) for servlet in self.servlets: servlet(worker_hs, resource) - self._worker_hs_to_resource[worker_hs] = resource + self._hs_to_site[worker_hs] = SynapseSite( + logger_name="synapse.access.http.fake", + site_tag="{}-{}".format( + worker_hs.config.server.server_name, worker_hs.get_instance_name() + ), + config=worker_hs.config.server.listeners[0], + resource=resource, + server_version_string="1", + ) + + if worker_hs.config.redis.redis_enabled: + worker_hs.get_tcp_replication().start_replication(worker_hs) return worker_hs @@ -285,7 +339,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): return config def render_on_worker(self, worker_hs: HomeServer, request: SynapseRequest): - render(request, self._worker_hs_to_resource[worker_hs], self.reactor) + render(request, self._hs_to_site[worker_hs].resource, self.reactor) def replicate(self): """Tell the master side of replication that something has happened, and then @@ -294,9 +348,9 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): self.streamer.on_notifier_poke() self.pump() - def _handle_http_replication_attempt(self): - """Handles a connection attempt to the master replication HTTP - listener. + def _handle_http_replication_attempt(self, hs, repl_port): + """Handles a connection attempt to the given HS replication HTTP + listener on the given port. """ # We should have at least one outbound connection attempt, where the @@ -305,7 +359,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): self.assertGreaterEqual(len(clients), 1) (host, port, client_factory, _timeout, _bindAddress) = clients.pop() self.assertEqual(host, "1.2.3.4") - self.assertEqual(port, 8765) + self.assertEqual(port, repl_port) # Set up client side protocol client_protocol = client_factory.buildProtocol(None) @@ -315,7 +369,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): # Set up the server side protocol channel = _PushHTTPChannel(self.reactor) channel.requestFactory = request_factory - channel.site = self.site + channel.site = self._hs_to_site[hs] # Connect client to server and vice versa. client_to_server_transport = FakeTransport( @@ -333,6 +387,32 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): # inside `connecTCP` before the connection has been passed back to the # code that requested the TCP connection. + def connect_any_redis_attempts(self): + """If redis is enabled we need to deal with workers connecting to a + redis server. We don't want to use a real Redis server so we use a + fake one. + """ + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0) + self.assertEqual(host, "localhost") + self.assertEqual(port, 6379) + + client_protocol = client_factory.buildProtocol(None) + server_protocol = self._redis_server.buildProtocol(None) + + client_to_server_transport = FakeTransport( + server_protocol, self.reactor, client_protocol + ) + client_protocol.makeConnection(client_to_server_transport) + + server_to_client_transport = FakeTransport( + client_protocol, self.reactor, server_protocol + ) + server_protocol.makeConnection(server_to_client_transport) + + return client_to_server_transport, server_to_client_transport + class TestReplicationDataHandler(GenericWorkerReplicationHandler): """Drop-in for ReplicationDataHandler which just collects RDATA rows""" @@ -467,3 +547,105 @@ class _PullToPushProducer: pass self.stopProducing() + + +class FakeRedisPubSubServer: + """A fake Redis server for pub/sub. + """ + + def __init__(self): + self._subscribers = set() + + def add_subscriber(self, conn): + """A connection has called SUBSCRIBE + """ + self._subscribers.add(conn) + + def remove_subscriber(self, conn): + """A connection has called UNSUBSCRIBE + """ + self._subscribers.discard(conn) + + def publish(self, conn, channel, msg) -> int: + """A connection want to publish a message to subscribers. + """ + for sub in self._subscribers: + sub.send(["message", channel, msg]) + + return len(self._subscribers) + + def buildProtocol(self, addr): + return FakeRedisPubSubProtocol(self) + + +class FakeRedisPubSubProtocol(Protocol): + """A connection from a client talking to the fake Redis server. + """ + + def __init__(self, server: FakeRedisPubSubServer): + self._server = server + self._reader = hiredis.Reader() + + def dataReceived(self, data): + self._reader.feed(data) + + # We might get multiple messages in one packet. + while True: + msg = self._reader.gets() + + if msg is False: + # No more messages. + return + + if not isinstance(msg, list): + # Inbound commands should always be a list + raise Exception("Expected redis list") + + self.handle_command(msg[0], *msg[1:]) + + def handle_command(self, command, *args): + """Received a Redis command from the client. + """ + + # We currently only support pub/sub. + if command == b"PUBLISH": + channel, message = args + num_subscribers = self._server.publish(self, channel, message) + self.send(num_subscribers) + elif command == b"SUBSCRIBE": + (channel,) = args + self._server.add_subscriber(self) + self.send(["subscribe", channel, 1]) + else: + raise Exception("Unknown command") + + def send(self, msg): + """Send a message back to the client. + """ + raw = self.encode(msg).encode("utf-8") + + self.transport.write(raw) + self.transport.flush() + + def encode(self, obj): + """Encode an object to its Redis format. + + Supports: strings/bytes, integers and list/tuples. + """ + + if isinstance(obj, bytes): + # We assume bytes are just unicode strings. + obj = obj.decode("utf-8") + + if isinstance(obj, str): + return "${len}\r\n{str}\r\n".format(len=len(obj), str=obj) + if isinstance(obj, int): + return ":{val}\r\n".format(val=obj) + if isinstance(obj, (list, tuple)): + items = "".join(self.encode(a) for a in obj) + return "*{len}\r\n{items}".format(len=len(obj), items=items) + + raise Exception("Unrecognized type for encoding redis: %r: %r", type(obj), obj) + + def connectionLost(self, reason): + self._server.remove_subscriber(self) diff --git a/tests/replication/test_sharded_event_persister.py b/tests/replication/test_sharded_event_persister.py new file mode 100644 index 0000000000..6068d14905 --- /dev/null +++ b/tests/replication/test_sharded_event_persister.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from synapse.rest import admin +from synapse.rest.client.v1 import login, room + +from tests.replication._base import BaseMultiWorkerStreamTestCase +from tests.utils import USE_POSTGRES_FOR_TESTS + +logger = logging.getLogger(__name__) + + +class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase): + """Checks event persisting sharding works + """ + + # Event persister sharding requires postgres (due to needing + # `MutliWriterIdGenerator`). + if not USE_POSTGRES_FOR_TESTS: + skip = "Requires Postgres" + + servlets = [ + admin.register_servlets_for_client_rest_resource, + room.register_servlets, + login.register_servlets, + ] + + def prepare(self, reactor, clock, hs): + # Register a user who sends a message that we'll get notified about + self.other_user_id = self.register_user("otheruser", "pass") + self.other_access_token = self.login("otheruser", "pass") + + def default_config(self): + conf = super().default_config() + conf["redis"] = {"enabled": "true"} + conf["stream_writers"] = {"events": ["worker1", "worker2"]} + conf["instance_map"] = { + "worker1": {"host": "testserv", "port": 1001}, + "worker2": {"host": "testserv", "port": 1002}, + } + return conf + + def test_basic(self): + """Simple test to ensure that multiple rooms can be created and joined, + and that different rooms get handled by different instances. + """ + + self.make_worker_hs( + "synapse.app.generic_worker", {"worker_name": "worker1"}, + ) + + self.make_worker_hs( + "synapse.app.generic_worker", {"worker_name": "worker2"}, + ) + + persisted_on_1 = False + persisted_on_2 = False + + store = self.hs.get_datastore() + + user_id = self.register_user("user", "pass") + access_token = self.login("user", "pass") + + # Keep making new rooms until we see rooms being persisted on both + # workers. + for _ in range(10): + # Create a room + room = self.helper.create_room_as(user_id, tok=access_token) + + # The other user joins + self.helper.join( + room=room, user=self.other_user_id, tok=self.other_access_token + ) + + # The other user sends some messages + rseponse = self.helper.send(room, body="Hi!", tok=self.other_access_token) + event_id = rseponse["event_id"] + + # The event position includes which instance persisted the event. + pos = self.get_success(store.get_position_for_event(event_id)) + + persisted_on_1 |= pos.instance_name == "worker1" + persisted_on_2 |= pos.instance_name == "worker2" + + if persisted_on_1 and persisted_on_2: + break + + self.assertTrue(persisted_on_1) + self.assertTrue(persisted_on_2) diff --git a/tests/unittest.py b/tests/unittest.py index e654c0442d..82ede9de34 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -241,7 +241,7 @@ class HomeserverTestCase(TestCase): # create a site to wrap the resource. self.site = SynapseSite( logger_name="synapse.access.http.fake", - site_tag="test", + site_tag=self.hs.config.server.server_name, config=self.hs.config.server.listeners[0], resource=self.resource, server_version_string="1", -- cgit 1.5.1 From b460a088c647a6d3ea0e5a9f4f80d86bb9e303b3 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 7 Oct 2020 08:58:21 -0400 Subject: Add typing information to the device handler. (#8407) --- changelog.d/8407.misc | 1 + mypy.ini | 1 + synapse/handlers/device.py | 89 +++++++++++++++++++------------ synapse/storage/databases/main/devices.py | 6 +-- 4 files changed, 59 insertions(+), 38 deletions(-) create mode 100644 changelog.d/8407.misc (limited to 'mypy.ini') diff --git a/changelog.d/8407.misc b/changelog.d/8407.misc new file mode 100644 index 0000000000..d37002d75b --- /dev/null +++ b/changelog.d/8407.misc @@ -0,0 +1 @@ +Add typing information to the device handler. diff --git a/mypy.ini b/mypy.ini index e84ad04e41..a7ffb81ef1 100644 --- a/mypy.ini +++ b/mypy.ini @@ -17,6 +17,7 @@ files = synapse/federation, synapse/handlers/auth.py, synapse/handlers/cas_handler.py, + synapse/handlers/device.py, synapse/handlers/directory.py, synapse/handlers/events.py, synapse/handlers/federation.py, diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index e883ed1e37..debb1b4f29 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple from synapse.api import errors from synapse.api.constants import EventTypes @@ -29,8 +29,10 @@ from synapse.api.errors import ( from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import ( + Collection, JsonDict, StreamToken, + UserID, get_domain_from_id, get_verify_key_from_cross_signing_key, ) @@ -42,13 +44,16 @@ from synapse.util.retryutils import NotRetryingDestination from ._base import BaseHandler +if TYPE_CHECKING: + from synapse.app.homeserver import HomeServer + logger = logging.getLogger(__name__) MAX_DEVICE_DISPLAY_NAME_LEN = 100 class DeviceWorkerHandler(BaseHandler): - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.hs = hs @@ -106,7 +111,9 @@ class DeviceWorkerHandler(BaseHandler): @trace @measure_func("device.get_user_ids_changed") - async def get_user_ids_changed(self, user_id: str, from_token: StreamToken): + async def get_user_ids_changed( + self, user_id: str, from_token: StreamToken + ) -> JsonDict: """Get list of users that have had the devices updated, or have newly joined a room, that `user_id` may be interested in. """ @@ -222,8 +229,8 @@ class DeviceWorkerHandler(BaseHandler): possibly_joined = possibly_changed & users_who_share_room possibly_left = (possibly_changed | possibly_left) - users_who_share_room else: - possibly_joined = [] - possibly_left = [] + possibly_joined = set() + possibly_left = set() result = {"changed": list(possibly_joined), "left": list(possibly_left)} @@ -231,7 +238,7 @@ class DeviceWorkerHandler(BaseHandler): return result - async def on_federation_query_user_devices(self, user_id): + async def on_federation_query_user_devices(self, user_id: str) -> JsonDict: stream_id, devices = await self.store.get_e2e_device_keys_for_federation_query( user_id ) @@ -250,7 +257,7 @@ class DeviceWorkerHandler(BaseHandler): class DeviceHandler(DeviceWorkerHandler): - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.federation_sender = hs.get_federation_sender() @@ -265,7 +272,7 @@ class DeviceHandler(DeviceWorkerHandler): hs.get_distributor().observe("user_left_room", self.user_left_room) - def _check_device_name_length(self, name: str): + def _check_device_name_length(self, name: Optional[str]): """ Checks whether a device name is longer than the maximum allowed length. @@ -284,8 +291,11 @@ class DeviceHandler(DeviceWorkerHandler): ) async def check_device_registered( - self, user_id, device_id, initial_device_display_name=None - ): + self, + user_id: str, + device_id: Optional[str], + initial_device_display_name: Optional[str] = None, + ) -> str: """ If the given device has not been registered, register it with the supplied display name. @@ -293,12 +303,11 @@ class DeviceHandler(DeviceWorkerHandler): If no device_id is supplied, we make one up. Args: - user_id (str): @user:id - device_id (str | None): device id supplied by client - initial_device_display_name (str | None): device display name from - client + user_id: @user:id + device_id: device id supplied by client + initial_device_display_name: device display name from client Returns: - str: device id (generated if none was supplied) + device id (generated if none was supplied) """ self._check_device_name_length(initial_device_display_name) @@ -317,15 +326,15 @@ class DeviceHandler(DeviceWorkerHandler): # times in case of a clash. attempts = 0 while attempts < 5: - device_id = stringutils.random_string(10).upper() + new_device_id = stringutils.random_string(10).upper() new_device = await self.store.store_device( user_id=user_id, - device_id=device_id, + device_id=new_device_id, initial_device_display_name=initial_device_display_name, ) if new_device: - await self.notify_device_update(user_id, [device_id]) - return device_id + await self.notify_device_update(user_id, [new_device_id]) + return new_device_id attempts += 1 raise errors.StoreError(500, "Couldn't generate a device ID.") @@ -434,7 +443,9 @@ class DeviceHandler(DeviceWorkerHandler): @trace @measure_func("notify_device_update") - async def notify_device_update(self, user_id, device_ids): + async def notify_device_update( + self, user_id: str, device_ids: Collection[str] + ) -> None: """Notify that a user's device(s) has changed. Pokes the notifier, and remote servers if the user is local. """ @@ -446,7 +457,7 @@ class DeviceHandler(DeviceWorkerHandler): user_id ) - hosts = set() + hosts = set() # type: Set[str] if self.hs.is_mine_id(user_id): hosts.update(get_domain_from_id(u) for u in users_who_share_room) hosts.discard(self.server_name) @@ -498,7 +509,7 @@ class DeviceHandler(DeviceWorkerHandler): self.notifier.on_new_event("device_list_key", position, users=[from_user_id]) - async def user_left_room(self, user, room_id): + async def user_left_room(self, user: UserID, room_id: str) -> None: user_id = user.to_string() room_ids = await self.store.get_rooms_for_user(user_id) if not room_ids: @@ -586,7 +597,9 @@ class DeviceHandler(DeviceWorkerHandler): return {"success": True} -def _update_device_from_client_ips(device, client_ips): +def _update_device_from_client_ips( + device: Dict[str, Any], client_ips: Dict[Tuple[str, str], Dict[str, Any]] +) -> None: ip = client_ips.get((device["user_id"], device["device_id"]), {}) device.update({"last_seen_ts": ip.get("last_seen"), "last_seen_ip": ip.get("ip")}) @@ -594,7 +607,7 @@ def _update_device_from_client_ips(device, client_ips): class DeviceListUpdater: "Handles incoming device list updates from federation and updates the DB" - def __init__(self, hs, device_handler): + def __init__(self, hs: "HomeServer", device_handler: DeviceHandler): self.store = hs.get_datastore() self.federation = hs.get_federation_client() self.clock = hs.get_clock() @@ -603,7 +616,9 @@ class DeviceListUpdater: self._remote_edu_linearizer = Linearizer(name="remote_device_list") # user_id -> list of updates waiting to be handled. - self._pending_updates = {} + self._pending_updates = ( + {} + ) # type: Dict[str, List[Tuple[str, str, Iterable[str], JsonDict]]] # Recently seen stream ids. We don't bother keeping these in the DB, # but they're useful to have them about to reduce the number of spurious @@ -626,7 +641,9 @@ class DeviceListUpdater: ) @trace - async def incoming_device_list_update(self, origin, edu_content): + async def incoming_device_list_update( + self, origin: str, edu_content: JsonDict + ) -> None: """Called on incoming device list update from federation. Responsible for parsing the EDU and adding to pending updates list. """ @@ -687,7 +704,7 @@ class DeviceListUpdater: await self._handle_device_updates(user_id) @measure_func("_incoming_device_list_update") - async def _handle_device_updates(self, user_id): + async def _handle_device_updates(self, user_id: str) -> None: "Actually handle pending updates." with (await self._remote_edu_linearizer.queue(user_id)): @@ -735,7 +752,9 @@ class DeviceListUpdater: stream_id for _, stream_id, _, _ in pending_updates ) - async def _need_to_do_resync(self, user_id, updates): + async def _need_to_do_resync( + self, user_id: str, updates: Iterable[Tuple[str, str, Iterable[str], JsonDict]] + ) -> bool: """Given a list of updates for a user figure out if we need to do a full resync, or whether we have enough data that we can just apply the delta. """ @@ -766,7 +785,7 @@ class DeviceListUpdater: return False @trace - async def _maybe_retry_device_resync(self): + async def _maybe_retry_device_resync(self) -> None: """Retry to resync device lists that are out of sync, except if another retry is in progress. """ @@ -809,7 +828,7 @@ class DeviceListUpdater: async def user_device_resync( self, user_id: str, mark_failed_as_stale: bool = True - ) -> Optional[dict]: + ) -> Optional[JsonDict]: """Fetches all devices for a user and updates the device cache with them. Args: @@ -833,7 +852,7 @@ class DeviceListUpdater: # it later. await self.store.mark_remote_user_device_cache_as_stale(user_id) - return + return None except (RequestSendFailed, HttpResponseException) as e: logger.warning( "Failed to handle device list update for %s: %s", user_id, e, @@ -850,12 +869,12 @@ class DeviceListUpdater: # next time we get a device list update for this user_id. # This makes it more likely that the device lists will # eventually become consistent. - return + return None except FederationDeniedError as e: set_tag("error", True) log_kv({"reason": "FederationDeniedError"}) logger.info(e) - return + return None except Exception as e: set_tag("error", True) log_kv( @@ -868,7 +887,7 @@ class DeviceListUpdater: # it later. await self.store.mark_remote_user_device_cache_as_stale(user_id) - return + return None log_kv({"result": result}) stream_id = result["stream_id"] devices = result["devices"] @@ -929,7 +948,7 @@ class DeviceListUpdater: user_id: str, master_key: Optional[Dict[str, Any]], self_signing_key: Optional[Dict[str, Any]], - ) -> list: + ) -> List[str]: """Process the given new master and self-signing key for the given remote user. Args: diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 317d6cde95..2d0a6408b5 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -911,7 +911,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000) async def store_device( - self, user_id: str, device_id: str, initial_device_display_name: str + self, user_id: str, device_id: str, initial_device_display_name: Optional[str] ) -> bool: """Ensure the given device is known; add it to the store if not @@ -1029,7 +1029,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): ) async def update_remote_device_list_cache_entry( - self, user_id: str, device_id: str, content: JsonDict, stream_id: int + self, user_id: str, device_id: str, content: JsonDict, stream_id: str ) -> None: """Updates a single device in the cache of a remote user's devicelist. @@ -1057,7 +1057,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): user_id: str, device_id: str, content: JsonDict, - stream_id: int, + stream_id: str, ) -> None: if content.get("deleted"): self.db_pool.simple_delete_txn( -- cgit 1.5.1 From a93f3121f8fd1c2b77e003d8e43ce881635bb098 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 9 Oct 2020 07:20:51 -0400 Subject: Add type hints to some handlers (#8505) --- changelog.d/8505.misc | 1 + mypy.ini | 5 +++++ synapse/federation/federation_server.py | 2 +- synapse/handlers/account_data.py | 14 +++++++++++--- synapse/handlers/deactivate_account.py | 9 ++++++--- synapse/handlers/devicemessage.py | 25 +++++++++++++++++-------- synapse/handlers/password_policy.py | 10 +++++++--- synapse/handlers/read_marker.py | 10 ++++++++-- synapse/notifier.py | 2 +- synapse/storage/databases/main/registration.py | 4 +++- 10 files changed, 60 insertions(+), 22 deletions(-) create mode 100644 changelog.d/8505.misc (limited to 'mypy.ini') diff --git a/changelog.d/8505.misc b/changelog.d/8505.misc new file mode 100644 index 0000000000..5aa5c113bd --- /dev/null +++ b/changelog.d/8505.misc @@ -0,0 +1 @@ +Add type hints to various parts of the code base. diff --git a/mypy.ini b/mypy.ini index a7ffb81ef1..19b60f7534 100644 --- a/mypy.ini +++ b/mypy.ini @@ -15,9 +15,12 @@ files = synapse/events/builder.py, synapse/events/spamcheck.py, synapse/federation, + synapse/handlers/account_data.py, synapse/handlers/auth.py, synapse/handlers/cas_handler.py, + synapse/handlers/deactivate_account.py, synapse/handlers/device.py, + synapse/handlers/devicemessage.py, synapse/handlers/directory.py, synapse/handlers/events.py, synapse/handlers/federation.py, @@ -26,7 +29,9 @@ files = synapse/handlers/message.py, synapse/handlers/oidc_handler.py, synapse/handlers/pagination.py, + synapse/handlers/password_policy.py, synapse/handlers/presence.py, + synapse/handlers/read_marker.py, synapse/handlers/room.py, synapse/handlers/room_member.py, synapse/handlers/room_member_worker.py, diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 02f11e1209..1c7ea886c9 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -861,7 +861,7 @@ class FederationHandlerRegistry: self._edu_type_to_instance = {} # type: Dict[str, str] def register_edu_handler( - self, edu_type: str, handler: Callable[[str, dict], Awaitable[None]] + self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]] ): """Sets the handler callable that will be used to handle an incoming federation EDU of the given type. diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py index 9112a0ab86..341135822e 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py @@ -12,16 +12,24 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import TYPE_CHECKING, List, Tuple + +from synapse.types import JsonDict, UserID + +if TYPE_CHECKING: + from synapse.app.homeserver import HomeServer class AccountDataEventSource: - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() - def get_current_key(self, direction="f"): + def get_current_key(self, direction: str = "f") -> int: return self.store.get_max_account_data_stream_id() - async def get_new_events(self, user, from_key, **kwargs): + async def get_new_events( + self, user: UserID, from_key: int, **kwargs + ) -> Tuple[List[JsonDict], int]: user_id = user.to_string() last_stream_id = from_key diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index 0635ad5708..72a5831531 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import Optional +from typing import TYPE_CHECKING, Optional from synapse.api.errors import SynapseError from synapse.metrics.background_process_metrics import run_as_background_process @@ -22,13 +22,16 @@ from synapse.types import UserID, create_requester from ._base import BaseHandler +if TYPE_CHECKING: + from synapse.app.homeserver import HomeServer + logger = logging.getLogger(__name__) class DeactivateAccountHandler(BaseHandler): """Handler which deals with deactivating user accounts.""" - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.hs = hs self._auth_handler = hs.get_auth_handler() @@ -137,7 +140,7 @@ class DeactivateAccountHandler(BaseHandler): return identity_server_supports_unbinding - async def _reject_pending_invites_for_user(self, user_id: str): + async def _reject_pending_invites_for_user(self, user_id: str) -> None: """Reject pending invites addressed to a given user ID. Args: diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 64ef7f63ab..9cac5a8463 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -14,7 +14,7 @@ # limitations under the License. import logging -from typing import Any, Dict +from typing import TYPE_CHECKING, Any, Dict from synapse.api.errors import SynapseError from synapse.logging.context import run_in_background @@ -24,18 +24,22 @@ from synapse.logging.opentracing import ( set_tag, start_active_span, ) -from synapse.types import UserID, get_domain_from_id +from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.util import json_encoder from synapse.util.stringutils import random_string +if TYPE_CHECKING: + from synapse.app.homeserver import HomeServer + + logger = logging.getLogger(__name__) class DeviceMessageHandler: - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): """ Args: - hs (synapse.server.HomeServer): server + hs: server """ self.store = hs.get_datastore() self.notifier = hs.get_notifier() @@ -48,7 +52,7 @@ class DeviceMessageHandler: self._device_list_updater = hs.get_device_handler().device_list_updater - async def on_direct_to_device_edu(self, origin, content): + async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None: local_messages = {} sender_user_id = content["sender"] if origin != get_domain_from_id(sender_user_id): @@ -95,7 +99,7 @@ class DeviceMessageHandler: message_type: str, sender_user_id: str, by_device: Dict[str, Dict[str, Any]], - ): + ) -> None: """Checks inbound device messages for unknown remote devices, and if found marks the remote cache for the user as stale. """ @@ -138,11 +142,16 @@ class DeviceMessageHandler: self._device_list_updater.user_device_resync, sender_user_id ) - async def send_device_message(self, sender_user_id, message_type, messages): + async def send_device_message( + self, + sender_user_id: str, + message_type: str, + messages: Dict[str, Dict[str, JsonDict]], + ) -> None: set_tag("number_of_messages", len(messages)) set_tag("sender", sender_user_id) local_messages = {} - remote_messages = {} + remote_messages = {} # type: Dict[str, Dict[str, Dict[str, JsonDict]]] for user_id, by_device in messages.items(): # we use UserID.from_string to catch invalid user ids if self.is_mine(UserID.from_string(user_id)): diff --git a/synapse/handlers/password_policy.py b/synapse/handlers/password_policy.py index 88e2f87200..6c635cc31b 100644 --- a/synapse/handlers/password_policy.py +++ b/synapse/handlers/password_policy.py @@ -16,14 +16,18 @@ import logging import re +from typing import TYPE_CHECKING from synapse.api.errors import Codes, PasswordRefusedError +if TYPE_CHECKING: + from synapse.app.homeserver import HomeServer + logger = logging.getLogger(__name__) class PasswordPolicyHandler: - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.policy = hs.config.password_policy self.enabled = hs.config.password_policy_enabled @@ -33,11 +37,11 @@ class PasswordPolicyHandler: self.regexp_uppercase = re.compile("[A-Z]") self.regexp_lowercase = re.compile("[a-z]") - def validate_password(self, password): + def validate_password(self, password: str) -> None: """Checks whether a given password complies with the server's policy. Args: - password (str): The password to check against the server's policy. + password: The password to check against the server's policy. Raises: PasswordRefusedError: The password doesn't comply with the server's policy. diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index c32f314a1c..a7550806e6 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -14,23 +14,29 @@ # limitations under the License. import logging +from typing import TYPE_CHECKING from synapse.util.async_helpers import Linearizer from ._base import BaseHandler +if TYPE_CHECKING: + from synapse.app.homeserver import HomeServer + logger = logging.getLogger(__name__) class ReadMarkerHandler(BaseHandler): - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.server_name = hs.config.server_name self.store = hs.get_datastore() self.read_marker_linearizer = Linearizer(name="read_marker") self.notifier = hs.get_notifier() - async def received_client_read_marker(self, room_id, user_id, event_id): + async def received_client_read_marker( + self, room_id: str, user_id: str, event_id: str + ) -> None: """Updates the read marker for a given user in a given room if the event ID given is ahead in the stream relative to the current read marker. diff --git a/synapse/notifier.py b/synapse/notifier.py index 59415f6f88..13adeed01e 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -339,7 +339,7 @@ class Notifier: self, stream_key: str, new_token: Union[int, RoomStreamToken], - users: Collection[UserID] = [], + users: Collection[Union[str, UserID]] = [], rooms: Collection[str] = [], ): """ Used to inform listeners that something has happened event wise. diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index a85867936f..7fd7b0b952 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -1220,7 +1220,9 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): desc="record_user_external_id", ) - async def user_set_password_hash(self, user_id: str, password_hash: str) -> None: + async def user_set_password_hash( + self, user_id: str, password_hash: Optional[str] + ) -> None: """ NB. This does *not* evict any cache because the one use for this removes most of the entries subsequently anyway so it would be -- cgit 1.5.1 From 1781bbe319ce24e8e468f0422519dc5823d8d420 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 9 Oct 2020 11:35:11 -0400 Subject: Add type hints to response cache. (#8507) --- changelog.d/8507.misc | 1 + mypy.ini | 1 + synapse/appservice/api.py | 4 +-- synapse/federation/federation_server.py | 8 ++++-- synapse/handlers/initial_sync.py | 10 ++++--- synapse/handlers/room.py | 2 +- synapse/handlers/sync.py | 4 ++- synapse/replication/http/_base.py | 2 +- synapse/util/caches/response_cache.py | 50 ++++++++++++++++++--------------- 9 files changed, 48 insertions(+), 34 deletions(-) create mode 100644 changelog.d/8507.misc (limited to 'mypy.ini') diff --git a/changelog.d/8507.misc b/changelog.d/8507.misc new file mode 100644 index 0000000000..724da8a996 --- /dev/null +++ b/changelog.d/8507.misc @@ -0,0 +1 @@ + Add type hints to various parts of the code base. diff --git a/mypy.ini b/mypy.ini index 19b60f7534..f08fe992a4 100644 --- a/mypy.ini +++ b/mypy.ini @@ -65,6 +65,7 @@ files = synapse/types.py, synapse/util/async_helpers.py, synapse/util/caches/descriptors.py, + synapse/util/caches/response_cache.py, synapse/util/caches/stream_change_cache.py, synapse/util/metrics.py, tests/replication, diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index c526c28b93..e8f0793795 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -14,7 +14,7 @@ # limitations under the License. import logging import urllib -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, Tuple from prometheus_client import Counter @@ -93,7 +93,7 @@ class ApplicationServiceApi(SimpleHttpClient): self.protocol_meta_cache = ResponseCache( hs, "as_protocol_meta", timeout_ms=HOUR_IN_MS - ) + ) # type: ResponseCache[Tuple[str, str]] async def query_user(self, service, user_id): if service.url is None: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index e8039e244c..23278e36b7 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -116,7 +116,7 @@ class FederationServer(FederationBase): # We cache results for transaction with the same ID self._transaction_resp_cache = ResponseCache( hs, "fed_txn_handler", timeout_ms=30000 - ) + ) # type: ResponseCache[Tuple[str, str]] self.transaction_actions = TransactionActions(self.store) @@ -124,10 +124,12 @@ class FederationServer(FederationBase): # We cache responses to state queries, as they take a while and often # come in waves. - self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000) + self._state_resp_cache = ResponseCache( + hs, "state_resp", timeout_ms=30000 + ) # type: ResponseCache[Tuple[str, str]] self._state_ids_resp_cache = ResponseCache( hs, "state_ids_resp", timeout_ms=30000 - ) + ) # type: ResponseCache[Tuple[str, str]] self._federation_metrics_domains = ( hs.get_config().federation.federation_metrics_domains diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 39a85801c1..98075f48d2 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -14,7 +14,7 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional, Tuple from twisted.internet import defer @@ -47,12 +47,14 @@ class InitialSyncHandler(BaseHandler): self.state = hs.get_state_handler() self.clock = hs.get_clock() self.validator = EventValidator() - self.snapshot_cache = ResponseCache(hs, "initial_sync_cache") + self.snapshot_cache = ResponseCache( + hs, "initial_sync_cache" + ) # type: ResponseCache[Tuple[str, Optional[StreamToken], Optional[StreamToken], str, Optional[int], bool, bool]] self._event_serializer = hs.get_event_client_serializer() self.storage = hs.get_storage() self.state_store = self.storage.state - def snapshot_all_rooms( + async def snapshot_all_rooms( self, user_id: str, pagin_config: PaginationConfig, @@ -84,7 +86,7 @@ class InitialSyncHandler(BaseHandler): include_archived, ) - return self.snapshot_cache.wrap( + return await self.snapshot_cache.wrap( key, self._snapshot_all_rooms, user_id, diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 1d04d41e98..93ed51063a 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -120,7 +120,7 @@ class RoomCreationHandler(BaseHandler): # subsequent requests self._upgrade_response_cache = ResponseCache( hs, "room_upgrade", timeout_ms=FIVE_MINUTES_IN_MS - ) + ) # type: ResponseCache[Tuple[str, str]] self._server_notices_mxid = hs.config.server_notices_mxid self.third_party_event_rules = hs.get_third_party_event_rules() diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6fb8332f93..a306631094 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -243,7 +243,9 @@ class SyncHandler: self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() - self.response_cache = ResponseCache(hs, "sync") + self.response_cache = ResponseCache( + hs, "sync" + ) # type: ResponseCache[Tuple[Any, ...]] self.state = hs.get_state_handler() self.auth = hs.get_auth() self.storage = hs.get_storage() diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index 64edadb624..2b3972cb14 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -92,7 +92,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): if self.CACHE: self.response_cache = ResponseCache( hs, "repl." + self.NAME, timeout_ms=30 * 60 * 1000 - ) + ) # type: ResponseCache[str] # We reserve `instance_name` as a parameter to sending requests, so we # assert here that sub classes don't try and use the name. diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index df1a721add..32228f42ee 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Optional, TypeVar from twisted.internet import defer @@ -20,10 +21,15 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches import register_cache +if TYPE_CHECKING: + from synapse.app.homeserver import HomeServer + logger = logging.getLogger(__name__) +T = TypeVar("T") + -class ResponseCache: +class ResponseCache(Generic[T]): """ This caches a deferred response. Until the deferred completes it will be returned from the cache. This means that if the client retries the request @@ -31,8 +37,9 @@ class ResponseCache: used rather than trying to compute a new response. """ - def __init__(self, hs, name, timeout_ms=0): - self.pending_result_cache = {} # Requests that haven't finished yet. + def __init__(self, hs: "HomeServer", name: str, timeout_ms: float = 0): + # Requests that haven't finished yet. + self.pending_result_cache = {} # type: Dict[T, ObservableDeferred] self.clock = hs.get_clock() self.timeout_sec = timeout_ms / 1000.0 @@ -40,13 +47,13 @@ class ResponseCache: self._name = name self._metrics = register_cache("response_cache", name, self, resizable=False) - def size(self): + def size(self) -> int: return len(self.pending_result_cache) - def __len__(self): + def __len__(self) -> int: return self.size() - def get(self, key): + def get(self, key: T) -> Optional[defer.Deferred]: """Look up the given key. Can return either a new Deferred (which also doesn't follow the synapse @@ -58,12 +65,11 @@ class ResponseCache: from an absent cache entry. Args: - key (hashable): + key: key to get/set in the cache Returns: - twisted.internet.defer.Deferred|None|E: None if there is no entry - for this key; otherwise either a deferred result or the result - itself. + None if there is no entry for this key; otherwise a deferred which + resolves to the result. """ result = self.pending_result_cache.get(key) if result is not None: @@ -73,7 +79,7 @@ class ResponseCache: self._metrics.inc_misses() return None - def set(self, key, deferred): + def set(self, key: T, deferred: defer.Deferred) -> defer.Deferred: """Set the entry for the given key to the given deferred. *deferred* should run its callbacks in the sentinel logcontext (ie, @@ -85,12 +91,11 @@ class ResponseCache: result. You will probably want to make_deferred_yieldable the result. Args: - key (hashable): - deferred (twisted.internet.defer.Deferred[T): + key: key to get/set in the cache + deferred: The deferred which resolves to the result. Returns: - twisted.internet.defer.Deferred[T]|T: a new deferred, or the actual - result. + A new deferred which resolves to the actual result. """ result = ObservableDeferred(deferred, consumeErrors=True) self.pending_result_cache[key] = result @@ -107,7 +112,9 @@ class ResponseCache: result.addBoth(remove) return result.observe() - def wrap(self, key, callback, *args, **kwargs): + def wrap( + self, key: T, callback: "Callable[..., Any]", *args: Any, **kwargs: Any + ) -> defer.Deferred: """Wrap together a *get* and *set* call, taking care of logcontexts First looks up the key in the cache, and if it is present makes it @@ -118,21 +125,20 @@ class ResponseCache: Example usage: - @defer.inlineCallbacks - def handle_request(request): + async def handle_request(request): # etc return result - result = yield response_cache.wrap( + result = await response_cache.wrap( key, handle_request, request, ) Args: - key (hashable): key to get/set in the cache + key: key to get/set in the cache - callback (callable): function to call if the key is not found in + callback: function to call if the key is not found in the cache *args: positional parameters to pass to the callback, if it is used @@ -140,7 +146,7 @@ class ResponseCache: **kwargs: named parameters to pass to the callback, if it is used Returns: - twisted.internet.defer.Deferred: yieldable result + Deferred which resolves to the result """ result = self.get(key) if not result: -- cgit 1.5.1 From 8075504a600b47ac93faf9b605ade691ae0fbcd3 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 15 Oct 2020 11:44:39 +0100 Subject: Enable mypy for synapse.util.caches (#8547) This seemed to entail dragging in a type stub for SortedList. --- changelog.d/8547.misc | 1 + mypy.ini | 4 +- stubs/sortedcontainers/__init__.pyi | 11 +-- stubs/sortedcontainers/sortedlist.pyi | 177 ++++++++++++++++++++++++++++++++++ synapse/util/caches/ttlcache.py | 2 +- 5 files changed, 185 insertions(+), 10 deletions(-) create mode 100644 changelog.d/8547.misc create mode 100644 stubs/sortedcontainers/sortedlist.pyi (limited to 'mypy.ini') diff --git a/changelog.d/8547.misc b/changelog.d/8547.misc new file mode 100644 index 0000000000..fafb1c8347 --- /dev/null +++ b/changelog.d/8547.misc @@ -0,0 +1 @@ +Enable mypy type checking for `synapse.util.caches`. diff --git a/mypy.ini b/mypy.ini index f08fe992a4..9748f6258c 100644 --- a/mypy.ini +++ b/mypy.ini @@ -64,9 +64,7 @@ files = synapse/streams, synapse/types.py, synapse/util/async_helpers.py, - synapse/util/caches/descriptors.py, - synapse/util/caches/response_cache.py, - synapse/util/caches/stream_change_cache.py, + synapse/util/caches, synapse/util/metrics.py, tests/replication, tests/test_utils, diff --git a/stubs/sortedcontainers/__init__.pyi b/stubs/sortedcontainers/__init__.pyi index 073b806d3c..fa307483fe 100644 --- a/stubs/sortedcontainers/__init__.pyi +++ b/stubs/sortedcontainers/__init__.pyi @@ -1,13 +1,12 @@ -from .sorteddict import ( - SortedDict, - SortedKeysView, - SortedItemsView, - SortedValuesView, -) +from .sorteddict import SortedDict, SortedItemsView, SortedKeysView, SortedValuesView +from .sortedlist import SortedKeyList, SortedList, SortedListWithKey __all__ = [ "SortedDict", "SortedKeysView", "SortedItemsView", "SortedValuesView", + "SortedKeyList", + "SortedList", + "SortedListWithKey", ] diff --git a/stubs/sortedcontainers/sortedlist.pyi b/stubs/sortedcontainers/sortedlist.pyi new file mode 100644 index 0000000000..8f6086b3ff --- /dev/null +++ b/stubs/sortedcontainers/sortedlist.pyi @@ -0,0 +1,177 @@ +# stub for SortedList. This is an exact copy of +# https://github.com/grantjenks/python-sortedcontainers/blob/a419ffbd2b1c935b09f11f0971696e537fd0c510/sortedcontainers/sortedlist.pyi +# (from https://github.com/grantjenks/python-sortedcontainers/pull/107) + +from typing import ( + Any, + Callable, + Generic, + Iterable, + Iterator, + List, + MutableSequence, + Optional, + Sequence, + Tuple, + Type, + TypeVar, + Union, + overload, +) + +_T = TypeVar("_T") +_SL = TypeVar("_SL", bound=SortedList) +_SKL = TypeVar("_SKL", bound=SortedKeyList) +_Key = Callable[[_T], Any] +_Repr = Callable[[], str] + +def recursive_repr(fillvalue: str = ...) -> Callable[[_Repr], _Repr]: ... + +class SortedList(MutableSequence[_T]): + + DEFAULT_LOAD_FACTOR: int = ... + def __init__( + self, iterable: Optional[Iterable[_T]] = ..., key: Optional[_Key[_T]] = ..., + ): ... + # NB: currently mypy does not honour return type, see mypy #3307 + @overload + def __new__(cls: Type[_SL], iterable: None, key: None) -> _SL: ... + @overload + def __new__(cls: Type[_SL], iterable: None, key: _Key[_T]) -> SortedKeyList[_T]: ... + @overload + def __new__(cls: Type[_SL], iterable: Iterable[_T], key: None) -> _SL: ... + @overload + def __new__(cls, iterable: Iterable[_T], key: _Key[_T]) -> SortedKeyList[_T]: ... + @property + def key(self) -> Optional[Callable[[_T], Any]]: ... + def _reset(self, load: int) -> None: ... + def clear(self) -> None: ... + def _clear(self) -> None: ... + def add(self, value: _T) -> None: ... + def _expand(self, pos: int) -> None: ... + def update(self, iterable: Iterable[_T]) -> None: ... + def _update(self, iterable: Iterable[_T]) -> None: ... + def discard(self, value: _T) -> None: ... + def remove(self, value: _T) -> None: ... + def _delete(self, pos: int, idx: int) -> None: ... + def _loc(self, pos: int, idx: int) -> int: ... + def _pos(self, idx: int) -> int: ... + def _build_index(self) -> None: ... + def __contains__(self, value: Any) -> bool: ... + def __delitem__(self, index: Union[int, slice]) -> None: ... + @overload + def __getitem__(self, index: int) -> _T: ... + @overload + def __getitem__(self, index: slice) -> List[_T]: ... + @overload + def _getitem(self, index: int) -> _T: ... + @overload + def _getitem(self, index: slice) -> List[_T]: ... + @overload + def __setitem__(self, index: int, value: _T) -> None: ... + @overload + def __setitem__(self, index: slice, value: Iterable[_T]) -> None: ... + def __iter__(self) -> Iterator[_T]: ... + def __reversed__(self) -> Iterator[_T]: ... + def __len__(self) -> int: ... + def reverse(self) -> None: ... + def islice( + self, start: Optional[int] = ..., stop: Optional[int] = ..., reverse=bool, + ) -> Iterator[_T]: ... + def _islice( + self, min_pos: int, min_idx: int, max_pos: int, max_idx: int, reverse: bool, + ) -> Iterator[_T]: ... + def irange( + self, + minimum: Optional[int] = ..., + maximum: Optional[int] = ..., + inclusive: Tuple[bool, bool] = ..., + reverse: bool = ..., + ) -> Iterator[_T]: ... + def bisect_left(self, value: _T) -> int: ... + def bisect_right(self, value: _T) -> int: ... + def bisect(self, value: _T) -> int: ... + def _bisect_right(self, value: _T) -> int: ... + def count(self, value: _T) -> int: ... + def copy(self: _SL) -> _SL: ... + def __copy__(self: _SL) -> _SL: ... + def append(self, value: _T) -> None: ... + def extend(self, values: Iterable[_T]) -> None: ... + def insert(self, index: int, value: _T) -> None: ... + def pop(self, index: int = ...) -> _T: ... + def index( + self, value: _T, start: Optional[int] = ..., stop: Optional[int] = ... + ) -> int: ... + def __add__(self: _SL, other: Iterable[_T]) -> _SL: ... + def __radd__(self: _SL, other: Iterable[_T]) -> _SL: ... + def __iadd__(self: _SL, other: Iterable[_T]) -> _SL: ... + def __mul__(self: _SL, num: int) -> _SL: ... + def __rmul__(self: _SL, num: int) -> _SL: ... + def __imul__(self: _SL, num: int) -> _SL: ... + def __eq__(self, other: Any) -> bool: ... + def __ne__(self, other: Any) -> bool: ... + def __lt__(self, other: Sequence[_T]) -> bool: ... + def __gt__(self, other: Sequence[_T]) -> bool: ... + def __le__(self, other: Sequence[_T]) -> bool: ... + def __ge__(self, other: Sequence[_T]) -> bool: ... + def __repr__(self) -> str: ... + def _check(self) -> None: ... + +class SortedKeyList(SortedList[_T]): + def __init__( + self, iterable: Optional[Iterable[_T]] = ..., key: _Key[_T] = ... + ) -> None: ... + def __new__( + cls, iterable: Optional[Iterable[_T]] = ..., key: _Key[_T] = ... + ) -> SortedKeyList[_T]: ... + @property + def key(self) -> Callable[[_T], Any]: ... + def clear(self) -> None: ... + def _clear(self) -> None: ... + def add(self, value: _T) -> None: ... + def _expand(self, pos: int) -> None: ... + def update(self, iterable: Iterable[_T]) -> None: ... + def _update(self, iterable: Iterable[_T]) -> None: ... + # NB: Must be T to be safely passed to self.func, yet base class imposes Any + def __contains__(self, value: _T) -> bool: ... # type: ignore + def discard(self, value: _T) -> None: ... + def remove(self, value: _T) -> None: ... + def _delete(self, pos: int, idx: int) -> None: ... + def irange( + self, + minimum: Optional[int] = ..., + maximum: Optional[int] = ..., + inclusive: Tuple[bool, bool] = ..., + reverse: bool = ..., + ): ... + def irange_key( + self, + min_key: Optional[Any] = ..., + max_key: Optional[Any] = ..., + inclusive: Tuple[bool, bool] = ..., + reserve: bool = ..., + ): ... + def bisect_left(self, value: _T) -> int: ... + def bisect_right(self, value: _T) -> int: ... + def bisect(self, value: _T) -> int: ... + def bisect_key_left(self, key: Any) -> int: ... + def _bisect_key_left(self, key: Any) -> int: ... + def bisect_key_right(self, key: Any) -> int: ... + def _bisect_key_right(self, key: Any) -> int: ... + def bisect_key(self, key: Any) -> int: ... + def count(self, value: _T) -> int: ... + def copy(self: _SKL) -> _SKL: ... + def __copy__(self: _SKL) -> _SKL: ... + def index( + self, value: _T, start: Optional[int] = ..., stop: Optional[int] = ... + ) -> int: ... + def __add__(self: _SKL, other: Iterable[_T]) -> _SKL: ... + def __radd__(self: _SKL, other: Iterable[_T]) -> _SKL: ... + def __iadd__(self: _SKL, other: Iterable[_T]) -> _SKL: ... + def __mul__(self: _SKL, num: int) -> _SKL: ... + def __rmul__(self: _SKL, num: int) -> _SKL: ... + def __imul__(self: _SKL, num: int) -> _SKL: ... + def __repr__(self) -> str: ... + def _check(self) -> None: ... + +SortedListWithKey = SortedKeyList diff --git a/synapse/util/caches/ttlcache.py b/synapse/util/caches/ttlcache.py index 3e180cafd3..6ce2a3d12b 100644 --- a/synapse/util/caches/ttlcache.py +++ b/synapse/util/caches/ttlcache.py @@ -34,7 +34,7 @@ class TTLCache: self._data = {} # the _CacheEntries, sorted by expiry time - self._expiry_list = SortedList() + self._expiry_list = SortedList() # type: SortedList[_CacheEntry] self._timer = timer -- cgit 1.5.1 From c276bd996916adce899410b9c4c891892f51b992 Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Thu, 15 Oct 2020 17:33:28 +0100 Subject: Send some ephemeral events to appservices (#8437) Optionally sends typing, presence, and read receipt information to appservices. --- changelog.d/8437.feature | 1 + mypy.ini | 1 + synapse/appservice/__init__.py | 180 ++++++++++++++------- synapse/appservice/api.py | 27 +++- synapse/appservice/scheduler.py | 48 ++++-- synapse/config/appservice.py | 3 + synapse/handlers/appservice.py | 109 ++++++++++++- synapse/handlers/receipts.py | 35 +++- synapse/handlers/sync.py | 1 - synapse/handlers/typing.py | 31 +++- synapse/notifier.py | 25 +++ synapse/storage/databases/main/appservice.py | 66 ++++++-- synapse/storage/databases/main/receipts.py | 55 +++++++ .../main/schema/delta/59/19as_device_stream.sql | 18 +++ tests/appservice/test_scheduler.py | 77 ++++++--- tests/storage/test_appservice.py | 8 +- 16 files changed, 563 insertions(+), 122 deletions(-) create mode 100644 changelog.d/8437.feature create mode 100644 synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql (limited to 'mypy.ini') diff --git a/changelog.d/8437.feature b/changelog.d/8437.feature new file mode 100644 index 0000000000..4abcccb326 --- /dev/null +++ b/changelog.d/8437.feature @@ -0,0 +1 @@ +Implement [MSC2409](https://github.com/matrix-org/matrix-doc/pull/2409) to send typing, read receipts, and presence events to appservices. diff --git a/mypy.ini b/mypy.ini index 9748f6258c..b5db54ee3b 100644 --- a/mypy.ini +++ b/mypy.ini @@ -15,6 +15,7 @@ files = synapse/events/builder.py, synapse/events/spamcheck.py, synapse/federation, + synapse/handlers/appservice.py, synapse/handlers/account_data.py, synapse/handlers/auth.py, synapse/handlers/cas_handler.py, diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 13ec1f71a6..3862d9c08f 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -14,14 +14,15 @@ # limitations under the License. import logging import re -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Iterable, List, Match, Optional from synapse.api.constants import EventTypes -from synapse.appservice.api import ApplicationServiceApi -from synapse.types import GroupID, get_domain_from_id +from synapse.events import EventBase +from synapse.types import GroupID, JsonDict, UserID, get_domain_from_id from synapse.util.caches.descriptors import cached if TYPE_CHECKING: + from synapse.appservice.api import ApplicationServiceApi from synapse.storage.databases.main import DataStore logger = logging.getLogger(__name__) @@ -32,38 +33,6 @@ class ApplicationServiceState: UP = "up" -class AppServiceTransaction: - """Represents an application service transaction.""" - - def __init__(self, service, id, events): - self.service = service - self.id = id - self.events = events - - async def send(self, as_api: ApplicationServiceApi) -> bool: - """Sends this transaction using the provided AS API interface. - - Args: - as_api: The API to use to send. - Returns: - True if the transaction was sent. - """ - return await as_api.push_bulk( - service=self.service, events=self.events, txn_id=self.id - ) - - async def complete(self, store: "DataStore") -> None: - """Completes this transaction as successful. - - Marks this transaction ID on the application service and removes the - transaction contents from the database. - - Args: - store: The database store to operate on. - """ - await store.complete_appservice_txn(service=self.service, txn_id=self.id) - - class ApplicationService: """Defines an application service. This definition is mostly what is provided to the /register AS API. @@ -91,6 +60,7 @@ class ApplicationService: protocols=None, rate_limited=True, ip_range_whitelist=None, + supports_ephemeral=False, ): self.token = token self.url = ( @@ -102,6 +72,7 @@ class ApplicationService: self.namespaces = self._check_namespaces(namespaces) self.id = id self.ip_range_whitelist = ip_range_whitelist + self.supports_ephemeral = supports_ephemeral if "|" in self.id: raise Exception("application service ID cannot contain '|' character") @@ -161,19 +132,21 @@ class ApplicationService: raise ValueError("Expected string for 'regex' in ns '%s'" % ns) return namespaces - def _matches_regex(self, test_string, namespace_key): + def _matches_regex(self, test_string: str, namespace_key: str) -> Optional[Match]: for regex_obj in self.namespaces[namespace_key]: if regex_obj["regex"].match(test_string): return regex_obj return None - def _is_exclusive(self, ns_key, test_string): + def _is_exclusive(self, ns_key: str, test_string: str) -> bool: regex_obj = self._matches_regex(test_string, ns_key) if regex_obj: return regex_obj["exclusive"] return False - async def _matches_user(self, event, store): + async def _matches_user( + self, event: Optional[EventBase], store: Optional["DataStore"] = None + ) -> bool: if not event: return False @@ -188,14 +161,23 @@ class ApplicationService: if not store: return False - does_match = await self._matches_user_in_member_list(event.room_id, store) + does_match = await self.matches_user_in_member_list(event.room_id, store) return does_match - @cached(num_args=1, cache_context=True) - async def _matches_user_in_member_list(self, room_id, store, cache_context): - member_list = await store.get_users_in_room( - room_id, on_invalidate=cache_context.invalidate - ) + @cached(num_args=1) + async def matches_user_in_member_list( + self, room_id: str, store: "DataStore" + ) -> bool: + """Check if this service is interested a room based upon it's membership + + Args: + room_id: The room to check. + store: The datastore to query. + + Returns: + True if this service would like to know about this room. + """ + member_list = await store.get_users_in_room(room_id) # check joined member events for user_id in member_list: @@ -203,12 +185,14 @@ class ApplicationService: return True return False - def _matches_room_id(self, event): + def _matches_room_id(self, event: EventBase) -> bool: if hasattr(event, "room_id"): return self.is_interested_in_room(event.room_id) return False - async def _matches_aliases(self, event, store): + async def _matches_aliases( + self, event: EventBase, store: Optional["DataStore"] = None + ) -> bool: if not store or not event: return False @@ -218,12 +202,15 @@ class ApplicationService: return True return False - async def is_interested(self, event, store=None) -> bool: + async def is_interested( + self, event: EventBase, store: Optional["DataStore"] = None + ) -> bool: """Check if this service is interested in this event. Args: - event(Event): The event to check. - store(DataStore) + event: The event to check. + store: The datastore to query. + Returns: True if this service would like to know about this event. """ @@ -231,39 +218,66 @@ class ApplicationService: if self._matches_room_id(event): return True + # This will check the namespaces first before + # checking the store, so should be run before _matches_aliases + if await self._matches_user(event, store): + return True + + # This will check the store, so should be run last if await self._matches_aliases(event, store): return True - if await self._matches_user(event, store): + return False + + @cached(num_args=1) + async def is_interested_in_presence( + self, user_id: UserID, store: "DataStore" + ) -> bool: + """Check if this service is interested a user's presence + + Args: + user_id: The user to check. + store: The datastore to query. + + Returns: + True if this service would like to know about presence for this user. + """ + # Find all the rooms the sender is in + if self.is_interested_in_user(user_id.to_string()): return True + room_ids = await store.get_rooms_for_user(user_id.to_string()) + # Then find out if the appservice is interested in any of those rooms + for room_id in room_ids: + if await self.matches_user_in_member_list(room_id, store): + return True return False - def is_interested_in_user(self, user_id): + def is_interested_in_user(self, user_id: str) -> bool: return ( - self._matches_regex(user_id, ApplicationService.NS_USERS) + bool(self._matches_regex(user_id, ApplicationService.NS_USERS)) or user_id == self.sender ) - def is_interested_in_alias(self, alias): + def is_interested_in_alias(self, alias: str) -> bool: return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES)) - def is_interested_in_room(self, room_id): + def is_interested_in_room(self, room_id: str) -> bool: return bool(self._matches_regex(room_id, ApplicationService.NS_ROOMS)) - def is_exclusive_user(self, user_id): + def is_exclusive_user(self, user_id: str) -> bool: return ( self._is_exclusive(ApplicationService.NS_USERS, user_id) or user_id == self.sender ) - def is_interested_in_protocol(self, protocol): + def is_interested_in_protocol(self, protocol: str) -> bool: return protocol in self.protocols - def is_exclusive_alias(self, alias): + def is_exclusive_alias(self, alias: str) -> bool: return self._is_exclusive(ApplicationService.NS_ALIASES, alias) - def is_exclusive_room(self, room_id): + def is_exclusive_room(self, room_id: str) -> bool: return self._is_exclusive(ApplicationService.NS_ROOMS, room_id) def get_exclusive_user_regexes(self): @@ -276,14 +290,14 @@ class ApplicationService: if regex_obj["exclusive"] ] - def get_groups_for_user(self, user_id): + def get_groups_for_user(self, user_id: str) -> Iterable[str]: """Get the groups that this user is associated with by this AS Args: - user_id (str): The ID of the user. + user_id: The ID of the user. Returns: - iterable[str]: an iterable that yields group_id strings. + An iterable that yields group_id strings. """ return ( regex_obj["group_id"] @@ -291,7 +305,7 @@ class ApplicationService: if "group_id" in regex_obj and regex_obj["regex"].match(user_id) ) - def is_rate_limited(self): + def is_rate_limited(self) -> bool: return self.rate_limited def __str__(self): @@ -300,3 +314,45 @@ class ApplicationService: dict_copy["token"] = "" dict_copy["hs_token"] = "" return "ApplicationService: %s" % (dict_copy,) + + +class AppServiceTransaction: + """Represents an application service transaction.""" + + def __init__( + self, + service: ApplicationService, + id: int, + events: List[EventBase], + ephemeral: List[JsonDict], + ): + self.service = service + self.id = id + self.events = events + self.ephemeral = ephemeral + + async def send(self, as_api: "ApplicationServiceApi") -> bool: + """Sends this transaction using the provided AS API interface. + + Args: + as_api: The API to use to send. + Returns: + True if the transaction was sent. + """ + return await as_api.push_bulk( + service=self.service, + events=self.events, + ephemeral=self.ephemeral, + txn_id=self.id, + ) + + async def complete(self, store: "DataStore") -> None: + """Completes this transaction as successful. + + Marks this transaction ID on the application service and removes the + transaction contents from the database. + + Args: + store: The database store to operate on. + """ + await store.complete_appservice_txn(service=self.service, txn_id=self.id) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index e8f0793795..e366a982b8 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -14,12 +14,13 @@ # limitations under the License. import logging import urllib -from typing import TYPE_CHECKING, Optional, Tuple +from typing import TYPE_CHECKING, List, Optional, Tuple from prometheus_client import Counter from synapse.api.constants import EventTypes, ThirdPartyEntityKind from synapse.api.errors import CodeMessageException +from synapse.events import EventBase from synapse.events.utils import serialize_event from synapse.http.client import SimpleHttpClient from synapse.types import JsonDict, ThirdPartyInstanceID @@ -201,7 +202,13 @@ class ApplicationServiceApi(SimpleHttpClient): key = (service.id, protocol) return await self.protocol_meta_cache.wrap(key, _get) - async def push_bulk(self, service, events, txn_id=None): + async def push_bulk( + self, + service: "ApplicationService", + events: List[EventBase], + ephemeral: List[JsonDict], + txn_id: Optional[int] = None, + ): if service.url is None: return True @@ -211,15 +218,19 @@ class ApplicationServiceApi(SimpleHttpClient): logger.warning( "push_bulk: Missing txn ID sending events to %s", service.url ) - txn_id = str(0) - txn_id = str(txn_id) + txn_id = 0 + + uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id))) + + # Never send ephemeral events to appservices that do not support it + if service.supports_ephemeral: + body = {"events": events, "de.sorunome.msc2409.ephemeral": ephemeral} + else: + body = {"events": events} - uri = service.url + ("/transactions/%s" % urllib.parse.quote(txn_id)) try: await self.put_json( - uri=uri, - json_body={"events": events}, - args={"access_token": service.hs_token}, + uri=uri, json_body=body, args={"access_token": service.hs_token}, ) sent_transactions_counter.labels(service.id).inc() sent_events_counter.labels(service.id).inc(len(events)) diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 8eb8c6f51c..ad3c408519 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -49,10 +49,13 @@ This is all tied together by the AppServiceScheduler which DIs the required components. """ import logging +from typing import List -from synapse.appservice import ApplicationServiceState +from synapse.appservice import ApplicationService, ApplicationServiceState +from synapse.events import EventBase from synapse.logging.context import run_in_background from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import JsonDict logger = logging.getLogger(__name__) @@ -82,8 +85,13 @@ class ApplicationServiceScheduler: for service in services: self.txn_ctrl.start_recoverer(service) - def submit_event_for_as(self, service, event): - self.queuer.enqueue(service, event) + def submit_event_for_as(self, service: ApplicationService, event: EventBase): + self.queuer.enqueue_event(service, event) + + def submit_ephemeral_events_for_as( + self, service: ApplicationService, events: List[JsonDict] + ): + self.queuer.enqueue_ephemeral(service, events) class _ServiceQueuer: @@ -96,17 +104,15 @@ class _ServiceQueuer: def __init__(self, txn_ctrl, clock): self.queued_events = {} # dict of {service_id: [events]} + self.queued_ephemeral = {} # dict of {service_id: [events]} # the appservices which currently have a transaction in flight self.requests_in_flight = set() self.txn_ctrl = txn_ctrl self.clock = clock - def enqueue(self, service, event): - self.queued_events.setdefault(service.id, []).append(event) - + def _start_background_request(self, service): # start a sender for this appservice if we don't already have one - if service.id in self.requests_in_flight: return @@ -114,7 +120,15 @@ class _ServiceQueuer: "as-sender-%s" % (service.id,), self._send_request, service ) - async def _send_request(self, service): + def enqueue_event(self, service: ApplicationService, event: EventBase): + self.queued_events.setdefault(service.id, []).append(event) + self._start_background_request(service) + + def enqueue_ephemeral(self, service: ApplicationService, events: List[JsonDict]): + self.queued_ephemeral.setdefault(service.id, []).extend(events) + self._start_background_request(service) + + async def _send_request(self, service: ApplicationService): # sanity-check: we shouldn't get here if this service already has a sender # running. assert service.id not in self.requests_in_flight @@ -123,10 +137,11 @@ class _ServiceQueuer: try: while True: events = self.queued_events.pop(service.id, []) - if not events: + ephemeral = self.queued_ephemeral.pop(service.id, []) + if not events and not ephemeral: return try: - await self.txn_ctrl.send(service, events) + await self.txn_ctrl.send(service, events, ephemeral) except Exception: logger.exception("AS request failed") finally: @@ -158,9 +173,16 @@ class _TransactionController: # for UTs self.RECOVERER_CLASS = _Recoverer - async def send(self, service, events): + async def send( + self, + service: ApplicationService, + events: List[EventBase], + ephemeral: List[JsonDict] = [], + ): try: - txn = await self.store.create_appservice_txn(service=service, events=events) + txn = await self.store.create_appservice_txn( + service=service, events=events, ephemeral=ephemeral + ) service_is_up = await self._is_service_up(service) if service_is_up: sent = await txn.send(self.as_api) @@ -204,7 +226,7 @@ class _TransactionController: recoverer.recover() logger.info("Now %i active recoverers", len(self.recoverers)) - async def _is_service_up(self, service): + async def _is_service_up(self, service: ApplicationService) -> bool: state = await self.store.get_appservice_state(service) return state == ApplicationServiceState.UP or state is None diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index 8ed3e24258..746fc3cc02 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -160,6 +160,8 @@ def _load_appservice(hostname, as_info, config_filename): if as_info.get("ip_range_whitelist"): ip_range_whitelist = IPSet(as_info.get("ip_range_whitelist")) + supports_ephemeral = as_info.get("de.sorunome.msc2409.push_ephemeral", False) + return ApplicationService( token=as_info["as_token"], hostname=hostname, @@ -168,6 +170,7 @@ def _load_appservice(hostname, as_info, config_filename): hs_token=as_info["hs_token"], sender=user_id, id=as_info["id"], + supports_ephemeral=supports_ephemeral, protocols=protocols, rate_limited=rate_limited, ip_range_whitelist=ip_range_whitelist, diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index c8d5e58035..07240d3a14 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +from typing import Dict, List, Optional from prometheus_client import Counter @@ -21,13 +22,16 @@ from twisted.internet import defer import synapse from synapse.api.constants import EventTypes +from synapse.appservice import ApplicationService +from synapse.events import EventBase +from synapse.handlers.presence import format_user_presence_state from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics import ( event_processing_loop_counter, event_processing_loop_room_count, ) from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import RoomStreamToken +from synapse.types import Collection, JsonDict, RoomStreamToken, UserID from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -44,6 +48,7 @@ class ApplicationServicesHandler: self.started_scheduler = False self.clock = hs.get_clock() self.notify_appservices = hs.config.notify_appservices + self.event_sources = hs.get_event_sources() self.current_max = 0 self.is_processing = False @@ -82,7 +87,7 @@ class ApplicationServicesHandler: if not events: break - events_by_room = {} + events_by_room = {} # type: Dict[str, List[EventBase]] for event in events: events_by_room.setdefault(event.room_id, []).append(event) @@ -161,6 +166,104 @@ class ApplicationServicesHandler: finally: self.is_processing = False + async def notify_interested_services_ephemeral( + self, stream_key: str, new_token: Optional[int], users: Collection[UserID] = [], + ): + """This is called by the notifier in the background + when a ephemeral event handled by the homeserver. + + This will determine which appservices + are interested in the event, and submit them. + + Events will only be pushed to appservices + that have opted into ephemeral events + + Args: + stream_key: The stream the event came from. + new_token: The latest stream token + users: The user(s) involved with the event. + """ + services = [ + service + for service in self.store.get_app_services() + if service.supports_ephemeral + ] + if not services or not self.notify_appservices: + return + logger.info("Checking interested services for %s" % (stream_key)) + with Measure(self.clock, "notify_interested_services_ephemeral"): + for service in services: + # Only handle typing if we have the latest token + if stream_key == "typing_key" and new_token is not None: + events = await self._handle_typing(service, new_token) + if events: + self.scheduler.submit_ephemeral_events_for_as(service, events) + # We don't persist the token for typing_key for performance reasons + elif stream_key == "receipt_key": + events = await self._handle_receipts(service) + if events: + self.scheduler.submit_ephemeral_events_for_as(service, events) + await self.store.set_type_stream_id_for_appservice( + service, "read_receipt", new_token + ) + elif stream_key == "presence_key": + events = await self._handle_presence(service, users) + if events: + self.scheduler.submit_ephemeral_events_for_as(service, events) + await self.store.set_type_stream_id_for_appservice( + service, "presence", new_token + ) + + async def _handle_typing(self, service: ApplicationService, new_token: int): + typing_source = self.event_sources.sources["typing"] + # Get the typing events from just before current + typing, _ = await typing_source.get_new_events_as( + service=service, + # For performance reasons, we don't persist the previous + # token in the DB and instead fetch the latest typing information + # for appservices. + from_key=new_token - 1, + ) + return typing + + async def _handle_receipts(self, service: ApplicationService): + from_key = await self.store.get_type_stream_id_for_appservice( + service, "read_receipt" + ) + receipts_source = self.event_sources.sources["receipt"] + receipts, _ = await receipts_source.get_new_events_as( + service=service, from_key=from_key + ) + return receipts + + async def _handle_presence( + self, service: ApplicationService, users: Collection[UserID] + ): + events = [] # type: List[JsonDict] + presence_source = self.event_sources.sources["presence"] + from_key = await self.store.get_type_stream_id_for_appservice( + service, "presence" + ) + for user in users: + interested = await service.is_interested_in_presence(user, self.store) + if not interested: + continue + presence_events, _ = await presence_source.get_new_events( + user=user, service=service, from_key=from_key, + ) + time_now = self.clock.time_msec() + presence_events = [ + { + "type": "m.presence", + "sender": event.user_id, + "content": format_user_presence_state( + event, time_now, include_user_id=False + ), + } + for event in presence_events + ] + events = events + presence_events + async def query_user_exists(self, user_id): """Check if any application service knows this user_id exists. @@ -223,7 +326,7 @@ class ApplicationServicesHandler: async def get_3pe_protocols(self, only_protocol=None): services = self.store.get_app_services() - protocols = {} + protocols = {} # type: Dict[str, List[JsonDict]] # Collect up all the individual protocol responses out of the ASes for s in services: diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 7225923757..c242c409cf 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -13,9 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from typing import List, Tuple +from synapse.appservice import ApplicationService from synapse.handlers._base import BaseHandler -from synapse.types import ReadReceipt, get_domain_from_id +from synapse.types import JsonDict, ReadReceipt, get_domain_from_id from synapse.util.async_helpers import maybe_awaitable logger = logging.getLogger(__name__) @@ -140,5 +142,36 @@ class ReceiptEventSource: return (events, to_key) + async def get_new_events_as( + self, from_key: int, service: ApplicationService + ) -> Tuple[List[JsonDict], int]: + """Returns a set of new receipt events that an appservice + may be interested in. + + Args: + from_key: the stream position at which events should be fetched from + service: The appservice which may be interested + """ + from_key = int(from_key) + to_key = self.get_current_key() + + if from_key == to_key: + return [], to_key + + # We first need to fetch all new receipts + rooms_to_events = await self.store.get_linearized_receipts_for_all_rooms( + from_key=from_key, to_key=to_key + ) + + # Then filter down to rooms that the AS can read + events = [] + for room_id, event in rooms_to_events.items(): + if not await service.matches_user_in_member_list(room_id, self.store): + continue + + events.append(event) + + return (events, to_key) + def get_current_key(self, direction="f"): return self.store.get_max_receipt_stream_id() diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index a306631094..b527724bc4 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -13,7 +13,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import itertools import logging from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 3cbfc2d780..d3692842e3 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -12,16 +12,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import random from collections import namedtuple from typing import TYPE_CHECKING, List, Set, Tuple from synapse.api.errors import AuthError, ShadowBanError, SynapseError +from synapse.appservice import ApplicationService from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.streams import TypingStream -from synapse.types import UserID, get_domain_from_id +from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer @@ -430,6 +430,33 @@ class TypingNotificationEventSource: "content": {"user_ids": list(typing)}, } + async def get_new_events_as( + self, from_key: int, service: ApplicationService + ) -> Tuple[List[JsonDict], int]: + """Returns a set of new typing events that an appservice + may be interested in. + + Args: + from_key: the stream position at which events should be fetched from + service: The appservice which may be interested + """ + with Measure(self.clock, "typing.get_new_events_as"): + from_key = int(from_key) + handler = self.get_typing_handler() + + events = [] + for room_id in handler._room_serials.keys(): + if handler._room_serials[room_id] <= from_key: + continue + if not await service.matches_user_in_member_list( + room_id, handler.store + ): + continue + + events.append(self._make_event_for(room_id)) + + return (events, handler._latest_room_serial) + async def get_new_events(self, from_key, room_ids, **kwargs): with Measure(self.clock, "typing.get_new_events"): from_key = int(from_key) diff --git a/synapse/notifier.py b/synapse/notifier.py index 51c830c91e..2e993411b9 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -329,6 +329,22 @@ class Notifier: except Exception: logger.exception("Error notifying application services of event") + async def _notify_app_services_ephemeral( + self, + stream_key: str, + new_token: Union[int, RoomStreamToken], + users: Collection[UserID] = [], + ): + try: + stream_token = None + if isinstance(new_token, int): + stream_token = new_token + await self.appservice_handler.notify_interested_services_ephemeral( + stream_key, stream_token, users + ) + except Exception: + logger.exception("Error notifying application services of event") + async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken): try: await self._pusher_pool.on_new_notifications(max_room_stream_token) @@ -367,6 +383,15 @@ class Notifier: self.notify_replication() + # Notify appservices + run_as_background_process( + "_notify_app_services_ephemeral", + self._notify_app_services_ephemeral, + stream_key, + new_token, + users, + ) + def on_new_replication_data(self) -> None: """Used to inform replication listeners that something has happend without waking up any of the normal user event streams""" diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 85f6b1e3fd..43bf0f649a 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -15,12 +15,15 @@ # limitations under the License. import logging import re +from typing import List -from synapse.appservice import AppServiceTransaction +from synapse.appservice import ApplicationService, AppServiceTransaction from synapse.config.appservice import load_appservices +from synapse.events import EventBase from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool from synapse.storage.databases.main.events_worker import EventsWorkerStore +from synapse.types import JsonDict from synapse.util import json_encoder logger = logging.getLogger(__name__) @@ -172,15 +175,23 @@ class ApplicationServiceTransactionWorkerStore( "application_services_state", {"as_id": service.id}, {"state": state} ) - async def create_appservice_txn(self, service, events): + async def create_appservice_txn( + self, + service: ApplicationService, + events: List[EventBase], + ephemeral: List[JsonDict], + ) -> AppServiceTransaction: """Atomically creates a new transaction for this application service - with the given list of events. + with the given list of events. Ephemeral events are NOT persisted to the + database and are not resent if a transaction is retried. Args: - service(ApplicationService): The service who the transaction is for. - events(list): A list of events to put in the transaction. + service: The service who the transaction is for. + events: A list of persistent events to put in the transaction. + ephemeral: A list of ephemeral events to put in the transaction. + Returns: - AppServiceTransaction: A new transaction. + A new transaction. """ def _create_appservice_txn(txn): @@ -207,7 +218,9 @@ class ApplicationServiceTransactionWorkerStore( "VALUES(?,?,?)", (service.id, new_txn_id, event_ids), ) - return AppServiceTransaction(service=service, id=new_txn_id, events=events) + return AppServiceTransaction( + service=service, id=new_txn_id, events=events, ephemeral=ephemeral + ) return await self.db_pool.runInteraction( "create_appservice_txn", _create_appservice_txn @@ -296,7 +309,9 @@ class ApplicationServiceTransactionWorkerStore( events = await self.get_events_as_list(event_ids) - return AppServiceTransaction(service=service, id=entry["txn_id"], events=events) + return AppServiceTransaction( + service=service, id=entry["txn_id"], events=events, ephemeral=[] + ) def _get_last_txn(self, txn, service_id): txn.execute( @@ -320,7 +335,7 @@ class ApplicationServiceTransactionWorkerStore( ) async def get_new_events_for_appservice(self, current_id, limit): - """Get all new evnets""" + """Get all new events for an appservice""" def get_new_events_for_appservice_txn(txn): sql = ( @@ -351,6 +366,39 @@ class ApplicationServiceTransactionWorkerStore( return upper_bound, events + async def get_type_stream_id_for_appservice( + self, service: ApplicationService, type: str + ) -> int: + def get_type_stream_id_for_appservice_txn(txn): + stream_id_type = "%s_stream_id" % type + txn.execute( + "SELECT ? FROM application_services_state WHERE as_id=?", + (stream_id_type, service.id,), + ) + last_txn_id = txn.fetchone() + if last_txn_id is None or last_txn_id[0] is None: # no row exists + return 0 + else: + return int(last_txn_id[0]) + + return await self.db_pool.runInteraction( + "get_type_stream_id_for_appservice", get_type_stream_id_for_appservice_txn + ) + + async def set_type_stream_id_for_appservice( + self, service: ApplicationService, type: str, pos: int + ) -> None: + def set_type_stream_id_for_appservice_txn(txn): + stream_id_type = "%s_stream_id" % type + txn.execute( + "UPDATE ? SET device_list_stream_id = ? WHERE as_id=?", + (stream_id_type, pos, service.id), + ) + + await self.db_pool.runInteraction( + "set_type_stream_id_for_appservice", set_type_stream_id_for_appservice_txn + ) + class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStore): # This is currently empty due to there not being any AS storage functions diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index c79ddff680..5cdf16521c 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -23,6 +23,7 @@ from twisted.internet import defer from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import DatabasePool from synapse.storage.util.id_generators import StreamIdGenerator +from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.descriptors import cached, cachedList @@ -274,6 +275,60 @@ class ReceiptsWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta): } return results + @cached(num_args=2,) + async def get_linearized_receipts_for_all_rooms( + self, to_key: int, from_key: Optional[int] = None + ) -> Dict[str, JsonDict]: + """Get receipts for all rooms between two stream_ids. + + Args: + to_key: Max stream id to fetch receipts upto. + from_key: Min stream id to fetch receipts from. None fetches + from the start. + + Returns: + A dictionary of roomids to a list of receipts. + """ + + def f(txn): + if from_key: + sql = """ + SELECT * FROM receipts_linearized WHERE + stream_id > ? AND stream_id <= ? + """ + txn.execute(sql, [from_key, to_key]) + else: + sql = """ + SELECT * FROM receipts_linearized WHERE + stream_id <= ? + """ + + txn.execute(sql, [to_key]) + + return self.db_pool.cursor_to_dict(txn) + + txn_results = await self.db_pool.runInteraction( + "get_linearized_receipts_for_all_rooms", f + ) + + results = {} + for row in txn_results: + # We want a single event per room, since we want to batch the + # receipts by room, event and type. + room_event = results.setdefault( + row["room_id"], + {"type": "m.receipt", "room_id": row["room_id"], "content": {}}, + ) + + # The content is of the form: + # {"$foo:bar": { "read": { "@user:host": }, .. }, .. } + event_entry = room_event["content"].setdefault(row["event_id"], {}) + receipt_type = event_entry.setdefault(row["receipt_type"], {}) + + receipt_type[row["user_id"]] = db_to_json(row["data"]) + + return results + async def get_users_sent_receipts_between( self, last_id: int, current_id: int ) -> List[str]: diff --git a/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql b/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql new file mode 100644 index 0000000000..20f5a95a24 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql @@ -0,0 +1,18 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE application_services_state + ADD COLUMN read_receipt_stream_id INT, + ADD COLUMN presence_stream_id INT; \ No newline at end of file diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 68a4caabbf..2acb8b7603 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -60,7 +60,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events))) self.store.create_appservice_txn.assert_called_once_with( - service=service, events=events # txn made and saved + service=service, events=events, ephemeral=[] # txn made and saved ) self.assertEquals(0, len(self.txnctrl.recoverers)) # no recoverer made txn.complete.assert_called_once_with(self.store) # txn completed @@ -81,7 +81,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events))) self.store.create_appservice_txn.assert_called_once_with( - service=service, events=events # txn made and saved + service=service, events=events, ephemeral=[] # txn made and saved ) self.assertEquals(0, txn.send.call_count) # txn not sent though self.assertEquals(0, txn.complete.call_count) # or completed @@ -106,7 +106,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events))) self.store.create_appservice_txn.assert_called_once_with( - service=service, events=events + service=service, events=events, ephemeral=[] ) self.assertEquals(1, self.recoverer_fn.call_count) # recoverer made self.assertEquals(1, self.recoverer.recover.call_count) # and invoked @@ -202,26 +202,28 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase): # Expect the event to be sent immediately. service = Mock(id=4) event = Mock() - self.queuer.enqueue(service, event) - self.txn_ctrl.send.assert_called_once_with(service, [event]) + self.queuer.enqueue_event(service, event) + self.txn_ctrl.send.assert_called_once_with(service, [event], []) def test_send_single_event_with_queue(self): d = defer.Deferred() - self.txn_ctrl.send = Mock(side_effect=lambda x, y: make_deferred_yieldable(d)) + self.txn_ctrl.send = Mock( + side_effect=lambda x, y, z: make_deferred_yieldable(d) + ) service = Mock(id=4) event = Mock(event_id="first") event2 = Mock(event_id="second") event3 = Mock(event_id="third") # Send an event and don't resolve it just yet. - self.queuer.enqueue(service, event) + self.queuer.enqueue_event(service, event) # Send more events: expect send() to NOT be called multiple times. - self.queuer.enqueue(service, event2) - self.queuer.enqueue(service, event3) - self.txn_ctrl.send.assert_called_with(service, [event]) + self.queuer.enqueue_event(service, event2) + self.queuer.enqueue_event(service, event3) + self.txn_ctrl.send.assert_called_with(service, [event], []) self.assertEquals(1, self.txn_ctrl.send.call_count) # Resolve the send event: expect the queued events to be sent d.callback(service) - self.txn_ctrl.send.assert_called_with(service, [event2, event3]) + self.txn_ctrl.send.assert_called_with(service, [event2, event3], []) self.assertEquals(2, self.txn_ctrl.send.call_count) def test_multiple_service_queues(self): @@ -239,21 +241,58 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase): send_return_list = [srv_1_defer, srv_2_defer] - def do_send(x, y): + def do_send(x, y, z): return make_deferred_yieldable(send_return_list.pop(0)) self.txn_ctrl.send = Mock(side_effect=do_send) # send events for different ASes and make sure they are sent - self.queuer.enqueue(srv1, srv_1_event) - self.queuer.enqueue(srv1, srv_1_event2) - self.txn_ctrl.send.assert_called_with(srv1, [srv_1_event]) - self.queuer.enqueue(srv2, srv_2_event) - self.queuer.enqueue(srv2, srv_2_event2) - self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event]) + self.queuer.enqueue_event(srv1, srv_1_event) + self.queuer.enqueue_event(srv1, srv_1_event2) + self.txn_ctrl.send.assert_called_with(srv1, [srv_1_event], []) + self.queuer.enqueue_event(srv2, srv_2_event) + self.queuer.enqueue_event(srv2, srv_2_event2) + self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event], []) # make sure callbacks for a service only send queued events for THAT # service srv_2_defer.callback(srv2) - self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2]) + self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2], []) self.assertEquals(3, self.txn_ctrl.send.call_count) + + def test_send_single_ephemeral_no_queue(self): + # Expect the event to be sent immediately. + service = Mock(id=4, name="service") + event_list = [Mock(name="event")] + self.queuer.enqueue_ephemeral(service, event_list) + self.txn_ctrl.send.assert_called_once_with(service, [], event_list) + + def test_send_multiple_ephemeral_no_queue(self): + # Expect the event to be sent immediately. + service = Mock(id=4, name="service") + event_list = [Mock(name="event1"), Mock(name="event2"), Mock(name="event3")] + self.queuer.enqueue_ephemeral(service, event_list) + self.txn_ctrl.send.assert_called_once_with(service, [], event_list) + + def test_send_single_ephemeral_with_queue(self): + d = defer.Deferred() + self.txn_ctrl.send = Mock( + side_effect=lambda x, y, z: make_deferred_yieldable(d) + ) + service = Mock(id=4) + event_list_1 = [Mock(event_id="event1"), Mock(event_id="event2")] + event_list_2 = [Mock(event_id="event3"), Mock(event_id="event4")] + event_list_3 = [Mock(event_id="event5"), Mock(event_id="event6")] + + # Send an event and don't resolve it just yet. + self.queuer.enqueue_ephemeral(service, event_list_1) + # Send more events: expect send() to NOT be called multiple times. + self.queuer.enqueue_ephemeral(service, event_list_2) + self.queuer.enqueue_ephemeral(service, event_list_3) + self.txn_ctrl.send.assert_called_with(service, [], event_list_1) + self.assertEquals(1, self.txn_ctrl.send.call_count) + # Resolve txn_ctrl.send + d.callback(service) + # Expect the queued events to be sent + self.txn_ctrl.send.assert_called_with(service, [], event_list_2 + event_list_3) + self.assertEquals(2, self.txn_ctrl.send.call_count) diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index c905a38930..c5c7987349 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -244,7 +244,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): service = Mock(id=self.as_list[0]["id"]) events = [Mock(event_id="e1"), Mock(event_id="e2")] txn = yield defer.ensureDeferred( - self.store.create_appservice_txn(service, events) + self.store.create_appservice_txn(service, events, []) ) self.assertEquals(txn.id, 1) self.assertEquals(txn.events, events) @@ -258,7 +258,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): yield self._insert_txn(service.id, 9644, events) yield self._insert_txn(service.id, 9645, events) txn = yield defer.ensureDeferred( - self.store.create_appservice_txn(service, events) + self.store.create_appservice_txn(service, events, []) ) self.assertEquals(txn.id, 9646) self.assertEquals(txn.events, events) @@ -270,7 +270,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): events = [Mock(event_id="e1"), Mock(event_id="e2")] yield self._set_last_txn(service.id, 9643) txn = yield defer.ensureDeferred( - self.store.create_appservice_txn(service, events) + self.store.create_appservice_txn(service, events, []) ) self.assertEquals(txn.id, 9644) self.assertEquals(txn.events, events) @@ -293,7 +293,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): yield self._insert_txn(self.as_list[3]["id"], 9643, events) txn = yield defer.ensureDeferred( - self.store.create_appservice_txn(service, events) + self.store.create_appservice_txn(service, events, []) ) self.assertEquals(txn.id, 9644) self.assertEquals(txn.events, events) -- cgit 1.5.1 From de5cafe980391ae6e2de1d38ac4e42dea182a304 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 21 Oct 2020 06:44:31 -0400 Subject: Add type hints to profile and base handlers. (#8609) --- changelog.d/8609.misc | 1 + mypy.ini | 4 +- synapse/handlers/_base.py | 20 ++++----- synapse/handlers/initial_sync.py | 8 +++- synapse/handlers/profile.py | 74 ++++++++++++++++++++----------- synapse/storage/databases/main/profile.py | 6 +-- 6 files changed, 72 insertions(+), 41 deletions(-) create mode 100644 changelog.d/8609.misc (limited to 'mypy.ini') diff --git a/changelog.d/8609.misc b/changelog.d/8609.misc new file mode 100644 index 0000000000..5e3f3c1993 --- /dev/null +++ b/changelog.d/8609.misc @@ -0,0 +1 @@ +Add type hints to profile and base handler. diff --git a/mypy.ini b/mypy.ini index b5db54ee3b..5e9f7b1259 100644 --- a/mypy.ini +++ b/mypy.ini @@ -15,8 +15,9 @@ files = synapse/events/builder.py, synapse/events/spamcheck.py, synapse/federation, - synapse/handlers/appservice.py, + synapse/handlers/_base.py, synapse/handlers/account_data.py, + synapse/handlers/appservice.py, synapse/handlers/auth.py, synapse/handlers/cas_handler.py, synapse/handlers/deactivate_account.py, @@ -32,6 +33,7 @@ files = synapse/handlers/pagination.py, synapse/handlers/password_policy.py, synapse/handlers/presence.py, + synapse/handlers/profile.py, synapse/handlers/read_marker.py, synapse/handlers/room.py, synapse/handlers/room_member.py, diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 0206320e96..bd8e71ae56 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +from typing import TYPE_CHECKING, Optional import synapse.state import synapse.storage @@ -22,6 +23,9 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.ratelimiting import Ratelimiter from synapse.types import UserID +if TYPE_CHECKING: + from synapse.app.homeserver import HomeServer + logger = logging.getLogger(__name__) @@ -30,11 +34,7 @@ class BaseHandler: Common base class for the event handlers. """ - def __init__(self, hs): - """ - Args: - hs (synapse.server.HomeServer): - """ + def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() # type: synapse.storage.DataStore self.auth = hs.get_auth() self.notifier = hs.get_notifier() @@ -56,7 +56,7 @@ class BaseHandler: clock=self.clock, rate_hz=self.hs.config.rc_admin_redaction.per_second, burst_count=self.hs.config.rc_admin_redaction.burst_count, - ) + ) # type: Optional[Ratelimiter] else: self.admin_redaction_ratelimiter = None @@ -127,15 +127,15 @@ class BaseHandler: if guest_access != "can_join": if context: current_state_ids = await context.get_current_state_ids() - current_state = await self.store.get_events( + current_state_dict = await self.store.get_events( list(current_state_ids.values()) ) + current_state = list(current_state_dict.values()) else: - current_state = await self.state_handler.get_current_state( + current_state_map = await self.state_handler.get_current_state( event.room_id ) - - current_state = list(current_state.values()) + current_state = list(current_state_map.values()) logger.info("maybe_kick_guest_users %r", current_state) await self.kick_guest_users(current_state) diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 98075f48d2..cb11754bf8 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -293,6 +293,10 @@ class InitialSyncHandler(BaseHandler): user_id, room_id, pagin_config, membership, is_peeking ) elif membership == Membership.LEAVE: + # The member_event_id will always be available if membership is set + # to leave. + assert member_event_id + result = await self._room_initial_sync_parted( user_id, room_id, pagin_config, membership, member_event_id, is_peeking ) @@ -315,7 +319,7 @@ class InitialSyncHandler(BaseHandler): user_id: str, room_id: str, pagin_config: PaginationConfig, - membership: Membership, + membership: str, member_event_id: str, is_peeking: bool, ) -> JsonDict: @@ -367,7 +371,7 @@ class InitialSyncHandler(BaseHandler): user_id: str, room_id: str, pagin_config: PaginationConfig, - membership: Membership, + membership: str, is_peeking: bool, ) -> JsonDict: current_state = await self.state.get_current_state(room_id=room_id) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index b78a12ad01..92700b589c 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -12,9 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import random +from typing import TYPE_CHECKING, Optional from synapse.api.errors import ( AuthError, @@ -25,10 +25,19 @@ from synapse.api.errors import ( SynapseError, ) from synapse.metrics.background_process_metrics import wrap_as_background_process -from synapse.types import UserID, create_requester, get_domain_from_id +from synapse.types import ( + JsonDict, + Requester, + UserID, + create_requester, + get_domain_from_id, +) from ._base import BaseHandler +if TYPE_CHECKING: + from synapse.app.homeserver import HomeServer + logger = logging.getLogger(__name__) MAX_DISPLAYNAME_LEN = 256 @@ -45,7 +54,7 @@ class ProfileHandler(BaseHandler): PROFILE_UPDATE_MS = 60 * 1000 PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000 - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.federation = hs.get_federation_client() @@ -60,7 +69,7 @@ class ProfileHandler(BaseHandler): self._update_remote_profile_cache, self.PROFILE_UPDATE_MS ) - async def get_profile(self, user_id): + async def get_profile(self, user_id: str) -> JsonDict: target_user = UserID.from_string(user_id) if self.hs.is_mine(target_user): @@ -91,7 +100,7 @@ class ProfileHandler(BaseHandler): except HttpResponseException as e: raise e.to_synapse_error() - async def get_profile_from_cache(self, user_id): + async def get_profile_from_cache(self, user_id: str) -> JsonDict: """Get the profile information from our local cache. If the user is ours then the profile information will always be corect. Otherwise, it may be out of date/missing. @@ -115,7 +124,7 @@ class ProfileHandler(BaseHandler): profile = await self.store.get_from_remote_profile_cache(user_id) return profile or {} - async def get_displayname(self, target_user): + async def get_displayname(self, target_user: UserID) -> str: if self.hs.is_mine(target_user): try: displayname = await self.store.get_profile_displayname( @@ -143,15 +152,19 @@ class ProfileHandler(BaseHandler): return result["displayname"] async def set_displayname( - self, target_user, requester, new_displayname, by_admin=False - ): + self, + target_user: UserID, + requester: Requester, + new_displayname: str, + by_admin: bool = False, + ) -> None: """Set the displayname of a user Args: - target_user (UserID): the user whose displayname is to be changed. - requester (Requester): The user attempting to make this change. - new_displayname (str): The displayname to give this user. - by_admin (bool): Whether this change was made by an administrator. + target_user: the user whose displayname is to be changed. + requester: The user attempting to make this change. + new_displayname: The displayname to give this user. + by_admin: Whether this change was made by an administrator. """ if not self.hs.is_mine(target_user): raise SynapseError(400, "User is not hosted on this homeserver") @@ -176,8 +189,9 @@ class ProfileHandler(BaseHandler): 400, "Displayname is too long (max %i)" % (MAX_DISPLAYNAME_LEN,) ) + displayname_to_set = new_displayname # type: Optional[str] if new_displayname == "": - new_displayname = None + displayname_to_set = None # If the admin changes the display name of a user, the requesting user cannot send # the join event to update the displayname in the rooms. @@ -185,7 +199,9 @@ class ProfileHandler(BaseHandler): if by_admin: requester = create_requester(target_user) - await self.store.set_profile_displayname(target_user.localpart, new_displayname) + await self.store.set_profile_displayname( + target_user.localpart, displayname_to_set + ) if self.hs.config.user_directory_search_all_users: profile = await self.store.get_profileinfo(target_user.localpart) @@ -195,7 +211,7 @@ class ProfileHandler(BaseHandler): await self._update_join_states(requester, target_user) - async def get_avatar_url(self, target_user): + async def get_avatar_url(self, target_user: UserID) -> str: if self.hs.is_mine(target_user): try: avatar_url = await self.store.get_profile_avatar_url( @@ -222,15 +238,19 @@ class ProfileHandler(BaseHandler): return result["avatar_url"] async def set_avatar_url( - self, target_user, requester, new_avatar_url, by_admin=False + self, + target_user: UserID, + requester: Requester, + new_avatar_url: str, + by_admin: bool = False, ): """Set a new avatar URL for a user. Args: - target_user (UserID): the user whose avatar URL is to be changed. - requester (Requester): The user attempting to make this change. - new_avatar_url (str): The avatar URL to give this user. - by_admin (bool): Whether this change was made by an administrator. + target_user: the user whose avatar URL is to be changed. + requester: The user attempting to make this change. + new_avatar_url: The avatar URL to give this user. + by_admin: Whether this change was made by an administrator. """ if not self.hs.is_mine(target_user): raise SynapseError(400, "User is not hosted on this homeserver") @@ -267,7 +287,7 @@ class ProfileHandler(BaseHandler): await self._update_join_states(requester, target_user) - async def on_profile_query(self, args): + async def on_profile_query(self, args: JsonDict) -> JsonDict: user = UserID.from_string(args["user_id"]) if not self.hs.is_mine(user): raise SynapseError(400, "User is not hosted on this homeserver") @@ -292,7 +312,9 @@ class ProfileHandler(BaseHandler): return response - async def _update_join_states(self, requester, target_user): + async def _update_join_states( + self, requester: Requester, target_user: UserID + ) -> None: if not self.hs.is_mine(target_user): return @@ -323,15 +345,17 @@ class ProfileHandler(BaseHandler): "Failed to update join event for room %s - %s", room_id, str(e) ) - async def check_profile_query_allowed(self, target_user, requester=None): + async def check_profile_query_allowed( + self, target_user: UserID, requester: Optional[UserID] = None + ) -> None: """Checks whether a profile query is allowed. If the 'require_auth_for_profile_requests' config flag is set to True and a 'requester' is provided, the query is only allowed if the two users share a room. Args: - target_user (UserID): The owner of the queried profile. - requester (None|UserID): The user querying for the profile. + target_user: The owner of the queried profile. + requester: The user querying for the profile. Raises: SynapseError(403): The two users share no room, or ne user couldn't diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py index 1681caa1f0..a6d1eb908a 100644 --- a/synapse/storage/databases/main/profile.py +++ b/synapse/storage/databases/main/profile.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional from synapse.api.errors import StoreError from synapse.storage._base import SQLBaseStore @@ -72,7 +72,7 @@ class ProfileWorkerStore(SQLBaseStore): ) async def set_profile_displayname( - self, user_localpart: str, new_displayname: str + self, user_localpart: str, new_displayname: Optional[str] ) -> None: await self.db_pool.simple_update_one( table="profiles", @@ -144,7 +144,7 @@ class ProfileWorkerStore(SQLBaseStore): async def get_remote_profile_cache_entries_that_expire( self, last_checked: int - ) -> Dict[str, str]: + ) -> List[Dict[str, str]]: """Get all users who haven't been checked since `last_checked` """ -- cgit 1.5.1 From a9f90fa73af8bc324aa4fbe2ca920fe5f47c4fee Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 22 Oct 2020 11:56:58 +0100 Subject: Type hints for RegistrationStore (#8615) --- changelog.d/8615.misc | 1 + mypy.ini | 1 + synapse/storage/databases/main/__init__.py | 1 - synapse/storage/databases/main/registration.py | 156 +++++++++++++------------ 4 files changed, 85 insertions(+), 74 deletions(-) create mode 100644 changelog.d/8615.misc (limited to 'mypy.ini') diff --git a/changelog.d/8615.misc b/changelog.d/8615.misc new file mode 100644 index 0000000000..79fa7b7ff8 --- /dev/null +++ b/changelog.d/8615.misc @@ -0,0 +1 @@ +Type hints for `RegistrationStore`. diff --git a/mypy.ini b/mypy.ini index 5e9f7b1259..59d9074c3b 100644 --- a/mypy.ini +++ b/mypy.ini @@ -57,6 +57,7 @@ files = synapse/spam_checker_api, synapse/state, synapse/storage/databases/main/events.py, + synapse/storage/databases/main/registration.py, synapse/storage/databases/main/stream.py, synapse/storage/databases/main/ui_auth.py, synapse/storage/database.py, diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 9b16f45f3e..43660ec4fb 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -146,7 +146,6 @@ class DataStore( db_conn, "e2e_cross_signing_keys", "stream_id" ) - self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id") self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id") self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id") self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id") diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 4c843b7679..b0329e17ec 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -16,29 +16,33 @@ # limitations under the License. import logging import re -from typing import Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple from synapse.api.constants import UserTypes from synapse.api.errors import Codes, StoreError, SynapseError, ThreepidValidationError from synapse.metrics.background_process_metrics import wrap_as_background_process -from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool -from synapse.storage.types import Cursor +from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore +from synapse.storage.databases.main.stats import StatsStore +from synapse.storage.types import Connection, Cursor +from synapse.storage.util.id_generators import IdGenerator from synapse.storage.util.sequence import build_sequence_generator from synapse.types import UserID from synapse.util.caches.descriptors import cached +if TYPE_CHECKING: + from synapse.server import HomeServer + THIRTY_MINUTES_IN_MS = 30 * 60 * 1000 logger = logging.getLogger(__name__) -class RegistrationWorkerStore(SQLBaseStore): - def __init__(self, database: DatabasePool, db_conn, hs): +class RegistrationWorkerStore(CacheInvalidationWorkerStore): + def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"): super().__init__(database, db_conn, hs) self.config = hs.config - self.clock = hs.get_clock() # Note: we don't check this sequence for consistency as we'd have to # call `find_max_generated_user_id_localpart` each time, which is @@ -55,7 +59,7 @@ class RegistrationWorkerStore(SQLBaseStore): # Create a background job for culling expired 3PID validity tokens if hs.config.run_background_tasks: - self.clock.looping_call( + self._clock.looping_call( self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS ) @@ -92,7 +96,7 @@ class RegistrationWorkerStore(SQLBaseStore): if not info: return False - now = self.clock.time_msec() + now = self._clock.time_msec() trial_duration_ms = self.config.mau_trial_days * 24 * 60 * 60 * 1000 is_trial = (now - info["creation_ts"] * 1000) < trial_duration_ms return is_trial @@ -257,7 +261,7 @@ class RegistrationWorkerStore(SQLBaseStore): return await self.db_pool.runInteraction( "get_users_expiring_soon", select_users_txn, - self.clock.time_msec(), + self._clock.time_msec(), self.config.account_validity.renew_at, ) @@ -328,13 +332,17 @@ class RegistrationWorkerStore(SQLBaseStore): await self.db_pool.runInteraction("set_server_admin", set_server_admin_txn) def _query_for_auth(self, txn, token): - sql = ( - "SELECT users.name, users.is_guest, users.shadow_banned, access_tokens.id as token_id," - " access_tokens.device_id, access_tokens.valid_until_ms" - " FROM users" - " INNER JOIN access_tokens on users.name = access_tokens.user_id" - " WHERE token = ?" - ) + sql = """ + SELECT users.name, + users.is_guest, + users.shadow_banned, + access_tokens.id as token_id, + access_tokens.device_id, + access_tokens.valid_until_ms + FROM users + INNER JOIN access_tokens on users.name = access_tokens.user_id + WHERE token = ? + """ txn.execute(sql, (token,)) rows = self.db_pool.cursor_to_dict(txn) @@ -803,7 +811,7 @@ class RegistrationWorkerStore(SQLBaseStore): await self.db_pool.runInteraction( "cull_expired_threepid_validation_tokens", cull_expired_threepid_validation_tokens_txn, - self.clock.time_msec(), + self._clock.time_msec(), ) @wrap_as_background_process("account_validity_set_expiration_dates") @@ -890,10 +898,10 @@ class RegistrationWorkerStore(SQLBaseStore): class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"): super().__init__(database, db_conn, hs) - self.clock = hs.get_clock() + self._clock = hs.get_clock() self.config = hs.config self.db_pool.updates.register_background_index_update( @@ -1016,13 +1024,56 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): return 1 + async def set_user_deactivated_status( + self, user_id: str, deactivated: bool + ) -> None: + """Set the `deactivated` property for the provided user to the provided value. + + Args: + user_id: The ID of the user to set the status for. + deactivated: The value to set for `deactivated`. + """ + + await self.db_pool.runInteraction( + "set_user_deactivated_status", + self.set_user_deactivated_status_txn, + user_id, + deactivated, + ) + + def set_user_deactivated_status_txn(self, txn, user_id: str, deactivated: bool): + self.db_pool.simple_update_one_txn( + txn=txn, + table="users", + keyvalues={"name": user_id}, + updatevalues={"deactivated": 1 if deactivated else 0}, + ) + self._invalidate_cache_and_stream( + txn, self.get_user_deactivated_status, (user_id,) + ) + txn.call_after(self.is_guest.invalidate, (user_id,)) + + @cached() + async def is_guest(self, user_id: str) -> bool: + res = await self.db_pool.simple_select_one_onecol( + table="users", + keyvalues={"name": user_id}, + retcol="is_guest", + allow_none=True, + desc="is_guest", + ) + + return res if res else False + -class RegistrationStore(RegistrationBackgroundUpdateStore): - def __init__(self, database: DatabasePool, db_conn, hs): +class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): + def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"): super().__init__(database, db_conn, hs) self._ignore_unknown_session_error = hs.config.request_token_inhibit_3pid_errors + self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id") + async def add_access_token_to_user( self, user_id: str, @@ -1138,19 +1189,19 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): def _register_user( self, txn, - user_id, - password_hash, - was_guest, - make_guest, - appservice_id, - create_profile_with_displayname, - admin, - user_type, - shadow_banned, + user_id: str, + password_hash: Optional[str], + was_guest: bool, + make_guest: bool, + appservice_id: Optional[str], + create_profile_with_displayname: Optional[str], + admin: bool, + user_type: Optional[str], + shadow_banned: bool, ): user_id_obj = UserID.from_string(user_id) - now = int(self.clock.time()) + now = int(self._clock.time()) try: if was_guest: @@ -1374,18 +1425,6 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): await self.db_pool.runInteraction("delete_access_token", f) - @cached() - async def is_guest(self, user_id: str) -> bool: - res = await self.db_pool.simple_select_one_onecol( - table="users", - keyvalues={"name": user_id}, - retcol="is_guest", - allow_none=True, - desc="is_guest", - ) - - return res if res else False - async def add_user_pending_deactivation(self, user_id: str) -> None: """ Adds a user to the table of users who need to be parted from all the rooms they're @@ -1479,7 +1518,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): txn, table="threepid_validation_session", keyvalues={"session_id": session_id}, - updatevalues={"validated_at": self.clock.time_msec()}, + updatevalues={"validated_at": self._clock.time_msec()}, ) return next_link @@ -1547,35 +1586,6 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): start_or_continue_validation_session_txn, ) - async def set_user_deactivated_status( - self, user_id: str, deactivated: bool - ) -> None: - """Set the `deactivated` property for the provided user to the provided value. - - Args: - user_id: The ID of the user to set the status for. - deactivated: The value to set for `deactivated`. - """ - - await self.db_pool.runInteraction( - "set_user_deactivated_status", - self.set_user_deactivated_status_txn, - user_id, - deactivated, - ) - - def set_user_deactivated_status_txn(self, txn, user_id, deactivated): - self.db_pool.simple_update_one_txn( - txn=txn, - table="users", - keyvalues={"name": user_id}, - updatevalues={"deactivated": 1 if deactivated else 0}, - ) - self._invalidate_cache_and_stream( - txn, self.get_user_deactivated_status, (user_id,) - ) - txn.call_after(self.is_guest.invalidate, (user_id,)) - def find_max_generated_user_id_localpart(cur: Cursor) -> int: """ -- cgit 1.5.1 From 10f45d85bb355c66110b9221b0aa09010d2d03ad Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 26 Oct 2020 14:17:31 -0400 Subject: Add type hints for account validity handler (#8620) This also fixes a bug by fixing handling of an account which doesn't expire. --- changelog.d/8620.bugfix | 1 + mypy.ini | 1 + synapse/handlers/account_validity.py | 29 ++++++++++++++++++++------ synapse/handlers/profile.py | 4 ++-- synapse/storage/databases/main/profile.py | 4 ++-- synapse/storage/databases/main/registration.py | 4 ++-- 6 files changed, 31 insertions(+), 12 deletions(-) create mode 100644 changelog.d/8620.bugfix (limited to 'mypy.ini') diff --git a/changelog.d/8620.bugfix b/changelog.d/8620.bugfix new file mode 100644 index 0000000000..c1078a3fb5 --- /dev/null +++ b/changelog.d/8620.bugfix @@ -0,0 +1 @@ +Fix a bug where the account validity endpoint would silently fail if the user ID did not have an expiration time. It now returns a 400 error. diff --git a/mypy.ini b/mypy.ini index 59d9074c3b..1fbd8decf8 100644 --- a/mypy.ini +++ b/mypy.ini @@ -17,6 +17,7 @@ files = synapse/federation, synapse/handlers/_base.py, synapse/handlers/account_data.py, + synapse/handlers/account_validity.py, synapse/handlers/appservice.py, synapse/handlers/auth.py, synapse/handlers/cas_handler.py, diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py index fd4f762f33..664d09da1c 100644 --- a/synapse/handlers/account_validity.py +++ b/synapse/handlers/account_validity.py @@ -18,19 +18,22 @@ import email.utils import logging from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText -from typing import List +from typing import TYPE_CHECKING, List -from synapse.api.errors import StoreError +from synapse.api.errors import StoreError, SynapseError from synapse.logging.context import make_deferred_yieldable from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.types import UserID from synapse.util import stringutils +if TYPE_CHECKING: + from synapse.app.homeserver import HomeServer + logger = logging.getLogger(__name__) class AccountValidityHandler: - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.hs = hs self.config = hs.config self.store = self.hs.get_datastore() @@ -67,7 +70,7 @@ class AccountValidityHandler: self.clock.looping_call(self._send_renewal_emails, 30 * 60 * 1000) @wrap_as_background_process("send_renewals") - async def _send_renewal_emails(self): + async def _send_renewal_emails(self) -> None: """Gets the list of users whose account is expiring in the amount of time configured in the ``renew_at`` parameter from the ``account_validity`` configuration, and sends renewal emails to all of these users as long as they @@ -81,11 +84,25 @@ class AccountValidityHandler: user_id=user["user_id"], expiration_ts=user["expiration_ts_ms"] ) - async def send_renewal_email_to_user(self, user_id: str): + async def send_renewal_email_to_user(self, user_id: str) -> None: + """ + Send a renewal email for a specific user. + + Args: + user_id: The user ID to send a renewal email for. + + Raises: + SynapseError if the user is not set to renew. + """ expiration_ts = await self.store.get_expiration_ts_for_user(user_id) + + # If this user isn't set to be expired, raise an error. + if expiration_ts is None: + raise SynapseError(400, "User has no expiration time: %s" % (user_id,)) + await self._send_renewal_email(user_id, expiration_ts) - async def _send_renewal_email(self, user_id: str, expiration_ts: int): + async def _send_renewal_email(self, user_id: str, expiration_ts: int) -> None: """Sends out a renewal email to every email address attached to the given user with a unique link allowing them to renew their account. diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 3875e53c08..14348faaf3 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -131,7 +131,7 @@ class ProfileHandler(BaseHandler): profile = await self.store.get_from_remote_profile_cache(user_id) return profile or {} - async def get_displayname(self, target_user: UserID) -> str: + async def get_displayname(self, target_user: UserID) -> Optional[str]: if self.hs.is_mine(target_user): try: displayname = await self.store.get_profile_displayname( @@ -218,7 +218,7 @@ class ProfileHandler(BaseHandler): await self._update_join_states(requester, target_user) - async def get_avatar_url(self, target_user: UserID) -> str: + async def get_avatar_url(self, target_user: UserID) -> Optional[str]: if self.hs.is_mine(target_user): try: avatar_url = await self.store.get_profile_avatar_url( diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py index a6d1eb908a..0e25ca3d7a 100644 --- a/synapse/storage/databases/main/profile.py +++ b/synapse/storage/databases/main/profile.py @@ -39,7 +39,7 @@ class ProfileWorkerStore(SQLBaseStore): avatar_url=profile["avatar_url"], display_name=profile["displayname"] ) - async def get_profile_displayname(self, user_localpart: str) -> str: + async def get_profile_displayname(self, user_localpart: str) -> Optional[str]: return await self.db_pool.simple_select_one_onecol( table="profiles", keyvalues={"user_id": user_localpart}, @@ -47,7 +47,7 @@ class ProfileWorkerStore(SQLBaseStore): desc="get_profile_displayname", ) - async def get_profile_avatar_url(self, user_localpart: str) -> str: + async def get_profile_avatar_url(self, user_localpart: str) -> Optional[str]: return await self.db_pool.simple_select_one_onecol( table="profiles", keyvalues={"user_id": user_localpart}, diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index b0329e17ec..e7b17a7385 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -240,13 +240,13 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): desc="get_renewal_token_for_user", ) - async def get_users_expiring_soon(self) -> List[Dict[str, int]]: + async def get_users_expiring_soon(self) -> List[Dict[str, Any]]: """Selects users whose account will expire in the [now, now + renew_at] time window (see configuration for account_validity for information on what renew_at refers to). Returns: - A list of dictionaries mapping user ID to expiration time (in milliseconds). + A list of dictionaries, each with a user ID and expiration time (in milliseconds). """ def select_users_txn(txn, now_ms, renew_at): -- cgit 1.5.1 From 31d721fbf6655080235003b5576110d477fa2353 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 28 Oct 2020 11:12:21 -0400 Subject: Add type hints to application services. (#8655) --- changelog.d/8655.misc | 1 + mypy.ini | 4 ++ synapse/handlers/appservice.py | 75 +++++++++++---------- synapse/handlers/auth.py | 23 +++++-- synapse/storage/databases/main/appservice.py | 98 +++++++++++++++++----------- 5 files changed, 122 insertions(+), 79 deletions(-) create mode 100644 changelog.d/8655.misc (limited to 'mypy.ini') diff --git a/changelog.d/8655.misc b/changelog.d/8655.misc new file mode 100644 index 0000000000..b588bdd3e2 --- /dev/null +++ b/changelog.d/8655.misc @@ -0,0 +1 @@ +Add more type hints to the application services code. diff --git a/mypy.ini b/mypy.ini index 1fbd8decf8..1ece2ba082 100644 --- a/mypy.ini +++ b/mypy.ini @@ -57,6 +57,7 @@ files = synapse/server_notices, synapse/spam_checker_api, synapse/state, + synapse/storage/databases/main/appservice.py, synapse/storage/databases/main/events.py, synapse/storage/databases/main/registration.py, synapse/storage/databases/main/stream.py, @@ -82,6 +83,9 @@ ignore_missing_imports = True [mypy-zope] ignore_missing_imports = True +[mypy-bcrypt] +ignore_missing_imports = True + [mypy-constantly] ignore_missing_imports = True diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 3ed29a2c16..9fc8444228 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -12,9 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging -from typing import Dict, List, Optional, Union +from typing import TYPE_CHECKING, Dict, List, Optional, Union from prometheus_client import Counter @@ -34,16 +33,20 @@ from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, ) -from synapse.types import Collection, JsonDict, RoomStreamToken, UserID +from synapse.storage.databases.main.directory import RoomAliasMapping +from synapse.types import Collection, JsonDict, RoomAlias, RoomStreamToken, UserID from synapse.util.metrics import Measure +if TYPE_CHECKING: + from synapse.app.homeserver import HomeServer + logger = logging.getLogger(__name__) events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "") class ApplicationServicesHandler: - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() self.is_mine_id = hs.is_mine_id self.appservice_api = hs.get_application_service_api() @@ -247,7 +250,9 @@ class ApplicationServicesHandler: service, "presence", new_token ) - async def _handle_typing(self, service: ApplicationService, new_token: int): + async def _handle_typing( + self, service: ApplicationService, new_token: int + ) -> List[JsonDict]: typing_source = self.event_sources.sources["typing"] # Get the typing events from just before current typing, _ = await typing_source.get_new_events_as( @@ -259,7 +264,7 @@ class ApplicationServicesHandler: ) return typing - async def _handle_receipts(self, service: ApplicationService): + async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]: from_key = await self.store.get_type_stream_id_for_appservice( service, "read_receipt" ) @@ -271,7 +276,7 @@ class ApplicationServicesHandler: async def _handle_presence( self, service: ApplicationService, users: Collection[Union[str, UserID]] - ): + ) -> List[JsonDict]: events = [] # type: List[JsonDict] presence_source = self.event_sources.sources["presence"] from_key = await self.store.get_type_stream_id_for_appservice( @@ -301,11 +306,11 @@ class ApplicationServicesHandler: return events - async def query_user_exists(self, user_id): + async def query_user_exists(self, user_id: str) -> bool: """Check if any application service knows this user_id exists. Args: - user_id(str): The user to query if they exist on any AS. + user_id: The user to query if they exist on any AS. Returns: True if this user exists on at least one application service. """ @@ -316,11 +321,13 @@ class ApplicationServicesHandler: return True return False - async def query_room_alias_exists(self, room_alias): + async def query_room_alias_exists( + self, room_alias: RoomAlias + ) -> Optional[RoomAliasMapping]: """Check if an application service knows this room alias exists. Args: - room_alias(RoomAlias): The room alias to query. + room_alias: The room alias to query. Returns: namedtuple: with keys "room_id" and "servers" or None if no association can be found. @@ -336,10 +343,13 @@ class ApplicationServicesHandler: ) if is_known_alias: # the alias exists now so don't query more ASes. - result = await self.store.get_association_from_room_alias(room_alias) - return result + return await self.store.get_association_from_room_alias(room_alias) + + return None - async def query_3pe(self, kind, protocol, fields): + async def query_3pe( + self, kind: str, protocol: str, fields: Dict[bytes, List[bytes]] + ) -> List[JsonDict]: services = self._get_services_for_3pn(protocol) results = await make_deferred_yieldable( @@ -361,7 +371,9 @@ class ApplicationServicesHandler: return ret - async def get_3pe_protocols(self, only_protocol=None): + async def get_3pe_protocols( + self, only_protocol: Optional[str] = None + ) -> Dict[str, JsonDict]: services = self.store.get_app_services() protocols = {} # type: Dict[str, List[JsonDict]] @@ -379,7 +391,7 @@ class ApplicationServicesHandler: if info is not None: protocols[p].append(info) - def _merge_instances(infos): + def _merge_instances(infos: List[JsonDict]) -> JsonDict: if not infos: return {} @@ -394,19 +406,17 @@ class ApplicationServicesHandler: return combined - for p in protocols.keys(): - protocols[p] = _merge_instances(protocols[p]) + return {p: _merge_instances(protocols[p]) for p in protocols.keys()} - return protocols - - async def _get_services_for_event(self, event): + async def _get_services_for_event( + self, event: EventBase + ) -> List[ApplicationService]: """Retrieve a list of application services interested in this event. Args: - event(Event): The event to check. Can be None if alias_list is not. + event: The event to check. Can be None if alias_list is not. Returns: - list: A list of services interested in this - event based on the service regex. + A list of services interested in this event based on the service regex. """ services = self.store.get_app_services() @@ -420,17 +430,15 @@ class ApplicationServicesHandler: return interested_list - def _get_services_for_user(self, user_id): + def _get_services_for_user(self, user_id: str) -> List[ApplicationService]: services = self.store.get_app_services() - interested_list = [s for s in services if (s.is_interested_in_user(user_id))] - return interested_list + return [s for s in services if (s.is_interested_in_user(user_id))] - def _get_services_for_3pn(self, protocol): + def _get_services_for_3pn(self, protocol: str) -> List[ApplicationService]: services = self.store.get_app_services() - interested_list = [s for s in services if s.is_interested_in_protocol(protocol)] - return interested_list + return [s for s in services if s.is_interested_in_protocol(protocol)] - async def _is_unknown_user(self, user_id): + async def _is_unknown_user(self, user_id: str) -> bool: if not self.is_mine_id(user_id): # we don't know if they are unknown or not since it isn't one of our # users. We can't poke ASes. @@ -445,9 +453,8 @@ class ApplicationServicesHandler: service_list = [s for s in services if s.sender == user_id] return len(service_list) == 0 - async def _check_user_exists(self, user_id): + async def _check_user_exists(self, user_id: str) -> bool: unknown_user = await self._is_unknown_user(user_id) if unknown_user: - exists = await self.query_user_exists(user_id) - return exists + return await self.query_user_exists(user_id) return True diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index dd14ab69d7..276594f3d9 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -18,10 +18,20 @@ import logging import time import unicodedata import urllib.parse -from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + Iterable, + List, + Optional, + Tuple, + Union, +) import attr -import bcrypt # type: ignore[import] +import bcrypt import pymacaroons from synapse.api.constants import LoginType @@ -49,6 +59,9 @@ from synapse.util.threepids import canonicalise_email from ._base import BaseHandler +if TYPE_CHECKING: + from synapse.app.homeserver import HomeServer + logger = logging.getLogger(__name__) @@ -149,11 +162,7 @@ class SsoLoginExtraAttributes: class AuthHandler(BaseHandler): SESSION_EXPIRE_MS = 48 * 60 * 60 * 1000 - def __init__(self, hs): - """ - Args: - hs (synapse.server.HomeServer): - """ + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.checkers = {} # type: Dict[str, UserInteractiveAuthChecker] diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 637a938bac..26eef6eb61 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -15,21 +15,31 @@ # limitations under the License. import logging import re -from typing import List +from typing import TYPE_CHECKING, List, Optional, Pattern, Tuple -from synapse.appservice import ApplicationService, AppServiceTransaction +from synapse.appservice import ( + ApplicationService, + ApplicationServiceState, + AppServiceTransaction, +) from synapse.config.appservice import load_appservices from synapse.events import EventBase from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool from synapse.storage.databases.main.events_worker import EventsWorkerStore +from synapse.storage.types import Connection from synapse.types import JsonDict from synapse.util import json_encoder +if TYPE_CHECKING: + from synapse.app.homeserver import HomeServer + logger = logging.getLogger(__name__) -def _make_exclusive_regex(services_cache): +def _make_exclusive_regex( + services_cache: List[ApplicationService], +) -> Optional[Pattern]: # We precompile a regex constructed from all the regexes that the AS's # have registered for exclusive users. exclusive_user_regexes = [ @@ -39,17 +49,19 @@ def _make_exclusive_regex(services_cache): ] if exclusive_user_regexes: exclusive_user_regex = "|".join("(" + r + ")" for r in exclusive_user_regexes) - exclusive_user_regex = re.compile(exclusive_user_regex) + exclusive_user_pattern = re.compile( + exclusive_user_regex + ) # type: Optional[Pattern] else: # We handle this case specially otherwise the constructed regex # will always match - exclusive_user_regex = None + exclusive_user_pattern = None - return exclusive_user_regex + return exclusive_user_pattern class ApplicationServiceWorkerStore(SQLBaseStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"): self.services_cache = load_appservices( hs.hostname, hs.config.app_service_config_files ) @@ -60,7 +72,7 @@ class ApplicationServiceWorkerStore(SQLBaseStore): def get_app_services(self): return self.services_cache - def get_if_app_services_interested_in_user(self, user_id): + def get_if_app_services_interested_in_user(self, user_id: str) -> bool: """Check if the user is one associated with an app service (exclusively) """ if self.exclusive_user_regex: @@ -68,7 +80,7 @@ class ApplicationServiceWorkerStore(SQLBaseStore): else: return False - def get_app_service_by_user_id(self, user_id): + def get_app_service_by_user_id(self, user_id: str) -> Optional[ApplicationService]: """Retrieve an application service from their user ID. All application services have associated with them a particular user ID. @@ -77,35 +89,35 @@ class ApplicationServiceWorkerStore(SQLBaseStore): a user ID to an application service. Args: - user_id(str): The user ID to see if it is an application service. + user_id: The user ID to see if it is an application service. Returns: - synapse.appservice.ApplicationService or None. + The application service or None. """ for service in self.services_cache: if service.sender == user_id: return service return None - def get_app_service_by_token(self, token): + def get_app_service_by_token(self, token: str) -> Optional[ApplicationService]: """Get the application service with the given appservice token. Args: - token (str): The application service token. + token: The application service token. Returns: - synapse.appservice.ApplicationService or None. + The application service or None. """ for service in self.services_cache: if service.token == token: return service return None - def get_app_service_by_id(self, as_id): + def get_app_service_by_id(self, as_id: str) -> Optional[ApplicationService]: """Get the application service with the given appservice ID. Args: - as_id (str): The application service ID. + as_id: The application service ID. Returns: - synapse.appservice.ApplicationService or None. + The application service or None. """ for service in self.services_cache: if service.id == as_id: @@ -124,11 +136,13 @@ class ApplicationServiceStore(ApplicationServiceWorkerStore): class ApplicationServiceTransactionWorkerStore( ApplicationServiceWorkerStore, EventsWorkerStore ): - async def get_appservices_by_state(self, state): + async def get_appservices_by_state( + self, state: ApplicationServiceState + ) -> List[ApplicationService]: """Get a list of application services based on their state. Args: - state(ApplicationServiceState): The state to filter on. + state: The state to filter on. Returns: A list of ApplicationServices, which may be empty. """ @@ -145,13 +159,15 @@ class ApplicationServiceTransactionWorkerStore( services.append(service) return services - async def get_appservice_state(self, service): + async def get_appservice_state( + self, service: ApplicationService + ) -> Optional[ApplicationServiceState]: """Get the application service state. Args: - service(ApplicationService): The service whose state to set. + service: The service whose state to set. Returns: - An ApplicationServiceState. + An ApplicationServiceState or none. """ result = await self.db_pool.simple_select_one( "application_services_state", @@ -164,12 +180,14 @@ class ApplicationServiceTransactionWorkerStore( return result.get("state") return None - async def set_appservice_state(self, service, state) -> None: + async def set_appservice_state( + self, service: ApplicationService, state: ApplicationServiceState + ) -> None: """Set the application service state. Args: - service(ApplicationService): The service whose state to set. - state(ApplicationServiceState): The connectivity state to apply. + service: The service whose state to set. + state: The connectivity state to apply. """ await self.db_pool.simple_upsert( "application_services_state", {"as_id": service.id}, {"state": state} @@ -226,13 +244,14 @@ class ApplicationServiceTransactionWorkerStore( "create_appservice_txn", _create_appservice_txn ) - async def complete_appservice_txn(self, txn_id, service) -> None: + async def complete_appservice_txn( + self, txn_id: int, service: ApplicationService + ) -> None: """Completes an application service transaction. Args: - txn_id(str): The transaction ID being completed. - service(ApplicationService): The application service which was sent - this transaction. + txn_id: The transaction ID being completed. + service: The application service which was sent this transaction. """ txn_id = int(txn_id) @@ -242,7 +261,7 @@ class ApplicationServiceTransactionWorkerStore( # has probably missed some events), so whine loudly but still continue, # since it shouldn't fail completion of the transaction. last_txn_id = self._get_last_txn(txn, service.id) - if (last_txn_id + 1) != txn_id: + if (txn_id + 1) != txn_id: logger.error( "appservice: Completing a transaction which has an ID > 1 from " "the last ID sent to this AS. We've either dropped events or " @@ -272,12 +291,13 @@ class ApplicationServiceTransactionWorkerStore( "complete_appservice_txn", _complete_appservice_txn ) - async def get_oldest_unsent_txn(self, service): - """Get the oldest transaction which has not been sent for this - service. + async def get_oldest_unsent_txn( + self, service: ApplicationService + ) -> Optional[AppServiceTransaction]: + """Get the oldest transaction which has not been sent for this service. Args: - service(ApplicationService): The app service to get the oldest txn. + service: The app service to get the oldest txn. Returns: An AppServiceTransaction or None. """ @@ -313,7 +333,7 @@ class ApplicationServiceTransactionWorkerStore( service=service, id=entry["txn_id"], events=events, ephemeral=[] ) - def _get_last_txn(self, txn, service_id): + def _get_last_txn(self, txn, service_id: Optional[str]) -> int: txn.execute( "SELECT last_txn FROM application_services_state WHERE as_id=?", (service_id,), @@ -324,7 +344,7 @@ class ApplicationServiceTransactionWorkerStore( else: return int(last_txn_id[0]) # select 'last_txn' col - async def set_appservice_last_pos(self, pos) -> None: + async def set_appservice_last_pos(self, pos: int) -> None: def set_appservice_last_pos_txn(txn): txn.execute( "UPDATE appservice_stream_position SET stream_ordering = ?", (pos,) @@ -334,7 +354,9 @@ class ApplicationServiceTransactionWorkerStore( "set_appservice_last_pos", set_appservice_last_pos_txn ) - async def get_new_events_for_appservice(self, current_id, limit): + async def get_new_events_for_appservice( + self, current_id: int, limit: int + ) -> Tuple[int, List[EventBase]]: """Get all new events for an appservice""" def get_new_events_for_appservice_txn(txn): @@ -394,7 +416,7 @@ class ApplicationServiceTransactionWorkerStore( ) async def set_type_stream_id_for_appservice( - self, service: ApplicationService, type: str, pos: int + self, service: ApplicationService, type: str, pos: Optional[int] ) -> None: if type not in ("read_receipt", "presence"): raise ValueError( -- cgit 1.5.1 From 243d427fbcb24c78c2df143767cd4636844fc82e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Nov 2020 12:13:48 +0000 Subject: Block clients from sending server ACLs that lock the local server out. (#8708) Fixes #4042 --- changelog.d/8708.misc | 1 + mypy.ini | 1 + synapse/events/validator.py | 27 +++++++++++++------- synapse/handlers/message.py | 3 +++ tests/handlers/test_message.py | 57 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 80 insertions(+), 9 deletions(-) create mode 100644 changelog.d/8708.misc (limited to 'mypy.ini') diff --git a/changelog.d/8708.misc b/changelog.d/8708.misc new file mode 100644 index 0000000000..be679fb0f8 --- /dev/null +++ b/changelog.d/8708.misc @@ -0,0 +1 @@ +Block attempts by clients to send server ACLs, or redactions of server ACLs, that would result in the local server being blocked from the room. diff --git a/mypy.ini b/mypy.ini index 1ece2ba082..fc9f8d8050 100644 --- a/mypy.ini +++ b/mypy.ini @@ -13,6 +13,7 @@ files = synapse/config, synapse/event_auth.py, synapse/events/builder.py, + synapse/events/validator.py, synapse/events/spamcheck.py, synapse/federation, synapse/handlers/_base.py, diff --git a/synapse/events/validator.py b/synapse/events/validator.py index 5f9af8529b..f8f3b1a31e 100644 --- a/synapse/events/validator.py +++ b/synapse/events/validator.py @@ -13,20 +13,26 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Union + from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes, Membership from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import EventFormatVersions +from synapse.config.homeserver import HomeServerConfig +from synapse.events import EventBase +from synapse.events.builder import EventBuilder from synapse.events.utils import validate_canonicaljson +from synapse.federation.federation_server import server_matches_acl_event from synapse.types import EventID, RoomID, UserID class EventValidator: - def validate_new(self, event, config): + def validate_new(self, event: EventBase, config: HomeServerConfig): """Validates the event has roughly the right format Args: - event (FrozenEvent): The event to validate. - config (Config): The homeserver's configuration. + event: The event to validate. + config: The homeserver's configuration. """ self.validate_builder(event) @@ -76,12 +82,18 @@ class EventValidator: if event.type == EventTypes.Retention: self._validate_retention(event) - def _validate_retention(self, event): + if event.type == EventTypes.ServerACL: + if not server_matches_acl_event(config.server_name, event): + raise SynapseError( + 400, "Can't create an ACL event that denies the local server" + ) + + def _validate_retention(self, event: EventBase): """Checks that an event that defines the retention policy for a room respects the format enforced by the spec. Args: - event (FrozenEvent): The event to validate. + event: The event to validate. """ if not event.is_state(): raise SynapseError(code=400, msg="must be a state event") @@ -116,13 +128,10 @@ class EventValidator: errcode=Codes.BAD_JSON, ) - def validate_builder(self, event): + def validate_builder(self, event: Union[EventBase, EventBuilder]): """Validates that the builder/event has roughly the right format. Only checks values that we expect a proto event to have, rather than all the fields an event would have - - Args: - event (EventBuilder|FrozenEvent) """ strings = ["room_id", "sender", "type"] diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index ca5602c13e..c6791fb912 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1138,6 +1138,9 @@ class EventCreationHandler: if original_event.room_id != event.room_id: raise SynapseError(400, "Cannot redact event from a different room") + if original_event.type == EventTypes.ServerACL: + raise AuthError(403, "Redacting server ACL events is not permitted") + prev_state_ids = await context.get_prev_state_ids() auth_events_ids = self.auth.compute_auth_events( event, prev_state_ids, for_verification=True diff --git a/tests/handlers/test_message.py b/tests/handlers/test_message.py index 2e0fea04af..8b57081cbe 100644 --- a/tests/handlers/test_message.py +++ b/tests/handlers/test_message.py @@ -154,3 +154,60 @@ class EventCreationTestCase(unittest.HomeserverTestCase): # Check that we've deduplicated the events. self.assertEqual(len(events), 2) self.assertEqual(events[0].event_id, events[1].event_id) + + +class ServerAclValidationTestCase(unittest.HomeserverTestCase): + servlets = [ + admin.register_servlets, + login.register_servlets, + room.register_servlets, + ] + + def prepare(self, reactor, clock, hs): + self.user_id = self.register_user("tester", "foobar") + self.access_token = self.login("tester", "foobar") + self.room_id = self.helper.create_room_as(self.user_id, tok=self.access_token) + + def test_allow_server_acl(self): + """Test that sending an ACL that blocks everyone but ourselves works. + """ + + self.helper.send_state( + self.room_id, + EventTypes.ServerACL, + body={"allow": [self.hs.hostname]}, + tok=self.access_token, + expect_code=200, + ) + + def test_deny_server_acl_block_outselves(self): + """Test that sending an ACL that blocks ourselves does not work. + """ + self.helper.send_state( + self.room_id, + EventTypes.ServerACL, + body={}, + tok=self.access_token, + expect_code=400, + ) + + def test_deny_redact_server_acl(self): + """Test that attempting to redact an ACL is blocked. + """ + + body = self.helper.send_state( + self.room_id, + EventTypes.ServerACL, + body={"allow": [self.hs.hostname]}, + tok=self.access_token, + expect_code=200, + ) + event_id = body["event_id"] + + # Redaction of event should fail. + path = "/_matrix/client/r0/rooms/%s/redact/%s" % (self.room_id, event_id) + request, channel = self.make_request( + "POST", path, content={}, access_token=self.access_token + ) + self.render(request) + self.assertEqual(int(channel.result["code"]), 403) -- cgit 1.5.1 From 6fde6aa9c02d35e0a908437ea49b275df9b58427 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 23 Nov 2020 13:28:03 -0500 Subject: Properly report user-agent/IP during registration of SSO users. (#8784) This also expands type-hints to the SSO and registration code. Refactors the CAS code to more closely match OIDC/SAML. --- changelog.d/8784.misc | 1 + mypy.ini | 1 + synapse/handlers/cas_handler.py | 71 +++++++++---- synapse/handlers/oidc_handler.py | 2 +- synapse/handlers/register.py | 214 +++++++++++++++++++++------------------ synapse/handlers/saml_handler.py | 6 +- 6 files changed, 173 insertions(+), 122 deletions(-) create mode 100644 changelog.d/8784.misc (limited to 'mypy.ini') diff --git a/changelog.d/8784.misc b/changelog.d/8784.misc new file mode 100644 index 0000000000..18a4263398 --- /dev/null +++ b/changelog.d/8784.misc @@ -0,0 +1 @@ +Fix a bug introduced in v1.20.0 where the user-agent and IP address reported during user registration for CAS, OpenID Connect, and SAML were of the wrong form. diff --git a/mypy.ini b/mypy.ini index fc9f8d8050..0cf7c93f45 100644 --- a/mypy.ini +++ b/mypy.ini @@ -37,6 +37,7 @@ files = synapse/handlers/presence.py, synapse/handlers/profile.py, synapse/handlers/read_marker.py, + synapse/handlers/register.py, synapse/handlers/room.py, synapse/handlers/room_member.py, synapse/handlers/room_member_worker.py, diff --git a/synapse/handlers/cas_handler.py b/synapse/handlers/cas_handler.py index 048a3b3c0b..f4ea0a9767 100644 --- a/synapse/handlers/cas_handler.py +++ b/synapse/handlers/cas_handler.py @@ -14,7 +14,7 @@ # limitations under the License. import logging import urllib -from typing import Dict, Optional, Tuple +from typing import TYPE_CHECKING, Dict, Optional, Tuple from xml.etree import ElementTree as ET from twisted.web.client import PartialDownloadError @@ -23,6 +23,9 @@ from synapse.api.errors import Codes, LoginError from synapse.http.site import SynapseRequest from synapse.types import UserID, map_username_to_mxid_localpart +if TYPE_CHECKING: + from synapse.app.homeserver import HomeServer + logger = logging.getLogger(__name__) @@ -31,10 +34,10 @@ class CasHandler: Utility class for to handle the response from a CAS SSO service. Args: - hs (synapse.server.HomeServer) + hs """ - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.hs = hs self._hostname = hs.hostname self._auth_handler = hs.get_auth_handler() @@ -200,27 +203,57 @@ class CasHandler: args["session"] = session username, user_display_name = await self._validate_ticket(ticket, args) - localpart = map_username_to_mxid_localpart(username) - user_id = UserID(localpart, self._hostname).to_string() - registered_user_id = await self._auth_handler.check_user_exists(user_id) + # Pull out the user-agent and IP from the request. + user_agent = request.get_user_agent("") + ip_address = self.hs.get_ip_from_request(request) + + # Get the matrix ID from the CAS username. + user_id = await self._map_cas_user_to_matrix_user( + username, user_display_name, user_agent, ip_address + ) if session: await self._auth_handler.complete_sso_ui_auth( - registered_user_id, session, request, + user_id, session, request, ) - else: - if not registered_user_id: - # Pull out the user-agent and IP from the request. - user_agent = request.get_user_agent("") - ip_address = self.hs.get_ip_from_request(request) - - registered_user_id = await self._registration_handler.register_user( - localpart=localpart, - default_display_name=user_display_name, - user_agent_ips=(user_agent, ip_address), - ) + # If this not a UI auth request than there must be a redirect URL. + assert client_redirect_url await self._auth_handler.complete_sso_login( - registered_user_id, request, client_redirect_url + user_id, request, client_redirect_url ) + + async def _map_cas_user_to_matrix_user( + self, + remote_user_id: str, + display_name: Optional[str], + user_agent: str, + ip_address: str, + ) -> str: + """ + Given a CAS username, retrieve the user ID for it and possibly register the user. + + Args: + remote_user_id: The username from the CAS response. + display_name: The display name from the CAS response. + user_agent: The user agent of the client making the request. + ip_address: The IP address of the client making the request. + + Returns: + The user ID associated with this response. + """ + + localpart = map_username_to_mxid_localpart(remote_user_id) + user_id = UserID(localpart, self._hostname).to_string() + registered_user_id = await self._auth_handler.check_user_exists(user_id) + + # If the user does not exist, register it. + if not registered_user_id: + registered_user_id = await self._registration_handler.register_user( + localpart=localpart, + default_display_name=display_name, + user_agent_ips=[(user_agent, ip_address)], + ) + + return registered_user_id diff --git a/synapse/handlers/oidc_handler.py b/synapse/handlers/oidc_handler.py index 4bfd8d5617..34de9109ea 100644 --- a/synapse/handlers/oidc_handler.py +++ b/synapse/handlers/oidc_handler.py @@ -925,7 +925,7 @@ class OidcHandler(BaseHandler): registered_user_id = await self._registration_handler.register_user( localpart=localpart, default_display_name=attributes["display_name"], - user_agent_ips=(user_agent, ip_address), + user_agent_ips=[(user_agent, ip_address)], ) await self.store.record_user_external_id( diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 252f700786..0d85fd0868 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -15,10 +15,12 @@ """Contains functions for registering clients.""" import logging +from typing import TYPE_CHECKING, List, Optional, Tuple from synapse import types from synapse.api.constants import MAX_USERID_LENGTH, EventTypes, JoinRules, LoginType from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError +from synapse.appservice import ApplicationService from synapse.config.server import is_threepid_reserved from synapse.http.servlet import assert_params_in_dict from synapse.replication.http.login import RegisterDeviceReplicationServlet @@ -32,16 +34,14 @@ from synapse.types import RoomAlias, UserID, create_requester from ._base import BaseHandler +if TYPE_CHECKING: + from synapse.app.homeserver import HomeServer + logger = logging.getLogger(__name__) class RegistrationHandler(BaseHandler): - def __init__(self, hs): - """ - - Args: - hs (synapse.server.HomeServer): - """ + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.hs = hs self.auth = hs.get_auth() @@ -71,7 +71,10 @@ class RegistrationHandler(BaseHandler): self.session_lifetime = hs.config.session_lifetime async def check_username( - self, localpart, guest_access_token=None, assigned_user_id=None + self, + localpart: str, + guest_access_token: Optional[str] = None, + assigned_user_id: Optional[str] = None, ): if types.contains_invalid_mxid_characters(localpart): raise SynapseError( @@ -140,39 +143,45 @@ class RegistrationHandler(BaseHandler): async def register_user( self, - localpart=None, - password_hash=None, - guest_access_token=None, - make_guest=False, - admin=False, - threepid=None, - user_type=None, - default_display_name=None, - address=None, - bind_emails=[], - by_admin=False, - user_agent_ips=None, - ): + localpart: Optional[str] = None, + password_hash: Optional[str] = None, + guest_access_token: Optional[str] = None, + make_guest: bool = False, + admin: bool = False, + threepid: Optional[dict] = None, + user_type: Optional[str] = None, + default_display_name: Optional[str] = None, + address: Optional[str] = None, + bind_emails: List[str] = [], + by_admin: bool = False, + user_agent_ips: Optional[List[Tuple[str, str]]] = None, + ) -> str: """Registers a new client on the server. Args: localpart: The local part of the user ID to register. If None, one will be generated. - password_hash (str|None): The hashed password to assign to this user so they can + password_hash: The hashed password to assign to this user so they can login again. This can be None which means they cannot login again via a password (e.g. the user is an application service user). - user_type (str|None): type of user. One of the values from + guest_access_token: The access token used when this was a guest + account. + make_guest: True if the the new user should be guest, + false to add a regular user account. + admin: True if the user should be registered as a server admin. + threepid: The threepid used for registering, if any. + user_type: type of user. One of the values from api.constants.UserTypes, or None for a normal user. - default_display_name (unicode|None): if set, the new user's displayname + default_display_name: if set, the new user's displayname will be set to this. Defaults to 'localpart'. - address (str|None): the IP address used to perform the registration. - bind_emails (List[str]): list of emails to bind to this account. - by_admin (bool): True if this registration is being made via the + address: the IP address used to perform the registration. + bind_emails: list of emails to bind to this account. + by_admin: True if this registration is being made via the admin api, otherwise False. - user_agent_ips (List[(str, str)]): Tuples of IP addresses and user-agents used + user_agent_ips: Tuples of IP addresses and user-agents used during the registration process. Returns: - str: user_id + The registere user_id. Raises: SynapseError if there was a problem registering. """ @@ -236,8 +245,10 @@ class RegistrationHandler(BaseHandler): else: # autogen a sequential user ID fail_count = 0 - user = None - while not user: + # If a default display name is not given, generate one. + generate_display_name = default_display_name is None + # This breaks on successful registration *or* errors after 10 failures. + while True: # Fail after being unable to find a suitable ID a few times if fail_count > 10: raise SynapseError(500, "Unable to find a suitable guest user ID") @@ -246,7 +257,7 @@ class RegistrationHandler(BaseHandler): user = UserID(localpart, self.hs.hostname) user_id = user.to_string() self.check_user_id_not_appservice_exclusive(user_id) - if default_display_name is None: + if generate_display_name: default_display_name = localpart try: await self.register_with_store( @@ -262,8 +273,6 @@ class RegistrationHandler(BaseHandler): break except SynapseError: # if user id is taken, just generate another - user = None - user_id = None fail_count += 1 if not self.hs.config.user_consent_at_registration: @@ -295,7 +304,7 @@ class RegistrationHandler(BaseHandler): return user_id - async def _create_and_join_rooms(self, user_id: str): + async def _create_and_join_rooms(self, user_id: str) -> None: """ Create the auto-join rooms and join or invite the user to them. @@ -379,7 +388,7 @@ class RegistrationHandler(BaseHandler): except Exception as e: logger.error("Failed to join new user to %r: %r", r, e) - async def _join_rooms(self, user_id: str): + async def _join_rooms(self, user_id: str) -> None: """ Join or invite the user to the auto-join rooms. @@ -425,6 +434,9 @@ class RegistrationHandler(BaseHandler): # Send the invite, if necessary. if requires_invite: + # If an invite is required, there must be a auto-join user ID. + assert self.hs.config.registration.auto_join_user_id + await room_member_handler.update_membership( requester=create_requester( self.hs.config.registration.auto_join_user_id, @@ -456,7 +468,7 @@ class RegistrationHandler(BaseHandler): except Exception as e: logger.error("Failed to join new user to %r: %r", r, e) - async def _auto_join_rooms(self, user_id: str): + async def _auto_join_rooms(self, user_id: str) -> None: """Automatically joins users to auto join rooms - creating the room in the first place if the user is the first to be created. @@ -479,16 +491,16 @@ class RegistrationHandler(BaseHandler): else: await self._join_rooms(user_id) - async def post_consent_actions(self, user_id): + async def post_consent_actions(self, user_id: str) -> None: """A series of registration actions that can only be carried out once consent has been granted Args: - user_id (str): The user to join + user_id: The user to join """ await self._auto_join_rooms(user_id) - async def appservice_register(self, user_localpart, as_token): + async def appservice_register(self, user_localpart: str, as_token: str) -> str: user = UserID(user_localpart, self.hs.hostname) user_id = user.to_string() service = self.store.get_app_service_by_token(as_token) @@ -513,7 +525,9 @@ class RegistrationHandler(BaseHandler): ) return user_id - def check_user_id_not_appservice_exclusive(self, user_id, allowed_appservice=None): + def check_user_id_not_appservice_exclusive( + self, user_id: str, allowed_appservice: Optional[ApplicationService] = None + ) -> None: # don't allow people to register the server notices mxid if self._server_notices_mxid is not None: if user_id == self._server_notices_mxid: @@ -537,12 +551,12 @@ class RegistrationHandler(BaseHandler): errcode=Codes.EXCLUSIVE, ) - def check_registration_ratelimit(self, address): + def check_registration_ratelimit(self, address: Optional[str]) -> None: """A simple helper method to check whether the registration rate limit has been hit for a given IP address Args: - address (str|None): the IP address used to perform the registration. If this is + address: the IP address used to perform the registration. If this is None, no ratelimiting will be performed. Raises: @@ -553,42 +567,39 @@ class RegistrationHandler(BaseHandler): self.ratelimiter.ratelimit(address) - def register_with_store( + async def register_with_store( self, - user_id, - password_hash=None, - was_guest=False, - make_guest=False, - appservice_id=None, - create_profile_with_displayname=None, - admin=False, - user_type=None, - address=None, - shadow_banned=False, - ): + user_id: str, + password_hash: Optional[str] = None, + was_guest: bool = False, + make_guest: bool = False, + appservice_id: Optional[str] = None, + create_profile_with_displayname: Optional[str] = None, + admin: bool = False, + user_type: Optional[str] = None, + address: Optional[str] = None, + shadow_banned: bool = False, + ) -> None: """Register user in the datastore. Args: - user_id (str): The desired user ID to register. - password_hash (str|None): Optional. The password hash for this user. - was_guest (bool): Optional. Whether this is a guest account being + user_id: The desired user ID to register. + password_hash: Optional. The password hash for this user. + was_guest: Optional. Whether this is a guest account being upgraded to a non-guest account. - make_guest (boolean): True if the the new user should be guest, + make_guest: True if the the new user should be guest, false to add a regular user account. - appservice_id (str|None): The ID of the appservice registering the user. - create_profile_with_displayname (unicode|None): Optionally create a + appservice_id: The ID of the appservice registering the user. + create_profile_with_displayname: Optionally create a profile for the user, setting their displayname to the given value - admin (boolean): is an admin user? - user_type (str|None): type of user. One of the values from + admin: is an admin user? + user_type: type of user. One of the values from api.constants.UserTypes, or None for a normal user. - address (str|None): the IP address used to perform the registration. - shadow_banned (bool): Whether to shadow-ban the user - - Returns: - Awaitable + address: the IP address used to perform the registration. + shadow_banned: Whether to shadow-ban the user """ if self.hs.config.worker_app: - return self._register_client( + await self._register_client( user_id=user_id, password_hash=password_hash, was_guest=was_guest, @@ -601,7 +612,7 @@ class RegistrationHandler(BaseHandler): shadow_banned=shadow_banned, ) else: - return self.store.register_user( + await self.store.register_user( user_id=user_id, password_hash=password_hash, was_guest=was_guest, @@ -614,22 +625,24 @@ class RegistrationHandler(BaseHandler): ) async def register_device( - self, user_id, device_id, initial_display_name, is_guest=False - ): + self, + user_id: str, + device_id: Optional[str], + initial_display_name: Optional[str], + is_guest: bool = False, + ) -> Tuple[str, str]: """Register a device for a user and generate an access token. The access token will be limited by the homeserver's session_lifetime config. Args: - user_id (str): full canonical @user:id - device_id (str|None): The device ID to check, or None to generate - a new one. - initial_display_name (str|None): An optional display name for the - device. - is_guest (bool): Whether this is a guest account + user_id: full canonical @user:id + device_id: The device ID to check, or None to generate a new one. + initial_display_name: An optional display name for the device. + is_guest: Whether this is a guest account Returns: - tuple[str, str]: Tuple of device ID and access token + Tuple of device ID and access token """ if self.hs.config.worker_app: @@ -649,7 +662,7 @@ class RegistrationHandler(BaseHandler): ) valid_until_ms = self.clock.time_msec() + self.session_lifetime - device_id = await self.device_handler.check_device_registered( + registered_device_id = await self.device_handler.check_device_registered( user_id, device_id, initial_display_name ) if is_guest: @@ -659,20 +672,21 @@ class RegistrationHandler(BaseHandler): ) else: access_token = await self._auth_handler.get_access_token_for_user_id( - user_id, device_id=device_id, valid_until_ms=valid_until_ms + user_id, device_id=registered_device_id, valid_until_ms=valid_until_ms ) - return (device_id, access_token) + return (registered_device_id, access_token) - async def post_registration_actions(self, user_id, auth_result, access_token): + async def post_registration_actions( + self, user_id: str, auth_result: dict, access_token: Optional[str] + ) -> None: """A user has completed registration Args: - user_id (str): The user ID that consented - auth_result (dict): The authenticated credentials of the newly - registered user. - access_token (str|None): The access token of the newly logged in - device, or None if `inhibit_login` enabled. + user_id: The user ID that consented + auth_result: The authenticated credentials of the newly registered user. + access_token: The access token of the newly logged in device, or + None if `inhibit_login` enabled. """ if self.hs.config.worker_app: await self._post_registration_client( @@ -698,19 +712,20 @@ class RegistrationHandler(BaseHandler): if auth_result and LoginType.TERMS in auth_result: await self._on_user_consented(user_id, self.hs.config.user_consent_version) - async def _on_user_consented(self, user_id, consent_version): + async def _on_user_consented(self, user_id: str, consent_version: str) -> None: """A user consented to the terms on registration Args: - user_id (str): The user ID that consented. - consent_version (str): version of the policy the user has - consented to. + user_id: The user ID that consented. + consent_version: version of the policy the user has consented to. """ logger.info("%s has consented to the privacy policy", user_id) await self.store.user_set_consent_version(user_id, consent_version) await self.post_consent_actions(user_id) - async def _register_email_threepid(self, user_id, threepid, token): + async def _register_email_threepid( + self, user_id: str, threepid: dict, token: Optional[str] + ) -> None: """Add an email address as a 3pid identifier Also adds an email pusher for the email address, if configured in the @@ -719,10 +734,9 @@ class RegistrationHandler(BaseHandler): Must be called on master. Args: - user_id (str): id of user - threepid (object): m.login.email.identity auth response - token (str|None): access_token for the user, or None if not logged - in. + user_id: id of user + threepid: m.login.email.identity auth response + token: access_token for the user, or None if not logged in. """ reqd = ("medium", "address", "validated_at") if any(x not in threepid for x in reqd): @@ -748,6 +762,8 @@ class RegistrationHandler(BaseHandler): # up when the access token is saved, but that's quite an # invasive change I'd rather do separately. user_tuple = await self.store.get_user_by_access_token(token) + # The token better still exist. + assert user_tuple token_id = user_tuple.token_id await self.pusher_pool.add_pusher( @@ -762,14 +778,14 @@ class RegistrationHandler(BaseHandler): data={}, ) - async def _register_msisdn_threepid(self, user_id, threepid): + async def _register_msisdn_threepid(self, user_id: str, threepid: dict) -> None: """Add a phone number as a 3pid identifier Must be called on master. Args: - user_id (str): id of user - threepid (object): m.login.msisdn auth response + user_id: id of user + threepid: m.login.msisdn auth response """ try: assert_params_in_dict(threepid, ["medium", "address", "validated_at"]) diff --git a/synapse/handlers/saml_handler.py b/synapse/handlers/saml_handler.py index f4e8cbeac8..37ab42f050 100644 --- a/synapse/handlers/saml_handler.py +++ b/synapse/handlers/saml_handler.py @@ -39,7 +39,7 @@ from synapse.util.async_helpers import Linearizer from synapse.util.iterutils import chunk_seq if TYPE_CHECKING: - import synapse.server + from synapse.server import HomeServer logger = logging.getLogger(__name__) @@ -56,7 +56,7 @@ class Saml2SessionData: class SamlHandler(BaseHandler): - def __init__(self, hs: "synapse.server.HomeServer"): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self._saml_client = Saml2Client(hs.config.saml2_sp_config) self._saml_idp_entityid = hs.config.saml2_idp_entityid @@ -330,7 +330,7 @@ class SamlHandler(BaseHandler): localpart=localpart, default_display_name=displayname, bind_emails=emails, - user_agent_ips=(user_agent, ip_address), + user_agent_ips=[(user_agent, ip_address)], ) await self.store.record_user_external_id( -- cgit 1.5.1 From 97b35ee259b062412ed2da8272dd200eb29ee93d Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 24 Nov 2020 12:53:00 +0000 Subject: Add a script to sign arbitrary json objects. (#8772) --- changelog.d/8772.misc | 1 + mypy.ini | 1 + scripts-dev/sign_json | 127 ++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 129 insertions(+) create mode 100644 changelog.d/8772.misc create mode 100755 scripts-dev/sign_json (limited to 'mypy.ini') diff --git a/changelog.d/8772.misc b/changelog.d/8772.misc new file mode 100644 index 0000000000..d74d0a3d5d --- /dev/null +++ b/changelog.d/8772.misc @@ -0,0 +1 @@ +Add a commandline script to sign arbitrary json objects. diff --git a/mypy.ini b/mypy.ini index 0cf7c93f45..f4f981e813 100644 --- a/mypy.ini +++ b/mypy.ini @@ -8,6 +8,7 @@ show_traceback = True mypy_path = stubs warn_unreachable = True files = + scripts-dev/sign_json, synapse/api, synapse/appservice, synapse/config, diff --git a/scripts-dev/sign_json b/scripts-dev/sign_json new file mode 100755 index 0000000000..44553fb79a --- /dev/null +++ b/scripts-dev/sign_json @@ -0,0 +1,127 @@ +#!/usr/bin/env python +# +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import argparse +import json +import sys +from json import JSONDecodeError + +import yaml +from signedjson.key import read_signing_keys +from signedjson.sign import sign_json + +from synapse.util import json_encoder + + +def main(): + parser = argparse.ArgumentParser( + description="""Adds a signature to a JSON object. + +Example usage: + + $ scripts-dev/sign_json.py -N test -k localhost.signing.key "{}" + {"signatures":{"test":{"ed25519:a_ZnZh":"LmPnml6iM0iR..."}}} +""", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + + parser.add_argument( + "-N", + "--server-name", + help="Name to give as the local homeserver. If unspecified, will be " + "read from the config file.", + ) + + parser.add_argument( + "-k", + "--signing-key-path", + help="Path to the file containing the private ed25519 key to sign the " + "request with.", + ) + + parser.add_argument( + "-c", + "--config", + default="homeserver.yaml", + help=( + "Path to synapse config file, from which the server name and/or signing " + "key path will be read. Ignored if --server-name and --signing-key-path " + "are both given." + ), + ) + + input_args = parser.add_mutually_exclusive_group() + + input_args.add_argument("input_data", nargs="?", help="Raw JSON to be signed.") + + input_args.add_argument( + "-i", + "--input", + type=argparse.FileType("r"), + default=sys.stdin, + help=( + "A file from which to read the JSON to be signed. If neither --input nor " + "input_data are given, JSON will be read from stdin." + ), + ) + + parser.add_argument( + "-o", + "--output", + type=argparse.FileType("w"), + default=sys.stdout, + help="Where to write the signed JSON. Defaults to stdout.", + ) + + args = parser.parse_args() + + if not args.server_name or not args.signing_key_path: + read_args_from_config(args) + + with open(args.signing_key_path) as f: + key = read_signing_keys(f)[0] + + json_to_sign = args.input_data + if json_to_sign is None: + json_to_sign = args.input.read() + + try: + obj = json.loads(json_to_sign) + except JSONDecodeError as e: + print("Unable to parse input as JSON: %s" % e, file=sys.stderr) + sys.exit(1) + + if not isinstance(obj, dict): + print("Input json was not an object", file=sys.stderr) + sys.exit(1) + + sign_json(obj, args.server_name, key) + for c in json_encoder.iterencode(obj): + args.output.write(c) + args.output.write("\n") + + +def read_args_from_config(args: argparse.Namespace) -> None: + with open(args.config, "r") as fh: + config = yaml.safe_load(fh) + if not args.server_name: + args.server_name = config["server_name"] + if not args.signing_key_path: + args.signing_key_path = config["signing_key_path"] + + +if __name__ == "__main__": + main() -- cgit 1.5.1 From f38676d16143e399b654504486cf8cbecad12a5d Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 25 Nov 2020 07:07:21 -0500 Subject: Add type hints to matrix federation client / agent. (#8806) --- changelog.d/8806.misc | 1 + mypy.ini | 2 + synapse/http/federation/matrix_federation_agent.py | 100 ++++--- synapse/http/federation/well_known_resolver.py | 16 +- synapse/http/matrixfederationclient.py | 304 +++++++++++---------- synapse/server.py | 3 +- 6 files changed, 231 insertions(+), 195 deletions(-) create mode 100644 changelog.d/8806.misc (limited to 'mypy.ini') diff --git a/changelog.d/8806.misc b/changelog.d/8806.misc new file mode 100644 index 0000000000..52457deb5e --- /dev/null +++ b/changelog.d/8806.misc @@ -0,0 +1 @@ +Add type hints to matrix federation client and agent. diff --git a/mypy.ini b/mypy.ini index f4f981e813..3e42235ac1 100644 --- a/mypy.ini +++ b/mypy.ini @@ -45,7 +45,9 @@ files = synapse/handlers/saml_handler.py, synapse/handlers/sync.py, synapse/handlers/ui_auth, + synapse/http/federation/matrix_federation_agent.py, synapse/http/federation/well_known_resolver.py, + synapse/http/matrixfederationclient.py, synapse/http/server.py, synapse/http/site.py, synapse/logging, diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index 83d6196d4a..e77f9587d0 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -12,21 +12,25 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging -import urllib -from typing import List +import urllib.parse +from typing import List, Optional from netaddr import AddrFormatError, IPAddress from zope.interface import implementer from twisted.internet import defer from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS -from twisted.internet.interfaces import IStreamClientEndpoint -from twisted.web.client import Agent, HTTPConnectionPool +from twisted.internet.interfaces import ( + IProtocolFactory, + IReactorCore, + IStreamClientEndpoint, +) +from twisted.web.client import URI, Agent, HTTPConnectionPool from twisted.web.http_headers import Headers -from twisted.web.iweb import IAgent, IAgentEndpointFactory +from twisted.web.iweb import IAgent, IAgentEndpointFactory, IBodyProducer +from synapse.crypto.context_factory import FederationPolicyForHTTPS from synapse.http.federation.srv_resolver import Server, SrvResolver from synapse.http.federation.well_known_resolver import WellKnownResolver from synapse.logging.context import make_deferred_yieldable, run_in_background @@ -44,30 +48,30 @@ class MatrixFederationAgent: Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.) Args: - reactor (IReactor): twisted reactor to use for underlying requests + reactor: twisted reactor to use for underlying requests - tls_client_options_factory (FederationPolicyForHTTPS|None): + tls_client_options_factory: factory to use for fetching client tls options, or none to disable TLS. - user_agent (bytes): + user_agent: The user agent header to use for federation requests. - _srv_resolver (SrvResolver|None): - SRVResolver impl to use for looking up SRV records. None to use a default - implementation. + _srv_resolver: + SrvResolver implementation to use for looking up SRV records. None + to use a default implementation. - _well_known_resolver (WellKnownResolver|None): + _well_known_resolver: WellKnownResolver to use to perform well-known lookups. None to use a default implementation. """ def __init__( self, - reactor, - tls_client_options_factory, - user_agent, - _srv_resolver=None, - _well_known_resolver=None, + reactor: IReactorCore, + tls_client_options_factory: Optional[FederationPolicyForHTTPS], + user_agent: bytes, + _srv_resolver: Optional[SrvResolver] = None, + _well_known_resolver: Optional[WellKnownResolver] = None, ): self._reactor = reactor self._clock = Clock(reactor) @@ -99,15 +103,20 @@ class MatrixFederationAgent: self._well_known_resolver = _well_known_resolver @defer.inlineCallbacks - def request(self, method, uri, headers=None, bodyProducer=None): + def request( + self, + method: bytes, + uri: bytes, + headers: Optional[Headers] = None, + bodyProducer: Optional[IBodyProducer] = None, + ) -> defer.Deferred: """ Args: - method (bytes): HTTP method: GET/POST/etc - uri (bytes): Absolute URI to be retrieved - headers (twisted.web.http_headers.Headers|None): - HTTP headers to send with the request, or None to - send no extra headers. - bodyProducer (twisted.web.iweb.IBodyProducer|None): + method: HTTP method: GET/POST/etc + uri: Absolute URI to be retrieved + headers: + HTTP headers to send with the request, or None to send no extra headers. + bodyProducer: An object which can generate bytes to make up the body of this request (for example, the properly encoded contents of a file for a file upload). Or None if the request is to have @@ -123,6 +132,9 @@ class MatrixFederationAgent: # explicit port. parsed_uri = urllib.parse.urlparse(uri) + # There must be a valid hostname. + assert parsed_uri.hostname + # If this is a matrix:// URI check if the server has delegated matrix # traffic using well-known delegation. # @@ -179,7 +191,12 @@ class MatrixHostnameEndpointFactory: """Factory for MatrixHostnameEndpoint for parsing to an Agent. """ - def __init__(self, reactor, tls_client_options_factory, srv_resolver): + def __init__( + self, + reactor: IReactorCore, + tls_client_options_factory: Optional[FederationPolicyForHTTPS], + srv_resolver: Optional[SrvResolver], + ): self._reactor = reactor self._tls_client_options_factory = tls_client_options_factory @@ -203,15 +220,20 @@ class MatrixHostnameEndpoint: resolution (i.e. via SRV). Does not check for well-known delegation. Args: - reactor (IReactor) - tls_client_options_factory (ClientTLSOptionsFactory|None): + reactor: twisted reactor to use for underlying requests + tls_client_options_factory: factory to use for fetching client tls options, or none to disable TLS. - srv_resolver (SrvResolver): The SRV resolver to use - parsed_uri (twisted.web.client.URI): The parsed URI that we're wanting - to connect to. + srv_resolver: The SRV resolver to use + parsed_uri: The parsed URI that we're wanting to connect to. """ - def __init__(self, reactor, tls_client_options_factory, srv_resolver, parsed_uri): + def __init__( + self, + reactor: IReactorCore, + tls_client_options_factory: Optional[FederationPolicyForHTTPS], + srv_resolver: SrvResolver, + parsed_uri: URI, + ): self._reactor = reactor self._parsed_uri = parsed_uri @@ -231,13 +253,13 @@ class MatrixHostnameEndpoint: self._srv_resolver = srv_resolver - def connect(self, protocol_factory): + def connect(self, protocol_factory: IProtocolFactory) -> defer.Deferred: """Implements IStreamClientEndpoint interface """ return run_in_background(self._do_connect, protocol_factory) - async def _do_connect(self, protocol_factory): + async def _do_connect(self, protocol_factory: IProtocolFactory) -> None: first_exception = None server_list = await self._resolve_server() @@ -303,20 +325,20 @@ class MatrixHostnameEndpoint: return [Server(host, 8448)] -def _is_ip_literal(host): +def _is_ip_literal(host: bytes) -> bool: """Test if the given host name is either an IPv4 or IPv6 literal. Args: - host (bytes) + host: The host name to check Returns: - bool + True if the hostname is an IP address literal. """ - host = host.decode("ascii") + host_str = host.decode("ascii") try: - IPAddress(host) + IPAddress(host_str) return True except AddrFormatError: return False diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py index 1cc666fbf6..5e08ef1664 100644 --- a/synapse/http/federation/well_known_resolver.py +++ b/synapse/http/federation/well_known_resolver.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import random import time @@ -21,10 +20,11 @@ from typing import Callable, Dict, Optional, Tuple import attr from twisted.internet import defer +from twisted.internet.interfaces import IReactorTime from twisted.web.client import RedirectAgent, readBody from twisted.web.http import stringToDatetime from twisted.web.http_headers import Headers -from twisted.web.iweb import IResponse +from twisted.web.iweb import IAgent, IResponse from synapse.logging.context import make_deferred_yieldable from synapse.util import Clock, json_decoder @@ -81,11 +81,11 @@ class WellKnownResolver: def __init__( self, - reactor, - agent, - user_agent, - well_known_cache=None, - had_well_known_cache=None, + reactor: IReactorTime, + agent: IAgent, + user_agent: bytes, + well_known_cache: Optional[TTLCache] = None, + had_well_known_cache: Optional[TTLCache] = None, ): self._reactor = reactor self._clock = Clock(reactor) @@ -127,7 +127,7 @@ class WellKnownResolver: with Measure(self._clock, "get_well_known"): result, cache_period = await self._fetch_well_known( server_name - ) # type: Tuple[Optional[bytes], float] + ) # type: Optional[bytes], float except _FetchWellKnownFailure as e: if prev_result and e.temporary: diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 7e17cdb73e..b2ccae90df 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -17,8 +17,9 @@ import cgi import logging import random import sys -import urllib +import urllib.parse from io import BytesIO +from typing import BinaryIO, Callable, Dict, List, Optional, Tuple, Union import attr import treq @@ -31,9 +32,10 @@ from twisted.internet import defer, protocol from twisted.internet.error import DNSLookupError from twisted.internet.interfaces import IReactorPluggableNameResolver, IReactorTime from twisted.internet.task import _EPSILON, Cooperator +from twisted.python.failure import Failure from twisted.web._newclient import ResponseDone from twisted.web.http_headers import Headers -from twisted.web.iweb import IResponse +from twisted.web.iweb import IBodyProducer, IResponse import synapse.metrics import synapse.util.retryutils @@ -54,6 +56,7 @@ from synapse.logging.opentracing import ( start_active_span, tags, ) +from synapse.types import JsonDict from synapse.util import json_decoder from synapse.util.async_helpers import timeout_deferred from synapse.util.metrics import Measure @@ -76,47 +79,44 @@ MAXINT = sys.maxsize _next_id = 1 +QueryArgs = Dict[str, Union[str, List[str]]] + + @attr.s(slots=True, frozen=True) class MatrixFederationRequest: - method = attr.ib() + method = attr.ib(type=str) """HTTP method - :type: str """ - path = attr.ib() + path = attr.ib(type=str) """HTTP path - :type: str """ - destination = attr.ib() + destination = attr.ib(type=str) """The remote server to send the HTTP request to. - :type: str""" + """ - json = attr.ib(default=None) + json = attr.ib(default=None, type=Optional[JsonDict]) """JSON to send in the body. - :type: dict|None """ - json_callback = attr.ib(default=None) + json_callback = attr.ib(default=None, type=Optional[Callable[[], JsonDict]]) """A callback to generate the JSON. - :type: func|None """ - query = attr.ib(default=None) + query = attr.ib(default=None, type=Optional[dict]) """Query arguments. - :type: dict|None """ - txn_id = attr.ib(default=None) + txn_id = attr.ib(default=None, type=Optional[str]) """Unique ID for this request (for logging) - :type: str|None """ uri = attr.ib(init=False, type=bytes) """The URI of this request """ - def __attrs_post_init__(self): + def __attrs_post_init__(self) -> None: global _next_id txn_id = "%s-O-%s" % (self.method, _next_id) _next_id = (_next_id + 1) % (MAXINT - 1) @@ -136,7 +136,7 @@ class MatrixFederationRequest: ) object.__setattr__(self, "uri", uri) - def get_json(self): + def get_json(self) -> Optional[JsonDict]: if self.json_callback: return self.json_callback() return self.json @@ -148,7 +148,7 @@ async def _handle_json_response( request: MatrixFederationRequest, response: IResponse, start_ms: int, -): +) -> JsonDict: """ Reads the JSON body of a response, with a timeout @@ -160,7 +160,7 @@ async def _handle_json_response( start_ms: Timestamp when request was made Returns: - dict: parsed JSON response + The parsed JSON response """ try: check_content_type_is_json(response.headers) @@ -266,27 +266,29 @@ class MatrixFederationHttpClient: self._cooperator = Cooperator(scheduler=schedule) async def _send_request_with_optional_trailing_slash( - self, request, try_trailing_slash_on_400=False, **send_request_args - ): + self, + request: MatrixFederationRequest, + try_trailing_slash_on_400: bool = False, + **send_request_args + ) -> IResponse: """Wrapper for _send_request which can optionally retry the request upon receiving a combination of a 400 HTTP response code and a 'M_UNRECOGNIZED' errcode. This is a workaround for Synapse <= v0.99.3 due to #3622. Args: - request (MatrixFederationRequest): details of request to be sent - try_trailing_slash_on_400 (bool): Whether on receiving a 400 + request: details of request to be sent + try_trailing_slash_on_400: Whether on receiving a 400 'M_UNRECOGNIZED' from the server to retry the request with a trailing slash appended to the request path. - send_request_args (Dict): A dictionary of arguments to pass to - `_send_request()`. + send_request_args: A dictionary of arguments to pass to `_send_request()`. Raises: HttpResponseException: If we get an HTTP response code >= 300 (except 429). Returns: - Dict: Parsed JSON response body. + Parsed JSON response body. """ try: response = await self._send_request(request, **send_request_args) @@ -313,24 +315,26 @@ class MatrixFederationHttpClient: async def _send_request( self, - request, - retry_on_dns_fail=True, - timeout=None, - long_retries=False, - ignore_backoff=False, - backoff_on_404=False, - ): + request: MatrixFederationRequest, + retry_on_dns_fail: bool = True, + timeout: Optional[int] = None, + long_retries: bool = False, + ignore_backoff: bool = False, + backoff_on_404: bool = False, + ) -> IResponse: """ Sends a request to the given server. Args: - request (MatrixFederationRequest): details of request to be sent + request: details of request to be sent + + retry_on_dns_fail: true if the request should be retied on DNS failures - timeout (int|None): number of milliseconds to wait for the response headers + timeout: number of milliseconds to wait for the response headers (including connecting to the server), *for each attempt*. 60s by default. - long_retries (bool): whether to use the long retry algorithm. + long_retries: whether to use the long retry algorithm. The regular retry algorithm makes 4 attempts, with intervals [0.5s, 1s, 2s]. @@ -346,14 +350,13 @@ class MatrixFederationHttpClient: NB: the long retry algorithm takes over 20 minutes to complete, with a default timeout of 60s! - ignore_backoff (bool): true to ignore the historical backoff data + ignore_backoff: true to ignore the historical backoff data and try the request anyway. - backoff_on_404 (bool): Back off if we get a 404 + backoff_on_404: Back off if we get a 404 Returns: - twisted.web.client.Response: resolves with the HTTP - response object on success. + Resolves with the HTTP response object on success. Raises: HttpResponseException: If we get an HTTP response code >= 300 @@ -404,7 +407,7 @@ class MatrixFederationHttpClient: ) # Inject the span into the headers - headers_dict = {} + headers_dict = {} # type: Dict[bytes, List[bytes]] inject_active_span_byte_dict(headers_dict, request.destination) headers_dict[b"User-Agent"] = [self.version_string_bytes] @@ -435,7 +438,7 @@ class MatrixFederationHttpClient: data = encode_canonical_json(json) producer = QuieterFileBodyProducer( BytesIO(data), cooperator=self._cooperator - ) + ) # type: Optional[IBodyProducer] else: producer = None auth_headers = self.build_auth_headers( @@ -524,14 +527,16 @@ class MatrixFederationHttpClient: ) body = None - e = HttpResponseException(response.code, response_phrase, body) + exc = HttpResponseException( + response.code, response_phrase, body + ) # Retry if the error is a 429 (Too Many Requests), # otherwise just raise a standard HttpResponseException if response.code == 429: - raise RequestSendFailed(e, can_retry=True) from e + raise RequestSendFailed(exc, can_retry=True) from exc else: - raise e + raise exc break except RequestSendFailed as e: @@ -582,22 +587,27 @@ class MatrixFederationHttpClient: return response def build_auth_headers( - self, destination, method, url_bytes, content=None, destination_is=None - ): + self, + destination: Optional[bytes], + method: bytes, + url_bytes: bytes, + content: Optional[JsonDict] = None, + destination_is: Optional[bytes] = None, + ) -> List[bytes]: """ Builds the Authorization headers for a federation request Args: - destination (bytes|None): The destination homeserver of the request. + destination: The destination homeserver of the request. May be None if the destination is an identity server, in which case destination_is must be non-None. - method (bytes): The HTTP method of the request - url_bytes (bytes): The URI path of the request - content (object): The body of the request - destination_is (bytes): As 'destination', but if the destination is an + method: The HTTP method of the request + url_bytes: The URI path of the request + content: The body of the request + destination_is: As 'destination', but if the destination is an identity server Returns: - list[bytes]: a list of headers to be added as "Authorization:" headers + A list of headers to be added as "Authorization:" headers """ request = { "method": method.decode("ascii"), @@ -629,33 +639,32 @@ class MatrixFederationHttpClient: async def put_json( self, - destination, - path, - args={}, - data={}, - json_data_callback=None, - long_retries=False, - timeout=None, - ignore_backoff=False, - backoff_on_404=False, - try_trailing_slash_on_400=False, - ): + destination: str, + path: str, + args: Optional[QueryArgs] = None, + data: Optional[JsonDict] = None, + json_data_callback: Optional[Callable[[], JsonDict]] = None, + long_retries: bool = False, + timeout: Optional[int] = None, + ignore_backoff: bool = False, + backoff_on_404: bool = False, + try_trailing_slash_on_400: bool = False, + ) -> Union[JsonDict, list]: """ Sends the specified json data using PUT Args: - destination (str): The remote server to send the HTTP request - to. - path (str): The HTTP path. - args (dict): query params - data (dict): A dict containing the data that will be used as + destination: The remote server to send the HTTP request to. + path: The HTTP path. + args: query params + data: A dict containing the data that will be used as the request body. This will be encoded as JSON. - json_data_callback (callable): A callable returning the dict to + json_data_callback: A callable returning the dict to use as the request body. - long_retries (bool): whether to use the long retry algorithm. See + long_retries: whether to use the long retry algorithm. See docs on _send_request for details. - timeout (int|None): number of milliseconds to wait for the response. + timeout: number of milliseconds to wait for the response. self._default_timeout (60s) by default. Note that we may make several attempts to send the request; this @@ -663,19 +672,19 @@ class MatrixFederationHttpClient: *each* attempt (including connection time) as well as the time spent reading the response body after a 200 response. - ignore_backoff (bool): true to ignore the historical backoff data + ignore_backoff: true to ignore the historical backoff data and try the request anyway. - backoff_on_404 (bool): True if we should count a 404 response as + backoff_on_404: True if we should count a 404 response as a failure of the server (and should therefore back off future requests). - try_trailing_slash_on_400 (bool): True if on a 400 M_UNRECOGNIZED + try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED response we should try appending a trailing slash to the end of the request. Workaround for #3622 in Synapse <= v0.99.3. This will be attempted before backing off if backing off has been enabled. Returns: - dict|list: Succeeds when we get a 2xx HTTP response. The + Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. Raises: @@ -721,29 +730,28 @@ class MatrixFederationHttpClient: async def post_json( self, - destination, - path, - data={}, - long_retries=False, - timeout=None, - ignore_backoff=False, - args={}, - ): + destination: str, + path: str, + data: Optional[JsonDict] = None, + long_retries: bool = False, + timeout: Optional[int] = None, + ignore_backoff: bool = False, + args: Optional[QueryArgs] = None, + ) -> Union[JsonDict, list]: """ Sends the specified json data using POST Args: - destination (str): The remote server to send the HTTP request - to. + destination: The remote server to send the HTTP request to. - path (str): The HTTP path. + path: The HTTP path. - data (dict): A dict containing the data that will be used as + data: A dict containing the data that will be used as the request body. This will be encoded as JSON. - long_retries (bool): whether to use the long retry algorithm. See + long_retries: whether to use the long retry algorithm. See docs on _send_request for details. - timeout (int|None): number of milliseconds to wait for the response. + timeout: number of milliseconds to wait for the response. self._default_timeout (60s) by default. Note that we may make several attempts to send the request; this @@ -751,10 +759,10 @@ class MatrixFederationHttpClient: *each* attempt (including connection time) as well as the time spent reading the response body after a 200 response. - ignore_backoff (bool): true to ignore the historical backoff data and + ignore_backoff: true to ignore the historical backoff data and try the request anyway. - args (dict): query params + args: query params Returns: dict|list: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. @@ -795,26 +803,25 @@ class MatrixFederationHttpClient: async def get_json( self, - destination, - path, - args=None, - retry_on_dns_fail=True, - timeout=None, - ignore_backoff=False, - try_trailing_slash_on_400=False, - ): + destination: str, + path: str, + args: Optional[QueryArgs] = None, + retry_on_dns_fail: bool = True, + timeout: Optional[int] = None, + ignore_backoff: bool = False, + try_trailing_slash_on_400: bool = False, + ) -> Union[JsonDict, list]: """ GETs some json from the given host homeserver and path Args: - destination (str): The remote server to send the HTTP request - to. + destination: The remote server to send the HTTP request to. - path (str): The HTTP path. + path: The HTTP path. - args (dict|None): A dictionary used to create query strings, defaults to + args: A dictionary used to create query strings, defaults to None. - timeout (int|None): number of milliseconds to wait for the response. + timeout: number of milliseconds to wait for the response. self._default_timeout (60s) by default. Note that we may make several attempts to send the request; this @@ -822,14 +829,14 @@ class MatrixFederationHttpClient: *each* attempt (including connection time) as well as the time spent reading the response body after a 200 response. - ignore_backoff (bool): true to ignore the historical backoff data + ignore_backoff: true to ignore the historical backoff data and try the request anyway. - try_trailing_slash_on_400 (bool): True if on a 400 M_UNRECOGNIZED + try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED response we should try appending a trailing slash to the end of the request. Workaround for #3622 in Synapse <= v0.99.3. Returns: - dict|list: Succeeds when we get a 2xx HTTP response. The + Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. Raises: @@ -870,24 +877,23 @@ class MatrixFederationHttpClient: async def delete_json( self, - destination, - path, - long_retries=False, - timeout=None, - ignore_backoff=False, - args={}, - ): + destination: str, + path: str, + long_retries: bool = False, + timeout: Optional[int] = None, + ignore_backoff: bool = False, + args: Optional[QueryArgs] = None, + ) -> Union[JsonDict, list]: """Send a DELETE request to the remote expecting some json response Args: - destination (str): The remote server to send the HTTP request - to. - path (str): The HTTP path. + destination: The remote server to send the HTTP request to. + path: The HTTP path. - long_retries (bool): whether to use the long retry algorithm. See + long_retries: whether to use the long retry algorithm. See docs on _send_request for details. - timeout (int|None): number of milliseconds to wait for the response. + timeout: number of milliseconds to wait for the response. self._default_timeout (60s) by default. Note that we may make several attempts to send the request; this @@ -895,12 +901,12 @@ class MatrixFederationHttpClient: *each* attempt (including connection time) as well as the time spent reading the response body after a 200 response. - ignore_backoff (bool): true to ignore the historical backoff data and + ignore_backoff: true to ignore the historical backoff data and try the request anyway. - args (dict): query params + args: query params Returns: - dict|list: Succeeds when we get a 2xx HTTP response. The + Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. Raises: @@ -938,25 +944,25 @@ class MatrixFederationHttpClient: async def get_file( self, - destination, - path, + destination: str, + path: str, output_stream, - args={}, - retry_on_dns_fail=True, - max_size=None, - ignore_backoff=False, - ): + args: Optional[QueryArgs] = None, + retry_on_dns_fail: bool = True, + max_size: Optional[int] = None, + ignore_backoff: bool = False, + ) -> Tuple[int, Dict[bytes, List[bytes]]]: """GETs a file from a given homeserver Args: - destination (str): The remote server to send the HTTP request to. - path (str): The HTTP path to GET. - output_stream (file): File to write the response body to. - args (dict): Optional dictionary used to create the query string. - ignore_backoff (bool): true to ignore the historical backoff data + destination: The remote server to send the HTTP request to. + path: The HTTP path to GET. + output_stream: File to write the response body to. + args: Optional dictionary used to create the query string. + ignore_backoff: true to ignore the historical backoff data and try the request anyway. Returns: - tuple[int, dict]: Resolves with an (int,dict) tuple of + Resolves with an (int,dict) tuple of the file length and a dict of the response headers. Raises: @@ -1005,13 +1011,15 @@ class MatrixFederationHttpClient: class _ReadBodyToFileProtocol(protocol.Protocol): - def __init__(self, stream, deferred, max_size): + def __init__( + self, stream: BinaryIO, deferred: defer.Deferred, max_size: Optional[int] + ): self.stream = stream self.deferred = deferred self.length = 0 self.max_size = max_size - def dataReceived(self, data): + def dataReceived(self, data: bytes) -> None: self.stream.write(data) self.length += len(data) if self.max_size is not None and self.length >= self.max_size: @@ -1025,14 +1033,16 @@ class _ReadBodyToFileProtocol(protocol.Protocol): self.deferred = defer.Deferred() self.transport.loseConnection() - def connectionLost(self, reason): + def connectionLost(self, reason: Failure) -> None: if reason.check(ResponseDone): self.deferred.callback(self.length) else: self.deferred.errback(reason) -def _readBodyToFile(response, stream, max_size): +def _readBodyToFile( + response: IResponse, stream: BinaryIO, max_size: Optional[int] +) -> defer.Deferred: d = defer.Deferred() response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size)) return d @@ -1049,13 +1059,13 @@ def _flatten_response_never_received(e): return repr(e) -def check_content_type_is_json(headers): +def check_content_type_is_json(headers: Headers) -> None: """ Check that a set of HTTP headers have a Content-Type header, and that it is application/json. Args: - headers (twisted.web.http_headers.Headers): headers to check + headers: headers to check Raises: RequestSendFailed: if the Content-Type header is missing or isn't JSON @@ -1080,7 +1090,7 @@ def check_content_type_is_json(headers): ) -def encode_query_args(args): +def encode_query_args(args: Optional[QueryArgs]) -> bytes: if args is None: return b"" @@ -1088,8 +1098,8 @@ def encode_query_args(args): for k, vs in args.items(): if isinstance(vs, str): vs = [vs] - encoded_args[k] = [v.encode("UTF-8") for v in vs] + encoded_args[k] = [v.encode("utf8") for v in vs] - query_bytes = urllib.parse.urlencode(encoded_args, True) + query_str = urllib.parse.urlencode(encoded_args, True) - return query_bytes.encode("utf8") + return query_str.encode("utf8") diff --git a/synapse/server.py b/synapse/server.py index 12a783de17..c82d8f9fad 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -27,7 +27,8 @@ import logging import os from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, TypeVar, cast -import twisted +import twisted.internet.base +import twisted.internet.tcp from twisted.mail.smtp import sendmail from twisted.web.iweb import IPolicyForHTTPS -- cgit 1.5.1 From 968939bdacc66be91aeba440a6b2ae7bc84731f1 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 25 Nov 2020 13:30:47 -0500 Subject: Add additional type hints to HTTP client. (#8812) This also removes some duplicated code between the simple HTTP client and matrix federation client. --- changelog.d/8806.misc | 2 +- changelog.d/8812.misc | 1 + mypy.ini | 3 +- synapse/http/client.py | 211 ++++++++++++++++++++------------- synapse/http/matrixfederationclient.py | 74 ++---------- 5 files changed, 142 insertions(+), 149 deletions(-) create mode 100644 changelog.d/8812.misc (limited to 'mypy.ini') diff --git a/changelog.d/8806.misc b/changelog.d/8806.misc index 52457deb5e..ee144846a5 100644 --- a/changelog.d/8806.misc +++ b/changelog.d/8806.misc @@ -1 +1 @@ -Add type hints to matrix federation client and agent. +Add type hints to HTTP abstractions. diff --git a/changelog.d/8812.misc b/changelog.d/8812.misc new file mode 100644 index 0000000000..ee144846a5 --- /dev/null +++ b/changelog.d/8812.misc @@ -0,0 +1 @@ +Add type hints to HTTP abstractions. diff --git a/mypy.ini b/mypy.ini index 3e42235ac1..a5503abe26 100644 --- a/mypy.ini +++ b/mypy.ini @@ -45,6 +45,7 @@ files = synapse/handlers/saml_handler.py, synapse/handlers/sync.py, synapse/handlers/ui_auth, + synapse/http/client.py, synapse/http/federation/matrix_federation_agent.py, synapse/http/federation/well_known_resolver.py, synapse/http/matrixfederationclient.py, @@ -109,7 +110,7 @@ ignore_missing_imports = True [mypy-opentracing] ignore_missing_imports = True -[mypy-OpenSSL] +[mypy-OpenSSL.*] ignore_missing_imports = True [mypy-netaddr] diff --git a/synapse/http/client.py b/synapse/http/client.py index f409368802..e5b13593f2 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -14,9 +14,10 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import urllib +import urllib.parse from io import BytesIO from typing import ( + TYPE_CHECKING, Any, BinaryIO, Dict, @@ -31,7 +32,7 @@ from typing import ( import treq from canonicaljson import encode_canonical_json -from netaddr import IPAddress +from netaddr import IPAddress, IPSet from prometheus_client import Counter from zope.interface import implementer, provider @@ -39,6 +40,8 @@ from OpenSSL import SSL from OpenSSL.SSL import VERIFY_NONE from twisted.internet import defer, error as twisted_error, protocol, ssl from twisted.internet.interfaces import ( + IAddress, + IHostResolution, IReactorPluggableNameResolver, IResolutionReceiver, ) @@ -53,7 +56,7 @@ from twisted.web.client import ( ) from twisted.web.http import PotentialDataLoss from twisted.web.http_headers import Headers -from twisted.web.iweb import IResponse +from twisted.web.iweb import IAgent, IBodyProducer, IResponse from synapse.api.errors import Codes, HttpResponseException, SynapseError from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_uri @@ -63,6 +66,9 @@ from synapse.logging.opentracing import set_tag, start_active_span, tags from synapse.util import json_decoder from synapse.util.async_helpers import timeout_deferred +if TYPE_CHECKING: + from synapse.app.homeserver import HomeServer + logger = logging.getLogger(__name__) outgoing_requests_counter = Counter("synapse_http_client_requests", "", ["method"]) @@ -84,12 +90,19 @@ QueryParamValue = Union[str, bytes, Iterable[Union[str, bytes]]] QueryParams = Union[Mapping[str, QueryParamValue], Mapping[bytes, QueryParamValue]] -def check_against_blacklist(ip_address, ip_whitelist, ip_blacklist): +def check_against_blacklist( + ip_address: IPAddress, ip_whitelist: Optional[IPSet], ip_blacklist: IPSet +) -> bool: """ + Compares an IP address to allowed and disallowed IP sets. + Args: - ip_address (netaddr.IPAddress) - ip_whitelist (netaddr.IPSet) - ip_blacklist (netaddr.IPSet) + ip_address: The IP address to check + ip_whitelist: Allowed IP addresses. + ip_blacklist: Disallowed IP addresses. + + Returns: + True if the IP address is in the blacklist and not in the whitelist. """ if ip_address in ip_blacklist: if ip_whitelist is None or ip_address not in ip_whitelist: @@ -118,23 +131,30 @@ class IPBlacklistingResolver: addresses, preventing DNS rebinding attacks on URL preview. """ - def __init__(self, reactor, ip_whitelist, ip_blacklist): + def __init__( + self, + reactor: IReactorPluggableNameResolver, + ip_whitelist: Optional[IPSet], + ip_blacklist: IPSet, + ): """ Args: - reactor (twisted.internet.reactor) - ip_whitelist (netaddr.IPSet) - ip_blacklist (netaddr.IPSet) + reactor: The twisted reactor. + ip_whitelist: IP addresses to allow. + ip_blacklist: IP addresses to disallow. """ self._reactor = reactor self._ip_whitelist = ip_whitelist self._ip_blacklist = ip_blacklist - def resolveHostName(self, recv, hostname, portNumber=0): + def resolveHostName( + self, recv: IResolutionReceiver, hostname: str, portNumber: int = 0 + ) -> IResolutionReceiver: r = recv() - addresses = [] + addresses = [] # type: List[IAddress] - def _callback(): + def _callback() -> None: r.resolutionBegan(None) has_bad_ip = False @@ -161,15 +181,15 @@ class IPBlacklistingResolver: @provider(IResolutionReceiver) class EndpointReceiver: @staticmethod - def resolutionBegan(resolutionInProgress): + def resolutionBegan(resolutionInProgress: IHostResolution) -> None: pass @staticmethod - def addressResolved(address): + def addressResolved(address: IAddress) -> None: addresses.append(address) @staticmethod - def resolutionComplete(): + def resolutionComplete() -> None: _callback() self._reactor.nameResolver.resolveHostName( @@ -185,19 +205,29 @@ class BlacklistingAgentWrapper(Agent): directly (without an IP address lookup). """ - def __init__(self, agent, reactor, ip_whitelist=None, ip_blacklist=None): + def __init__( + self, + agent: IAgent, + ip_whitelist: Optional[IPSet] = None, + ip_blacklist: Optional[IPSet] = None, + ): """ Args: - agent (twisted.web.client.Agent): The Agent to wrap. - reactor (twisted.internet.reactor) - ip_whitelist (netaddr.IPSet) - ip_blacklist (netaddr.IPSet) + agent: The Agent to wrap. + ip_whitelist: IP addresses to allow. + ip_blacklist: IP addresses to disallow. """ self._agent = agent self._ip_whitelist = ip_whitelist self._ip_blacklist = ip_blacklist - def request(self, method, uri, headers=None, bodyProducer=None): + def request( + self, + method: bytes, + uri: bytes, + headers: Optional[Headers] = None, + bodyProducer: Optional[IBodyProducer] = None, + ) -> defer.Deferred: h = urllib.parse.urlparse(uri.decode("ascii")) try: @@ -226,23 +256,23 @@ class SimpleHttpClient: def __init__( self, - hs, - treq_args={}, - ip_whitelist=None, - ip_blacklist=None, - http_proxy=None, - https_proxy=None, + hs: "HomeServer", + treq_args: Dict[str, Any] = {}, + ip_whitelist: Optional[IPSet] = None, + ip_blacklist: Optional[IPSet] = None, + http_proxy: Optional[bytes] = None, + https_proxy: Optional[bytes] = None, ): """ Args: - hs (synapse.server.HomeServer) - treq_args (dict): Extra keyword arguments to be given to treq.request. - ip_blacklist (netaddr.IPSet): The IP addresses that are blacklisted that + hs + treq_args: Extra keyword arguments to be given to treq.request. + ip_blacklist: The IP addresses that are blacklisted that we may not request. - ip_whitelist (netaddr.IPSet): The whitelisted IP addresses, that we can + ip_whitelist: The whitelisted IP addresses, that we can request if it were otherwise caught in a blacklist. - http_proxy (bytes): proxy server to use for http connections. host[:port] - https_proxy (bytes): proxy server to use for https connections. host[:port] + http_proxy: proxy server to use for http connections. host[:port] + https_proxy: proxy server to use for https connections. host[:port] """ self.hs = hs @@ -306,7 +336,6 @@ class SimpleHttpClient: # by the DNS resolution. self.agent = BlacklistingAgentWrapper( self.agent, - self.reactor, ip_whitelist=self._ip_whitelist, ip_blacklist=self._ip_blacklist, ) @@ -397,7 +426,7 @@ class SimpleHttpClient: async def post_urlencoded_get_json( self, uri: str, - args: Mapping[str, Union[str, List[str]]] = {}, + args: Optional[Mapping[str, Union[str, List[str]]]] = None, headers: Optional[RawHeaders] = None, ) -> Any: """ @@ -422,9 +451,7 @@ class SimpleHttpClient: # TODO: Do we ever want to log message contents? logger.debug("post_urlencoded_get_json args: %s", args) - query_bytes = urllib.parse.urlencode(encode_urlencode_args(args), True).encode( - "utf8" - ) + query_bytes = encode_query_args(args) actual_headers = { b"Content-Type": [b"application/x-www-form-urlencoded"], @@ -432,7 +459,7 @@ class SimpleHttpClient: b"Accept": [b"application/json"], } if headers: - actual_headers.update(headers) + actual_headers.update(headers) # type: ignore response = await self.request( "POST", uri, headers=Headers(actual_headers), data=query_bytes @@ -479,7 +506,7 @@ class SimpleHttpClient: b"Accept": [b"application/json"], } if headers: - actual_headers.update(headers) + actual_headers.update(headers) # type: ignore response = await self.request( "POST", uri, headers=Headers(actual_headers), data=json_str @@ -495,7 +522,10 @@ class SimpleHttpClient: ) async def get_json( - self, uri: str, args: QueryParams = {}, headers: Optional[RawHeaders] = None, + self, + uri: str, + args: Optional[QueryParams] = None, + headers: Optional[RawHeaders] = None, ) -> Any: """Gets some json from the given URI. @@ -516,7 +546,7 @@ class SimpleHttpClient: """ actual_headers = {b"Accept": [b"application/json"]} if headers: - actual_headers.update(headers) + actual_headers.update(headers) # type: ignore body = await self.get_raw(uri, args, headers=headers) return json_decoder.decode(body.decode("utf-8")) @@ -525,7 +555,7 @@ class SimpleHttpClient: self, uri: str, json_body: Any, - args: QueryParams = {}, + args: Optional[QueryParams] = None, headers: RawHeaders = None, ) -> Any: """Puts some json to the given URI. @@ -546,9 +576,9 @@ class SimpleHttpClient: ValueError: if the response was not JSON """ - if len(args): - query_bytes = urllib.parse.urlencode(args, True) - uri = "%s?%s" % (uri, query_bytes) + if args: + query_str = urllib.parse.urlencode(args, True) + uri = "%s?%s" % (uri, query_str) json_str = encode_canonical_json(json_body) @@ -558,7 +588,7 @@ class SimpleHttpClient: b"Accept": [b"application/json"], } if headers: - actual_headers.update(headers) + actual_headers.update(headers) # type: ignore response = await self.request( "PUT", uri, headers=Headers(actual_headers), data=json_str @@ -574,7 +604,10 @@ class SimpleHttpClient: ) async def get_raw( - self, uri: str, args: QueryParams = {}, headers: Optional[RawHeaders] = None + self, + uri: str, + args: Optional[QueryParams] = None, + headers: Optional[RawHeaders] = None, ) -> bytes: """Gets raw text from the given URI. @@ -592,13 +625,13 @@ class SimpleHttpClient: HttpResponseException on a non-2xx HTTP response. """ - if len(args): - query_bytes = urllib.parse.urlencode(args, True) - uri = "%s?%s" % (uri, query_bytes) + if args: + query_str = urllib.parse.urlencode(args, True) + uri = "%s?%s" % (uri, query_str) actual_headers = {b"User-Agent": [self.user_agent]} if headers: - actual_headers.update(headers) + actual_headers.update(headers) # type: ignore response = await self.request("GET", uri, headers=Headers(actual_headers)) @@ -641,7 +674,7 @@ class SimpleHttpClient: actual_headers = {b"User-Agent": [self.user_agent]} if headers: - actual_headers.update(headers) + actual_headers.update(headers) # type: ignore response = await self.request("GET", url, headers=Headers(actual_headers)) @@ -649,12 +682,13 @@ class SimpleHttpClient: if ( b"Content-Length" in resp_headers + and max_size and int(resp_headers[b"Content-Length"][0]) > max_size ): - logger.warning("Requested URL is too large > %r bytes" % (self.max_size,)) + logger.warning("Requested URL is too large > %r bytes" % (max_size,)) raise SynapseError( 502, - "Requested file is too large > %r bytes" % (self.max_size,), + "Requested file is too large > %r bytes" % (max_size,), Codes.TOO_LARGE, ) @@ -668,7 +702,7 @@ class SimpleHttpClient: try: length = await make_deferred_yieldable( - _readBodyToFile(response, output_stream, max_size) + readBodyToFile(response, output_stream, max_size) ) except SynapseError: # This can happen e.g. because the body is too large. @@ -696,18 +730,16 @@ def _timeout_to_request_timed_out_error(f: Failure): return f -# XXX: FIXME: This is horribly copy-pasted from matrixfederationclient. -# The two should be factored out. - - class _ReadBodyToFileProtocol(protocol.Protocol): - def __init__(self, stream, deferred, max_size): + def __init__( + self, stream: BinaryIO, deferred: defer.Deferred, max_size: Optional[int] + ): self.stream = stream self.deferred = deferred self.length = 0 self.max_size = max_size - def dataReceived(self, data): + def dataReceived(self, data: bytes) -> None: self.stream.write(data) self.length += len(data) if self.max_size is not None and self.length >= self.max_size: @@ -721,7 +753,7 @@ class _ReadBodyToFileProtocol(protocol.Protocol): self.deferred = defer.Deferred() self.transport.loseConnection() - def connectionLost(self, reason): + def connectionLost(self, reason: Failure) -> None: if reason.check(ResponseDone): self.deferred.callback(self.length) elif reason.check(PotentialDataLoss): @@ -732,35 +764,48 @@ class _ReadBodyToFileProtocol(protocol.Protocol): self.deferred.errback(reason) -# XXX: FIXME: This is horribly copy-pasted from matrixfederationclient. -# The two should be factored out. +def readBodyToFile( + response: IResponse, stream: BinaryIO, max_size: Optional[int] +) -> defer.Deferred: + """ + Read a HTTP response body to a file-object. Optionally enforcing a maximum file size. + Args: + response: The HTTP response to read from. + stream: The file-object to write to. + max_size: The maximum file size to allow. + + Returns: + A Deferred which resolves to the length of the read body. + """ -def _readBodyToFile(response, stream, max_size): d = defer.Deferred() response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size)) return d -def encode_urlencode_args(args): - return {k: encode_urlencode_arg(v) for k, v in args.items()} +def encode_query_args(args: Optional[Mapping[str, Union[str, List[str]]]]) -> bytes: + """ + Encodes a map of query arguments to bytes which can be appended to a URL. + Args: + args: The query arguments, a mapping of string to string or list of strings. + + Returns: + The query arguments encoded as bytes. + """ + if args is None: + return b"" -def encode_urlencode_arg(arg): - if isinstance(arg, str): - return arg.encode("utf-8") - elif isinstance(arg, list): - return [encode_urlencode_arg(i) for i in arg] - else: - return arg + encoded_args = {} + for k, vs in args.items(): + if isinstance(vs, str): + vs = [vs] + encoded_args[k] = [v.encode("utf8") for v in vs] + query_str = urllib.parse.urlencode(encoded_args, True) -def _print_ex(e): - if hasattr(e, "reasons") and e.reasons: - for ex in e.reasons: - _print_ex(ex) - else: - logger.exception(e) + return query_str.encode("utf8") class InsecureInterceptableContextFactory(ssl.ContextFactory): diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index b2ccae90df..4e27f93b7a 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -19,7 +19,7 @@ import random import sys import urllib.parse from io import BytesIO -from typing import BinaryIO, Callable, Dict, List, Optional, Tuple, Union +from typing import Callable, Dict, List, Optional, Tuple, Union import attr import treq @@ -28,26 +28,27 @@ from prometheus_client import Counter from signedjson.sign import sign_json from zope.interface import implementer -from twisted.internet import defer, protocol +from twisted.internet import defer from twisted.internet.error import DNSLookupError from twisted.internet.interfaces import IReactorPluggableNameResolver, IReactorTime from twisted.internet.task import _EPSILON, Cooperator -from twisted.python.failure import Failure -from twisted.web._newclient import ResponseDone from twisted.web.http_headers import Headers from twisted.web.iweb import IBodyProducer, IResponse import synapse.metrics import synapse.util.retryutils from synapse.api.errors import ( - Codes, FederationDeniedError, HttpResponseException, RequestSendFailed, - SynapseError, ) from synapse.http import QuieterFileBodyProducer -from synapse.http.client import BlacklistingAgentWrapper, IPBlacklistingResolver +from synapse.http.client import ( + BlacklistingAgentWrapper, + IPBlacklistingResolver, + encode_query_args, + readBodyToFile, +) from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent from synapse.logging.context import make_deferred_yieldable from synapse.logging.opentracing import ( @@ -250,9 +251,7 @@ class MatrixFederationHttpClient: # Use a BlacklistingAgentWrapper to prevent circumventing the IP # blacklist via IP literals in server names self.agent = BlacklistingAgentWrapper( - self.agent, - self.reactor, - ip_blacklist=hs.config.federation_ip_range_blacklist, + self.agent, ip_blacklist=hs.config.federation_ip_range_blacklist, ) self.clock = hs.get_clock() @@ -986,7 +985,7 @@ class MatrixFederationHttpClient: headers = dict(response.headers.getAllRawHeaders()) try: - d = _readBodyToFile(response, output_stream, max_size) + d = readBodyToFile(response, output_stream, max_size) d.addTimeout(self.default_timeout, self.reactor) length = await make_deferred_yieldable(d) except Exception as e: @@ -1010,44 +1009,6 @@ class MatrixFederationHttpClient: return (length, headers) -class _ReadBodyToFileProtocol(protocol.Protocol): - def __init__( - self, stream: BinaryIO, deferred: defer.Deferred, max_size: Optional[int] - ): - self.stream = stream - self.deferred = deferred - self.length = 0 - self.max_size = max_size - - def dataReceived(self, data: bytes) -> None: - self.stream.write(data) - self.length += len(data) - if self.max_size is not None and self.length >= self.max_size: - self.deferred.errback( - SynapseError( - 502, - "Requested file is too large > %r bytes" % (self.max_size,), - Codes.TOO_LARGE, - ) - ) - self.deferred = defer.Deferred() - self.transport.loseConnection() - - def connectionLost(self, reason: Failure) -> None: - if reason.check(ResponseDone): - self.deferred.callback(self.length) - else: - self.deferred.errback(reason) - - -def _readBodyToFile( - response: IResponse, stream: BinaryIO, max_size: Optional[int] -) -> defer.Deferred: - d = defer.Deferred() - response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size)) - return d - - def _flatten_response_never_received(e): if hasattr(e, "reasons"): reasons = ", ".join( @@ -1088,18 +1049,3 @@ def check_content_type_is_json(headers: Headers) -> None: ), can_retry=False, ) - - -def encode_query_args(args: Optional[QueryArgs]) -> bytes: - if args is None: - return b"" - - encoded_args = {} - for k, vs in args.items(): - if isinstance(vs, str): - vs = [vs] - encoded_args[k] = [v.encode("utf8") for v in vs] - - query_str = urllib.parse.urlencode(encoded_args, True) - - return query_str.encode("utf8") -- cgit 1.5.1 From ddc43436838e19a7dd16860389bd76c74578dae7 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 1 Dec 2020 11:10:42 +0000 Subject: Add some tests for `password_auth_providers` (#8819) These things seemed to be completely untested, so I added a load of tests for them. --- changelog.d/8819.misc | 1 + mypy.ini | 1 + tests/handlers/test_password_providers.py | 486 ++++++++++++++++++++++++++++++ 3 files changed, 488 insertions(+) create mode 100644 changelog.d/8819.misc create mode 100644 tests/handlers/test_password_providers.py (limited to 'mypy.ini') diff --git a/changelog.d/8819.misc b/changelog.d/8819.misc new file mode 100644 index 0000000000..a5793273a5 --- /dev/null +++ b/changelog.d/8819.misc @@ -0,0 +1 @@ +Add tests for `password_auth_provider`s. diff --git a/mypy.ini b/mypy.ini index a5503abe26..3c8d303064 100644 --- a/mypy.ini +++ b/mypy.ini @@ -80,6 +80,7 @@ files = synapse/util/metrics.py, tests/replication, tests/test_utils, + tests/handlers/test_password_providers.py, tests/rest/client/v2_alpha/test_auth.py, tests/util/test_stream_change_cache.py diff --git a/tests/handlers/test_password_providers.py b/tests/handlers/test_password_providers.py new file mode 100644 index 0000000000..edfab8a13a --- /dev/null +++ b/tests/handlers/test_password_providers.py @@ -0,0 +1,486 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the password_auth_provider interface""" + +from typing import Any, Type, Union + +from mock import Mock + +from twisted.internet import defer + +import synapse +from synapse.rest.client.v1 import login +from synapse.rest.client.v2_alpha import devices +from synapse.types import JsonDict + +from tests import unittest +from tests.server import FakeChannel +from tests.unittest import override_config + +# (possibly experimental) login flows we expect to appear in the list after the normal +# ones +ADDITIONAL_LOGIN_FLOWS = [{"type": "uk.half-shot.msc2778.login.application_service"}] + +# a mock instance which the dummy auth providers delegate to, so we can see what's going +# on +mock_password_provider = Mock() + + +class PasswordOnlyAuthProvider: + """A password_provider which only implements `check_password`.""" + + @staticmethod + def parse_config(self): + pass + + def __init__(self, config, account_handler): + pass + + def check_password(self, *args): + return mock_password_provider.check_password(*args) + + +class CustomAuthProvider: + """A password_provider which implements a custom login type.""" + + @staticmethod + def parse_config(self): + pass + + def __init__(self, config, account_handler): + pass + + def get_supported_login_types(self): + return {"test.login_type": ["test_field"]} + + def check_auth(self, *args): + return mock_password_provider.check_auth(*args) + + +def providers_config(*providers: Type[Any]) -> dict: + """Returns a config dict that will enable the given password auth providers""" + return { + "password_providers": [ + {"module": "%s.%s" % (__name__, provider.__qualname__), "config": {}} + for provider in providers + ] + } + + +class PasswordAuthProviderTests(unittest.HomeserverTestCase): + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + devices.register_servlets, + ] + + def setUp(self): + # we use a global mock device, so make sure we are starting with a clean slate + mock_password_provider.reset_mock() + super().setUp() + + @override_config(providers_config(PasswordOnlyAuthProvider)) + def test_password_only_auth_provider_login(self): + # login flows should only have m.login.password + flows = self._get_login_flows() + self.assertEqual(flows, [{"type": "m.login.password"}] + ADDITIONAL_LOGIN_FLOWS) + + # check_password must return an awaitable + mock_password_provider.check_password.return_value = defer.succeed(True) + channel = self._send_password_login("u", "p") + self.assertEqual(channel.code, 200, channel.result) + self.assertEqual("@u:test", channel.json_body["user_id"]) + mock_password_provider.check_password.assert_called_once_with("@u:test", "p") + mock_password_provider.reset_mock() + + # login with mxid should work too + channel = self._send_password_login("@u:bz", "p") + self.assertEqual(channel.code, 200, channel.result) + self.assertEqual("@u:bz", channel.json_body["user_id"]) + mock_password_provider.check_password.assert_called_once_with("@u:bz", "p") + mock_password_provider.reset_mock() + + # try a weird username / pass. Honestly it's unclear what we *expect* to happen + # in these cases, but at least we can guard against the API changing + # unexpectedly + channel = self._send_password_login(" USER🙂NAME ", " pASS\U0001F622word ") + self.assertEqual(channel.code, 200, channel.result) + self.assertEqual("@ USER🙂NAME :test", channel.json_body["user_id"]) + mock_password_provider.check_password.assert_called_once_with( + "@ USER🙂NAME :test", " pASS😢word " + ) + + @override_config(providers_config(PasswordOnlyAuthProvider)) + def test_password_only_auth_provider_ui_auth(self): + """UI Auth should delegate correctly to the password provider""" + + # create the user, otherwise access doesn't work + module_api = self.hs.get_module_api() + self.get_success(module_api.register_user("u")) + + # log in twice, to get two devices + mock_password_provider.check_password.return_value = defer.succeed(True) + tok1 = self.login("u", "p") + self.login("u", "p", device_id="dev2") + mock_password_provider.reset_mock() + + # have the auth provider deny the request to start with + mock_password_provider.check_password.return_value = defer.succeed(False) + + # make the initial request which returns a 401 + session = self._start_delete_device_session(tok1, "dev2") + mock_password_provider.check_password.assert_not_called() + + # Make another request providing the UI auth flow. + channel = self._authed_delete_device(tok1, "dev2", session, "u", "p") + self.assertEqual(channel.code, 401) # XXX why not a 403? + self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN") + mock_password_provider.check_password.assert_called_once_with("@u:test", "p") + mock_password_provider.reset_mock() + + # Finally, check the request goes through when we allow it + mock_password_provider.check_password.return_value = defer.succeed(True) + channel = self._authed_delete_device(tok1, "dev2", session, "u", "p") + self.assertEqual(channel.code, 200) + mock_password_provider.check_password.assert_called_once_with("@u:test", "p") + + @override_config(providers_config(PasswordOnlyAuthProvider)) + def test_local_user_fallback_login(self): + """rejected login should fall back to local db""" + self.register_user("localuser", "localpass") + + # check_password must return an awaitable + mock_password_provider.check_password.return_value = defer.succeed(False) + channel = self._send_password_login("u", "p") + self.assertEqual(channel.code, 403, channel.result) + + channel = self._send_password_login("localuser", "localpass") + self.assertEqual(channel.code, 200, channel.result) + self.assertEqual("@localuser:test", channel.json_body["user_id"]) + + @override_config(providers_config(PasswordOnlyAuthProvider)) + def test_local_user_fallback_ui_auth(self): + """rejected login should fall back to local db""" + self.register_user("localuser", "localpass") + + # have the auth provider deny the request + mock_password_provider.check_password.return_value = defer.succeed(False) + + # log in twice, to get two devices + tok1 = self.login("localuser", "localpass") + self.login("localuser", "localpass", device_id="dev2") + mock_password_provider.check_password.reset_mock() + + # first delete should give a 401 + session = self._start_delete_device_session(tok1, "dev2") + mock_password_provider.check_password.assert_not_called() + + # Wrong password + channel = self._authed_delete_device(tok1, "dev2", session, "localuser", "xxx") + self.assertEqual(channel.code, 401) # XXX why not a 403? + self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN") + mock_password_provider.check_password.assert_called_once_with( + "@localuser:test", "xxx" + ) + mock_password_provider.reset_mock() + + # Right password + channel = self._authed_delete_device( + tok1, "dev2", session, "localuser", "localpass" + ) + self.assertEqual(channel.code, 200) + mock_password_provider.check_password.assert_called_once_with( + "@localuser:test", "localpass" + ) + + @override_config( + { + **providers_config(PasswordOnlyAuthProvider), + "password_config": {"localdb_enabled": False}, + } + ) + def test_no_local_user_fallback_login(self): + """localdb_enabled can block login with the local password + """ + self.register_user("localuser", "localpass") + + # check_password must return an awaitable + mock_password_provider.check_password.return_value = defer.succeed(False) + channel = self._send_password_login("localuser", "localpass") + self.assertEqual(channel.code, 403) + self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN") + mock_password_provider.check_password.assert_called_once_with( + "@localuser:test", "localpass" + ) + + @override_config( + { + **providers_config(PasswordOnlyAuthProvider), + "password_config": {"localdb_enabled": False}, + } + ) + def test_no_local_user_fallback_ui_auth(self): + """localdb_enabled can block ui auth with the local password + """ + self.register_user("localuser", "localpass") + + # allow login via the auth provider + mock_password_provider.check_password.return_value = defer.succeed(True) + + # log in twice, to get two devices + tok1 = self.login("localuser", "p") + self.login("localuser", "p", device_id="dev2") + mock_password_provider.check_password.reset_mock() + + # first delete should give a 401 + session = self._start_delete_device_session(tok1, "dev2") + mock_password_provider.check_password.assert_not_called() + + # now try deleting with the local password + mock_password_provider.check_password.return_value = defer.succeed(False) + channel = self._authed_delete_device( + tok1, "dev2", session, "localuser", "localpass" + ) + self.assertEqual(channel.code, 401) # XXX why not a 403? + self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN") + mock_password_provider.check_password.assert_called_once_with( + "@localuser:test", "localpass" + ) + + @override_config( + { + **providers_config(PasswordOnlyAuthProvider), + "password_config": {"enabled": False}, + } + ) + def test_password_auth_disabled(self): + """password auth doesn't work if it's disabled across the board""" + # login flows should be empty + flows = self._get_login_flows() + self.assertEqual(flows, ADDITIONAL_LOGIN_FLOWS) + + # login shouldn't work and should be rejected with a 400 ("unknown login type") + channel = self._send_password_login("u", "p") + self.assertEqual(channel.code, 400, channel.result) + mock_password_provider.check_password.assert_not_called() + + @override_config(providers_config(CustomAuthProvider)) + def test_custom_auth_provider_login(self): + # login flows should have the custom flow and m.login.password, since we + # haven't disabled local password lookup. + # (password must come first, because reasons) + flows = self._get_login_flows() + self.assertEqual( + flows, + [{"type": "m.login.password"}, {"type": "test.login_type"}] + + ADDITIONAL_LOGIN_FLOWS, + ) + + # login with missing param should be rejected + channel = self._send_login("test.login_type", "u") + self.assertEqual(channel.code, 400, channel.result) + mock_password_provider.check_auth.assert_not_called() + + mock_password_provider.check_auth.return_value = defer.succeed("@user:bz") + channel = self._send_login("test.login_type", "u", test_field="y") + self.assertEqual(channel.code, 200, channel.result) + self.assertEqual("@user:bz", channel.json_body["user_id"]) + mock_password_provider.check_auth.assert_called_once_with( + "u", "test.login_type", {"test_field": "y"} + ) + mock_password_provider.reset_mock() + + # try a weird username. Again, it's unclear what we *expect* to happen + # in these cases, but at least we can guard against the API changing + # unexpectedly + mock_password_provider.check_auth.return_value = defer.succeed( + "@ MALFORMED! :bz" + ) + channel = self._send_login("test.login_type", " USER🙂NAME ", test_field=" abc ") + self.assertEqual(channel.code, 200, channel.result) + self.assertEqual("@ MALFORMED! :bz", channel.json_body["user_id"]) + mock_password_provider.check_auth.assert_called_once_with( + " USER🙂NAME ", "test.login_type", {"test_field": " abc "} + ) + + @override_config(providers_config(CustomAuthProvider)) + def test_custom_auth_provider_ui_auth(self): + # register the user and log in twice, to get two devices + self.register_user("localuser", "localpass") + tok1 = self.login("localuser", "localpass") + self.login("localuser", "localpass", device_id="dev2") + + # make the initial request which returns a 401 + channel = self._delete_device(tok1, "dev2") + self.assertEqual(channel.code, 401) + # Ensure that flows are what is expected. + self.assertIn({"stages": ["m.login.password"]}, channel.json_body["flows"]) + self.assertIn({"stages": ["test.login_type"]}, channel.json_body["flows"]) + session = channel.json_body["session"] + + # missing param + body = { + "auth": { + "type": "test.login_type", + "identifier": {"type": "m.id.user", "user": "localuser"}, + # FIXME "identifier" is ignored + # https://github.com/matrix-org/synapse/issues/5665 + "user": "localuser", + "session": session, + }, + } + + channel = self._delete_device(tok1, "dev2", body) + self.assertEqual(channel.code, 400) + # there's a perfectly good M_MISSING_PARAM errcode, but heaven forfend we should + # use it... + self.assertIn("Missing parameters", channel.json_body["error"]) + mock_password_provider.check_auth.assert_not_called() + mock_password_provider.reset_mock() + + # right params, but authing as the wrong user + mock_password_provider.check_auth.return_value = defer.succeed("@user:bz") + body["auth"]["test_field"] = "foo" + channel = self._delete_device(tok1, "dev2", body) + self.assertEqual(channel.code, 403) + self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN") + mock_password_provider.check_auth.assert_called_once_with( + "localuser", "test.login_type", {"test_field": "foo"} + ) + mock_password_provider.reset_mock() + + # and finally, succeed + mock_password_provider.check_auth.return_value = defer.succeed( + "@localuser:test" + ) + channel = self._delete_device(tok1, "dev2", body) + self.assertEqual(channel.code, 200) + mock_password_provider.check_auth.assert_called_once_with( + "localuser", "test.login_type", {"test_field": "foo"} + ) + + @override_config(providers_config(CustomAuthProvider)) + def test_custom_auth_provider_callback(self): + callback = Mock(return_value=defer.succeed(None)) + + mock_password_provider.check_auth.return_value = defer.succeed( + ("@user:bz", callback) + ) + channel = self._send_login("test.login_type", "u", test_field="y") + self.assertEqual(channel.code, 200, channel.result) + self.assertEqual("@user:bz", channel.json_body["user_id"]) + mock_password_provider.check_auth.assert_called_once_with( + "u", "test.login_type", {"test_field": "y"} + ) + + # check the args to the callback + callback.assert_called_once() + call_args, call_kwargs = callback.call_args + # should be one positional arg + self.assertEqual(len(call_args), 1) + self.assertEqual(call_args[0]["user_id"], "@user:bz") + for p in ["user_id", "access_token", "device_id", "home_server"]: + self.assertIn(p, call_args[0]) + + @override_config( + {**providers_config(CustomAuthProvider), "password_config": {"enabled": False}} + ) + def test_custom_auth_password_disabled(self): + """Test login with a custom auth provider where password login is disabled""" + self.register_user("localuser", "localpass") + + flows = self._get_login_flows() + self.assertEqual(flows, [{"type": "test.login_type"}] + ADDITIONAL_LOGIN_FLOWS) + + # login shouldn't work and should be rejected with a 400 ("unknown login type") + channel = self._send_password_login("localuser", "localpass") + self.assertEqual(channel.code, 400, channel.result) + mock_password_provider.check_auth.assert_not_called() + + @override_config( + { + **providers_config(CustomAuthProvider), + "password_config": {"localdb_enabled": False}, + } + ) + def test_custom_auth_no_local_user_fallback(self): + """Test login with a custom auth provider where the local db is disabled""" + self.register_user("localuser", "localpass") + + flows = self._get_login_flows() + self.assertEqual(flows, [{"type": "test.login_type"}] + ADDITIONAL_LOGIN_FLOWS) + + # password login shouldn't work and should be rejected with a 400 + # ("unknown login type") + channel = self._send_password_login("localuser", "localpass") + self.assertEqual(channel.code, 400, channel.result) + + test_custom_auth_no_local_user_fallback.skip = "currently broken" + + def _get_login_flows(self) -> JsonDict: + _, channel = self.make_request("GET", "/_matrix/client/r0/login") + self.assertEqual(channel.code, 200, channel.result) + return channel.json_body["flows"] + + def _send_password_login(self, user: str, password: str) -> FakeChannel: + return self._send_login(type="m.login.password", user=user, password=password) + + def _send_login(self, type, user, **params) -> FakeChannel: + params.update({"user": user, "type": type}) + _, channel = self.make_request("POST", "/_matrix/client/r0/login", params) + return channel + + def _start_delete_device_session(self, access_token, device_id) -> str: + """Make an initial delete device request, and return the UI Auth session ID""" + channel = self._delete_device(access_token, device_id) + self.assertEqual(channel.code, 401) + # Ensure that flows are what is expected. + self.assertIn({"stages": ["m.login.password"]}, channel.json_body["flows"]) + return channel.json_body["session"] + + def _authed_delete_device( + self, + access_token: str, + device_id: str, + session: str, + user_id: str, + password: str, + ) -> FakeChannel: + """Make a delete device request, authenticating with the given uid/password""" + return self._delete_device( + access_token, + device_id, + { + "auth": { + "type": "m.login.password", + "identifier": {"type": "m.id.user", "user": user_id}, + # FIXME "identifier" is ignored + # https://github.com/matrix-org/synapse/issues/5665 + "user": user_id, + "password": password, + "session": session, + }, + }, + ) + + def _delete_device( + self, access_token: str, device: str, body: Union[JsonDict, bytes] = b"", + ) -> FakeChannel: + """Delete an individual device.""" + _, channel = self.make_request( + "DELETE", "devices/" + device, body, access_token=access_token + ) + return channel -- cgit 1.5.1