diff options
Diffstat (limited to 'synapse')
33 files changed, 279 insertions, 97 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 139221ad34..448e45e00f 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -69,7 +69,7 @@ class FederationSenderSlaveStore( self.federation_out_pos_startup = self._get_federation_out_pos(db_conn) def _get_federation_out_pos(self, db_conn): - sql = "SELECT stream_id FROM federation_stream_position" " WHERE type = ?" + sql = "SELECT stream_id FROM federation_stream_position WHERE type = ?" sql = self.database_engine.convert_param_style(sql) txn = db_conn.cursor() diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 3e25bf5747..57174da021 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -185,7 +185,7 @@ class ApplicationServiceApi(SimpleHttpClient): if not _is_valid_3pe_metadata(info): logger.warning( - "query_3pe_protocol to %s did not return a" " valid result", uri + "query_3pe_protocol to %s did not return a valid result", uri ) return None diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index e77d3387ff..ca43e96bd1 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -134,7 +134,7 @@ def _load_appservice(hostname, as_info, config_filename): for regex_obj in as_info["namespaces"][ns]: if not isinstance(regex_obj, dict): raise ValueError( - "Expected namespace entry in %s to be an object," " but got %s", + "Expected namespace entry in %s to be an object, but got %s", ns, regex_obj, ) diff --git a/synapse/config/room_directory.py b/synapse/config/room_directory.py index 7c9f05bde4..7ac7699676 100644 --- a/synapse/config/room_directory.py +++ b/synapse/config/room_directory.py @@ -170,7 +170,7 @@ class _RoomDirectoryRule(object): self.action = action else: raise ConfigError( - "%s rules can only have action of 'allow'" " or 'deny'" % (option_name,) + "%s rules can only have action of 'allow' or 'deny'" % (option_name,) ) self._alias_matches_all = alias == "*" diff --git a/synapse/config/server.py b/synapse/config/server.py index 00d01c43af..11336d7549 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -223,7 +223,7 @@ class ServerConfig(Config): self.federation_ip_range_blacklist.update(["0.0.0.0", "::"]) except Exception as e: raise ConfigError( - "Invalid range(s) provided in " "federation_ip_range_blacklist: %s" % e + "Invalid range(s) provided in federation_ip_range_blacklist: %s" % e ) if self.public_baseurl is not None: @@ -787,14 +787,14 @@ class ServerConfig(Config): "--print-pidfile", action="store_true", default=None, - help="Print the path to the pidfile just" " before daemonizing", + help="Print the path to the pidfile just before daemonizing", ) server_group.add_argument( "--manhole", metavar="PORT", dest="manhole", type=int, - help="Turn on the twisted telnet manhole" " service on the given port.", + help="Turn on the twisted telnet manhole service on the given port.", ) diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index 44edcabed4..d68b4bd670 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -44,7 +44,7 @@ class TransactionActions(object): response code and response body. """ if not transaction.transaction_id: - raise RuntimeError("Cannot persist a transaction with no " "transaction_id") + raise RuntimeError("Cannot persist a transaction with no transaction_id") return self.store.get_received_txn_response(transaction.transaction_id, origin) @@ -56,7 +56,7 @@ class TransactionActions(object): Deferred """ if not transaction.transaction_id: - raise RuntimeError("Cannot persist a transaction with no " "transaction_id") + raise RuntimeError("Cannot persist a transaction with no transaction_id") return self.store.set_received_txn_response( transaction.transaction_id, origin, code, response diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 2b2ee8612a..4ebb0e8bc0 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -49,7 +49,7 @@ sent_pdus_destination_dist_count = Counter( sent_pdus_destination_dist_total = Counter( "synapse_federation_client_sent_pdu_destinations:total", - "" "Total number of PDUs queued for sending across all destinations", + "Total number of PDUs queued for sending across all destinations", ) diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index 67b3e1ab6e..5fed626d5b 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -84,7 +84,7 @@ class TransactionManager(object): txn_id = str(self._next_txn_id) logger.debug( - "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)", + "TX [%s] {%s} Attempting new transaction (pdus: %d, edus: %d)", destination, txn_id, len(pdus), @@ -103,7 +103,7 @@ class TransactionManager(object): self._next_txn_id += 1 logger.info( - "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)", + "TX [%s] {%s} Sending transaction [%s], (PDUs: %d, EDUs: %d)", destination, txn_id, transaction.transaction_id, diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 69051101a6..a07d2f1a17 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -119,7 +119,7 @@ class DirectoryHandler(BaseHandler): if not service.is_interested_in_alias(room_alias.to_string()): raise SynapseError( 400, - "This application service has not reserved" " this kind of alias.", + "This application service has not reserved this kind of alias.", errcode=Codes.EXCLUSIVE, ) else: diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index f09a0b73c8..28c12753c1 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -30,6 +30,7 @@ from twisted.internet import defer from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace +from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet from synapse.types import ( UserID, get_domain_from_id, @@ -53,6 +54,12 @@ class E2eKeysHandler(object): self._edu_updater = SigningKeyEduUpdater(hs, self) + self._is_master = hs.config.worker_app is None + if not self._is_master: + self._user_device_resync_client = ReplicationUserDevicesResyncRestServlet.make_client( + hs + ) + federation_registry = hs.get_federation_registry() # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec @@ -191,9 +198,15 @@ class E2eKeysHandler(object): # probably be tracking their device lists. However, we haven't # done an initial sync on the device list so we do it now. try: - user_devices = yield self.device_handler.device_list_updater.user_device_resync( - user_id - ) + if self._is_master: + user_devices = yield self.device_handler.device_list_updater.user_device_resync( + user_id + ) + else: + user_devices = yield self._user_device_resync_client( + user_id=user_id + ) + user_devices = user_devices["devices"] for device in user_devices: results[user_id] = {device["device_id"]: device["keys"]} diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index e9a5e46ced..13fcb408a6 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -96,7 +96,7 @@ def parse_boolean_from_args(args, name, default=None, required=False): return {b"true": True, b"false": False}[args[name][0]] except Exception: message = ( - "Boolean query parameter %r must be one of" " ['true', 'false']" + "Boolean query parameter %r must be one of ['true', 'false']" ) % (name,) raise SynapseError(400, message) else: diff --git a/synapse/logging/_structured.py b/synapse/logging/_structured.py index 334ddaf39a..ffa7b20ca8 100644 --- a/synapse/logging/_structured.py +++ b/synapse/logging/_structured.py @@ -261,6 +261,18 @@ def parse_drain_configs( ) +class StoppableLogPublisher(LogPublisher): + """ + A log publisher that can tell its observers to shut down any external + communications. + """ + + def stop(self): + for obs in self._observers: + if hasattr(obs, "stop"): + obs.stop() + + def setup_structured_logging( hs, config, @@ -336,7 +348,7 @@ def setup_structured_logging( # We should never get here, but, just in case, throw an error. raise ConfigError("%s drain type cannot be configured" % (observer.type,)) - publisher = LogPublisher(*observers) + publisher = StoppableLogPublisher(*observers) log_filter = LogLevelFilterPredicate() for namespace, namespace_config in log_config.get( diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py index 76ce7d8808..05fc64f409 100644 --- a/synapse/logging/_terse_json.py +++ b/synapse/logging/_terse_json.py @@ -17,25 +17,29 @@ Log formatters that output terse JSON. """ +import json import sys +import traceback from collections import deque from ipaddress import IPv4Address, IPv6Address, ip_address from math import floor -from typing import IO +from typing import IO, Optional import attr -from simplejson import dumps from zope.interface import implementer from twisted.application.internet import ClientService +from twisted.internet.defer import Deferred from twisted.internet.endpoints import ( HostnameEndpoint, TCP4ClientEndpoint, TCP6ClientEndpoint, ) +from twisted.internet.interfaces import IPushProducer, ITransport from twisted.internet.protocol import Factory, Protocol from twisted.logger import FileLogObserver, ILogObserver, Logger -from twisted.python.failure import Failure + +_encoder = json.JSONEncoder(ensure_ascii=False, separators=(",", ":")) def flatten_event(event: dict, metadata: dict, include_time: bool = False): @@ -141,12 +145,50 @@ def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogOb def formatEvent(_event: dict) -> str: flattened = flatten_event(_event, metadata) - return dumps(flattened, ensure_ascii=False, separators=(",", ":")) + "\n" + return _encoder.encode(flattened) + "\n" return FileLogObserver(outFile, formatEvent) @attr.s +@implementer(IPushProducer) +class LogProducer(object): + """ + An IPushProducer that writes logs from its buffer to its transport when it + is resumed. + + Args: + buffer: Log buffer to read logs from. + transport: Transport to write to. + """ + + transport = attr.ib(type=ITransport) + _buffer = attr.ib(type=deque) + _paused = attr.ib(default=False, type=bool, init=False) + + def pauseProducing(self): + self._paused = True + + def stopProducing(self): + self._paused = True + self._buffer = None + + def resumeProducing(self): + self._paused = False + + while self._paused is False and (self._buffer and self.transport.connected): + try: + event = self._buffer.popleft() + self.transport.write(_encoder.encode(event).encode("utf8")) + self.transport.write(b"\n") + except Exception: + # Something has gone wrong writing to the transport -- log it + # and break out of the while. + traceback.print_exc(file=sys.__stderr__) + break + + +@attr.s @implementer(ILogObserver) class TerseJSONToTCPLogObserver(object): """ @@ -165,8 +207,9 @@ class TerseJSONToTCPLogObserver(object): metadata = attr.ib(type=dict) maximum_buffer = attr.ib(type=int) _buffer = attr.ib(default=attr.Factory(deque), type=deque) - _writer = attr.ib(default=None) + _connection_waiter = attr.ib(default=None, type=Optional[Deferred]) _logger = attr.ib(default=attr.Factory(Logger)) + _producer = attr.ib(default=None, type=Optional[LogProducer]) def start(self) -> None: @@ -187,38 +230,43 @@ class TerseJSONToTCPLogObserver(object): factory = Factory.forProtocol(Protocol) self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor()) self._service.startService() + self._connect() - def _write_loop(self) -> None: + def stop(self): + self._service.stopService() + + def _connect(self) -> None: """ - Implement the write loop. + Triggers an attempt to connect then write to the remote if not already writing. """ - if self._writer: + if self._connection_waiter: return - self._writer = self._service.whenConnected() + self._connection_waiter = self._service.whenConnected(failAfterFailures=1) + + @self._connection_waiter.addErrback + def fail(r): + r.printTraceback(file=sys.__stderr__) + self._connection_waiter = None + self._connect() - @self._writer.addBoth + @self._connection_waiter.addCallback def writer(r): - if isinstance(r, Failure): - r.printTraceback(file=sys.__stderr__) - self._writer = None - self.hs.get_reactor().callLater(1, self._write_loop) + # We have a connection. If we already have a producer, and its + # transport is the same, just trigger a resumeProducing. + if self._producer and r.transport is self._producer.transport: + self._producer.resumeProducing() return - try: - for event in self._buffer: - r.transport.write( - dumps(event, ensure_ascii=False, separators=(",", ":")).encode( - "utf8" - ) - ) - r.transport.write(b"\n") - self._buffer.clear() - except Exception as e: - sys.__stderr__.write("Failed writing out logs with %s\n" % (str(e),)) - - self._writer = False - self.hs.get_reactor().callLater(1, self._write_loop) + # If the producer is still producing, stop it. + if self._producer: + self._producer.stopProducing() + + # Make a new producer and start it. + self._producer = LogProducer(buffer=self._buffer, transport=r.transport) + r.transport.registerProducer(self._producer, True) + self._producer.resumeProducing() + self._connection_waiter = None def _handle_pressure(self) -> None: """ @@ -277,4 +325,4 @@ class TerseJSONToTCPLogObserver(object): self._logger.failure("Failed clearing backpressure") # Try and write immediately. - self._write_loop() + self._connect() diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index e994037be6..d0879b0490 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -246,7 +246,7 @@ class HttpPusher(object): # fixed, we don't suddenly deliver a load # of old notifications. logger.warning( - "Giving up on a notification to user %s, " "pushkey %s", + "Giving up on a notification to user %s, pushkey %s", self.user_id, self.pushkey, ) @@ -299,8 +299,7 @@ class HttpPusher(object): # for sanity, we only remove the pushkey if it # was the one we actually sent... logger.warning( - ("Ignoring rejected pushkey %s because we" " didn't send it"), - pk, + ("Ignoring rejected pushkey %s because we didn't send it"), pk, ) else: logger.info("Pushkey %s was rejected: removing", pk) diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 1d15a06a58..b13b646bfd 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -43,7 +43,7 @@ logger = logging.getLogger(__name__) MESSAGE_FROM_PERSON_IN_ROOM = ( - "You have a message on %(app)s from %(person)s " "in the %(room)s room..." + "You have a message on %(app)s from %(person)s in the %(room)s room..." ) MESSAGE_FROM_PERSON = "You have a message on %(app)s from %(person)s..." MESSAGES_FROM_PERSON = "You have messages on %(app)s from %(person)s..." @@ -55,7 +55,7 @@ MESSAGES_FROM_PERSON_AND_OTHERS = ( "You have messages on %(app)s from %(person)s and others..." ) INVITE_FROM_PERSON_TO_ROOM = ( - "%(person)s has invited you to join the " "%(room)s room on %(app)s..." + "%(person)s has invited you to join the %(room)s room on %(app)s..." ) INVITE_FROM_PERSON = "%(person)s has invited you to chat on %(app)s..." diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index 81b85352b1..28dbc6fcba 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -14,7 +14,14 @@ # limitations under the License. from synapse.http.server import JsonResource -from synapse.replication.http import federation, login, membership, register, send_event +from synapse.replication.http import ( + devices, + federation, + login, + membership, + register, + send_event, +) REPLICATION_PREFIX = "/_synapse/replication" @@ -30,3 +37,4 @@ class ReplicationRestResource(JsonResource): federation.register_servlets(hs, self) login.register_servlets(hs, self) register.register_servlets(hs, self) + devices.register_servlets(hs, self) diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py new file mode 100644 index 0000000000..e32aac0a25 --- /dev/null +++ b/synapse/replication/http/devices.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from synapse.replication.http._base import ReplicationEndpoint + +logger = logging.getLogger(__name__) + + +class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint): + """Ask master to resync the device list for a user by contacting their + server. + + This must happen on master so that the results can be correctly cached in + the database and streamed to workers. + + Request format: + + POST /_synapse/replication/user_device_resync/:user_id + + {} + + Response is equivalent to ` /_matrix/federation/v1/user/devices/:user_id` + response, e.g.: + + { + "user_id": "@alice:example.org", + "devices": [ + { + "device_id": "JLAFKJWSCS", + "keys": { ... }, + "device_display_name": "Alice's Mobile Phone" + } + ] + } + """ + + NAME = "user_device_resync" + PATH_ARGS = ("user_id",) + CACHE = False + + def __init__(self, hs): + super(ReplicationUserDevicesResyncRestServlet, self).__init__(hs) + + self.device_list_updater = hs.get_device_handler().device_list_updater + self.store = hs.get_datastore() + self.clock = hs.get_clock() + + @staticmethod + def _serialize_payload(user_id): + return {} + + async def _handle_request(self, request, user_id): + user_devices = await self.device_list_updater.user_device_resync(user_id) + + return 200, user_devices + + +def register_servlets(hs, http_server): + ReplicationUserDevicesResyncRestServlet(hs).register(http_server) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 9e45429d49..8512923eae 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -88,8 +88,7 @@ TagAccountDataStreamRow = namedtuple( "TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict ) AccountDataStreamRow = namedtuple( - "AccountDataStream", - ("user_id", "room_id", "data_type", "data"), # str # str # str # dict + "AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str ) GroupsStreamRow = namedtuple( "GroupsStreamRow", @@ -421,8 +420,8 @@ class AccountDataStream(Stream): results = list(room_results) results.extend( - (stream_id, user_id, None, account_data_type, content) - for stream_id, user_id, account_data_type, content in global_results + (stream_id, user_id, None, account_data_type) + for stream_id, user_id, account_data_type in global_results ) return results diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 15c15a12f5..a23d6f5c75 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -122,7 +122,7 @@ class PreviewUrlResource(DirectServeResource): pattern = entry[attrib] value = getattr(url_tuple, attrib) logger.debug( - "Matching attrib '%s' with value '%s' against" " pattern '%s'", + "Matching attrib '%s' with value '%s' against pattern '%s'", attrib, value, pattern, diff --git a/synapse/server_notices/consent_server_notices.py b/synapse/server_notices/consent_server_notices.py index 415e9c17d8..5736c56032 100644 --- a/synapse/server_notices/consent_server_notices.py +++ b/synapse/server_notices/consent_server_notices.py @@ -54,7 +54,7 @@ class ConsentServerNotices(object): ) if "body" not in self._server_notice_content: raise ConfigError( - "user_consent server_notice_consent must contain a 'body' " "key." + "user_consent server_notice_consent must contain a 'body' key." ) self._consent_uri_builder = ConsentURIBuilder(hs.config) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index ab596fa68d..459901ac60 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -409,16 +409,15 @@ class SQLBaseStore(object): i = 0 N = 5 while True: + cursor = LoggingTransaction( + conn.cursor(), + name, + self.database_engine, + after_callbacks, + exception_callbacks, + ) try: - txn = conn.cursor() - txn = LoggingTransaction( - txn, - name, - self.database_engine, - after_callbacks, - exception_callbacks, - ) - r = func(txn, *args, **kwargs) + r = func(cursor, *args, **kwargs) conn.commit() return r except self.database_engine.module.OperationalError as e: @@ -456,6 +455,40 @@ class SQLBaseStore(object): ) continue raise + finally: + # we're either about to retry with a new cursor, or we're about to + # release the connection. Once we release the connection, it could + # get used for another query, which might do a conn.rollback(). + # + # In the latter case, even though that probably wouldn't affect the + # results of this transaction, python's sqlite will reset all + # statements on the connection [1], which will make our cursor + # invalid [2]. + # + # In any case, continuing to read rows after commit()ing seems + # dubious from the PoV of ACID transactional semantics + # (sqlite explicitly says that once you commit, you may see rows + # from subsequent updates.) + # + # In psycopg2, cursors are essentially a client-side fabrication - + # all the data is transferred to the client side when the statement + # finishes executing - so in theory we could go on streaming results + # from the cursor, but attempting to do so would make us + # incompatible with sqlite, so let's make sure we're not doing that + # by closing the cursor. + # + # (*named* cursors in psycopg2 are different and are proper server- + # side things, but (a) we don't use them and (b) they are implicitly + # closed by ending the transaction anyway.) + # + # In short, if we haven't finished with the cursor yet, that's a + # problem waiting to bite us. + # + # TL;DR: we're done with the cursor, so we can close it. + # + # [1]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/connection.c#L465 + # [2]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/cursor.c#L236 + cursor.close() except Exception as e: logger.debug("[TXN FAIL] {%s} %s", name, e) raise @@ -851,7 +884,7 @@ class SQLBaseStore(object): allvalues.update(values) latter = "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values) - sql = ("INSERT INTO %s (%s) VALUES (%s) " "ON CONFLICT (%s) DO %s") % ( + sql = ("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s") % ( table, ", ".join(k for k in allvalues), ", ".join("?" for _ in allvalues), diff --git a/synapse/storage/data_stores/main/account_data.py b/synapse/storage/data_stores/main/account_data.py index 6afbfc0d74..22093484ed 100644 --- a/synapse/storage/data_stores/main/account_data.py +++ b/synapse/storage/data_stores/main/account_data.py @@ -184,14 +184,14 @@ class AccountDataWorkerStore(SQLBaseStore): current_id(int): The position to fetch up to. Returns: A deferred pair of lists of tuples of stream_id int, user_id string, - room_id string, type string, and content string. + room_id string, and type string. """ if last_room_id == current_id and last_global_id == current_id: return defer.succeed(([], [])) def get_updated_account_data_txn(txn): sql = ( - "SELECT stream_id, user_id, account_data_type, content" + "SELECT stream_id, user_id, account_data_type" " FROM account_data WHERE ? < stream_id AND stream_id <= ?" " ORDER BY stream_id ASC LIMIT ?" ) @@ -199,7 +199,7 @@ class AccountDataWorkerStore(SQLBaseStore): global_results = txn.fetchall() sql = ( - "SELECT stream_id, user_id, room_id, account_data_type, content" + "SELECT stream_id, user_id, room_id, account_data_type" " FROM room_account_data WHERE ? < stream_id AND stream_id <= ?" " ORDER BY stream_id ASC LIMIT ?" ) diff --git a/synapse/storage/data_stores/main/deviceinbox.py b/synapse/storage/data_stores/main/deviceinbox.py index 96cd0fb77a..a23744f11c 100644 --- a/synapse/storage/data_stores/main/deviceinbox.py +++ b/synapse/storage/data_stores/main/deviceinbox.py @@ -380,7 +380,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore) devices = list(messages_by_device.keys()) if len(devices) == 1 and devices[0] == "*": # Handle wildcard device_ids. - sql = "SELECT device_id FROM devices" " WHERE user_id = ?" + sql = "SELECT device_id FROM devices WHERE user_id = ?" txn.execute(sql, (user_id,)) message_json = json.dumps(messages_by_device["*"]) for row in txn: diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py index 073412a78d..d8ad59ad93 100644 --- a/synapse/storage/data_stores/main/end_to_end_keys.py +++ b/synapse/storage/data_stores/main/end_to_end_keys.py @@ -138,9 +138,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore): result.setdefault(user_id, {})[device_id] = None # get signatures on the device - signature_sql = ( - "SELECT * " " FROM e2e_cross_signing_signatures " " WHERE %s" - ) % (" OR ".join("(" + q + ")" for q in signature_query_clauses)) + signature_sql = ("SELECT * FROM e2e_cross_signing_signatures WHERE %s") % ( + " OR ".join("(" + q + ")" for q in signature_query_clauses) + ) txn.execute(signature_sql, signature_query_params) rows = self.cursor_to_dict(txn) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 878f7568a6..627c0b67f1 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -713,9 +713,7 @@ class EventsStore( metadata_json = encode_json(event.internal_metadata.get_dict()) - sql = ( - "UPDATE event_json SET internal_metadata = ?" " WHERE event_id = ?" - ) + sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?" txn.execute(sql, (metadata_json, event.event_id)) # Add an entry to the ex_outlier_stream table to replicate the @@ -732,7 +730,7 @@ class EventsStore( }, ) - sql = "UPDATE events SET outlier = ?" " WHERE event_id = ?" + sql = "UPDATE events SET outlier = ? WHERE event_id = ?" txn.execute(sql, (False, event.event_id)) # Update the event_backward_extremities table now that this @@ -1479,7 +1477,7 @@ class EventsStore( # We do joins against events_to_purge for e.g. calculating state # groups to purge, etc., so lets make an index. - txn.execute("CREATE INDEX events_to_purge_id" " ON events_to_purge(event_id)") + txn.execute("CREATE INDEX events_to_purge_id ON events_to_purge(event_id)") txn.execute("SELECT event_id, should_delete FROM events_to_purge") event_rows = txn.fetchall() diff --git a/synapse/storage/data_stores/main/filtering.py b/synapse/storage/data_stores/main/filtering.py index a2a2a67927..f05ace299a 100644 --- a/synapse/storage/data_stores/main/filtering.py +++ b/synapse/storage/data_stores/main/filtering.py @@ -55,7 +55,7 @@ class FilteringStore(SQLBaseStore): if filter_id_response is not None: return filter_id_response[0] - sql = "SELECT MAX(filter_id) FROM user_filters " "WHERE user_id = ?" + sql = "SELECT MAX(filter_id) FROM user_filters WHERE user_id = ?" txn.execute(sql, (user_localpart,)) max_id = txn.fetchone()[0] if max_id is None: diff --git a/synapse/storage/data_stores/main/media_repository.py b/synapse/storage/data_stores/main/media_repository.py index 84b5f3ad5e..0f2887bdce 100644 --- a/synapse/storage/data_stores/main/media_repository.py +++ b/synapse/storage/data_stores/main/media_repository.py @@ -337,7 +337,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): if len(media_ids) == 0: return - sql = "DELETE FROM local_media_repository_url_cache" " WHERE media_id = ?" + sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?" def _delete_url_cache_txn(txn): txn.executemany(sql, [(media_id,) for media_id in media_ids]) @@ -365,11 +365,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): return def _delete_url_cache_media_txn(txn): - sql = "DELETE FROM local_media_repository" " WHERE media_id = ?" + sql = "DELETE FROM local_media_repository WHERE media_id = ?" txn.executemany(sql, [(media_id,) for media_id in media_ids]) - sql = "DELETE FROM local_media_repository_thumbnails" " WHERE media_id = ?" + sql = "DELETE FROM local_media_repository_thumbnails WHERE media_id = ?" txn.executemany(sql, [(media_id,) for media_id in media_ids]) diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py index 0c24430f28..8b17334ff4 100644 --- a/synapse/storage/data_stores/main/receipts.py +++ b/synapse/storage/data_stores/main/receipts.py @@ -280,7 +280,7 @@ class ReceiptsWorkerStore(SQLBaseStore): args.append(limit) txn.execute(sql, args) - return (r[0:5] + (json.loads(r[5]),) for r in txn) + return list(r[0:5] + (json.loads(r[5]),) for r in txn) return self.runInteraction( "get_all_updated_receipts", get_all_updated_receipts_txn diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py index ee1b2b2bbf..6a594c160c 100644 --- a/synapse/storage/data_stores/main/registration.py +++ b/synapse/storage/data_stores/main/registration.py @@ -377,9 +377,7 @@ class RegistrationWorkerStore(SQLBaseStore): """ def f(txn): - sql = ( - "SELECT name, password_hash FROM users" " WHERE lower(name) = lower(?)" - ) + sql = "SELECT name, password_hash FROM users WHERE lower(name) = lower(?)" txn.execute(sql, (user_id,)) return dict(txn) diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 8780fdd989..9ae4a913a1 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -616,7 +616,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): def _get_max_topological_txn(self, txn, room_id): txn.execute( - "SELECT MAX(topological_ordering) FROM events" " WHERE room_id = ?", + "SELECT MAX(topological_ordering) FROM events WHERE room_id = ?", (room_id,), ) diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py index 10d1887f75..aa24339717 100644 --- a/synapse/storage/data_stores/main/tags.py +++ b/synapse/storage/data_stores/main/tags.py @@ -83,9 +83,7 @@ class TagsWorkerStore(AccountDataWorkerStore): ) def get_tag_content(txn, tag_ids): - sql = ( - "SELECT tag, content" " FROM room_tags" " WHERE user_id=? AND room_id=?" - ) + sql = "SELECT tag, content FROM room_tags WHERE user_id=? AND room_id=?" results = [] for stream_id, user_id, room_id in tag_ids: txn.execute(sql, (user_id, room_id)) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 2e7753820e..731e1c9d9c 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -447,7 +447,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams) # Mark as done. cur.execute( database_engine.convert_param_style( - "INSERT INTO applied_module_schemas (module_name, file)" " VALUES (?,?)" + "INSERT INTO applied_module_schemas (module_name, file) VALUES (?,?)" ), (modname, name), ) diff --git a/synapse/streams/config.py b/synapse/streams/config.py index 02994ab2a5..cd56cd91ed 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -88,9 +88,12 @@ class PaginationConfig(object): raise SynapseError(400, "Invalid request.") def __repr__(self): - return ( - "PaginationConfig(from_tok=%r, to_tok=%r," " direction=%r, limit=%r)" - ) % (self.from_token, self.to_token, self.direction, self.limit) + return ("PaginationConfig(from_tok=%r, to_tok=%r, direction=%r, limit=%r)") % ( + self.from_token, + self.to_token, + self.direction, + self.limit, + ) def get_source_config(self, source_name): keyname = "%s_key" % source_name |