diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/app/appservice.py | 4 | ||||
-rw-r--r-- | synapse/app/client_reader.py | 4 | ||||
-rw-r--r-- | synapse/app/federation_reader.py | 4 | ||||
-rw-r--r-- | synapse/app/media_repository.py | 4 | ||||
-rw-r--r-- | synapse/app/pusher.py | 4 | ||||
-rw-r--r-- | synapse/app/synchrotron.py | 2 | ||||
-rw-r--r-- | synapse/config/password_auth_providers.py | 9 | ||||
-rw-r--r-- | synapse/config/repository.py | 2 | ||||
-rw-r--r-- | synapse/handlers/auth.py | 2 | ||||
-rw-r--r-- | synapse/handlers/message.py | 15 | ||||
-rw-r--r-- | synapse/http/server.py | 23 | ||||
-rw-r--r-- | synapse/metrics/__init__.py | 16 | ||||
-rw-r--r-- | synapse/metrics/process_collector.py | 65 | ||||
-rw-r--r-- | synapse/rest/media/v1/download_resource.py | 3 | ||||
-rw-r--r-- | synapse/rest/media/v1/thumbnail_resource.py | 3 | ||||
-rw-r--r-- | synapse/storage/prepare_database.py | 2 | ||||
-rw-r--r-- | synapse/storage/pusher.py | 13 | ||||
-rw-r--r-- | synapse/storage/schema/delta/38/postgres_fts_gist.sql | 17 | ||||
-rw-r--r-- | synapse/storage/search.py | 27 | ||||
-rw-r--r-- | synapse/storage/transactions.py | 154 |
20 files changed, 134 insertions, 239 deletions
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 4e62a84b28..dd9ee406a1 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -34,6 +34,8 @@ from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string +from synapse import events + from twisted.internet import reactor, defer from twisted.web.resource import Resource @@ -151,6 +153,8 @@ def start(config_options): setup_logging(config.worker_log_config, config.worker_log_file) + events.USE_FROZEN_DICTS = config.use_frozen_dicts + database_engine = create_engine(config.database_config) if config.notify_appservices: diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 9fccc73db3..0086a2977e 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -41,6 +41,8 @@ from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string from synapse.crypto import context_factory +from synapse import events + from twisted.internet import reactor, defer from twisted.web.resource import Resource @@ -165,6 +167,8 @@ def start(config_options): setup_logging(config.worker_log_config, config.worker_log_file) + events.USE_FROZEN_DICTS = config.use_frozen_dicts + database_engine = create_engine(config.database_config) tls_server_context_factory = context_factory.ServerContextFactory(config) diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 1f5ae1937e..b5f59a9931 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -39,6 +39,8 @@ from synapse.api.urls import FEDERATION_PREFIX from synapse.federation.transport.server import TransportLayerServer from synapse.crypto import context_factory +from synapse import events + from twisted.internet import reactor, defer from twisted.web.resource import Resource @@ -156,6 +158,8 @@ def start(config_options): setup_logging(config.worker_log_config, config.worker_log_file) + events.USE_FROZEN_DICTS = config.use_frozen_dicts + database_engine = create_engine(config.database_config) tls_server_context_factory = context_factory.ServerContextFactory(config) diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index 6e5ec01c6c..44c19a1bef 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -41,6 +41,8 @@ from synapse.api.urls import ( ) from synapse.crypto import context_factory +from synapse import events + from twisted.internet import reactor, defer from twisted.web.resource import Resource @@ -162,6 +164,8 @@ def start(config_options): setup_logging(config.worker_log_config, config.worker_log_file) + events.USE_FROZEN_DICTS = config.use_frozen_dicts + database_engine = create_engine(config.database_config) tls_server_context_factory = context_factory.ServerContextFactory(config) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 1a6f5507a9..a0e765c54f 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -36,6 +36,8 @@ from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string +from synapse import events + from twisted.internet import reactor, defer from twisted.web.resource import Resource @@ -239,6 +241,8 @@ def start(config_options): setup_logging(config.worker_log_config, config.worker_log_file) + events.USE_FROZEN_DICTS = config.use_frozen_dicts + if config.start_pushers: sys.stderr.write( "\nThe pushers must be disabled in the main synapse process" diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 64b209ffe6..bf1b995dc2 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -446,6 +446,8 @@ def start(config_options): setup_logging(config.worker_log_config, config.worker_log_file) + synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts + database_engine = create_engine(config.database_config) ss = SynchrotronServer( diff --git a/synapse/config/password_auth_providers.py b/synapse/config/password_auth_providers.py index f6d9bb1c62..1f438d2bb3 100644 --- a/synapse/config/password_auth_providers.py +++ b/synapse/config/password_auth_providers.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import Config +from ._base import Config, ConfigError import importlib @@ -39,7 +39,12 @@ class PasswordAuthProviderConfig(Config): module = importlib.import_module(module) provider_class = getattr(module, clz) - provider_config = provider_class.parse_config(provider["config"]) + try: + provider_config = provider_class.parse_config(provider["config"]) + except Exception as e: + raise ConfigError( + "Failed to parse config for %r: %r" % (provider['module'], e) + ) self.password_providers.append((provider_class, provider_config)) def default_config(self, **kwargs): diff --git a/synapse/config/repository.py b/synapse/config/repository.py index 8810079848..2c6f57168e 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -167,6 +167,8 @@ class ContentRepositoryConfig(Config): # - '10.0.0.0/8' # - '172.16.0.0/12' # - '192.168.0.0/16' + # - '100.64.0.0/10' + # - '169.254.0.0/16' # # List of IP address CIDR ranges that the URL preview spider is allowed # to access even if they are specified in url_preview_ip_range_blacklist. diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 3635521230..3851b35889 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -653,7 +653,7 @@ class AuthHandler(BaseHandler): Returns: Hashed password (str). """ - return bcrypt.hashpw(password + self.hs.config.password_pepper, + return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper, bcrypt.gensalt(self.bcrypt_rounds)) def validate_hash(self, password, stored_hash): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index abfa8c65a4..81df45177a 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -34,6 +34,7 @@ from ._base import BaseHandler from canonicaljson import encode_canonical_json import logging +import random logger = logging.getLogger(__name__) @@ -415,6 +416,20 @@ class MessageHandler(BaseHandler): builder.room_id, ) + # We want to limit the max number of prev events we point to in our + # new event + if len(latest_ret) > 10: + # Sort by reverse depth, so we point to the most recent. + latest_ret.sort(key=lambda a: -a[2]) + new_latest_ret = latest_ret[:5] + + # We also randomly point to some of the older events, to make + # sure that we don't completely ignore the older events. + if latest_ret[5:]: + sample_size = min(5, len(latest_ret[5:])) + new_latest_ret.extend(random.sample(latest_ret[5:], sample_size)) + latest_ret = new_latest_ret + if latest_ret: depth = max([d for _, _, d in latest_ret]) + 1 else: diff --git a/synapse/http/server.py b/synapse/http/server.py index 168e53ce0c..14715878c5 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -392,17 +392,30 @@ def respond_with_json_bytes(request, code, json_bytes, send_cors=False, request.setHeader(b"Content-Length", b"%d" % (len(json_bytes),)) if send_cors: - request.setHeader("Access-Control-Allow-Origin", "*") - request.setHeader("Access-Control-Allow-Methods", - "GET, POST, PUT, DELETE, OPTIONS") - request.setHeader("Access-Control-Allow-Headers", - "Origin, X-Requested-With, Content-Type, Accept") + set_cors_headers(request) request.write(json_bytes) finish_request(request) return NOT_DONE_YET +def set_cors_headers(request): + """Set the CORs headers so that javascript running in a web browsers can + use this API + + Args: + request (twisted.web.http.Request): The http request to add CORs to. + """ + request.setHeader("Access-Control-Allow-Origin", "*") + request.setHeader( + "Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS" + ) + request.setHeader( + "Access-Control-Allow-Headers", + "Origin, X-Requested-With, Content-Type, Accept" + ) + + def finish_request(request): """ Finish writing the response to the request. diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 7041da25ce..2265e6e8d6 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -111,18 +111,20 @@ def render_all(): return "\n".join(strs) -reactor_metrics = get_metrics_for("reactor") -tick_time = reactor_metrics.register_distribution("tick_time") -pending_calls_metric = reactor_metrics.register_distribution("pending_calls") +register_process_collector(get_metrics_for("process")) + -gc_time = reactor_metrics.register_distribution("gc_time", labels=["gen"]) -gc_unreachable = reactor_metrics.register_counter("gc_unreachable", labels=["gen"]) +python_metrics = get_metrics_for("python") -reactor_metrics.register_callback( +gc_time = python_metrics.register_distribution("gc_time", labels=["gen"]) +gc_unreachable = python_metrics.register_counter("gc_unreachable_total", labels=["gen"]) +python_metrics.register_callback( "gc_counts", lambda: {(i,): v for i, v in enumerate(gc.get_count())}, labels=["gen"] ) -register_process_collector(get_metrics_for("process")) +reactor_metrics = get_metrics_for("python.twisted.reactor") +tick_time = reactor_metrics.register_distribution("tick_time") +pending_calls_metric = reactor_metrics.register_distribution("pending_calls") def runUntilCurrentTimer(func): diff --git a/synapse/metrics/process_collector.py b/synapse/metrics/process_collector.py index 0e95582368..6fec3de399 100644 --- a/synapse/metrics/process_collector.py +++ b/synapse/metrics/process_collector.py @@ -13,12 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -# Because otherwise 'resource' collides with synapse.metrics.resource -from __future__ import absolute_import - import os -import stat -from resource import getrusage, RUSAGE_SELF TICKS_PER_SEC = 100 @@ -29,16 +24,6 @@ HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") HAVE_PROC_SELF_LIMITS = os.path.exists("/proc/self/limits") HAVE_PROC_SELF_FD = os.path.exists("/proc/self/fd") -TYPES = { - stat.S_IFSOCK: "SOCK", - stat.S_IFLNK: "LNK", - stat.S_IFREG: "REG", - stat.S_IFBLK: "BLK", - stat.S_IFDIR: "DIR", - stat.S_IFCHR: "CHR", - stat.S_IFIFO: "FIFO", -} - # Field indexes from /proc/self/stat, taken from the proc(5) manpage STAT_FIELDS = { "utime": 14, @@ -49,9 +34,7 @@ STAT_FIELDS = { } -rusage = None stats = {} -fd_counts = None # In order to report process_start_time_seconds we need to know the # machine's boot time, because the value in /proc/self/stat is relative to @@ -65,9 +48,6 @@ if HAVE_PROC_STAT: def update_resource_metrics(): - global rusage - rusage = getrusage(RUSAGE_SELF) - if HAVE_PROC_SELF_STAT: global stats with open("/proc/self/stat") as s: @@ -80,52 +60,17 @@ def update_resource_metrics(): # we've lost the first two fields in PID and COMMAND above stats[name] = int(raw_stats[index - 3]) - global fd_counts - fd_counts = _process_fds() - - -def _process_fds(): - counts = {(k,): 0 for k in TYPES.values()} - counts[("other",)] = 0 +def _count_fds(): # Not every OS will have a /proc/self/fd directory if not HAVE_PROC_SELF_FD: - return counts - - for fd in os.listdir("/proc/self/fd"): - try: - s = os.stat("/proc/self/fd/%s" % (fd)) - fmt = stat.S_IFMT(s.st_mode) - if fmt in TYPES: - t = TYPES[fmt] - else: - t = "other" + return 0 - counts[(t,)] += 1 - except OSError: - # the dirh itself used by listdir() is usually missing by now - pass - - return counts + return len(os.listdir("/proc/self/fd")) def register_process_collector(process_metrics): - # Legacy synapse-invented metric names - - resource_metrics = process_metrics.make_subspace("resource") - - resource_metrics.register_collector(update_resource_metrics) - - # msecs - resource_metrics.register_callback("utime", lambda: rusage.ru_utime * 1000) - resource_metrics.register_callback("stime", lambda: rusage.ru_stime * 1000) - - # kilobytes - resource_metrics.register_callback("maxrss", lambda: rusage.ru_maxrss * 1024) - - process_metrics.register_callback("fds", _process_fds, labels=["type"]) - - # New prometheus-standard metric names + process_metrics.register_collector(update_resource_metrics) if HAVE_PROC_SELF_STAT: process_metrics.register_callback( @@ -158,7 +103,7 @@ def register_process_collector(process_metrics): if HAVE_PROC_SELF_FD: process_metrics.register_callback( "open_fds", - lambda: sum(fd_counts.values()) + lambda: _count_fds() ) if HAVE_PROC_SELF_LIMITS: diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py index a45ee9483e..dfb87ffd15 100644 --- a/synapse/rest/media/v1/download_resource.py +++ b/synapse/rest/media/v1/download_resource.py @@ -15,7 +15,7 @@ from ._base import parse_media_id, respond_with_file, respond_404 from twisted.web.resource import Resource -from synapse.http.server import request_handler +from synapse.http.server import request_handler, set_cors_headers from twisted.web.server import NOT_DONE_YET from twisted.internet import defer @@ -45,6 +45,7 @@ class DownloadResource(Resource): @request_handler() @defer.inlineCallbacks def _async_render_GET(self, request): + set_cors_headers(request) request.setHeader( "Content-Security-Policy", "default-src 'none';" diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py index 0b9e1de1a7..d8f54adc99 100644 --- a/synapse/rest/media/v1/thumbnail_resource.py +++ b/synapse/rest/media/v1/thumbnail_resource.py @@ -17,7 +17,7 @@ from ._base import parse_media_id, respond_404, respond_with_file from twisted.web.resource import Resource from synapse.http.servlet import parse_string, parse_integer -from synapse.http.server import request_handler +from synapse.http.server import request_handler, set_cors_headers from twisted.web.server import NOT_DONE_YET from twisted.internet import defer @@ -48,6 +48,7 @@ class ThumbnailResource(Resource): @request_handler() @defer.inlineCallbacks def _async_render_GET(self, request): + set_cors_headers(request) server_name, media_id, _ = parse_media_id(request) width = parse_integer(request, "width") height = parse_integer(request, "height") diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index d2c0aebe48..6576a30098 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 37 +SCHEMA_VERSION = 38 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 8f5f8f24a9..8cc9f0353b 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -137,17 +137,8 @@ class PusherStore(SQLBaseStore): @cachedInlineCallbacks(num_args=1, max_entries=15000) def get_if_user_has_pusher(self, user_id): - result = yield self._simple_select_many_batch( - table='pushers', - keyvalues={ - 'user_name': 'user_id', - }, - retcol='user_name', - desc='get_if_user_has_pusher', - allow_none=True, - ) - - defer.returnValue(bool(result)) + # This only exists for the cachedList decorator + raise NotImplementedError() @cachedList(cached_method_name="get_if_user_has_pusher", list_name="user_ids", num_args=1, inlineCallbacks=True) diff --git a/synapse/storage/schema/delta/38/postgres_fts_gist.sql b/synapse/storage/schema/delta/38/postgres_fts_gist.sql new file mode 100644 index 0000000000..f090a7b75a --- /dev/null +++ b/synapse/storage/schema/delta/38/postgres_fts_gist.sql @@ -0,0 +1,17 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + INSERT into background_updates (update_name, progress_json) + VALUES ('event_search_postgres_gist', '{}'); diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 12941d1775..8f2b3c4435 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -31,6 +31,7 @@ class SearchStore(BackgroundUpdateStore): EVENT_SEARCH_UPDATE_NAME = "event_search" EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order" + EVENT_SEARCH_USE_GIST_POSTGRES_NAME = "event_search_postgres_gist" def __init__(self, hs): super(SearchStore, self).__init__(hs) @@ -41,6 +42,10 @@ class SearchStore(BackgroundUpdateStore): self.EVENT_SEARCH_ORDER_UPDATE_NAME, self._background_reindex_search_order ) + self.register_background_update_handler( + self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME, + self._background_reindex_gist_search + ) @defer.inlineCallbacks def _background_reindex_search(self, progress, batch_size): @@ -140,6 +145,28 @@ class SearchStore(BackgroundUpdateStore): defer.returnValue(result) @defer.inlineCallbacks + def _background_reindex_gist_search(self, progress, batch_size): + def create_index(conn): + conn.rollback() + conn.set_session(autocommit=True) + c = conn.cursor() + + c.execute( + "CREATE INDEX CONCURRENTLY event_search_fts_idx_gist" + " ON event_search USING GIST (vector)" + ) + + c.execute("DROP INDEX event_search_fts_idx") + + conn.set_session(autocommit=False) + + if isinstance(self.database_engine, PostgresEngine): + yield self.runWithConnection(create_index) + + yield self._end_background_update(self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME) + defer.returnValue(1) + + @defer.inlineCallbacks def _background_reindex_search_order(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 5055c04b24..adab520c78 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -16,13 +16,12 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached -from twisted.internet import defer, reactor +from twisted.internet import defer from canonicaljson import encode_canonical_json from collections import namedtuple -import itertools import logging import ujson as json @@ -50,20 +49,6 @@ class TransactionStore(SQLBaseStore): def __init__(self, hs): super(TransactionStore, self).__init__(hs) - # New transactions that are currently in flights - self.inflight_transactions = {} - - # Newly delievered transactions that *weren't* persisted while in flight - self.new_delivered_transactions = {} - - # Newly delivered transactions that *were* persisted while in flight - self.update_delivered_transactions = {} - - self.last_transaction = {} - - reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns) - self._clock.looping_call(self._persist_in_mem_txns, 1000) - self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000) def get_received_txn_response(self, transaction_id, origin): @@ -148,46 +133,7 @@ class TransactionStore(SQLBaseStore): Returns: list: A list of previous transaction ids. """ - - auto_id = self._transaction_id_gen.get_next() - - txn_row = _TransactionRow( - id=auto_id, - transaction_id=transaction_id, - destination=destination, - ts=origin_server_ts, - response_code=0, - response_json=None, - ) - - self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row - - prev_txn = self.last_transaction.get(destination) - if prev_txn: - return defer.succeed(prev_txn) - else: - return self.runInteraction( - "_get_prevs_txn", - self._get_prevs_txn, - destination, - ) - - def _get_prevs_txn(self, txn, destination): - # First we find out what the prev_txns should be. - # Since we know that we are only sending one transaction at a time, - # we can simply take the last one. - query = ( - "SELECT * FROM sent_transactions" - " WHERE destination = ?" - " ORDER BY id DESC LIMIT 1" - ) - - txn.execute(query, (destination,)) - results = self.cursor_to_dict(txn) - - prev_txns = [r["transaction_id"] for r in results] - - return prev_txns + return defer.succeed([]) def delivered_txn(self, transaction_id, destination, code, response_dict): """Persists the response for an outgoing transaction. @@ -198,52 +144,7 @@ class TransactionStore(SQLBaseStore): code (int) response_json (str) """ - - txn_row = self.inflight_transactions.get( - destination, {} - ).pop(transaction_id, None) - - self.last_transaction[destination] = transaction_id - - if txn_row: - d = self.new_delivered_transactions.setdefault(destination, {}) - d[transaction_id] = txn_row._replace( - response_code=code, - response_json=None, # For now, don't persist response - ) - else: - d = self.update_delivered_transactions.setdefault(destination, {}) - # For now, don't persist response - d[transaction_id] = _UpdateTransactionRow(code, None) - - def get_transactions_after(self, transaction_id, destination): - """Get all transactions after a given local transaction_id. - - Args: - transaction_id (str) - destination (str) - - Returns: - list: A list of dicts - """ - return self.runInteraction( - "get_transactions_after", - self._get_transactions_after, transaction_id, destination - ) - - def _get_transactions_after(self, txn, transaction_id, destination): - query = ( - "SELECT * FROM sent_transactions" - " WHERE destination = ? AND id >" - " (" - " SELECT id FROM sent_transactions" - " WHERE transaction_id = ? AND destination = ?" - " )" - ) - - txn.execute(query, (destination, transaction_id, destination)) - - return self.cursor_to_dict(txn) + pass @cached(max_entries=10000) def get_destination_retry_timings(self, destination): @@ -339,58 +240,11 @@ class TransactionStore(SQLBaseStore): txn.execute(query, (self._clock.time_msec(),)) return self.cursor_to_dict(txn) - @defer.inlineCallbacks - def _persist_in_mem_txns(self): - try: - inflight = self.inflight_transactions - new_delivered = self.new_delivered_transactions - update_delivered = self.update_delivered_transactions - - self.inflight_transactions = {} - self.new_delivered_transactions = {} - self.update_delivered_transactions = {} - - full_rows = [ - row._asdict() - for txn_map in itertools.chain(inflight.values(), new_delivered.values()) - for row in txn_map.values() - ] - - def f(txn): - if full_rows: - self._simple_insert_many_txn( - txn=txn, - table="sent_transactions", - values=full_rows - ) - - for dest, txn_map in update_delivered.items(): - for txn_id, update_row in txn_map.items(): - self._simple_update_one_txn( - txn, - table="sent_transactions", - keyvalues={ - "transaction_id": txn_id, - "destination": dest, - }, - updatevalues={ - "response_code": update_row.response_code, - "response_json": None, # For now, don't persist response - } - ) - - if full_rows or update_delivered: - yield self.runInteraction("_persist_in_mem_txns", f) - except: - logger.exception("Failed to persist transactions!") - def _cleanup_transactions(self): now = self._clock.time_msec() month_ago = now - 30 * 24 * 60 * 60 * 1000 - six_hours_ago = now - 6 * 60 * 60 * 1000 def _cleanup_transactions_txn(txn): txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,)) - txn.execute("DELETE FROM sent_transactions WHERE ts < ?", (six_hours_ago,)) - return self.runInteraction("_persist_in_mem_txns", _cleanup_transactions_txn) + return self.runInteraction("_cleanup_transactions", _cleanup_transactions_txn) |