summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/federation_server.py5
-rw-r--r--synapse/federation/transaction_queue.py3
-rw-r--r--synapse/federation/transport/server.py47
-rw-r--r--synapse/handlers/presence.py2
-rw-r--r--synapse/replication/resource.py9
-rw-r--r--synapse/rest/__init__.py2
-rw-r--r--synapse/rest/client/v2_alpha/openid.py96
-rw-r--r--synapse/storage/__init__.py4
-rw-r--r--synapse/storage/events.py160
-rw-r--r--synapse/storage/openid.py32
-rw-r--r--synapse/storage/registration.py3
-rw-r--r--synapse/storage/schema/delta/32/openid.sql9
-rw-r--r--synapse/storage/transactions.py165
13 files changed, 479 insertions, 58 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 429ab6ddec..f1d231b9d8 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -388,6 +388,11 @@ class FederationServer(FederationBase):
         })
 
     @log_function
+    def on_openid_userinfo(self, token):
+        ts_now_ms = self._clock.time_msec()
+        return self.store.get_user_id_for_open_id_token(token, ts_now_ms)
+
+    @log_function
     def _get_persisted_pdu(self, origin, event_id, do_auth=True):
         """ Get a PDU from the database with given origin and id.
 
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 1928da03b3..5787f854d4 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -20,6 +20,7 @@ from .persistence import TransactionActions
 from .units import Transaction
 
 from synapse.api.errors import HttpResponseException
+from synapse.util.async import run_on_reactor
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.retryutils import (
@@ -199,6 +200,8 @@ class TransactionQueue(object):
     @defer.inlineCallbacks
     @log_function
     def _attempt_new_transaction(self, destination):
+        yield run_on_reactor()
+
         # list of (pending_pdu, deferred, order)
         if destination in self.pending_transactions:
             # XXX: pending_transactions can get stuck on by a never-ending
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 3e552b6c44..5b6c7d11dd 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -18,7 +18,7 @@ from twisted.internet import defer
 from synapse.api.urls import FEDERATION_PREFIX as PREFIX
 from synapse.api.errors import Codes, SynapseError
 from synapse.http.server import JsonResource
-from synapse.http.servlet import parse_json_object_from_request
+from synapse.http.servlet import parse_json_object_from_request, parse_string
 from synapse.util.ratelimitutils import FederationRateLimiter
 
 import functools
@@ -448,6 +448,50 @@ class On3pidBindServlet(BaseFederationServlet):
         return code
 
 
+class OpenIdUserInfo(BaseFederationServlet):
+    """
+    Exchange a bearer token for information about a user.
+
+    The response format should be compatible with:
+        http://openid.net/specs/openid-connect-core-1_0.html#UserInfoResponse
+
+    GET /openid/userinfo?access_token=ABDEFGH HTTP/1.1
+
+    HTTP/1.1 200 OK
+    Content-Type: application/json
+
+    {
+        "sub": "@userpart:example.org",
+    }
+    """
+
+    PATH = "/openid/userinfo"
+
+    @defer.inlineCallbacks
+    def on_GET(self, request):
+        token = parse_string(request, "access_token")
+        if token is None:
+            defer.returnValue((401, {
+                "errcode": "M_MISSING_TOKEN", "error": "Access Token required"
+            }))
+            return
+
+        user_id = yield self.handler.on_openid_userinfo(token)
+
+        if user_id is None:
+            defer.returnValue((401, {
+                "errcode": "M_UNKNOWN_TOKEN",
+                "error": "Access Token unknown or expired"
+            }))
+
+        defer.returnValue((200, {"sub": user_id}))
+
+    # Avoid doing remote HS authorization checks which are done by default by
+    # BaseFederationServlet.
+    def _wrap(self, code):
+        return code
+
+
 SERVLET_CLASSES = (
     FederationSendServlet,
     FederationPullServlet,
@@ -468,6 +512,7 @@ SERVLET_CLASSES = (
     FederationClientKeysClaimServlet,
     FederationThirdPartyInviteExchangeServlet,
     On3pidBindServlet,
+    OpenIdUserInfo,
 )
 
 
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index d0c8f1328b..639567953a 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -168,7 +168,7 @@ class PresenceHandler(BaseHandler):
         # The initial delay is to allow disconnected clients a chance to
         # reconnect before we treat them as offline.
         self.clock.call_later(
-            0 * 1000,
+            30 * 1000,
             self.clock.looping_call,
             self._handle_timeouts,
             5000,
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index ff78c60f13..69ad1de863 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -159,6 +159,15 @@ class ReplicationResource(Resource):
 
         result = yield self.notifier.wait_for_replication(replicate, timeout)
 
+        for stream_name, stream_content in result.items():
+            logger.info(
+                "Replicating %d rows of %s from %s -> %s",
+                len(stream_content["rows"]),
+                stream_name,
+                stream_content["position"],
+                request_streams.get(stream_name),
+            )
+
         request.write(json.dumps(result, ensure_ascii=False))
         finish_request(request)
 
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index e805cb9111..8b223e032b 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -45,6 +45,7 @@ from synapse.rest.client.v2_alpha import (
     tags,
     account_data,
     report_event,
+    openid,
 )
 
 from synapse.http.server import JsonResource
@@ -88,3 +89,4 @@ class ClientRestResource(JsonResource):
         tags.register_servlets(hs, client_resource)
         account_data.register_servlets(hs, client_resource)
         report_event.register_servlets(hs, client_resource)
+        openid.register_servlets(hs, client_resource)
diff --git a/synapse/rest/client/v2_alpha/openid.py b/synapse/rest/client/v2_alpha/openid.py
new file mode 100644
index 0000000000..aa1cae8e1e
--- /dev/null
+++ b/synapse/rest/client/v2_alpha/openid.py
@@ -0,0 +1,96 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ._base import client_v2_patterns
+
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.api.errors import AuthError
+from synapse.util.stringutils import random_string
+
+from twisted.internet import defer
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class IdTokenServlet(RestServlet):
+    """
+    Get a bearer token that may be passed to a third party to confirm ownership
+    of a matrix user id.
+
+    The format of the response could be made compatible with the format given
+    in http://openid.net/specs/openid-connect-core-1_0.html#TokenResponse
+
+    But instead of returning a signed "id_token" the response contains the
+    name of the issuing matrix homeserver. This means that for now the third
+    party will need to check the validity of the "id_token" against the
+    federation /openid/userinfo endpoint of the homeserver.
+
+    Request:
+
+    POST /user/{user_id}/openid/request_token?access_token=... HTTP/1.1
+
+    {}
+
+    Response:
+
+    HTTP/1.1 200 OK
+    {
+        "access_token": "ABDEFGH",
+        "token_type": "Bearer",
+        "matrix_server_name": "example.com",
+        "expires_in": 3600,
+    }
+    """
+    PATTERNS = client_v2_patterns(
+        "/user/(?P<user_id>[^/]*)/openid/request_token"
+    )
+
+    EXPIRES_MS = 3600 * 1000
+
+    def __init__(self, hs):
+        super(IdTokenServlet, self).__init__()
+        self.auth = hs.get_auth()
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+        self.server_name = hs.config.server_name
+
+    @defer.inlineCallbacks
+    def on_POST(self, request, user_id):
+        requester = yield self.auth.get_user_by_req(request)
+        if user_id != requester.user.to_string():
+            raise AuthError(403, "Cannot request tokens for other users.")
+
+        # Parse the request body to make sure it's JSON, but ignore the contents
+        # for now.
+        parse_json_object_from_request(request)
+
+        token = random_string(24)
+        ts_valid_until_ms = self.clock.time_msec() + self.EXPIRES_MS
+
+        yield self.store.insert_open_id_token(token, ts_valid_until_ms, user_id)
+
+        defer.returnValue((200, {
+            "access_token": token,
+            "token_type": "Bearer",
+            "matrix_server_name": self.server_name,
+            "expires_in": self.EXPIRES_MS / 1000,
+        }))
+
+
+def register_servlets(hs, http_server):
+    IdTokenServlet(hs).register(http_server)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 7122b0cbb1..d970fde9e8 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -44,6 +44,7 @@ from .receipts import ReceiptsStore
 from .search import SearchStore
 from .tags import TagsStore
 from .account_data import AccountDataStore
+from .openid import OpenIdStore
 
 from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator
 
@@ -81,7 +82,8 @@ class DataStore(RoomMemberStore, RoomStore,
                 SearchStore,
                 TagsStore,
                 AccountDataStore,
-                EventPushActionsStore
+                EventPushActionsStore,
+                OpenIdStore,
                 ):
 
     def __init__(self, db_conn, hs):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 0307b2af3c..a27b7919cd 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -19,12 +19,14 @@ from twisted.internet import defer, reactor
 from synapse.events import FrozenEvent, USE_FROZEN_DICTS
 from synapse.events.utils import prune_event
 
+from synapse.util.async import ObservableDeferred
 from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
 from synapse.util.logutils import log_function
 from synapse.api.constants import EventTypes
 
 from canonicaljson import encode_canonical_json
-from collections import namedtuple
+from collections import deque, namedtuple
+
 
 import logging
 import math
@@ -50,6 +52,93 @@ EVENT_QUEUE_ITERATIONS = 3  # No. times we block waiting for requests for events
 EVENT_QUEUE_TIMEOUT_S = 0.1  # Timeout when waiting for requests for events
 
 
+class _EventPeristenceQueue(object):
+    """Queues up events so that they can be persisted in bulk with only one
+    concurrent transaction per room.
+    """
+
+    _EventPersistQueueItem = namedtuple("_EventPersistQueueItem", (
+        "events_and_contexts", "current_state", "backfilled", "deferred",
+    ))
+
+    def __init__(self):
+        self._event_persist_queues = {}
+        self._currently_persisting_rooms = set()
+
+    def add_to_queue(self, room_id, events_and_contexts, backfilled, current_state):
+        """Add events to the queue, with the given persist_event options.
+        """
+        queue = self._event_persist_queues.setdefault(room_id, deque())
+        if queue:
+            end_item = queue[-1]
+            if end_item.current_state or current_state:
+                # We perist events with current_state set to True one at a time
+                pass
+            if end_item.backfilled == backfilled:
+                end_item.events_and_contexts.extend(events_and_contexts)
+                return end_item.deferred.observe()
+
+        deferred = ObservableDeferred(defer.Deferred())
+
+        queue.append(self._EventPersistQueueItem(
+            events_and_contexts=events_and_contexts,
+            backfilled=backfilled,
+            current_state=current_state,
+            deferred=deferred,
+        ))
+
+        return deferred.observe()
+
+    def handle_queue(self, room_id, per_item_callback):
+        """Attempts to handle the queue for a room if not already being handled.
+
+        The given callback will be invoked with for each item in the queue,1
+        of type _EventPersistQueueItem. The per_item_callback will continuously
+        be called with new items, unless the queue becomnes empty. The return
+        value of the function will be given to the deferreds waiting on the item,
+        exceptions will be passed to the deferres as well.
+
+        This function should therefore be called whenever anything is added
+        to the queue.
+
+        If another callback is currently handling the queue then it will not be
+        invoked.
+        """
+
+        if room_id in self._currently_persisting_rooms:
+            return
+
+        self._currently_persisting_rooms.add(room_id)
+
+        @defer.inlineCallbacks
+        def handle_queue_loop():
+            try:
+                queue = self._get_drainining_queue(room_id)
+                for item in queue:
+                    try:
+                        ret = yield per_item_callback(item)
+                        item.deferred.callback(ret)
+                    except Exception as e:
+                        item.deferred.errback(e)
+            finally:
+                queue = self._event_persist_queues.pop(room_id, None)
+                if queue:
+                    self._event_persist_queues[room_id] = queue
+                self._currently_persisting_rooms.discard(room_id)
+
+        preserve_fn(handle_queue_loop)()
+
+    def _get_drainining_queue(self, room_id):
+        queue = self._event_persist_queues.setdefault(room_id, deque())
+
+        try:
+            while True:
+                yield queue.popleft()
+        except IndexError:
+            # Queue has been drained.
+            pass
+
+
 class EventsStore(SQLBaseStore):
     EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
 
@@ -59,19 +148,72 @@ class EventsStore(SQLBaseStore):
             self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
         )
 
-    @defer.inlineCallbacks
+        self._event_persist_queue = _EventPeristenceQueue()
+
     def persist_events(self, events_and_contexts, backfilled=False):
         """
         Write events to the database
         Args:
             events_and_contexts: list of tuples of (event, context)
             backfilled: ?
+        """
+        partitioned = {}
+        for event, ctx in events_and_contexts:
+            partitioned.setdefault(event.room_id, []).append((event, ctx))
+
+        deferreds = []
+        for room_id, evs_ctxs in partitioned.items():
+            d = self._event_persist_queue.add_to_queue(
+                room_id, evs_ctxs,
+                backfilled=backfilled,
+                current_state=None,
+            )
+            deferreds.append(d)
 
-        Returns: Tuple of stream_orderings where the first is the minimum and
-            last is the maximum stream ordering assigned to the events when
-            persisting.
+        for room_id in partitioned.keys():
+            self._maybe_start_persisting(room_id)
 
-        """
+        return defer.gatherResults(deferreds, consumeErrors=True)
+
+    @defer.inlineCallbacks
+    @log_function
+    def persist_event(self, event, context, current_state=None, backfilled=False):
+        deferred = self._event_persist_queue.add_to_queue(
+            event.room_id, [(event, context)],
+            backfilled=backfilled,
+            current_state=current_state,
+        )
+
+        self._maybe_start_persisting(event.room_id)
+
+        yield deferred
+
+        max_persisted_id = yield self._stream_id_gen.get_current_token()
+        defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id))
+
+    def _maybe_start_persisting(self, room_id):
+        @defer.inlineCallbacks
+        def persisting_queue(item):
+            if item.current_state:
+                for event, context in item.events_and_contexts:
+                    # There should only ever be one item in
+                    # events_and_contexts when current_state is
+                    # not None
+                    yield self._persist_event(
+                        event, context,
+                        current_state=item.current_state,
+                        backfilled=item.backfilled,
+                    )
+            else:
+                yield self._persist_events(
+                    item.events_and_contexts,
+                    backfilled=item.backfilled,
+                )
+
+        self._event_persist_queue.handle_queue(room_id, persisting_queue)
+
+    @defer.inlineCallbacks
+    def _persist_events(self, events_and_contexts, backfilled=False):
         if not events_and_contexts:
             return
 
@@ -118,8 +260,7 @@ class EventsStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     @log_function
-    def persist_event(self, event, context, current_state=None, backfilled=False):
-
+    def _persist_event(self, event, context, current_state=None, backfilled=False):
         try:
             with self._stream_id_gen.get_next() as stream_ordering:
                 with self._state_groups_id_gen.get_next() as state_group_id:
@@ -136,9 +277,6 @@ class EventsStore(SQLBaseStore):
         except _RollbackButIsFineException:
             pass
 
-        max_persisted_id = yield self._stream_id_gen.get_current_token()
-        defer.returnValue((stream_ordering, max_persisted_id))
-
     @defer.inlineCallbacks
     def get_event(self, event_id, check_redacted=True,
                   get_prev_content=False, allow_rejected=False,
diff --git a/synapse/storage/openid.py b/synapse/storage/openid.py
new file mode 100644
index 0000000000..5dabb607bd
--- /dev/null
+++ b/synapse/storage/openid.py
@@ -0,0 +1,32 @@
+from ._base import SQLBaseStore
+
+
+class OpenIdStore(SQLBaseStore):
+    def insert_open_id_token(self, token, ts_valid_until_ms, user_id):
+        return self._simple_insert(
+            table="open_id_tokens",
+            values={
+                "token": token,
+                "ts_valid_until_ms": ts_valid_until_ms,
+                "user_id": user_id,
+            },
+            desc="insert_open_id_token"
+        )
+
+    def get_user_id_for_open_id_token(self, token, ts_now_ms):
+        def get_user_id_for_token_txn(txn):
+            sql = (
+                "SELECT user_id FROM open_id_tokens"
+                " WHERE token = ? AND ? <= ts_valid_until_ms"
+            )
+
+            txn.execute(sql, (token, ts_now_ms))
+
+            rows = txn.fetchall()
+            if not rows:
+                return None
+            else:
+                return rows[0][0]
+        return self.runInteraction(
+            "get_user_id_for_token", get_user_id_for_token_txn
+        )
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 7af0cae6a5..bda84a744a 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -101,6 +101,7 @@ class RegistrationStore(SQLBaseStore):
             make_guest,
             appservice_id
         )
+        self.get_user_by_id.invalidate((user_id,))
         self.is_guest.invalidate((user_id,))
 
     def _register(
@@ -156,6 +157,7 @@ class RegistrationStore(SQLBaseStore):
                 (next_id, user_id, token,)
             )
 
+    @cached()
     def get_user_by_id(self, user_id):
         return self._simple_select_one(
             table="users",
@@ -193,6 +195,7 @@ class RegistrationStore(SQLBaseStore):
         }, {
             'password_hash': password_hash
         })
+        self.get_user_by_id.invalidate((user_id,))
 
     @defer.inlineCallbacks
     def user_delete_access_tokens(self, user_id, except_token_ids=[]):
diff --git a/synapse/storage/schema/delta/32/openid.sql b/synapse/storage/schema/delta/32/openid.sql
new file mode 100644
index 0000000000..36f37b11c8
--- /dev/null
+++ b/synapse/storage/schema/delta/32/openid.sql
@@ -0,0 +1,9 @@
+
+CREATE TABLE open_id_tokens (
+    token TEXT NOT NULL PRIMARY KEY,
+    ts_valid_until_ms bigint NOT NULL,
+    user_id TEXT NOT NULL,
+    UNIQUE (token)
+);
+
+CREATE index open_id_tokens_ts_valid_until_ms ON open_id_tokens(ts_valid_until_ms);
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index d338dfcf0a..6c7481a728 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -16,16 +16,56 @@
 from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cached
 
+from twisted.internet import defer, reactor
+
 from canonicaljson import encode_canonical_json
+
+from collections import namedtuple
+
+import itertools
 import logging
 
 logger = logging.getLogger(__name__)
 
 
+_TransactionRow = namedtuple(
+    "_TransactionRow", (
+        "id", "transaction_id", "destination", "ts", "response_code",
+        "response_json",
+    )
+)
+
+_UpdateTransactionRow = namedtuple(
+    "_TransactionRow", (
+        "response_code", "response_json",
+    )
+)
+
+
 class TransactionStore(SQLBaseStore):
     """A collection of queries for handling PDUs.
     """
 
+    def __init__(self, hs):
+        super(TransactionStore, self).__init__(hs)
+
+        # New transactions that are currently in flights
+        self.inflight_transactions = {}
+
+        # Newly delievered transactions that *weren't* persisted while in flight
+        self.new_delivered_transactions = {}
+
+        # Newly delivered transactions that *were* persisted while in flight
+        self.update_delivered_transactions = {}
+
+        self.last_transaction = {}
+
+        reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns)
+        hs.get_clock().looping_call(
+            self._persist_in_mem_txns,
+            1000,
+        )
+
     def get_received_txn_response(self, transaction_id, origin):
         """For an incoming transaction from a given origin, check if we have
         already responded to it. If so, return the response code and response
@@ -108,17 +148,30 @@ class TransactionStore(SQLBaseStore):
             list: A list of previous transaction ids.
         """
 
-        return self.runInteraction(
-            "prep_send_transaction",
-            self._prep_send_transaction,
-            transaction_id, destination, origin_server_ts
+        auto_id = self._transaction_id_gen.get_next()
+
+        txn_row = _TransactionRow(
+            id=auto_id,
+            transaction_id=transaction_id,
+            destination=destination,
+            ts=origin_server_ts,
+            response_code=0,
+            response_json=None,
         )
 
-    def _prep_send_transaction(self, txn, transaction_id, destination,
-                               origin_server_ts):
+        self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row
 
-        next_id = self._transaction_id_gen.get_next()
+        prev_txn = self.last_transaction.get(destination)
+        if prev_txn:
+            return defer.succeed(prev_txn)
+        else:
+            return self.runInteraction(
+                "_get_prevs_txn",
+                self._get_prevs_txn,
+                destination,
+            )
 
+    def _get_prevs_txn(self, txn, destination):
         # First we find out what the prev_txns should be.
         # Since we know that we are only sending one transaction at a time,
         # we can simply take the last one.
@@ -133,23 +186,6 @@ class TransactionStore(SQLBaseStore):
 
         prev_txns = [r["transaction_id"] for r in results]
 
-        # Actually add the new transaction to the sent_transactions table.
-
-        self._simple_insert_txn(
-            txn,
-            table="sent_transactions",
-            values={
-                "id": next_id,
-                "transaction_id": transaction_id,
-                "destination": destination,
-                "ts": origin_server_ts,
-                "response_code": 0,
-                "response_json": None,
-            }
-        )
-
-        # TODO Update the tx id -> pdu id mapping
-
         return prev_txns
 
     def delivered_txn(self, transaction_id, destination, code, response_dict):
@@ -161,27 +197,23 @@ class TransactionStore(SQLBaseStore):
             code (int)
             response_json (str)
         """
-        return self.runInteraction(
-            "delivered_txn",
-            self._delivered_txn,
-            transaction_id, destination, code,
-            buffer(encode_canonical_json(response_dict)),
-        )
 
-    def _delivered_txn(self, txn, transaction_id, destination,
-                       code, response_json):
-        self._simple_update_one_txn(
-            txn,
-            table="sent_transactions",
-            keyvalues={
-                "transaction_id": transaction_id,
-                "destination": destination,
-            },
-            updatevalues={
-                "response_code": code,
-                "response_json": None,  # For now, don't persist response_json
-            }
-        )
+        txn_row = self.inflight_transactions.get(
+            destination, {}
+        ).pop(transaction_id, None)
+
+        self.last_transaction[destination] = transaction_id
+
+        if txn_row:
+            d = self.new_delivered_transactions.setdefault(destination, {})
+            d[transaction_id] = txn_row._replace(
+                response_code=code,
+                response_json=None,  # For now, don't persist response
+            )
+        else:
+            d = self.update_delivered_transactions.setdefault(destination, {})
+            # For now, don't persist response
+            d[transaction_id] = _UpdateTransactionRow(code, None)
 
     def get_transactions_after(self, transaction_id, destination):
         """Get all transactions after a given local transaction_id.
@@ -305,3 +337,48 @@ class TransactionStore(SQLBaseStore):
 
         txn.execute(query, (self._clock.time_msec(),))
         return self.cursor_to_dict(txn)
+
+    @defer.inlineCallbacks
+    def _persist_in_mem_txns(self):
+        try:
+            inflight = self.inflight_transactions
+            new_delivered = self.new_delivered_transactions
+            update_delivered = self.update_delivered_transactions
+
+            self.inflight_transactions = {}
+            self.new_delivered_transactions = {}
+            self.update_delivered_transactions = {}
+
+            full_rows = [
+                row._asdict()
+                for txn_map in itertools.chain(inflight.values(), new_delivered.values())
+                for row in txn_map.values()
+            ]
+
+            def f(txn):
+                if full_rows:
+                    self._simple_insert_many_txn(
+                        txn=txn,
+                        table="sent_transactions",
+                        values=full_rows
+                    )
+
+                for dest, txn_map in update_delivered.items():
+                    for txn_id, update_row in txn_map.items():
+                        self._simple_update_one_txn(
+                            txn,
+                            table="sent_transactions",
+                            keyvalues={
+                                "transaction_id": txn_id,
+                                "destination": dest,
+                            },
+                            updatevalues={
+                                "response_code": update_row.response_code,
+                                "response_json": None,  # For now, don't persist response
+                            }
+                        )
+
+            if full_rows or update_delivered:
+                yield self.runInteraction("_persist_in_mem_txns", f)
+        except:
+            logger.exception("Failed to persist transactions!")