summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/storage/_base.py51
-rw-r--r--synapse/storage/data_stores/main/receipts.py2
2 files changed, 43 insertions, 10 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 6b8a9cd89a..459901ac60 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -409,16 +409,15 @@ class SQLBaseStore(object):
             i = 0
             N = 5
             while True:
+                cursor = LoggingTransaction(
+                    conn.cursor(),
+                    name,
+                    self.database_engine,
+                    after_callbacks,
+                    exception_callbacks,
+                )
                 try:
-                    txn = conn.cursor()
-                    txn = LoggingTransaction(
-                        txn,
-                        name,
-                        self.database_engine,
-                        after_callbacks,
-                        exception_callbacks,
-                    )
-                    r = func(txn, *args, **kwargs)
+                    r = func(cursor, *args, **kwargs)
                     conn.commit()
                     return r
                 except self.database_engine.module.OperationalError as e:
@@ -456,6 +455,40 @@ class SQLBaseStore(object):
                                 )
                             continue
                     raise
+                finally:
+                    # we're either about to retry with a new cursor, or we're about to
+                    # release the connection. Once we release the connection, it could
+                    # get used for another query, which might do a conn.rollback().
+                    #
+                    # In the latter case, even though that probably wouldn't affect the
+                    # results of this transaction, python's sqlite will reset all
+                    # statements on the connection [1], which will make our cursor
+                    # invalid [2].
+                    #
+                    # In any case, continuing to read rows after commit()ing seems
+                    # dubious from the PoV of ACID transactional semantics
+                    # (sqlite explicitly says that once you commit, you may see rows
+                    # from subsequent updates.)
+                    #
+                    # In psycopg2, cursors are essentially a client-side fabrication -
+                    # all the data is transferred to the client side when the statement
+                    # finishes executing - so in theory we could go on streaming results
+                    # from the cursor, but attempting to do so would make us
+                    # incompatible with sqlite, so let's make sure we're not doing that
+                    # by closing the cursor.
+                    #
+                    # (*named* cursors in psycopg2 are different and are proper server-
+                    # side things, but (a) we don't use them and (b) they are implicitly
+                    # closed by ending the transaction anyway.)
+                    #
+                    # In short, if we haven't finished with the cursor yet, that's a
+                    # problem waiting to bite us.
+                    #
+                    # TL;DR: we're done with the cursor, so we can close it.
+                    #
+                    # [1]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/connection.c#L465
+                    # [2]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/cursor.c#L236
+                    cursor.close()
         except Exception as e:
             logger.debug("[TXN FAIL] {%s} %s", name, e)
             raise
diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py
index 0c24430f28..8b17334ff4 100644
--- a/synapse/storage/data_stores/main/receipts.py
+++ b/synapse/storage/data_stores/main/receipts.py
@@ -280,7 +280,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
                 args.append(limit)
             txn.execute(sql, args)
 
-            return (r[0:5] + (json.loads(r[5]),) for r in txn)
+            return list(r[0:5] + (json.loads(r[5]),) for r in txn)
 
         return self.runInteraction(
             "get_all_updated_receipts", get_all_updated_receipts_txn