diff options
author | Brendan Abolivier <babolivier@matrix.org> | 2019-11-26 17:53:57 +0000 |
---|---|---|
committer | Brendan Abolivier <babolivier@matrix.org> | 2019-11-26 17:53:57 +0000 |
commit | 9e937c28ee2013d01716a4647ddb8df34c7ec3cd (patch) | |
tree | ddb7c3a62227bfa760b9c9c8fcd78359f4bc4636 /synapse/storage/_base.py | |
parent | Don't restrict the tests to v1 rooms (diff) | |
parent | Merge pull request #6420 from matrix-org/erikj/fix_find_next_generated_user_i... (diff) | |
download | synapse-9e937c28ee2013d01716a4647ddb8df34c7ec3cd.tar.xz |
Merge branch 'develop' into babolivier/message_retention
Diffstat (limited to 'synapse/storage/_base.py')
-rw-r--r-- | synapse/storage/_base.py | 62 |
1 files changed, 46 insertions, 16 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 1a2b7ebe25..459901ac60 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -361,14 +361,11 @@ class SQLBaseStore(object): expiration_ts, ) - self._simple_insert_txn( + self._simple_upsert_txn( txn, "account_validity", - values={ - "user_id": user_id, - "expiration_ts_ms": expiration_ts, - "email_sent": False, - }, + keyvalues={"user_id": user_id}, + values={"expiration_ts_ms": expiration_ts, "email_sent": False}, ) def start_profiling(self): @@ -412,16 +409,15 @@ class SQLBaseStore(object): i = 0 N = 5 while True: + cursor = LoggingTransaction( + conn.cursor(), + name, + self.database_engine, + after_callbacks, + exception_callbacks, + ) try: - txn = conn.cursor() - txn = LoggingTransaction( - txn, - name, - self.database_engine, - after_callbacks, - exception_callbacks, - ) - r = func(txn, *args, **kwargs) + r = func(cursor, *args, **kwargs) conn.commit() return r except self.database_engine.module.OperationalError as e: @@ -459,6 +455,40 @@ class SQLBaseStore(object): ) continue raise + finally: + # we're either about to retry with a new cursor, or we're about to + # release the connection. Once we release the connection, it could + # get used for another query, which might do a conn.rollback(). + # + # In the latter case, even though that probably wouldn't affect the + # results of this transaction, python's sqlite will reset all + # statements on the connection [1], which will make our cursor + # invalid [2]. + # + # In any case, continuing to read rows after commit()ing seems + # dubious from the PoV of ACID transactional semantics + # (sqlite explicitly says that once you commit, you may see rows + # from subsequent updates.) + # + # In psycopg2, cursors are essentially a client-side fabrication - + # all the data is transferred to the client side when the statement + # finishes executing - so in theory we could go on streaming results + # from the cursor, but attempting to do so would make us + # incompatible with sqlite, so let's make sure we're not doing that + # by closing the cursor. + # + # (*named* cursors in psycopg2 are different and are proper server- + # side things, but (a) we don't use them and (b) they are implicitly + # closed by ending the transaction anyway.) + # + # In short, if we haven't finished with the cursor yet, that's a + # problem waiting to bite us. + # + # TL;DR: we're done with the cursor, so we can close it. + # + # [1]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/connection.c#L465 + # [2]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/cursor.c#L236 + cursor.close() except Exception as e: logger.debug("[TXN FAIL] {%s} %s", name, e) raise @@ -854,7 +884,7 @@ class SQLBaseStore(object): allvalues.update(values) latter = "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values) - sql = ("INSERT INTO %s (%s) VALUES (%s) " "ON CONFLICT (%s) DO %s") % ( + sql = ("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s") % ( table, ", ".join(k for k in allvalues), ", ".join("?" for _ in allvalues), |