diff options
author | Erik Johnston <erik@matrix.org> | 2016-05-09 13:05:09 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-05-09 13:05:09 +0100 |
commit | def64d6ef32ea1b2acc92e470db307e97c6eac05 (patch) | |
tree | 8d29d7d432a5b703d15f1176ae7e720b1caa5805 | |
parent | Add bulk fetch storage API (diff) | |
parent | Merge pull request #764 from matrix-org/erikj/replication_logging (diff) | |
download | synapse-def64d6ef32ea1b2acc92e470db307e97c6eac05.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/ignore_user
-rw-r--r-- | synapse/federation/federation_server.py | 5 | ||||
-rw-r--r-- | synapse/federation/transaction_queue.py | 3 | ||||
-rw-r--r-- | synapse/federation/transport/server.py | 47 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 2 | ||||
-rw-r--r-- | synapse/replication/resource.py | 9 | ||||
-rw-r--r-- | synapse/rest/__init__.py | 2 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/openid.py | 96 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 4 | ||||
-rw-r--r-- | synapse/storage/events.py | 160 | ||||
-rw-r--r-- | synapse/storage/openid.py | 32 | ||||
-rw-r--r-- | synapse/storage/registration.py | 3 | ||||
-rw-r--r-- | synapse/storage/schema/delta/32/openid.sql | 9 | ||||
-rw-r--r-- | synapse/storage/transactions.py | 165 |
13 files changed, 479 insertions, 58 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 429ab6ddec..f1d231b9d8 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -388,6 +388,11 @@ class FederationServer(FederationBase): }) @log_function + def on_openid_userinfo(self, token): + ts_now_ms = self._clock.time_msec() + return self.store.get_user_id_for_open_id_token(token, ts_now_ms) + + @log_function def _get_persisted_pdu(self, origin, event_id, do_auth=True): """ Get a PDU from the database with given origin and id. diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 1928da03b3..5787f854d4 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -20,6 +20,7 @@ from .persistence import TransactionActions from .units import Transaction from synapse.api.errors import HttpResponseException +from synapse.util.async import run_on_reactor from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext from synapse.util.retryutils import ( @@ -199,6 +200,8 @@ class TransactionQueue(object): @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): + yield run_on_reactor() + # list of (pending_pdu, deferred, order) if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 3e552b6c44..5b6c7d11dd 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX from synapse.api.errors import Codes, SynapseError from synapse.http.server import JsonResource -from synapse.http.servlet import parse_json_object_from_request +from synapse.http.servlet import parse_json_object_from_request, parse_string from synapse.util.ratelimitutils import FederationRateLimiter import functools @@ -448,6 +448,50 @@ class On3pidBindServlet(BaseFederationServlet): return code +class OpenIdUserInfo(BaseFederationServlet): + """ + Exchange a bearer token for information about a user. + + The response format should be compatible with: + http://openid.net/specs/openid-connect-core-1_0.html#UserInfoResponse + + GET /openid/userinfo?access_token=ABDEFGH HTTP/1.1 + + HTTP/1.1 200 OK + Content-Type: application/json + + { + "sub": "@userpart:example.org", + } + """ + + PATH = "/openid/userinfo" + + @defer.inlineCallbacks + def on_GET(self, request): + token = parse_string(request, "access_token") + if token is None: + defer.returnValue((401, { + "errcode": "M_MISSING_TOKEN", "error": "Access Token required" + })) + return + + user_id = yield self.handler.on_openid_userinfo(token) + + if user_id is None: + defer.returnValue((401, { + "errcode": "M_UNKNOWN_TOKEN", + "error": "Access Token unknown or expired" + })) + + defer.returnValue((200, {"sub": user_id})) + + # Avoid doing remote HS authorization checks which are done by default by + # BaseFederationServlet. + def _wrap(self, code): + return code + + SERVLET_CLASSES = ( FederationSendServlet, FederationPullServlet, @@ -468,6 +512,7 @@ SERVLET_CLASSES = ( FederationClientKeysClaimServlet, FederationThirdPartyInviteExchangeServlet, On3pidBindServlet, + OpenIdUserInfo, ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index d0c8f1328b..639567953a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -168,7 +168,7 @@ class PresenceHandler(BaseHandler): # The initial delay is to allow disconnected clients a chance to # reconnect before we treat them as offline. self.clock.call_later( - 0 * 1000, + 30 * 1000, self.clock.looping_call, self._handle_timeouts, 5000, diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index ff78c60f13..69ad1de863 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -159,6 +159,15 @@ class ReplicationResource(Resource): result = yield self.notifier.wait_for_replication(replicate, timeout) + for stream_name, stream_content in result.items(): + logger.info( + "Replicating %d rows of %s from %s -> %s", + len(stream_content["rows"]), + stream_name, + stream_content["position"], + request_streams.get(stream_name), + ) + request.write(json.dumps(result, ensure_ascii=False)) finish_request(request) diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index e805cb9111..8b223e032b 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -45,6 +45,7 @@ from synapse.rest.client.v2_alpha import ( tags, account_data, report_event, + openid, ) from synapse.http.server import JsonResource @@ -88,3 +89,4 @@ class ClientRestResource(JsonResource): tags.register_servlets(hs, client_resource) account_data.register_servlets(hs, client_resource) report_event.register_servlets(hs, client_resource) + openid.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v2_alpha/openid.py b/synapse/rest/client/v2_alpha/openid.py new file mode 100644 index 0000000000..aa1cae8e1e --- /dev/null +++ b/synapse/rest/client/v2_alpha/openid.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ._base import client_v2_patterns + +from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.api.errors import AuthError +from synapse.util.stringutils import random_string + +from twisted.internet import defer + +import logging + +logger = logging.getLogger(__name__) + + +class IdTokenServlet(RestServlet): + """ + Get a bearer token that may be passed to a third party to confirm ownership + of a matrix user id. + + The format of the response could be made compatible with the format given + in http://openid.net/specs/openid-connect-core-1_0.html#TokenResponse + + But instead of returning a signed "id_token" the response contains the + name of the issuing matrix homeserver. This means that for now the third + party will need to check the validity of the "id_token" against the + federation /openid/userinfo endpoint of the homeserver. + + Request: + + POST /user/{user_id}/openid/request_token?access_token=... HTTP/1.1 + + {} + + Response: + + HTTP/1.1 200 OK + { + "access_token": "ABDEFGH", + "token_type": "Bearer", + "matrix_server_name": "example.com", + "expires_in": 3600, + } + """ + PATTERNS = client_v2_patterns( + "/user/(?P<user_id>[^/]*)/openid/request_token" + ) + + EXPIRES_MS = 3600 * 1000 + + def __init__(self, hs): + super(IdTokenServlet, self).__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastore() + self.clock = hs.get_clock() + self.server_name = hs.config.server_name + + @defer.inlineCallbacks + def on_POST(self, request, user_id): + requester = yield self.auth.get_user_by_req(request) + if user_id != requester.user.to_string(): + raise AuthError(403, "Cannot request tokens for other users.") + + # Parse the request body to make sure it's JSON, but ignore the contents + # for now. + parse_json_object_from_request(request) + + token = random_string(24) + ts_valid_until_ms = self.clock.time_msec() + self.EXPIRES_MS + + yield self.store.insert_open_id_token(token, ts_valid_until_ms, user_id) + + defer.returnValue((200, { + "access_token": token, + "token_type": "Bearer", + "matrix_server_name": self.server_name, + "expires_in": self.EXPIRES_MS / 1000, + })) + + +def register_servlets(hs, http_server): + IdTokenServlet(hs).register(http_server) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 7122b0cbb1..d970fde9e8 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -44,6 +44,7 @@ from .receipts import ReceiptsStore from .search import SearchStore from .tags import TagsStore from .account_data import AccountDataStore +from .openid import OpenIdStore from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator @@ -81,7 +82,8 @@ class DataStore(RoomMemberStore, RoomStore, SearchStore, TagsStore, AccountDataStore, - EventPushActionsStore + EventPushActionsStore, + OpenIdStore, ): def __init__(self, db_conn, hs): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 0307b2af3c..a27b7919cd 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -19,12 +19,14 @@ from twisted.internet import defer, reactor from synapse.events import FrozenEvent, USE_FROZEN_DICTS from synapse.events.utils import prune_event +from synapse.util.async import ObservableDeferred from synapse.util.logcontext import preserve_fn, PreserveLoggingContext from synapse.util.logutils import log_function from synapse.api.constants import EventTypes from canonicaljson import encode_canonical_json -from collections import namedtuple +from collections import deque, namedtuple + import logging import math @@ -50,6 +52,93 @@ EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for requests for events EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events +class _EventPeristenceQueue(object): + """Queues up events so that they can be persisted in bulk with only one + concurrent transaction per room. + """ + + _EventPersistQueueItem = namedtuple("_EventPersistQueueItem", ( + "events_and_contexts", "current_state", "backfilled", "deferred", + )) + + def __init__(self): + self._event_persist_queues = {} + self._currently_persisting_rooms = set() + + def add_to_queue(self, room_id, events_and_contexts, backfilled, current_state): + """Add events to the queue, with the given persist_event options. + """ + queue = self._event_persist_queues.setdefault(room_id, deque()) + if queue: + end_item = queue[-1] + if end_item.current_state or current_state: + # We perist events with current_state set to True one at a time + pass + if end_item.backfilled == backfilled: + end_item.events_and_contexts.extend(events_and_contexts) + return end_item.deferred.observe() + + deferred = ObservableDeferred(defer.Deferred()) + + queue.append(self._EventPersistQueueItem( + events_and_contexts=events_and_contexts, + backfilled=backfilled, + current_state=current_state, + deferred=deferred, + )) + + return deferred.observe() + + def handle_queue(self, room_id, per_item_callback): + """Attempts to handle the queue for a room if not already being handled. + + The given callback will be invoked with for each item in the queue,1 + of type _EventPersistQueueItem. The per_item_callback will continuously + be called with new items, unless the queue becomnes empty. The return + value of the function will be given to the deferreds waiting on the item, + exceptions will be passed to the deferres as well. + + This function should therefore be called whenever anything is added + to the queue. + + If another callback is currently handling the queue then it will not be + invoked. + """ + + if room_id in self._currently_persisting_rooms: + return + + self._currently_persisting_rooms.add(room_id) + + @defer.inlineCallbacks + def handle_queue_loop(): + try: + queue = self._get_drainining_queue(room_id) + for item in queue: + try: + ret = yield per_item_callback(item) + item.deferred.callback(ret) + except Exception as e: + item.deferred.errback(e) + finally: + queue = self._event_persist_queues.pop(room_id, None) + if queue: + self._event_persist_queues[room_id] = queue + self._currently_persisting_rooms.discard(room_id) + + preserve_fn(handle_queue_loop)() + + def _get_drainining_queue(self, room_id): + queue = self._event_persist_queues.setdefault(room_id, deque()) + + try: + while True: + yield queue.popleft() + except IndexError: + # Queue has been drained. + pass + + class EventsStore(SQLBaseStore): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" @@ -59,19 +148,72 @@ class EventsStore(SQLBaseStore): self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts ) - @defer.inlineCallbacks + self._event_persist_queue = _EventPeristenceQueue() + def persist_events(self, events_and_contexts, backfilled=False): """ Write events to the database Args: events_and_contexts: list of tuples of (event, context) backfilled: ? + """ + partitioned = {} + for event, ctx in events_and_contexts: + partitioned.setdefault(event.room_id, []).append((event, ctx)) + + deferreds = [] + for room_id, evs_ctxs in partitioned.items(): + d = self._event_persist_queue.add_to_queue( + room_id, evs_ctxs, + backfilled=backfilled, + current_state=None, + ) + deferreds.append(d) - Returns: Tuple of stream_orderings where the first is the minimum and - last is the maximum stream ordering assigned to the events when - persisting. + for room_id in partitioned.keys(): + self._maybe_start_persisting(room_id) - """ + return defer.gatherResults(deferreds, consumeErrors=True) + + @defer.inlineCallbacks + @log_function + def persist_event(self, event, context, current_state=None, backfilled=False): + deferred = self._event_persist_queue.add_to_queue( + event.room_id, [(event, context)], + backfilled=backfilled, + current_state=current_state, + ) + + self._maybe_start_persisting(event.room_id) + + yield deferred + + max_persisted_id = yield self._stream_id_gen.get_current_token() + defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id)) + + def _maybe_start_persisting(self, room_id): + @defer.inlineCallbacks + def persisting_queue(item): + if item.current_state: + for event, context in item.events_and_contexts: + # There should only ever be one item in + # events_and_contexts when current_state is + # not None + yield self._persist_event( + event, context, + current_state=item.current_state, + backfilled=item.backfilled, + ) + else: + yield self._persist_events( + item.events_and_contexts, + backfilled=item.backfilled, + ) + + self._event_persist_queue.handle_queue(room_id, persisting_queue) + + @defer.inlineCallbacks + def _persist_events(self, events_and_contexts, backfilled=False): if not events_and_contexts: return @@ -118,8 +260,7 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks @log_function - def persist_event(self, event, context, current_state=None, backfilled=False): - + def _persist_event(self, event, context, current_state=None, backfilled=False): try: with self._stream_id_gen.get_next() as stream_ordering: with self._state_groups_id_gen.get_next() as state_group_id: @@ -136,9 +277,6 @@ class EventsStore(SQLBaseStore): except _RollbackButIsFineException: pass - max_persisted_id = yield self._stream_id_gen.get_current_token() - defer.returnValue((stream_ordering, max_persisted_id)) - @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False, diff --git a/synapse/storage/openid.py b/synapse/storage/openid.py new file mode 100644 index 0000000000..5dabb607bd --- /dev/null +++ b/synapse/storage/openid.py @@ -0,0 +1,32 @@ +from ._base import SQLBaseStore + + +class OpenIdStore(SQLBaseStore): + def insert_open_id_token(self, token, ts_valid_until_ms, user_id): + return self._simple_insert( + table="open_id_tokens", + values={ + "token": token, + "ts_valid_until_ms": ts_valid_until_ms, + "user_id": user_id, + }, + desc="insert_open_id_token" + ) + + def get_user_id_for_open_id_token(self, token, ts_now_ms): + def get_user_id_for_token_txn(txn): + sql = ( + "SELECT user_id FROM open_id_tokens" + " WHERE token = ? AND ? <= ts_valid_until_ms" + ) + + txn.execute(sql, (token, ts_now_ms)) + + rows = txn.fetchall() + if not rows: + return None + else: + return rows[0][0] + return self.runInteraction( + "get_user_id_for_token", get_user_id_for_token_txn + ) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 7af0cae6a5..bda84a744a 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -101,6 +101,7 @@ class RegistrationStore(SQLBaseStore): make_guest, appservice_id ) + self.get_user_by_id.invalidate((user_id,)) self.is_guest.invalidate((user_id,)) def _register( @@ -156,6 +157,7 @@ class RegistrationStore(SQLBaseStore): (next_id, user_id, token,) ) + @cached() def get_user_by_id(self, user_id): return self._simple_select_one( table="users", @@ -193,6 +195,7 @@ class RegistrationStore(SQLBaseStore): }, { 'password_hash': password_hash }) + self.get_user_by_id.invalidate((user_id,)) @defer.inlineCallbacks def user_delete_access_tokens(self, user_id, except_token_ids=[]): diff --git a/synapse/storage/schema/delta/32/openid.sql b/synapse/storage/schema/delta/32/openid.sql new file mode 100644 index 0000000000..36f37b11c8 --- /dev/null +++ b/synapse/storage/schema/delta/32/openid.sql @@ -0,0 +1,9 @@ + +CREATE TABLE open_id_tokens ( + token TEXT NOT NULL PRIMARY KEY, + ts_valid_until_ms bigint NOT NULL, + user_id TEXT NOT NULL, + UNIQUE (token) +); + +CREATE index open_id_tokens_ts_valid_until_ms ON open_id_tokens(ts_valid_until_ms); diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index d338dfcf0a..6c7481a728 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -16,16 +16,56 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached +from twisted.internet import defer, reactor + from canonicaljson import encode_canonical_json + +from collections import namedtuple + +import itertools import logging logger = logging.getLogger(__name__) +_TransactionRow = namedtuple( + "_TransactionRow", ( + "id", "transaction_id", "destination", "ts", "response_code", + "response_json", + ) +) + +_UpdateTransactionRow = namedtuple( + "_TransactionRow", ( + "response_code", "response_json", + ) +) + + class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. """ + def __init__(self, hs): + super(TransactionStore, self).__init__(hs) + + # New transactions that are currently in flights + self.inflight_transactions = {} + + # Newly delievered transactions that *weren't* persisted while in flight + self.new_delivered_transactions = {} + + # Newly delivered transactions that *were* persisted while in flight + self.update_delivered_transactions = {} + + self.last_transaction = {} + + reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns) + hs.get_clock().looping_call( + self._persist_in_mem_txns, + 1000, + ) + def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have already responded to it. If so, return the response code and response @@ -108,17 +148,30 @@ class TransactionStore(SQLBaseStore): list: A list of previous transaction ids. """ - return self.runInteraction( - "prep_send_transaction", - self._prep_send_transaction, - transaction_id, destination, origin_server_ts + auto_id = self._transaction_id_gen.get_next() + + txn_row = _TransactionRow( + id=auto_id, + transaction_id=transaction_id, + destination=destination, + ts=origin_server_ts, + response_code=0, + response_json=None, ) - def _prep_send_transaction(self, txn, transaction_id, destination, - origin_server_ts): + self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row - next_id = self._transaction_id_gen.get_next() + prev_txn = self.last_transaction.get(destination) + if prev_txn: + return defer.succeed(prev_txn) + else: + return self.runInteraction( + "_get_prevs_txn", + self._get_prevs_txn, + destination, + ) + def _get_prevs_txn(self, txn, destination): # First we find out what the prev_txns should be. # Since we know that we are only sending one transaction at a time, # we can simply take the last one. @@ -133,23 +186,6 @@ class TransactionStore(SQLBaseStore): prev_txns = [r["transaction_id"] for r in results] - # Actually add the new transaction to the sent_transactions table. - - self._simple_insert_txn( - txn, - table="sent_transactions", - values={ - "id": next_id, - "transaction_id": transaction_id, - "destination": destination, - "ts": origin_server_ts, - "response_code": 0, - "response_json": None, - } - ) - - # TODO Update the tx id -> pdu id mapping - return prev_txns def delivered_txn(self, transaction_id, destination, code, response_dict): @@ -161,27 +197,23 @@ class TransactionStore(SQLBaseStore): code (int) response_json (str) """ - return self.runInteraction( - "delivered_txn", - self._delivered_txn, - transaction_id, destination, code, - buffer(encode_canonical_json(response_dict)), - ) - def _delivered_txn(self, txn, transaction_id, destination, - code, response_json): - self._simple_update_one_txn( - txn, - table="sent_transactions", - keyvalues={ - "transaction_id": transaction_id, - "destination": destination, - }, - updatevalues={ - "response_code": code, - "response_json": None, # For now, don't persist response_json - } - ) + txn_row = self.inflight_transactions.get( + destination, {} + ).pop(transaction_id, None) + + self.last_transaction[destination] = transaction_id + + if txn_row: + d = self.new_delivered_transactions.setdefault(destination, {}) + d[transaction_id] = txn_row._replace( + response_code=code, + response_json=None, # For now, don't persist response + ) + else: + d = self.update_delivered_transactions.setdefault(destination, {}) + # For now, don't persist response + d[transaction_id] = _UpdateTransactionRow(code, None) def get_transactions_after(self, transaction_id, destination): """Get all transactions after a given local transaction_id. @@ -305,3 +337,48 @@ class TransactionStore(SQLBaseStore): txn.execute(query, (self._clock.time_msec(),)) return self.cursor_to_dict(txn) + + @defer.inlineCallbacks + def _persist_in_mem_txns(self): + try: + inflight = self.inflight_transactions + new_delivered = self.new_delivered_transactions + update_delivered = self.update_delivered_transactions + + self.inflight_transactions = {} + self.new_delivered_transactions = {} + self.update_delivered_transactions = {} + + full_rows = [ + row._asdict() + for txn_map in itertools.chain(inflight.values(), new_delivered.values()) + for row in txn_map.values() + ] + + def f(txn): + if full_rows: + self._simple_insert_many_txn( + txn=txn, + table="sent_transactions", + values=full_rows + ) + + for dest, txn_map in update_delivered.items(): + for txn_id, update_row in txn_map.items(): + self._simple_update_one_txn( + txn, + table="sent_transactions", + keyvalues={ + "transaction_id": txn_id, + "destination": dest, + }, + updatevalues={ + "response_code": update_row.response_code, + "response_json": None, # For now, don't persist response + } + ) + + if full_rows or update_delivered: + yield self.runInteraction("_persist_in_mem_txns", f) + except: + logger.exception("Failed to persist transactions!") |