summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/federation/sender/__init__.py7
-rw-r--r--synapse/federation/sender/per_destination_queue.py17
-rw-r--r--synapse/federation/sender/transaction_manager.py26
-rw-r--r--synapse/handlers/events.py49
-rw-r--r--synapse/handlers/initial_sync.py80
-rw-r--r--synapse/handlers/pagination.py56
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py109
7 files changed, 193 insertions, 151 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py

index 4662008bfd..5276c1734f 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py
@@ -108,8 +108,6 @@ class FederationSender(object): ), ) - self._order = 1 - self._is_processing = False self._last_poked_id = -1 @@ -272,9 +270,6 @@ class FederationSender(object): # a transaction in progress. If we do, stick it in the pending_pdus # table and we'll get back to it later. - order = self._order - self._order += 1 - destinations = set(destinations) destinations.discard(self.server_name) logger.debug("Sending to: %s", str(destinations)) @@ -286,7 +281,7 @@ class FederationSender(object): sent_pdus_destination_dist_count.inc() for destination in destinations: - self._get_per_destination_queue(destination).send_pdu(pdu, order) + self._get_per_destination_queue(destination).send_pdu(pdu) async def send_read_receipt(self, receipt: ReadReceipt) -> None: """Send a RR to any other servers in the room diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index c09ffcaf4c..f1534d431d 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py
@@ -92,8 +92,8 @@ class PerDestinationQueue(object): self._destination = destination self.transmission_loop_running = False - # a list of tuples of (pending pdu, order) - self._pending_pdus = [] # type: List[Tuple[EventBase, int]] + # a list of pending PDUs + self._pending_pdus = [] # type: List[EventBase] # XXX this is never actually used: see # https://github.com/matrix-org/synapse/issues/7549 @@ -132,14 +132,13 @@ class PerDestinationQueue(object): + len(self._pending_edus_keyed) ) - def send_pdu(self, pdu: EventBase, order: int) -> None: + def send_pdu(self, pdu: EventBase) -> None: """Add a PDU to the queue, and start the transmission loop if necessary Args: pdu: pdu to send - order """ - self._pending_pdus.append((pdu, order)) + self._pending_pdus.append(pdu) self.attempt_new_transaction() def send_presence(self, states: Iterable[UserPresenceState]) -> None: @@ -185,7 +184,7 @@ class PerDestinationQueue(object): returns immediately. Otherwise kicks off the process of sending a transaction in the background. """ - # list of (pending_pdu, deferred, order) + if self.transmission_loop_running: # XXX: this can get stuck on by a never-ending # request at which point pending_pdus just keeps growing. @@ -210,7 +209,7 @@ class PerDestinationQueue(object): ) async def _transaction_transmission_loop(self) -> None: - pending_pdus = [] # type: List[Tuple[EventBase, int]] + pending_pdus = [] # type: List[EventBase] try: self.transmission_loop_running = True @@ -373,13 +372,13 @@ class PerDestinationQueue(object): "TX [%s] Failed to send transaction: %s", self._destination, e ) - for p, _ in pending_pdus: + for p in pending_pdus: logger.info( "Failed to send event %s to %s", p.event_id, self._destination ) except Exception: logger.exception("TX [%s] Failed to send transaction", self._destination) - for p, _ in pending_pdus: + for p in pending_pdus: logger.info( "Failed to send event %s to %s", p.event_id, self._destination ) diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 9bd534a313..0ebc70d57d 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py
@@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, List, Tuple +from typing import TYPE_CHECKING, List from synapse.api.errors import HttpResponseException from synapse.events import EventBase @@ -53,11 +53,17 @@ class TransactionManager(object): @measure_func("_send_new_transaction") async def send_new_transaction( - self, - destination: str, - pending_pdus: List[Tuple[EventBase, int]], - pending_edus: List[Edu], - ): + self, destination: str, pdus: List[EventBase], edus: List[Edu], + ) -> bool: + """ + Args: + destination: The destination to send to (e.g. 'example.org') + pdus: In-order list of PDUs to send + edus: List of EDUs to send + + Returns: + True iff the transaction was successful + """ # Make a transaction-sending opentracing span. This span follows on from # all the edus in that transaction. This needs to be done since there is @@ -67,7 +73,7 @@ class TransactionManager(object): span_contexts = [] keep_destination = whitelisted_homeserver(destination) - for edu in pending_edus: + for edu in edus: context = edu.get_context() if context: span_contexts.append(extract_text_map(json_decoder.decode(context))) @@ -75,12 +81,6 @@ class TransactionManager(object): edu.strip_context() with start_active_span_follows_from("send_transaction", span_contexts): - - # Sort based on the order field - pending_pdus.sort(key=lambda t: t[1]) - pdus = [x[0] for x in pending_pdus] - edus = pending_edus - success = True logger.debug("TX [%s] _attempt_new_transaction", destination) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 1924636c4d..b05e32f457 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py
@@ -15,29 +15,30 @@ import logging import random +from typing import TYPE_CHECKING, Iterable, List, Optional from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, SynapseError from synapse.events import EventBase from synapse.handlers.presence import format_user_presence_state from synapse.logging.utils import log_function -from synapse.types import UserID +from synapse.streams.config import PaginationConfig +from synapse.types import JsonDict, UserID from synapse.visibility import filter_events_for_client from ._base import BaseHandler +if TYPE_CHECKING: + from synapse.server import HomeServer + + logger = logging.getLogger(__name__) class EventStreamHandler(BaseHandler): - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super(EventStreamHandler, self).__init__(hs) - # Count of active streams per user - self._streams_per_user = {} - # Grace timers per user to delay the "stopped" signal - self._stop_timer_per_user = {} - self.distributor = hs.get_distributor() self.distributor.declare("started_user_eventstream") self.distributor.declare("stopped_user_eventstream") @@ -52,14 +53,14 @@ class EventStreamHandler(BaseHandler): @log_function async def get_stream( self, - auth_user_id, - pagin_config, - timeout=0, - as_client_event=True, - affect_presence=True, - room_id=None, - is_guest=False, - ): + auth_user_id: str, + pagin_config: PaginationConfig, + timeout: int = 0, + as_client_event: bool = True, + affect_presence: bool = True, + room_id: Optional[str] = None, + is_guest: bool = False, + ) -> JsonDict: """Fetches the events stream for a given user. """ @@ -98,7 +99,7 @@ class EventStreamHandler(BaseHandler): # When the user joins a new room, or another user joins a currently # joined room, we need to send down presence for those users. - to_add = [] + to_add = [] # type: List[JsonDict] for event in events: if not isinstance(event, EventBase): continue @@ -110,7 +111,7 @@ class EventStreamHandler(BaseHandler): # Send down presence for everyone in the room. users = await self.state.get_current_users_in_room( event.room_id - ) + ) # type: Iterable[str] else: users = [event.state_key] @@ -144,20 +145,22 @@ class EventStreamHandler(BaseHandler): class EventHandler(BaseHandler): - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super(EventHandler, self).__init__(hs) self.storage = hs.get_storage() - async def get_event(self, user, room_id, event_id): + async def get_event( + self, user: UserID, room_id: Optional[str], event_id: str + ) -> Optional[EventBase]: """Retrieve a single specified event. Args: - user (synapse.types.UserID): The user requesting the event - room_id (str|None): The expected room id. We'll return None if the + user: The user requesting the event + room_id: The expected room id. We'll return None if the event's room does not match. - event_id (str): The event ID to obtain. + event_id: The event ID to obtain. Returns: - dict: An event, or None if there is no event matching this ID. + An event, or None if there is no event matching this ID. Raises: SynapseError if there was a problem retrieving this event, or AuthError if the user does not have the rights to inspect this diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index ae6bd1d352..d5ddc583ad 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py
@@ -14,6 +14,7 @@ # limitations under the License. import logging +from typing import TYPE_CHECKING from twisted.internet import defer @@ -22,8 +23,9 @@ from synapse.api.errors import SynapseError from synapse.events.validator import EventValidator from synapse.handlers.presence import format_user_presence_state from synapse.logging.context import make_deferred_yieldable, run_in_background +from synapse.storage.roommember import RoomsForUser from synapse.streams.config import PaginationConfig -from synapse.types import StreamToken, UserID +from synapse.types import JsonDict, Requester, StreamToken, UserID from synapse.util import unwrapFirstError from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.response_cache import ResponseCache @@ -31,11 +33,15 @@ from synapse.visibility import filter_events_for_client from ._base import BaseHandler +if TYPE_CHECKING: + from synapse.server import HomeServer + + logger = logging.getLogger(__name__) class InitialSyncHandler(BaseHandler): - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super(InitialSyncHandler, self).__init__(hs) self.hs = hs self.state = hs.get_state_handler() @@ -48,27 +54,25 @@ class InitialSyncHandler(BaseHandler): def snapshot_all_rooms( self, - user_id=None, - pagin_config=None, - as_client_event=True, - include_archived=False, - ): + user_id: str, + pagin_config: PaginationConfig, + as_client_event: bool = True, + include_archived: bool = False, + ) -> JsonDict: """Retrieve a snapshot of all rooms the user is invited or has joined. This snapshot may include messages for all rooms where the user is joined, depending on the pagination config. Args: - user_id (str): The ID of the user making the request. - pagin_config (synapse.api.streams.PaginationConfig): The pagination - config used to determine how many messages *PER ROOM* to return. - as_client_event (bool): True to get events in client-server format. - include_archived (bool): True to get rooms that the user has left + user_id: The ID of the user making the request. + pagin_config: The pagination config used to determine how many + messages *PER ROOM* to return. + as_client_event: True to get events in client-server format. + include_archived: True to get rooms that the user has left Returns: - A list of dicts with "room_id" and "membership" keys for all rooms - the user is currently invited or joined in on. Rooms where the user - is joined on, may return a "messages" key with messages, depending - on the specified PaginationConfig. + A JsonDict with the same format as the response to `/intialSync` + API """ key = ( user_id, @@ -91,11 +95,11 @@ class InitialSyncHandler(BaseHandler): async def _snapshot_all_rooms( self, - user_id=None, - pagin_config=None, - as_client_event=True, - include_archived=False, - ): + user_id: str, + pagin_config: PaginationConfig, + as_client_event: bool = True, + include_archived: bool = False, + ) -> JsonDict: memberships = [Membership.INVITE, Membership.JOIN] if include_archived: @@ -134,7 +138,7 @@ class InitialSyncHandler(BaseHandler): if limit is None: limit = 10 - async def handle_room(event): + async def handle_room(event: RoomsForUser): d = { "room_id": event.room_id, "membership": event.membership, @@ -251,17 +255,18 @@ class InitialSyncHandler(BaseHandler): return ret - async def room_initial_sync(self, requester, room_id, pagin_config=None): + async def room_initial_sync( + self, requester: Requester, room_id: str, pagin_config: PaginationConfig + ) -> JsonDict: """Capture the a snapshot of a room. If user is currently a member of the room this will be what is currently in the room. If the user left the room this will be what was in the room when they left. Args: - requester(Requester): The user to get a snapshot for. - room_id(str): The room to get a snapshot of. - pagin_config(synapse.streams.config.PaginationConfig): - The pagination config used to determine how many messages to - return. + requester: The user to get a snapshot for. + room_id: The room to get a snapshot of. + pagin_config: The pagination config used to determine how many + messages to return. Raises: AuthError if the user wasn't in the room. Returns: @@ -305,8 +310,14 @@ class InitialSyncHandler(BaseHandler): return result async def _room_initial_sync_parted( - self, user_id, room_id, pagin_config, membership, member_event_id, is_peeking - ): + self, + user_id: str, + room_id: str, + pagin_config: PaginationConfig, + membership: Membership, + member_event_id: str, + is_peeking: bool, + ) -> JsonDict: room_state = await self.state_store.get_state_for_events([member_event_id]) room_state = room_state[member_event_id] @@ -350,8 +361,13 @@ class InitialSyncHandler(BaseHandler): } async def _room_initial_sync_joined( - self, user_id, room_id, pagin_config, membership, is_peeking - ): + self, + user_id: str, + room_id: str, + pagin_config: PaginationConfig, + membership: Membership, + is_peeking: bool, + ) -> JsonDict: current_state = await self.state.get_current_state(room_id=room_id) # TODO: These concurrently diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 5a1aa7d830..63d7edff87 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py
@@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import Any, Dict, Optional +from typing import TYPE_CHECKING, Any, Dict, Optional, Set from twisted.python.failure import Failure @@ -30,6 +30,10 @@ from synapse.util.async_helpers import ReadWriteLock from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client +if TYPE_CHECKING: + from synapse.server import HomeServer + + logger = logging.getLogger(__name__) @@ -68,7 +72,7 @@ class PaginationHandler(object): paginating during a purge. """ - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.hs = hs self.auth = hs.get_auth() self.store = hs.get_datastore() @@ -78,9 +82,9 @@ class PaginationHandler(object): self._server_name = hs.hostname self.pagination_lock = ReadWriteLock() - self._purges_in_progress_by_room = set() + self._purges_in_progress_by_room = set() # type: Set[str] # map from purge id to PurgeStatus - self._purges_by_id = {} + self._purges_by_id = {} # type: Dict[str, PurgeStatus] self._event_serializer = hs.get_event_client_serializer() self._retention_default_max_lifetime = hs.config.retention_default_max_lifetime @@ -102,7 +106,9 @@ class PaginationHandler(object): job["longest_max_lifetime"], ) - async def purge_history_for_rooms_in_range(self, min_ms, max_ms): + async def purge_history_for_rooms_in_range( + self, min_ms: Optional[int], max_ms: Optional[int] + ): """Purge outdated events from rooms within the given retention range. If a default retention policy is defined in the server's configuration and its @@ -110,10 +116,10 @@ class PaginationHandler(object): retention policy. Args: - min_ms (int|None): Duration in milliseconds that define the lower limit of + min_ms: Duration in milliseconds that define the lower limit of the range to handle (exclusive). If None, it means that the range has no lower limit. - max_ms (int|None): Duration in milliseconds that define the upper limit of + max_ms: Duration in milliseconds that define the upper limit of the range to handle (inclusive). If None, it means that the range has no upper limit. """ @@ -220,18 +226,19 @@ class PaginationHandler(object): "_purge_history", self._purge_history, purge_id, room_id, token, True, ) - def start_purge_history(self, room_id, token, delete_local_events=False): + def start_purge_history( + self, room_id: str, token: str, delete_local_events: bool = False + ) -> str: """Start off a history purge on a room. Args: - room_id (str): The room to purge from - - token (str): topological token to delete events before - delete_local_events (bool): True to delete local events as well as + room_id: The room to purge from + token: topological token to delete events before + delete_local_events: True to delete local events as well as remote ones Returns: - str: unique ID for this purge transaction. + unique ID for this purge transaction. """ if room_id in self._purges_in_progress_by_room: raise SynapseError( @@ -284,14 +291,11 @@ class PaginationHandler(object): self.hs.get_reactor().callLater(24 * 3600, clear_purge) - def get_purge_status(self, purge_id): + def get_purge_status(self, purge_id: str) -> Optional[PurgeStatus]: """Get the current status of an active purge Args: - purge_id (str): purge_id returned by start_purge_history - - Returns: - PurgeStatus|None + purge_id: purge_id returned by start_purge_history """ return self._purges_by_id.get(purge_id) @@ -312,8 +316,8 @@ class PaginationHandler(object): async def get_messages( self, requester: Requester, - room_id: Optional[str] = None, - pagin_config: Optional[PaginationConfig] = None, + room_id: str, + pagin_config: PaginationConfig, as_client_event: bool = True, event_filter: Optional[Filter] = None, ) -> Dict[str, Any]: @@ -368,11 +372,15 @@ class PaginationHandler(object): # If they have left the room then clamp the token to be before # they left the room, to save the effort of loading from the # database. + + # This is only None if the room is world_readable, in which + # case "JOIN" would have been returned. + assert member_event_id + leave_token = await self.store.get_topological_token_for_event( member_event_id ) - leave_token = RoomStreamToken.parse(leave_token) - if leave_token.topological < max_topo: + if RoomStreamToken.parse(leave_token).topological < max_topo: source_config.from_key = str(leave_token) await self.hs.get_handlers().federation_handler.maybe_backfill( @@ -419,8 +427,8 @@ class PaginationHandler(object): ) if state_ids: - state = await self.store.get_events(list(state_ids.values())) - state = state.values() + state_dict = await self.store.get_events(list(state_ids.values())) + state = state_dict.values() time_now = self.clock.time_msec() diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index cc0b15ae07..09af033233 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -25,6 +25,7 @@ from twisted.enterprise.adbapi import Connection from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import make_in_list_sql_clause +from synapse.storage.types import Cursor from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.descriptors import cached, cachedList @@ -45,8 +46,9 @@ class DeviceKeyLookupResult: # key) and "signatures" (a signature of the structure by the ed25519 key) key_json = attr.ib(type=Optional[str]) - # cross-signing sigs - signatures = attr.ib(type=Optional[Dict], default=None) + # cross-signing sigs on this device. + # dict from (signing user_id)->(signing device_id)->sig + signatures = attr.ib(type=Optional[Dict[str, Dict[str, str]]], factory=dict) class EndToEndKeyWorkerStore(SQLBaseStore): @@ -133,7 +135,10 @@ class EndToEndKeyWorkerStore(SQLBaseStore): include_all_devices: bool = False, include_deleted_devices: bool = False, ) -> Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]]: - """Fetch a list of device keys, together with their cross-signatures. + """Fetch a list of device keys + + Any cross-signatures made on the keys by the owner of the device are also + included. Args: query_list: List of pairs of user_ids and device_ids. Device id can be None @@ -154,22 +159,51 @@ class EndToEndKeyWorkerStore(SQLBaseStore): result = await self.db_pool.runInteraction( "get_e2e_device_keys", - self._get_e2e_device_keys_and_signatures_txn, + self._get_e2e_device_keys_txn, query_list, include_all_devices, include_deleted_devices, ) + # get the (user_id, device_id) tuples to look up cross-signatures for + signature_query = ( + (user_id, device_id) + for user_id, dev in result.items() + for device_id, d in dev.items() + if d is not None + ) + + for batch in batch_iter(signature_query, 50): + cross_sigs_result = await self.db_pool.runInteraction( + "get_e2e_cross_signing_signatures", + self._get_e2e_cross_signing_signatures_for_devices_txn, + batch, + ) + + # add each cross-signing signature to the correct device in the result dict. + for (user_id, key_id, device_id, signature) in cross_sigs_result: + target_device_result = result[user_id][device_id] + target_device_signatures = target_device_result.signatures + + signing_user_signatures = target_device_signatures.setdefault( + user_id, {} + ) + signing_user_signatures[key_id] = signature + log_kv(result) return result - def _get_e2e_device_keys_and_signatures_txn( + def _get_e2e_device_keys_txn( self, txn, query_list, include_all_devices=False, include_deleted_devices=False ) -> Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]]: + """Get information on devices from the database + + The results include the device's keys and self-signatures, but *not* any + cross-signing signatures which have been added subsequently (for which, see + get_e2e_device_keys_and_signatures) + """ query_clauses = [] query_params = [] - signature_query_clauses = [] - signature_query_params = [] if include_all_devices is False: include_deleted_devices = False @@ -180,20 +214,12 @@ class EndToEndKeyWorkerStore(SQLBaseStore): for (user_id, device_id) in query_list: query_clause = "user_id = ?" query_params.append(user_id) - signature_query_clause = "target_user_id = ?" - signature_query_params.append(user_id) if device_id is not None: query_clause += " AND device_id = ?" query_params.append(device_id) - signature_query_clause += " AND target_device_id = ?" - signature_query_params.append(device_id) - - signature_query_clause += " AND user_id = ?" - signature_query_params.append(user_id) query_clauses.append(query_clause) - signature_query_clauses.append(signature_query_clause) sql = ( "SELECT user_id, device_id, " @@ -221,41 +247,36 @@ class EndToEndKeyWorkerStore(SQLBaseStore): for user_id, device_id in deleted_devices: result.setdefault(user_id, {})[device_id] = None - # get signatures on the device - signature_sql = ("SELECT * FROM e2e_cross_signing_signatures WHERE %s") % ( - " OR ".join("(" + q + ")" for q in signature_query_clauses) - ) + return result - txn.execute(signature_sql, signature_query_params) - rows = self.db_pool.cursor_to_dict(txn) - - # add each cross-signing signature to the correct device in the result dict. - for row in rows: - signing_user_id = row["user_id"] - signing_key_id = row["key_id"] - target_user_id = row["target_user_id"] - target_device_id = row["target_device_id"] - signature = row["signature"] - - target_user_result = result.get(target_user_id) - if not target_user_result: - continue + def _get_e2e_cross_signing_signatures_for_devices_txn( + self, txn: Cursor, device_query: Iterable[Tuple[str, str]] + ) -> List[Tuple[str, str, str, str]]: + """Get cross-signing signatures for a given list of devices - target_device_result = target_user_result.get(target_device_id) - if not target_device_result: - # note that target_device_result will be None for deleted devices. - continue + Returns signatures made by the owners of the devices. - target_device_signatures = target_device_result.signatures - if target_device_signatures is None: - target_device_signatures = target_device_result.signatures = {} + Returns: a list of results; each entry in the list is a tuple of + (user_id, key_id, target_device_id, signature). + """ + signature_query_clauses = [] + signature_query_params = [] - signing_user_signatures = target_device_signatures.setdefault( - signing_user_id, {} + for (user_id, device_id) in device_query: + signature_query_clauses.append( + "target_user_id = ? AND target_device_id = ? AND user_id = ?" ) - signing_user_signatures[signing_key_id] = signature + signature_query_params.extend([user_id, device_id, user_id]) - return result + signature_sql = """ + SELECT user_id, key_id, target_device_id, signature + FROM e2e_cross_signing_signatures WHERE %s + """ % ( + " OR ".join("(" + q + ")" for q in signature_query_clauses) + ) + + txn.execute(signature_sql, signature_query_params) + return txn.fetchall() async def get_e2e_one_time_keys( self, user_id: str, device_id: str, key_ids: List[str]