From d7a0496f3ec534076121632352f44733253e1e16 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Mar 2015 15:59:48 +0000 Subject: Convert storage layer to be mysql compatible --- synapse/storage/event_federation.py | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) (limited to 'synapse/storage/event_federation.py') diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 2deda8ac50..5d66b2f24c 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -242,7 +242,6 @@ class EventFederationStore(SQLBaseStore): "room_id": room_id, "min_depth": depth, }, - or_replace=True, ) def _handle_prev_events(self, txn, outlier, event_id, prev_events, @@ -262,7 +261,6 @@ class EventFederationStore(SQLBaseStore): "room_id": room_id, "is_state": 0, }, - or_ignore=True, ) # Update the extremities table if this is not an outlier. @@ -281,19 +279,19 @@ class EventFederationStore(SQLBaseStore): # We only insert as a forward extremity the new event if there are # no other events that reference it as a prev event query = ( - "INSERT OR IGNORE INTO %(table)s (event_id, room_id) " - "SELECT ?, ? WHERE NOT EXISTS (" - "SELECT 1 FROM %(event_edges)s WHERE " - "prev_event_id = ? " - ")" - ) % { - "table": "event_forward_extremities", - "event_edges": "event_edges", - } + "SELECT 1 FROM event_edges WHERE prev_event_id = ?" + ) - logger.debug("query: %s", query) + txn.execute(query, (event_id,)) + + if not txn.fetchone(): + query = ( + "INSERT INTO event_forward_extremities" + " (event_id, room_id)" + " VALUES (?, ?)" + ) - txn.execute(query, (event_id, room_id, event_id)) + txn.execute(query, (event_id, room_id)) # Insert all the prev_events as a backwards thing, they'll get # deleted in a second if they're incorrect anyway. @@ -306,7 +304,6 @@ class EventFederationStore(SQLBaseStore): "event_id": e_id, "room_id": room_id, }, - or_ignore=True, ) # Also delete from the backwards extremities table all ones that -- cgit 1.4.1 From 58d83399663a080c123d2f112b4f4d84accbc638 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 14 Apr 2015 13:53:20 +0100 Subject: Add support for postgres instead of mysql. Change sql accourdingly. blob + varbinary -> bytea. No support for UNSIGNED or CREATE INDEX IF NOT EXISTS. --- synapse/app/homeserver.py | 2 + synapse/storage/__init__.py | 15 +++----- synapse/storage/_base.py | 2 +- synapse/storage/engines/__init__.py | 2 + synapse/storage/event_federation.py | 10 ++--- synapse/storage/events.py | 4 +- synapse/storage/room.py | 34 ++++++++++++----- .../full_schemas/16/application_services.sql | 10 ++--- .../storage/schema/full_schemas/16/event_edges.sql | 26 ++++++------- .../schema/full_schemas/16/event_signatures.sql | 16 ++++---- synapse/storage/schema/full_schemas/16/im.sql | 44 +++++++++++----------- synapse/storage/schema/full_schemas/16/keys.sql | 8 ++-- .../schema/full_schemas/16/media_repository.sql | 8 ++-- .../storage/schema/full_schemas/16/presence.sql | 4 +- synapse/storage/schema/full_schemas/16/push.sql | 30 +++++++-------- .../storage/schema/full_schemas/16/redactions.sql | 4 +- synapse/storage/schema/full_schemas/16/state.sql | 10 ++--- .../schema/full_schemas/16/transactions.sql | 24 ++++++------ synapse/storage/schema/full_schemas/16/users.sql | 10 ++--- synapse/storage/schema/schema_version.sql | 14 +++---- synapse/storage/stream.py | 16 ++++---- 21 files changed, 153 insertions(+), 140 deletions(-) (limited to 'synapse/storage/event_federation.py') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index a47e548d66..033011e1d7 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -373,6 +373,8 @@ def setup(config_options): "use_unicode": True, "collation": "utf8mb4_bin", }) + elif name == "psycopg2": + pass elif name == "sqlite3": db_config.setdefault("args", {}).update({ "cp_min": 1, diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index b46cafd25e..272420194d 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -236,7 +236,7 @@ def _setup_new_database(cur, database_engine): cur.execute( database_engine.convert_param_style( - "REPLACE INTO schema_version (version, upgraded)" + "INSERT INTO schema_version (version, upgraded)" " VALUES (?,?)" ), (max_current_ver, False,) @@ -432,14 +432,11 @@ def executescript(txn, schema_path): def _get_or_create_schema_state(txn, database_engine): - try: - # Bluntly try creating the schema_version tables. - schema_path = os.path.join( - dir_path, "schema", "schema_version.sql", - ) - executescript(txn, schema_path) - except: - pass + # Bluntly try creating the schema_version tables. + schema_path = os.path.join( + dir_path, "schema", "schema_version.sql", + ) + executescript(txn, schema_path) txn.execute("SELECT version, upgraded FROM schema_version") row = txn.fetchone() diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e30514cd5e..fa5199104a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -330,7 +330,7 @@ class SQLBaseStore(object): continue raise except Exception as e: - logger.debug("[TXN FAIL] {%s}", name, e) + logger.debug("[TXN FAIL] {%s} %s", name, e) raise finally: end = time.time() * 1000 diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py index 29702be923..548d4e1b42 100644 --- a/synapse/storage/engines/__init__.py +++ b/synapse/storage/engines/__init__.py @@ -14,6 +14,7 @@ # limitations under the License. from .maria import MariaEngine +from .postgres import PostgresEngine from .sqlite3 import Sqlite3Engine import importlib @@ -22,6 +23,7 @@ import importlib SUPPORTED_MODULE = { "sqlite3": Sqlite3Engine, "mysql.connector": MariaEngine, + "psycopg2": PostgresEngine, } diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 79ad5ddc9c..54a3c9d805 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -153,7 +153,7 @@ class EventFederationStore(SQLBaseStore): results = self._get_prev_events_and_state( txn, event_id, - is_state=1, + is_state=True, ) return [(e_id, h, ) for e_id, h, _ in results] @@ -164,7 +164,7 @@ class EventFederationStore(SQLBaseStore): } if is_state is not None: - keyvalues["is_state"] = is_state + keyvalues["is_state"] = bool(is_state) res = self._simple_select_list_txn( txn, @@ -259,7 +259,7 @@ class EventFederationStore(SQLBaseStore): "event_id": event_id, "prev_event_id": e_id, "room_id": room_id, - "is_state": 0, + "is_state": False, }, ) @@ -397,7 +397,7 @@ class EventFederationStore(SQLBaseStore): query = ( "SELECT prev_event_id FROM event_edges " - "WHERE room_id = ? AND event_id = ? AND is_state = 0 " + "WHERE room_id = ? AND event_id = ? AND is_state = ? " "LIMIT ?" ) @@ -406,7 +406,7 @@ class EventFederationStore(SQLBaseStore): for event_id in front: txn.execute( query, - (room_id, event_id, limit - len(event_results)) + (room_id, event_id, False, limit - len(event_results)) ) for e_id, in txn.fetchall(): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a2e87c27ce..9fe2effb4b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -188,12 +188,12 @@ class EventsStore(SQLBaseStore): ) sql = ( - "UPDATE events SET outlier = 0" + "UPDATE events SET outlier = ?" " WHERE event_id = ?" ) txn.execute( sql, - (event.event_id,) + (False, event.event_id,) ) return diff --git a/synapse/storage/room.py b/synapse/storage/room.py index a1a76280fe..48ebb33057 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -105,14 +105,12 @@ class RoomStore(SQLBaseStore): # We use non printing ascii character US (\x1F) as a separator sql = ( - "SELECT r.room_id, n.name, t.topic, " - "group_concat(a.room_alias, '\x1F') " - "FROM rooms AS r " - "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id " - "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id " - "INNER JOIN room_aliases AS a ON a.room_id = r.room_id " - "WHERE r.is_public = ? " - "GROUP BY r.room_id " + "SELECT r.room_id, max(n.name), max(t.topic)" + " FROM rooms AS r" + " LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id" + " LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id" + " WHERE r.is_public = ?" + " GROUP BY r.room_id" ) % { "topic": topic_subquery, "name": name_subquery, @@ -120,7 +118,22 @@ class RoomStore(SQLBaseStore): txn.execute(sql, (is_public,)) - return txn.fetchall() + rows = txn.fetchall() + + for i, row in enumerate(rows): + room_id = row[0] + aliases = self._simple_select_onecol_txn( + txn, + table="room_aliases", + keyvalues={ + "room_id": room_id + }, + retcol="room_alias", + ) + + rows[i] = list(row) + [aliases] + + return rows rows = yield self.runInteraction( "get_rooms", f @@ -131,9 +144,10 @@ class RoomStore(SQLBaseStore): "room_id": r[0], "name": r[1], "topic": r[2], - "aliases": r[3].split("\x1F"), + "aliases": r[3], } for r in rows + if r[3] # We only return rooms that have at least one alias. ] defer.returnValue(ret) diff --git a/synapse/storage/schema/full_schemas/16/application_services.sql b/synapse/storage/schema/full_schemas/16/application_services.sql index bc709df92d..f08c5bcf76 100644 --- a/synapse/storage/schema/full_schemas/16/application_services.sql +++ b/synapse/storage/schema/full_schemas/16/application_services.sql @@ -14,7 +14,7 @@ */ CREATE TABLE IF NOT EXISTS application_services( - id BIGINT UNSIGNED PRIMARY KEY, + id BIGINT PRIMARY KEY, url VARCHAR(150), token VARCHAR(150), hs_token VARCHAR(150), @@ -23,8 +23,8 @@ CREATE TABLE IF NOT EXISTS application_services( ); CREATE TABLE IF NOT EXISTS application_services_regex( - id BIGINT UNSIGNED PRIMARY KEY, - as_id BIGINT UNSIGNED NOT NULL, + id BIGINT PRIMARY KEY, + as_id BIGINT NOT NULL, namespace INTEGER, /* enum[room_id|room_alias|user_id] */ regex VARCHAR(150), FOREIGN KEY(as_id) REFERENCES application_services(id) @@ -39,10 +39,10 @@ CREATE TABLE IF NOT EXISTS application_services_state( CREATE TABLE IF NOT EXISTS application_services_txns( as_id VARCHAR(150) NOT NULL, txn_id INTEGER NOT NULL, - event_ids LONGBLOB NOT NULL, + event_ids bytea NOT NULL, UNIQUE(as_id, txn_id) ); -CREATE INDEX IF NOT EXISTS application_services_txns_id ON application_services_txns ( +CREATE INDEX application_services_txns_id ON application_services_txns ( as_id ); diff --git a/synapse/storage/schema/full_schemas/16/event_edges.sql b/synapse/storage/schema/full_schemas/16/event_edges.sql index bdb1109094..05d0874f0d 100644 --- a/synapse/storage/schema/full_schemas/16/event_edges.sql +++ b/synapse/storage/schema/full_schemas/16/event_edges.sql @@ -19,8 +19,8 @@ CREATE TABLE IF NOT EXISTS event_forward_extremities( UNIQUE (event_id, room_id) ); -CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id); -CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id); +CREATE INDEX ev_extrem_room ON event_forward_extremities(room_id); +CREATE INDEX ev_extrem_id ON event_forward_extremities(event_id); CREATE TABLE IF NOT EXISTS event_backward_extremities( @@ -29,8 +29,8 @@ CREATE TABLE IF NOT EXISTS event_backward_extremities( UNIQUE (event_id, room_id) ); -CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id); -CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id); +CREATE INDEX ev_b_extrem_room ON event_backward_extremities(room_id); +CREATE INDEX ev_b_extrem_id ON event_backward_extremities(event_id); CREATE TABLE IF NOT EXISTS event_edges( @@ -41,8 +41,8 @@ CREATE TABLE IF NOT EXISTS event_edges( UNIQUE (event_id, prev_event_id, room_id, is_state) ); -CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id); -CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id); +CREATE INDEX ev_edges_id ON event_edges(event_id); +CREATE INDEX ev_edges_prev_id ON event_edges(prev_event_id); CREATE TABLE IF NOT EXISTS room_depth( @@ -51,17 +51,17 @@ CREATE TABLE IF NOT EXISTS room_depth( UNIQUE (room_id) ); -CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id); +CREATE INDEX room_depth_room ON room_depth(room_id); create TABLE IF NOT EXISTS event_destinations( event_id VARCHAR(150) NOT NULL, destination VARCHAR(150) NOT NULL, - delivered_ts BIGINT UNSIGNED DEFAULT 0, -- or 0 if not delivered + delivered_ts BIGINT DEFAULT 0, -- or 0 if not delivered UNIQUE (event_id, destination) ); -CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id); +CREATE INDEX event_destinations_id ON event_destinations(event_id); CREATE TABLE IF NOT EXISTS state_forward_extremities( @@ -72,10 +72,10 @@ CREATE TABLE IF NOT EXISTS state_forward_extremities( UNIQUE (event_id, room_id) ); -CREATE INDEX IF NOT EXISTS st_extrem_keys ON state_forward_extremities( +CREATE INDEX st_extrem_keys ON state_forward_extremities( room_id, type, state_key ); -CREATE INDEX IF NOT EXISTS st_extrem_id ON state_forward_extremities(event_id); +CREATE INDEX st_extrem_id ON state_forward_extremities(event_id); CREATE TABLE IF NOT EXISTS event_auth( @@ -85,5 +85,5 @@ CREATE TABLE IF NOT EXISTS event_auth( UNIQUE (event_id, auth_id, room_id) ); -CREATE INDEX IF NOT EXISTS evauth_edges_id ON event_auth(event_id); -CREATE INDEX IF NOT EXISTS evauth_edges_auth_id ON event_auth(auth_id); +CREATE INDEX evauth_edges_id ON event_auth(event_id); +CREATE INDEX evauth_edges_auth_id ON event_auth(auth_id); diff --git a/synapse/storage/schema/full_schemas/16/event_signatures.sql b/synapse/storage/schema/full_schemas/16/event_signatures.sql index 09886f607c..4291827368 100644 --- a/synapse/storage/schema/full_schemas/16/event_signatures.sql +++ b/synapse/storage/schema/full_schemas/16/event_signatures.sql @@ -16,40 +16,40 @@ CREATE TABLE IF NOT EXISTS event_content_hashes ( event_id VARCHAR(150), algorithm VARCHAR(150), - hash LONGBLOB, + hash bytea, UNIQUE (event_id, algorithm) ); -CREATE INDEX IF NOT EXISTS event_content_hashes_id ON event_content_hashes(event_id); +CREATE INDEX event_content_hashes_id ON event_content_hashes(event_id); CREATE TABLE IF NOT EXISTS event_reference_hashes ( event_id VARCHAR(150), algorithm VARCHAR(150), - hash LONGBLOB, + hash bytea, UNIQUE (event_id, algorithm) ); -CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes(event_id); +CREATE INDEX event_reference_hashes_id ON event_reference_hashes(event_id); CREATE TABLE IF NOT EXISTS event_signatures ( event_id VARCHAR(150), signature_name VARCHAR(150), key_id VARCHAR(150), - signature LONGBLOB, + signature bytea, UNIQUE (event_id, signature_name, key_id) ); -CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures(event_id); +CREATE INDEX event_signatures_id ON event_signatures(event_id); CREATE TABLE IF NOT EXISTS event_edge_hashes( event_id VARCHAR(150), prev_event_id VARCHAR(150), algorithm VARCHAR(150), - hash LONGBLOB, + hash bytea, UNIQUE (event_id, prev_event_id, algorithm) ); -CREATE INDEX IF NOT EXISTS event_edge_hashes_id ON event_edge_hashes(event_id); +CREATE INDEX event_edge_hashes_id ON event_edge_hashes(event_id); diff --git a/synapse/storage/schema/full_schemas/16/im.sql b/synapse/storage/schema/full_schemas/16/im.sql index 19f0f34143..a661fc160c 100644 --- a/synapse/storage/schema/full_schemas/16/im.sql +++ b/synapse/storage/schema/full_schemas/16/im.sql @@ -14,33 +14,33 @@ */ CREATE TABLE IF NOT EXISTS events( - stream_ordering BIGINT UNSIGNED PRIMARY KEY, - topological_ordering BIGINT UNSIGNED NOT NULL, + stream_ordering BIGINT PRIMARY KEY, + topological_ordering BIGINT NOT NULL, event_id VARCHAR(150) NOT NULL, type VARCHAR(150) NOT NULL, room_id VARCHAR(150) NOT NULL, - content LONGBLOB NOT NULL, - unrecognized_keys LONGBLOB, + content bytea NOT NULL, + unrecognized_keys bytea, processed BOOL NOT NULL, outlier BOOL NOT NULL, - depth BIGINT UNSIGNED DEFAULT 0 NOT NULL, + depth BIGINT DEFAULT 0 NOT NULL, UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS events_stream_ordering ON events (stream_ordering); -CREATE INDEX IF NOT EXISTS events_topological_ordering ON events (topological_ordering); -CREATE INDEX IF NOT EXISTS events_room_id ON events (room_id); +CREATE INDEX events_stream_ordering ON events (stream_ordering); +CREATE INDEX events_topological_ordering ON events (topological_ordering); +CREATE INDEX events_room_id ON events (room_id); CREATE TABLE IF NOT EXISTS event_json( event_id VARCHAR(150) NOT NULL, room_id VARCHAR(150) NOT NULL, - internal_metadata LONGBLOB NOT NULL, - json LONGBLOB NOT NULL, + internal_metadata bytea NOT NULL, + json bytea NOT NULL, UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS event_json_room_id ON event_json(room_id); +CREATE INDEX event_json_room_id ON event_json(room_id); CREATE TABLE IF NOT EXISTS state_events( @@ -52,9 +52,9 @@ CREATE TABLE IF NOT EXISTS state_events( UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS state_events_room_id ON state_events (room_id); -CREATE INDEX IF NOT EXISTS state_events_type ON state_events (type); -CREATE INDEX IF NOT EXISTS state_events_state_key ON state_events (state_key); +CREATE INDEX state_events_room_id ON state_events (room_id); +CREATE INDEX state_events_type ON state_events (type); +CREATE INDEX state_events_state_key ON state_events (state_key); CREATE TABLE IF NOT EXISTS current_state_events( @@ -66,9 +66,9 @@ CREATE TABLE IF NOT EXISTS current_state_events( UNIQUE (room_id, type, state_key) ); -CREATE INDEX IF NOT EXISTS current_state_events_room_id ON current_state_events (room_id); -CREATE INDEX IF NOT EXISTS current_state_events_type ON current_state_events (type); -CREATE INDEX IF NOT EXISTS current_state_events_state_key ON current_state_events (state_key); +CREATE INDEX current_state_events_room_id ON current_state_events (room_id); +CREATE INDEX current_state_events_type ON current_state_events (type); +CREATE INDEX current_state_events_state_key ON current_state_events (state_key); CREATE TABLE IF NOT EXISTS room_memberships( event_id VARCHAR(150) NOT NULL, @@ -79,8 +79,8 @@ CREATE TABLE IF NOT EXISTS room_memberships( UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS room_memberships_room_id ON room_memberships (room_id); -CREATE INDEX IF NOT EXISTS room_memberships_user_id ON room_memberships (user_id); +CREATE INDEX room_memberships_room_id ON room_memberships (room_id); +CREATE INDEX room_memberships_user_id ON room_memberships (user_id); CREATE TABLE IF NOT EXISTS feedback( event_id VARCHAR(150) NOT NULL, @@ -98,7 +98,7 @@ CREATE TABLE IF NOT EXISTS topics( UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS topics_room_id ON topics(room_id); +CREATE INDEX topics_room_id ON topics(room_id); CREATE TABLE IF NOT EXISTS room_names( event_id VARCHAR(150) NOT NULL, @@ -107,7 +107,7 @@ CREATE TABLE IF NOT EXISTS room_names( UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS room_names_room_id ON room_names(room_id); +CREATE INDEX room_names_room_id ON room_names(room_id); CREATE TABLE IF NOT EXISTS rooms( room_id VARCHAR(150) PRIMARY KEY NOT NULL, @@ -121,4 +121,4 @@ CREATE TABLE IF NOT EXISTS room_hosts( UNIQUE (room_id, host) ); -CREATE INDEX IF NOT EXISTS room_hosts_room_id ON room_hosts (room_id); +CREATE INDEX room_hosts_room_id ON room_hosts (room_id); diff --git a/synapse/storage/schema/full_schemas/16/keys.sql b/synapse/storage/schema/full_schemas/16/keys.sql index 35f141c288..459b510427 100644 --- a/synapse/storage/schema/full_schemas/16/keys.sql +++ b/synapse/storage/schema/full_schemas/16/keys.sql @@ -16,8 +16,8 @@ CREATE TABLE IF NOT EXISTS server_tls_certificates( server_name VARCHAR(150), -- Server name. fingerprint VARCHAR(150), -- Certificate fingerprint. from_server VARCHAR(150), -- Which key server the certificate was fetched from. - ts_added_ms BIGINT UNSIGNED, -- When the certifcate was added. - tls_certificate LONGBLOB, -- DER encoded x509 certificate. + ts_added_ms BIGINT, -- When the certifcate was added. + tls_certificate bytea, -- DER encoded x509 certificate. UNIQUE (server_name, fingerprint) ); @@ -25,7 +25,7 @@ CREATE TABLE IF NOT EXISTS server_signature_keys( server_name VARCHAR(150), -- Server name. key_id VARCHAR(150), -- Key version. from_server VARCHAR(150), -- Which key server the key was fetched form. - ts_added_ms BIGINT UNSIGNED, -- When the key was added. - verify_key LONGBLOB, -- NACL verification key. + ts_added_ms BIGINT, -- When the key was added. + verify_key bytea, -- NACL verification key. UNIQUE (server_name, key_id) ); diff --git a/synapse/storage/schema/full_schemas/16/media_repository.sql b/synapse/storage/schema/full_schemas/16/media_repository.sql index 014bce4aeb..0e819fca38 100644 --- a/synapse/storage/schema/full_schemas/16/media_repository.sql +++ b/synapse/storage/schema/full_schemas/16/media_repository.sql @@ -17,7 +17,7 @@ CREATE TABLE IF NOT EXISTS local_media_repository ( media_id VARCHAR(150), -- The id used to refer to the media. media_type VARCHAR(150), -- The MIME-type of the media. media_length INTEGER, -- Length of the media in bytes. - created_ts BIGINT UNSIGNED, -- When the content was uploaded in ms. + created_ts BIGINT, -- When the content was uploaded in ms. upload_name VARCHAR(150), -- The name the media was uploaded with. user_id VARCHAR(150), -- The user who uploaded the file. UNIQUE (media_id) @@ -35,14 +35,14 @@ CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails ( ) ); -CREATE INDEX IF NOT EXISTS local_media_repository_thumbnails_media_id +CREATE INDEX local_media_repository_thumbnails_media_id ON local_media_repository_thumbnails (media_id); CREATE TABLE IF NOT EXISTS remote_media_cache ( media_origin VARCHAR(150), -- The remote HS the media came from. media_id VARCHAR(150), -- The id used to refer to the media on that server. media_type VARCHAR(150), -- The MIME-type of the media. - created_ts BIGINT UNSIGNED, -- When the content was uploaded in ms. + created_ts BIGINT, -- When the content was uploaded in ms. upload_name VARCHAR(150), -- The name the media was uploaded with. media_length INTEGER, -- Length of the media in bytes. filesystem_id VARCHAR(150), -- The name used to store the media on disk. @@ -64,5 +64,5 @@ CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails ( ) ); -CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id +CREATE INDEX remote_media_cache_thumbnails_media_id ON remote_media_cache_thumbnails (media_id); diff --git a/synapse/storage/schema/full_schemas/16/presence.sql b/synapse/storage/schema/full_schemas/16/presence.sql index fbe5b0af6c..9c41be296e 100644 --- a/synapse/storage/schema/full_schemas/16/presence.sql +++ b/synapse/storage/schema/full_schemas/16/presence.sql @@ -16,7 +16,7 @@ CREATE TABLE IF NOT EXISTS presence( user_id VARCHAR(150) NOT NULL, state VARCHAR(20), status_msg VARCHAR(150), - mtime BIGINT UNSIGNED, -- miliseconds since last state change + mtime BIGINT, -- miliseconds since last state change UNIQUE (user_id) ); @@ -37,4 +37,4 @@ CREATE TABLE IF NOT EXISTS presence_list( UNIQUE (user_id, observed_user_id) ); -CREATE INDEX IF NOT EXISTS presence_list_user_id ON presence_list (user_id); +CREATE INDEX presence_list_user_id ON presence_list (user_id); diff --git a/synapse/storage/schema/full_schemas/16/push.sql b/synapse/storage/schema/full_schemas/16/push.sql index 33300736f9..5c0c7bc201 100644 --- a/synapse/storage/schema/full_schemas/16/push.sql +++ b/synapse/storage/schema/full_schemas/16/push.sql @@ -22,52 +22,52 @@ CREATE TABLE IF NOT EXISTS rejections( -- Push notification endpoints that users have configured CREATE TABLE IF NOT EXISTS pushers ( - id BIGINT UNSIGNED PRIMARY KEY, + id BIGINT PRIMARY KEY, user_name VARCHAR(150) NOT NULL, profile_tag VARCHAR(32) NOT NULL, kind VARCHAR(8) NOT NULL, app_id VARCHAR(64) NOT NULL, app_display_name VARCHAR(64) NOT NULL, device_display_name VARCHAR(128) NOT NULL, - pushkey VARBINARY(512) NOT NULL, - ts BIGINT UNSIGNED NOT NULL, + pushkey bytea NOT NULL, + ts BIGINT NOT NULL, lang VARCHAR(8), - data LONGBLOB, + data bytea, last_token TEXT, - last_success BIGINT UNSIGNED, - failing_since BIGINT UNSIGNED, + last_success BIGINT, + failing_since BIGINT, UNIQUE (app_id, pushkey) ); CREATE TABLE IF NOT EXISTS push_rules ( - id BIGINT UNSIGNED PRIMARY KEY, + id BIGINT PRIMARY KEY, user_name VARCHAR(150) NOT NULL, rule_id VARCHAR(150) NOT NULL, - priority_class TINYINT NOT NULL, + priority_class SMALLINT NOT NULL, priority INTEGER NOT NULL DEFAULT 0, conditions VARCHAR(150) NOT NULL, actions VARCHAR(150) NOT NULL, UNIQUE(user_name, rule_id) ); -CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name); +CREATE INDEX push_rules_user_name on push_rules (user_name); CREATE TABLE IF NOT EXISTS user_filters( user_id VARCHAR(150), - filter_id BIGINT UNSIGNED, - filter_json LONGBLOB + filter_id BIGINT, + filter_json bytea ); -CREATE INDEX IF NOT EXISTS user_filters_by_user_id_filter_id ON user_filters( +CREATE INDEX user_filters_by_user_id_filter_id ON user_filters( user_id, filter_id ); CREATE TABLE IF NOT EXISTS push_rules_enable ( - id BIGINT UNSIGNED PRIMARY KEY, + id BIGINT PRIMARY KEY, user_name VARCHAR(150) NOT NULL, rule_id VARCHAR(150) NOT NULL, - enabled TINYINT, + enabled SMALLINT, UNIQUE(user_name, rule_id) ); -CREATE INDEX IF NOT EXISTS push_rules_enable_user_name on push_rules_enable (user_name); +CREATE INDEX push_rules_enable_user_name on push_rules_enable (user_name); diff --git a/synapse/storage/schema/full_schemas/16/redactions.sql b/synapse/storage/schema/full_schemas/16/redactions.sql index b81451eab4..492fd22033 100644 --- a/synapse/storage/schema/full_schemas/16/redactions.sql +++ b/synapse/storage/schema/full_schemas/16/redactions.sql @@ -18,5 +18,5 @@ CREATE TABLE IF NOT EXISTS redactions ( UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS redactions_event_id ON redactions (event_id); -CREATE INDEX IF NOT EXISTS redactions_redacts ON redactions (redacts); +CREATE INDEX redactions_event_id ON redactions (event_id); +CREATE INDEX redactions_redacts ON redactions (redacts); diff --git a/synapse/storage/schema/full_schemas/16/state.sql b/synapse/storage/schema/full_schemas/16/state.sql index 8c51610396..3c54595e64 100644 --- a/synapse/storage/schema/full_schemas/16/state.sql +++ b/synapse/storage/schema/full_schemas/16/state.sql @@ -14,7 +14,7 @@ */ CREATE TABLE IF NOT EXISTS state_groups( - id BIGINT UNSIGNED PRIMARY KEY, + id BIGINT PRIMARY KEY, room_id VARCHAR(150) NOT NULL, event_id VARCHAR(150) NOT NULL ); @@ -33,8 +33,8 @@ CREATE TABLE IF NOT EXISTS event_to_state_groups( UNIQUE (event_id) ); -CREATE INDEX IF NOT EXISTS state_groups_id ON state_groups(id); +CREATE INDEX state_groups_id ON state_groups(id); -CREATE INDEX IF NOT EXISTS state_groups_state_id ON state_groups_state(state_group); -CREATE INDEX IF NOT EXISTS state_groups_state_tuple ON state_groups_state(room_id, type, state_key); -CREATE INDEX IF NOT EXISTS event_to_state_groups_id ON event_to_state_groups(event_id); +CREATE INDEX state_groups_state_id ON state_groups_state(state_group); +CREATE INDEX state_groups_state_tuple ON state_groups_state(room_id, type, state_key); +CREATE INDEX event_to_state_groups_id ON event_to_state_groups(event_id); diff --git a/synapse/storage/schema/full_schemas/16/transactions.sql b/synapse/storage/schema/full_schemas/16/transactions.sql index f381e67603..bc64064936 100644 --- a/synapse/storage/schema/full_schemas/16/transactions.sql +++ b/synapse/storage/schema/full_schemas/16/transactions.sql @@ -16,32 +16,32 @@ CREATE TABLE IF NOT EXISTS received_transactions( transaction_id VARCHAR(150), origin VARCHAR(150), - ts BIGINT UNSIGNED, + ts BIGINT, response_code INTEGER, - response_json LONGBLOB, - has_been_referenced BOOL default 0, -- Whether thishas been referenced by a prev_tx + response_json bytea, + has_been_referenced smallint default 0, -- Whether thishas been referenced by a prev_tx UNIQUE (transaction_id, origin) ); -CREATE INDEX IF NOT EXISTS transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0; +CREATE INDEX transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0; -- Stores what transactions we've sent, what their response was (if we got one) and whether we have -- since referenced the transaction in another outgoing transaction CREATE TABLE IF NOT EXISTS sent_transactions( - id BIGINT UNSIGNED PRIMARY KEY, -- This is used to apply insertion ordering + id BIGINT PRIMARY KEY, -- This is used to apply insertion ordering transaction_id VARCHAR(150), destination VARCHAR(150), response_code INTEGER DEFAULT 0, - response_json LONGBLOB, - ts BIGINT UNSIGNED + response_json bytea, + ts BIGINT ); -CREATE INDEX IF NOT EXISTS sent_transaction_dest ON sent_transactions(destination); -CREATE INDEX IF NOT EXISTS sent_transaction_txn_id ON sent_transactions(transaction_id); +CREATE INDEX sent_transaction_dest ON sent_transactions(destination); +CREATE INDEX sent_transaction_txn_id ON sent_transactions(transaction_id); -- So that we can do an efficient look up of all transactions that have yet to be successfully -- sent. -CREATE INDEX IF NOT EXISTS sent_transaction_sent ON sent_transactions(response_code); +CREATE INDEX sent_transaction_sent ON sent_transactions(response_code); -- For sent transactions only. @@ -53,11 +53,11 @@ CREATE TABLE IF NOT EXISTS transaction_id_to_pdu( UNIQUE (transaction_id, destination) ); -CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination); +CREATE INDEX transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination); -- To track destination health CREATE TABLE IF NOT EXISTS destinations( destination VARCHAR(150) PRIMARY KEY, - retry_last_ts BIGINT UNSIGNED, + retry_last_ts BIGINT, retry_interval INTEGER ); diff --git a/synapse/storage/schema/full_schemas/16/users.sql b/synapse/storage/schema/full_schemas/16/users.sql index d0011c04b4..267284d07d 100644 --- a/synapse/storage/schema/full_schemas/16/users.sql +++ b/synapse/storage/schema/full_schemas/16/users.sql @@ -15,17 +15,17 @@ CREATE TABLE IF NOT EXISTS users( name VARCHAR(150), password_hash VARCHAR(150), - creation_ts BIGINT UNSIGNED, - admin BOOL DEFAULT 0 NOT NULL, + creation_ts BIGINT, + admin SMALLINT DEFAULT 0 NOT NULL, UNIQUE(name) ); CREATE TABLE IF NOT EXISTS access_tokens( - id BIGINT UNSIGNED PRIMARY KEY, + id BIGINT PRIMARY KEY, user_id VARCHAR(150) NOT NULL, device_id VARCHAR(150), token VARCHAR(150) NOT NULL, - last_used BIGINT UNSIGNED, + last_used BIGINT, UNIQUE(token) ); @@ -35,7 +35,7 @@ CREATE TABLE IF NOT EXISTS user_ips ( device_id VARCHAR(150), ip VARCHAR(150) NOT NULL, user_agent VARCHAR(150) NOT NULL, - last_seen BIGINT UNSIGNED NOT NULL + last_seen BIGINT NOT NULL ); CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user); diff --git a/synapse/storage/schema/schema_version.sql b/synapse/storage/schema/schema_version.sql index e7fa6fe569..d9494611e0 100644 --- a/synapse/storage/schema/schema_version.sql +++ b/synapse/storage/schema/schema_version.sql @@ -14,16 +14,14 @@ */ CREATE TABLE IF NOT EXISTS schema_version( - `Lock` CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. - `version` INTEGER NOT NULL, - `upgraded` BOOL NOT NULL, -- Whether we reached this version from an upgrade or an initial schema. - CHECK (`Lock`='X') + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + version INTEGER NOT NULL, + upgraded BOOL NOT NULL, -- Whether we reached this version from an upgrade or an initial schema. + CHECK (Lock='X') ); CREATE TABLE IF NOT EXISTS applied_schema_deltas( - `version` INTEGER NOT NULL, - `file` VARCHAR(150) NOT NULL, + version INTEGER NOT NULL, + file VARCHAR(150) NOT NULL, UNIQUE(version, file) ); - -CREATE INDEX IF NOT EXISTS schema_deltas_ver ON applied_schema_deltas(version); diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 57c2e4dfeb..df6de7cbcd 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -240,7 +240,7 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT e.event_id, e.stream_ordering FROM events AS e WHERE " - "(e.outlier = 0 AND (room_id IN (%(current)s)) OR " + "(e.outlier = ? AND (room_id IN (%(current)s)) OR " "(event_id IN (%(invites)s))) " "AND e.stream_ordering > ? AND e.stream_ordering <= ? " "ORDER BY stream_ordering ASC LIMIT %(limit)d " @@ -251,7 +251,7 @@ class StreamStore(SQLBaseStore): } def f(txn): - txn.execute(sql, (user_id, user_id, from_id.stream, to_id.stream,)) + txn.execute(sql, (False, user_id, user_id, from_id.stream, to_id.stream,)) rows = self.cursor_to_dict(txn) @@ -283,7 +283,7 @@ class StreamStore(SQLBaseStore): # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence # we have a bit of asymmetry when it comes to equalities. - args = [room_id] + args = [False, room_id] if direction == 'b': order = "DESC" bounds = _StreamToken.parse(from_key).upper_bound() @@ -307,7 +307,7 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT * FROM events" - " WHERE outlier = 0 AND room_id = ? AND %(bounds)s" + " WHERE outlier = ? AND room_id = ? AND %(bounds)s" " ORDER BY topological_ordering %(order)s," " stream_ordering %(order)s %(limit)s" ) % { @@ -358,7 +358,7 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT stream_ordering, topological_ordering, event_id" " FROM events" - " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0" + " WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?" " ORDER BY topological_ordering DESC, stream_ordering DESC" " LIMIT ?" ) @@ -368,17 +368,17 @@ class StreamStore(SQLBaseStore): "SELECT stream_ordering, topological_ordering, event_id" " FROM events" " WHERE room_id = ? AND stream_ordering > ?" - " AND stream_ordering <= ? AND outlier = 0" + " AND stream_ordering <= ? AND outlier = ?" " ORDER BY topological_ordering DESC, stream_ordering DESC" " LIMIT ?" ) def get_recent_events_for_room_txn(txn): if from_token is None: - txn.execute(sql, (room_id, end_token.stream, limit,)) + txn.execute(sql, (room_id, end_token.stream, False, limit,)) else: txn.execute(sql, ( - room_id, from_token.stream, end_token.stream, limit + room_id, from_token.stream, end_token.stream, False, limit )) rows = self.cursor_to_dict(txn) -- cgit 1.4.1 From 23c639ff325c454a7e4957be0b7852776de0fb58 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 1 May 2015 10:17:19 +0100 Subject: Split a storage function in two so that we don't have to do extra work. --- synapse/federation/federation_server.py | 4 ++-- synapse/state.py | 7 +------ synapse/storage/event_federation.py | 11 +++++++++++ 3 files changed, 14 insertions(+), 8 deletions(-) (limited to 'synapse/storage/event_federation.py') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 25c0014f97..2b46188c91 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -417,13 +417,13 @@ class FederationServer(FederationBase): pdu.internal_metadata.outlier = True elif min_depth and pdu.depth > min_depth: if get_missing and prevs - seen: - latest_tuples = yield self.store.get_latest_events_in_room( + latest = yield self.store.get_latest_event_ids_in_room( pdu.room_id ) # We add the prev events that we have seen to the latest # list to ensure the remote server doesn't give them to us - latest = set(e_id for e_id, _, _ in latest_tuples) + latest = set(latest) latest |= seen missing_events = yield self.get_missing_events( diff --git a/synapse/state.py b/synapse/state.py index ba2500d61c..9dddb77d5b 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -86,12 +86,7 @@ class StateHandler(object): If `event_type` is specified, then the method returns only the one event (or None) with that `event_type` and `state_key`. """ - events = yield self.store.get_latest_events_in_room(room_id) - - event_ids = [ - e_id - for e_id, _, _ in events - ] + event_ids = yield self.store.get_latest_event_ids_in_room(room_id) cache = None if self._state_cache is not None: diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 54a3c9d805..8bbb42c27e 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -96,11 +96,22 @@ class EventFederationStore(SQLBaseStore): room_id, ) + def get_latest_event_ids_in_room(self, room_id): + return self._simple_select_onecol( + table="event_forward_extremities", + keyvalues={ + "room_id": room_id, + }, + retcol="event_id", + desc="get_latest_events_in_room", + ) + def _get_latest_events_in_room(self, txn, room_id): sql = ( "SELECT e.event_id, e.depth FROM events as e " "INNER JOIN event_forward_extremities as f " "ON e.event_id = f.event_id " + "AND e.room_id = f.room_id " "WHERE f.room_id = ?" ) -- cgit 1.4.1 From 4011cf1c424708616023493bb2dba89a8554f2d0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 1 May 2015 13:06:26 +0100 Subject: Cache latest_event_ids_in_room --- synapse/storage/event_federation.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/storage/event_federation.py') diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 8bbb42c27e..fbbcce754b 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore +from ._base import SQLBaseStore, cached from syutil.base64util import encode_base64 import logging @@ -96,6 +96,7 @@ class EventFederationStore(SQLBaseStore): room_id, ) + @cached() def get_latest_event_ids_in_room(self, room_id): return self._simple_select_onecol( table="event_forward_extremities", @@ -329,6 +330,8 @@ class EventFederationStore(SQLBaseStore): ) txn.execute(query) + self.get_latest_event_ids_in_room.invalidate(room_id) + def get_backfill_events(self, room_id, event_list, limit): """Get a list of Events for a given topic that occurred before (and including) the events in event_list. Return a list of max size `limit` -- cgit 1.4.1 From d9cc5de9e580c8a0de92352ec50fa62fb32b0b95 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 May 2015 10:24:10 +0100 Subject: Correctly name transaction --- synapse/storage/event_federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/event_federation.py') diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index fbbcce754b..68f39bd684 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -104,7 +104,7 @@ class EventFederationStore(SQLBaseStore): "room_id": room_id, }, retcol="event_id", - desc="get_latest_events_in_room", + desc="get_latest_event_ids_in_room", ) def _get_latest_events_in_room(self, txn, room_id): -- cgit 1.4.1 From a9aea68fd568182185e8d0ae478c56df8ac6be49 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 May 2015 14:57:08 +0100 Subject: Invalidate the caches from the correct thread --- synapse/storage/event_federation.py | 10 ++++++---- synapse/storage/events.py | 39 ++++++++++++++++++++++++------------- synapse/storage/room.py | 4 ++-- synapse/storage/roommember.py | 8 +++++--- synapse/storage/signatures.py | 12 ++++++------ synapse/storage/state.py | 2 +- 6 files changed, 46 insertions(+), 29 deletions(-) (limited to 'synapse/storage/event_federation.py') diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 68f39bd684..3cd3fbdc9b 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -241,7 +241,7 @@ class EventFederationStore(SQLBaseStore): return int(min_depth) if min_depth is not None else None - def _update_min_depth_for_room_txn(self, txn, room_id, depth): + def _update_min_depth_for_room_txn(self, txn, invalidates, room_id, depth): min_depth = self._get_min_depth_interaction(txn, room_id) do_insert = depth < min_depth if min_depth else True @@ -256,8 +256,8 @@ class EventFederationStore(SQLBaseStore): }, ) - def _handle_prev_events(self, txn, outlier, event_id, prev_events, - room_id): + def _handle_prev_events(self, txn, invalidates, outlier, event_id, + prev_events, room_id): """ For the given event, update the event edges table and forward and backward extremities tables. @@ -330,7 +330,9 @@ class EventFederationStore(SQLBaseStore): ) txn.execute(query) - self.get_latest_event_ids_in_room.invalidate(room_id) + invalidates.append(( + self.get_latest_event_ids_in_room.invalidate, room_id + )) def get_backfill_events(self, room_id, event_list, limit): """Get a list of Events for a given topic that occurred before (and diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a3c260ddc4..b2ab4b02f3 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -42,7 +42,7 @@ class EventsStore(SQLBaseStore): stream_ordering = self.min_token try: - yield self.runInteraction( + invalidates = yield self.runInteraction( "persist_event", self._persist_event_txn, event=event, @@ -52,6 +52,11 @@ class EventsStore(SQLBaseStore): is_new_state=is_new_state, current_state=current_state, ) + for invalidated in invalidates: + invalidated_callback = invalidated[0] + invalidated_args = invalidated[1:] + invalidated_callback(*invalidated_args) + except _RollbackButIsFineException: pass @@ -91,9 +96,10 @@ class EventsStore(SQLBaseStore): def _persist_event_txn(self, txn, event, context, backfilled, stream_ordering=None, is_new_state=True, current_state=None): + invalidates = [] # Remove the any existing cache entries for the event_id - self._invalidate_get_event_cache(event.event_id) + invalidates.append((self._invalidate_get_event_cache, event.event_id)) if stream_ordering is None: with self._stream_id_gen.get_next_txn(txn) as stream_ordering: @@ -150,10 +156,11 @@ class EventsStore(SQLBaseStore): outlier = event.internal_metadata.is_outlier() if not outlier: - self._store_state_groups_txn(txn, event, context) + self._store_state_groups_txn(txn, invalidates, event, context) self._update_min_depth_for_room_txn( txn, + invalidates, event.room_id, event.depth ) @@ -199,6 +206,7 @@ class EventsStore(SQLBaseStore): self._handle_prev_events( txn, + invalidates, outlier=outlier, event_id=event.event_id, prev_events=event.prev_events, @@ -206,13 +214,13 @@ class EventsStore(SQLBaseStore): ) if event.type == EventTypes.Member: - self._store_room_member_txn(txn, event) + self._store_room_member_txn(txn, invalidates, event) elif event.type == EventTypes.Name: - self._store_room_name_txn(txn, event) + self._store_room_name_txn(txn, invalidates, event) elif event.type == EventTypes.Topic: - self._store_room_topic_txn(txn, event) + self._store_room_topic_txn(txn, invalidates, event) elif event.type == EventTypes.Redaction: - self._store_redaction(txn, event) + self._store_redaction(txn, invalidates, event) event_dict = { k: v @@ -281,19 +289,22 @@ class EventsStore(SQLBaseStore): ) if context.rejected: - self._store_rejections_txn(txn, event.event_id, context.rejected) + self._store_rejections_txn( + txn, invalidates, event.event_id, context.rejected + ) for hash_alg, hash_base64 in event.hashes.items(): hash_bytes = decode_base64(hash_base64) self._store_event_content_hash_txn( - txn, event.event_id, hash_alg, hash_bytes, + txn, invalidates, event.event_id, hash_alg, hash_bytes, ) for prev_event_id, prev_hashes in event.prev_events: for alg, hash_base64 in prev_hashes.items(): hash_bytes = decode_base64(hash_base64) self._store_prev_event_hash_txn( - txn, event.event_id, prev_event_id, alg, hash_bytes + txn, invalidates, event.event_id, prev_event_id, alg, + hash_bytes ) for auth_id, _ in event.auth_events: @@ -309,7 +320,7 @@ class EventsStore(SQLBaseStore): (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) self._store_event_reference_hash_txn( - txn, event.event_id, ref_alg, ref_hash_bytes + txn, invalidates, event.event_id, ref_alg, ref_hash_bytes ) if event.is_state(): @@ -356,9 +367,11 @@ class EventsStore(SQLBaseStore): } ) - def _store_redaction(self, txn, event): + return invalidates + + def _store_redaction(self, txn, invalidates, event): # invalidate the cache for the redacted event - self._invalidate_get_event_cache(event.redacts) + invalidates.append((self._invalidate_get_event_cache, event.redacts)) txn.execute( "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", (event.event_id, event.redacts) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index f956377632..d42d7ff0e3 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -162,7 +162,7 @@ class RoomStore(SQLBaseStore): defer.returnValue(ret) - def _store_room_topic_txn(self, txn, event): + def _store_room_topic_txn(self, txn, invalidates, event): if hasattr(event, "content") and "topic" in event.content: self._simple_insert_txn( txn, @@ -174,7 +174,7 @@ class RoomStore(SQLBaseStore): }, ) - def _store_room_name_txn(self, txn, event): + def _store_room_name_txn(self, txn, invalidates, event): if hasattr(event, "content") and "name" in event.content: self._simple_insert_txn( txn, diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 09fb77a194..117da817ba 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -35,7 +35,7 @@ RoomsForUser = namedtuple( class RoomMemberStore(SQLBaseStore): - def _store_room_member_txn(self, txn, event): + def _store_room_member_txn(self, txn, invalidates, event): """Store a room member in the database. """ try: @@ -64,8 +64,10 @@ class RoomMemberStore(SQLBaseStore): } ) - self.get_rooms_for_user.invalidate(target_user_id) - self.get_joined_hosts_for_room.invalidate(event.room_id) + invalidates.extend([ + (self.get_rooms_for_user.invalidate, target_user_id), + (self.get_joined_hosts_for_room.invalidate, event.room_id), + ]) def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index f051828630..e3979846e7 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -39,8 +39,8 @@ class SignatureStore(SQLBaseStore): txn.execute(query, (event_id, )) return dict(txn.fetchall()) - def _store_event_content_hash_txn(self, txn, event_id, algorithm, - hash_bytes): + def _store_event_content_hash_txn(self, txn, invalidates, event_id, + algorithm, hash_bytes): """Store a hash for a Event Args: txn (cursor): @@ -101,8 +101,8 @@ class SignatureStore(SQLBaseStore): txn.execute(query, (event_id, )) return {k: v for k, v in txn.fetchall()} - def _store_event_reference_hash_txn(self, txn, event_id, algorithm, - hash_bytes): + def _store_event_reference_hash_txn(self, txn, invalidates, event_id, + algorithm, hash_bytes): """Store a hash for a PDU Args: txn (cursor): @@ -184,8 +184,8 @@ class SignatureStore(SQLBaseStore): hashes[algorithm] = hash_bytes return results - def _store_prev_event_hash_txn(self, txn, event_id, prev_event_id, - algorithm, hash_bytes): + def _store_prev_event_hash_txn(self, txn, invalidates, event_id, + prev_event_id, algorithm, hash_bytes): self._simple_insert_txn( txn, "event_edge_hashes", diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 7e55e8bed6..35d11c27cc 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -82,7 +82,7 @@ class StateStore(SQLBaseStore): f, ) - def _store_state_groups_txn(self, txn, event, context): + def _store_state_groups_txn(self, txn, invalidates, event, context): if context.current_state is None: return -- cgit 1.4.1 From 43c2e8deae5f7e2b339ab5c131391231886cad09 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 May 2015 15:13:25 +0100 Subject: Add support for using executemany --- synapse/storage/_base.py | 54 ++++++++++++++++++++++++++++--------- synapse/storage/event_federation.py | 40 ++++++++++++++------------- synapse/storage/events.py | 46 +++++++++++++++++-------------- synapse/storage/state.py | 16 ++++++----- tests/storage/test_base.py | 4 +-- 5 files changed, 99 insertions(+), 61 deletions(-) (limited to 'synapse/storage/event_federation.py') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e01c61d08d..b7c3cf03c8 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -160,18 +160,23 @@ class LoggingTransaction(object): def __setattr__(self, name, value): setattr(self.txn, name, value) - def execute(self, sql, *args, **kwargs): + def execute(self, sql, *args): + self._do_execute(self.txn.execute, sql, *args) + + def executemany(self, sql, *args): + self._do_execute(self.txn.executemany, sql, *args) + + def _do_execute(self, func, sql, *args): # TODO(paul): Maybe use 'info' and 'debug' for values? sql_logger.debug("[SQL] {%s} %s", self.name, sql) sql = self.database_engine.convert_param_style(sql) - if args and args[0]: + if args: try: sql_logger.debug( - "[SQL values] {%s} " + ", ".join(("<%r>",) * len(args[0])), - self.name, - *args[0] + "[SQL values] {%s} %r", + self.name, args[0] ) except: # Don't let logging failures stop SQL from working @@ -180,8 +185,8 @@ class LoggingTransaction(object): start = time.time() * 1000 try: - return self.txn.execute( - sql, *args, **kwargs + return func( + sql, *args ) except Exception as e: logger.debug("[SQL FAIL] {%s} %s", self.name, e) @@ -434,18 +439,41 @@ class SQLBaseStore(object): @log_function def _simple_insert_txn(self, txn, table, values): + keys, vals = zip(*values.items()) + sql = "INSERT INTO %s (%s) VALUES(%s)" % ( table, - ", ".join(k for k in values), - ", ".join("?" for k in values) + ", ".join(k for k in keys), + ", ".join("?" for _ in keys) ) - logger.debug( - "[SQL] %s Args=%s", - sql, values.values(), + txn.execute(sql, vals) + + def _simple_insert_many_txn(self, txn, table, values): + if not values: + return + + keys, vals = zip(*[ + zip( + *(sorted(i.items(), key=lambda kv: kv[0])) + ) + for i in values + if i + ]) + + for k in keys: + if k != keys[0]: + raise RuntimeError( + "All items must have the same keys" + ) + + sql = "INSERT INTO %s (%s) VALUES(%s)" % ( + table, + ", ".join(k for k in keys[0]), + ", ".join("?" for _ in keys[0]) ) - txn.execute(sql, values.values()) + txn.executemany(sql, vals) def _simple_upsert(self, table, keyvalues, values, insertion_values={}, desc="_simple_upsert", lock=True): diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 68f39bd684..0aca4ba17b 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -262,18 +262,19 @@ class EventFederationStore(SQLBaseStore): For the given event, update the event edges table and forward and backward extremities tables. """ - for e_id, _ in prev_events: - # TODO (erikj): This could be done as a bulk insert - self._simple_insert_txn( - txn, + self._simple_insert_many_txn( + txn, table="event_edges", - values={ - "event_id": event_id, - "prev_event_id": e_id, - "room_id": room_id, - "is_state": False, - }, - ) + values=[ + { + "event_id": event_id, + "prev_event_id": e_id, + "room_id": room_id, + "is_state": False, + } + for e_id, _ in prev_events + ], + ) # Update the extremities table if this is not an outlier. if not outlier: @@ -307,16 +308,17 @@ class EventFederationStore(SQLBaseStore): # Insert all the prev_events as a backwards thing, they'll get # deleted in a second if they're incorrect anyway. - for e_id, _ in prev_events: - # TODO (erikj): This could be done as a bulk insert - self._simple_insert_txn( - txn, - table="event_backward_extremities", - values={ + self._simple_insert_many_txn( + txn, + table="event_backward_extremities", + values=[ + { "event_id": e_id, "room_id": room_id, - }, - ) + } + for e_id, _ in prev_events + ], + ) # Also delete from the backwards extremities table all ones that # reference events that we have already seen diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a3c260ddc4..84e446a99c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -113,17 +113,19 @@ class EventsStore(SQLBaseStore): keyvalues={"room_id": event.room_id}, ) - for s in current_state: - self._simple_insert_txn( - txn, - "current_state_events", + self._simple_insert_many_txn( + txn, + "current_state_events", + [ { "event_id": s.event_id, "room_id": s.room_id, "type": s.type, "state_key": s.state_key, - }, - ) + } + for s in current_state + ], + ) if event.is_state() and is_new_state: if not backfilled and not context.rejected: @@ -296,16 +298,18 @@ class EventsStore(SQLBaseStore): txn, event.event_id, prev_event_id, alg, hash_bytes ) - for auth_id, _ in event.auth_events: - self._simple_insert_txn( - txn, - table="event_auth", - values={ + self._simple_insert_many_txn( + txn, + table="event_auth", + values=[ + { "event_id": event.event_id, "room_id": event.room_id, "auth_id": auth_id, - }, - ) + } + for auth_id, _ in event.auth_events + ], + ) (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) self._store_event_reference_hash_txn( @@ -330,17 +334,19 @@ class EventsStore(SQLBaseStore): vals, ) - for e_id, h in event.prev_state: - self._simple_insert_txn( - txn, - table="event_edges", - values={ + self._simple_insert_many_txn( + txn, + table="event_edges", + values=[ + { "event_id": event.event_id, "prev_event_id": e_id, "room_id": event.room_id, "is_state": True, - }, - ) + } + for e_id, h in event.prev_state + ], + ) if is_new_state and not context.rejected: self._simple_upsert_txn( diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 7e55e8bed6..dbc0e49c1f 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -104,18 +104,20 @@ class StateStore(SQLBaseStore): }, ) - for state in state_events.values(): - self._simple_insert_txn( - txn, - table="state_groups_state", - values={ + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { "state_group": state_group, "room_id": state.room_id, "type": state.type, "state_key": state.state_key, "event_id": state.event_id, - }, - ) + } + for state in state_events.values() + ], + ) self._simple_insert_txn( txn, diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py index a64d2b821e..8c348ecc95 100644 --- a/tests/storage/test_base.py +++ b/tests/storage/test_base.py @@ -67,7 +67,7 @@ class SQLBaseStoreTestCase(unittest.TestCase): self.mock_txn.execute.assert_called_with( "INSERT INTO tablename (columname) VALUES(?)", - ["Value"] + ("Value",) ) @defer.inlineCallbacks @@ -82,7 +82,7 @@ class SQLBaseStoreTestCase(unittest.TestCase): self.mock_txn.execute.assert_called_with( "INSERT INTO tablename (colA, colB, colC) VALUES(?, ?, ?)", - [1, 2, 3] + (1, 2, 3,) ) @defer.inlineCallbacks -- cgit 1.4.1 From bdcd7693c8b954c9a7895339d4727c17221d4d9d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 May 2015 15:14:48 +0100 Subject: Fix indentation --- synapse/storage/event_federation.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) (limited to 'synapse/storage/event_federation.py') diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 0aca4ba17b..36b1feac60 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -264,16 +264,16 @@ class EventFederationStore(SQLBaseStore): """ self._simple_insert_many_txn( txn, - table="event_edges", - values=[ - { - "event_id": event_id, - "prev_event_id": e_id, - "room_id": room_id, - "is_state": False, - } - for e_id, _ in prev_events - ], + table="event_edges", + values=[ + { + "event_id": event_id, + "prev_event_id": e_id, + "room_id": room_id, + "is_state": False, + } + for e_id, _ in prev_events + ], ) # Update the extremities table if this is not an outlier. -- cgit 1.4.1 From d18f37e026a02b4e899bc96e600850007a613189 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 May 2015 17:32:21 +0100 Subject: Collect the invalidate callbacks on the transaction object rather than passing around a separate list --- synapse/storage/_base.py | 18 ++++++++++---- synapse/storage/event_federation.py | 10 ++++---- synapse/storage/events.py | 48 ++++++++++++++++--------------------- synapse/storage/room.py | 4 ++-- synapse/storage/roommember.py | 8 +++---- synapse/storage/signatures.py | 12 +++++----- synapse/storage/state.py | 2 +- 7 files changed, 51 insertions(+), 51 deletions(-) (limited to 'synapse/storage/event_federation.py') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 579ed56377..ccf9697fa3 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -185,12 +185,16 @@ class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object passed to the constructor. Adds logging and metrics to the .execute() method.""" - __slots__ = ["txn", "name", "database_engine"] + __slots__ = ["txn", "name", "database_engine", "after_callbacks"] - def __init__(self, txn, name, database_engine): + def __init__(self, txn, name, database_engine, after_callbacks): object.__setattr__(self, "txn", txn) object.__setattr__(self, "name", name) object.__setattr__(self, "database_engine", database_engine) + object.__setattr__(self, "after_callbacks", after_callbacks) + + def call_after(self, callback, *args): + self.after_callbacks.append((callback, args)) def __getattr__(self, name): return getattr(self.txn, name) @@ -336,6 +340,8 @@ class SQLBaseStore(object): start_time = time.time() * 1000 + after_callbacks = [] + def inner_func(conn, *args, **kwargs): with LoggingContext("runInteraction") as context: if self.database_engine.is_connection_closed(conn): @@ -360,10 +366,10 @@ class SQLBaseStore(object): while True: try: txn = conn.cursor() - return func( - LoggingTransaction(txn, name, self.database_engine), - *args, **kwargs + txn = LoggingTransaction( + txn, name, self.database_engine, after_callbacks ) + return func(txn, *args, **kwargs) except self.database_engine.module.OperationalError as e: # This can happen if the database disappears mid # transaction. @@ -412,6 +418,8 @@ class SQLBaseStore(object): result = yield self._db_pool.runWithConnection( inner_func, *args, **kwargs ) + for after_callback, after_args in after_callbacks: + after_callback(*after_args) defer.returnValue(result) def cursor_to_dict(self, cursor): diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 3cd3fbdc9b..893344eff3 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -241,7 +241,7 @@ class EventFederationStore(SQLBaseStore): return int(min_depth) if min_depth is not None else None - def _update_min_depth_for_room_txn(self, txn, invalidates, room_id, depth): + def _update_min_depth_for_room_txn(self, txn, room_id, depth): min_depth = self._get_min_depth_interaction(txn, room_id) do_insert = depth < min_depth if min_depth else True @@ -256,8 +256,8 @@ class EventFederationStore(SQLBaseStore): }, ) - def _handle_prev_events(self, txn, invalidates, outlier, event_id, - prev_events, room_id): + def _handle_prev_events(self, txn, outlier, event_id, prev_events, + room_id): """ For the given event, update the event edges table and forward and backward extremities tables. @@ -330,9 +330,9 @@ class EventFederationStore(SQLBaseStore): ) txn.execute(query) - invalidates.append(( + txn.call_after( self.get_latest_event_ids_in_room.invalidate, room_id - )) + ) def get_backfill_events(self, room_id, event_list, limit): """Get a list of Events for a given topic that occurred before (and diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 7dc49ceed6..17f9d27289 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -42,7 +42,7 @@ class EventsStore(SQLBaseStore): stream_ordering = self.min_token try: - invalidates = yield self.runInteraction( + yield self.runInteraction( "persist_event", self._persist_event_txn, event=event, @@ -52,11 +52,6 @@ class EventsStore(SQLBaseStore): is_new_state=is_new_state, current_state=current_state, ) - for invalidated in invalidates: - invalidated_callback = invalidated[0] - invalidated_args = invalidated[1:] - invalidated_callback(*invalidated_args) - except _RollbackButIsFineException: pass @@ -96,10 +91,9 @@ class EventsStore(SQLBaseStore): def _persist_event_txn(self, txn, event, context, backfilled, stream_ordering=None, is_new_state=True, current_state=None): - invalidates = [] # Remove the any existing cache entries for the event_id - invalidates.append((self._invalidate_get_event_cache, event.event_id)) + txn.call_after(self._invalidate_get_event_cache, event.event_id) if stream_ordering is None: with self._stream_id_gen.get_next_txn(txn) as stream_ordering: @@ -121,10 +115,12 @@ class EventsStore(SQLBaseStore): for s in current_state: if s.type == EventTypes.Member: - invalidates.extend([ - (self.get_rooms_for_user.invalidate, s.state_key), - (self.get_joined_hosts_for_room.invalidate, s.room_id), - ]) + txn.call_after( + self.get_rooms_for_user.invalidate, s.state_key + ) + txn.call_after( + self.get_joined_hosts_for_room.invalidate, s.room_id + ) self._simple_insert_txn( txn, "current_state_events", @@ -161,11 +157,10 @@ class EventsStore(SQLBaseStore): outlier = event.internal_metadata.is_outlier() if not outlier: - self._store_state_groups_txn(txn, invalidates, event, context) + self._store_state_groups_txn(txn, event, context) self._update_min_depth_for_room_txn( txn, - invalidates, event.room_id, event.depth ) @@ -207,11 +202,10 @@ class EventsStore(SQLBaseStore): sql, (False, event.event_id,) ) - return invalidates + return self._handle_prev_events( txn, - invalidates, outlier=outlier, event_id=event.event_id, prev_events=event.prev_events, @@ -219,13 +213,13 @@ class EventsStore(SQLBaseStore): ) if event.type == EventTypes.Member: - self._store_room_member_txn(txn, invalidates, event) + self._store_room_member_txn(txn, event) elif event.type == EventTypes.Name: - self._store_room_name_txn(txn, invalidates, event) + self._store_room_name_txn(txn, event) elif event.type == EventTypes.Topic: - self._store_room_topic_txn(txn, invalidates, event) + self._store_room_topic_txn(txn, event) elif event.type == EventTypes.Redaction: - self._store_redaction(txn, invalidates, event) + self._store_redaction(txn, event) event_dict = { k: v @@ -295,20 +289,20 @@ class EventsStore(SQLBaseStore): if context.rejected: self._store_rejections_txn( - txn, invalidates, event.event_id, context.rejected + txn, event.event_id, context.rejected ) for hash_alg, hash_base64 in event.hashes.items(): hash_bytes = decode_base64(hash_base64) self._store_event_content_hash_txn( - txn, invalidates, event.event_id, hash_alg, hash_bytes, + txn, event.event_id, hash_alg, hash_bytes, ) for prev_event_id, prev_hashes in event.prev_events: for alg, hash_base64 in prev_hashes.items(): hash_bytes = decode_base64(hash_base64) self._store_prev_event_hash_txn( - txn, invalidates, event.event_id, prev_event_id, alg, + txn, event.event_id, prev_event_id, alg, hash_bytes ) @@ -325,7 +319,7 @@ class EventsStore(SQLBaseStore): (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) self._store_event_reference_hash_txn( - txn, invalidates, event.event_id, ref_alg, ref_hash_bytes + txn, event.event_id, ref_alg, ref_hash_bytes ) if event.is_state(): @@ -372,11 +366,11 @@ class EventsStore(SQLBaseStore): } ) - return invalidates + return - def _store_redaction(self, txn, invalidates, event): + def _store_redaction(self, txn, event): # invalidate the cache for the redacted event - invalidates.append((self._invalidate_get_event_cache, event.redacts)) + txn.call_after(self._invalidate_get_event_cache, event.redacts) txn.execute( "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", (event.event_id, event.redacts) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index d42d7ff0e3..f956377632 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -162,7 +162,7 @@ class RoomStore(SQLBaseStore): defer.returnValue(ret) - def _store_room_topic_txn(self, txn, invalidates, event): + def _store_room_topic_txn(self, txn, event): if hasattr(event, "content") and "topic" in event.content: self._simple_insert_txn( txn, @@ -174,7 +174,7 @@ class RoomStore(SQLBaseStore): }, ) - def _store_room_name_txn(self, txn, invalidates, event): + def _store_room_name_txn(self, txn, event): if hasattr(event, "content") and "name" in event.content: self._simple_insert_txn( txn, diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 117da817ba..839c74f63a 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -35,7 +35,7 @@ RoomsForUser = namedtuple( class RoomMemberStore(SQLBaseStore): - def _store_room_member_txn(self, txn, invalidates, event): + def _store_room_member_txn(self, txn, event): """Store a room member in the database. """ try: @@ -64,10 +64,8 @@ class RoomMemberStore(SQLBaseStore): } ) - invalidates.extend([ - (self.get_rooms_for_user.invalidate, target_user_id), - (self.get_joined_hosts_for_room.invalidate, event.room_id), - ]) + txn.call_after(self.get_rooms_for_user.invalidate, target_user_id) + txn.call_after(self.get_joined_hosts_for_room.invalidate, event.room_id) def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index e3979846e7..f051828630 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -39,8 +39,8 @@ class SignatureStore(SQLBaseStore): txn.execute(query, (event_id, )) return dict(txn.fetchall()) - def _store_event_content_hash_txn(self, txn, invalidates, event_id, - algorithm, hash_bytes): + def _store_event_content_hash_txn(self, txn, event_id, algorithm, + hash_bytes): """Store a hash for a Event Args: txn (cursor): @@ -101,8 +101,8 @@ class SignatureStore(SQLBaseStore): txn.execute(query, (event_id, )) return {k: v for k, v in txn.fetchall()} - def _store_event_reference_hash_txn(self, txn, invalidates, event_id, - algorithm, hash_bytes): + def _store_event_reference_hash_txn(self, txn, event_id, algorithm, + hash_bytes): """Store a hash for a PDU Args: txn (cursor): @@ -184,8 +184,8 @@ class SignatureStore(SQLBaseStore): hashes[algorithm] = hash_bytes return results - def _store_prev_event_hash_txn(self, txn, invalidates, event_id, - prev_event_id, algorithm, hash_bytes): + def _store_prev_event_hash_txn(self, txn, event_id, prev_event_id, + algorithm, hash_bytes): self._simple_insert_txn( txn, "event_edge_hashes", diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 35d11c27cc..7e55e8bed6 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -82,7 +82,7 @@ class StateStore(SQLBaseStore): f, ) - def _store_state_groups_txn(self, txn, invalidates, event, context): + def _store_state_groups_txn(self, txn, event, context): if context.current_state is None: return -- cgit 1.4.1