From 9728c305a34a1f9546d2ce0ef4c54352dc55a16d Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 18 Dec 2014 14:49:22 +0000 Subject: after a few rethinks, a working implementation of pushers. --- synapse/storage/_base.py | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) (limited to 'synapse/storage/_base.py') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 4881f03368..eb8cc4a9f3 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -195,6 +195,51 @@ class SQLBaseStore(object): txn.execute(sql, values.values()) return txn.lastrowid + def _simple_upsert(self, table, keyvalues, values): + """ + :param table: The table to upsert into + :param keyvalues: Dict of the unique key tables and their new values + :param values: Dict of all the nonunique columns and their new values + :return: A deferred + """ + return self.runInteraction( + "_simple_upsert", + self._simple_upsert_txn, table, keyvalues, values + ) + + def _simple_upsert_txn(self, txn, table, keyvalues, values): + # Try to update + sql = "UPDATE %s SET %s WHERE %s" % ( + table, + ", ".join("%s = ?" % (k) for k in values), + " AND ".join("%s = ?" % (k) for k in keyvalues) + ) + sqlargs = values.values() + keyvalues.values() + logger.debug( + "[SQL] %s Args=%s", + sql, sqlargs, + ) + + txn.execute(sql, sqlargs) + if txn.rowcount == 0: + # We didn't update and rows so insert a new one + allvalues = {} + allvalues.update(keyvalues) + allvalues.update(values) + + sql = "INSERT INTO %s (%s) VALUES (%s)" % ( + table, + ", ".join(k for k in allvalues), + ", ".join("?" for _ in allvalues) + ) + logger.debug( + "[SQL] %s Args=%s", + sql, keyvalues.values(), + ) + txn.execute(sql, allvalues.values()) + + + def _simple_select_one(self, table, keyvalues, retcols, allow_none=False): """Executes a SELECT query on the named table, which is expected to -- cgit 1.4.1 From b1b85753d759f7127fbb1c4a95005fffd3da7f4d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 22 Jan 2015 15:50:17 +0000 Subject: Add support for storing rejected events in EventContext and data stores --- synapse/events/snapshot.py | 1 + synapse/storage/__init__.py | 11 ++++++++--- synapse/storage/_base.py | 21 +++++++++++++-------- synapse/storage/rejections.py | 33 +++++++++++++++++++++++++++++++++ synapse/storage/schema/delta/v12.sql | 21 +++++++++++++++++++++ synapse/storage/schema/im.sql | 7 +++++++ 6 files changed, 83 insertions(+), 11 deletions(-) create mode 100644 synapse/storage/rejections.py create mode 100644 synapse/storage/schema/delta/v12.sql (limited to 'synapse/storage/_base.py') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 6bbba8d6ba..7e98bdef28 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -20,3 +20,4 @@ class EventContext(object): self.current_state = current_state self.auth_events = auth_events self.state_group = None + self.rejected = False diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 4beb951b9f..015fcc8775 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -30,6 +30,7 @@ from .transactions import TransactionStore from .keys import KeyStore from .event_federation import EventFederationStore from .media_repository import MediaRepositoryStore +from .rejections import RejectionsStore from .state import StateStore from .signatures import SignatureStore @@ -66,7 +67,7 @@ SCHEMAS = [ # Remember to update this number every time an incompatible change is made to # database schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 11 +SCHEMA_VERSION = 12 class _RollbackButIsFineException(Exception): @@ -82,6 +83,7 @@ class DataStore(RoomMemberStore, RoomStore, DirectoryStore, KeyStore, StateStore, SignatureStore, EventFederationStore, MediaRepositoryStore, + RejectionsStore, ): def __init__(self, hs): @@ -224,6 +226,9 @@ class DataStore(RoomMemberStore, RoomStore, if not outlier: self._store_state_groups_txn(txn, event, context) + if context.rejected: + self._store_rejections_txn(txn, event.event_id, context.rejected) + if current_state: txn.execute( "DELETE FROM current_state_events WHERE room_id = ?", @@ -262,7 +267,7 @@ class DataStore(RoomMemberStore, RoomStore, or_replace=True, ) - if is_new_state: + if is_new_state and not context.rejected: self._simple_insert_txn( txn, "current_state_events", @@ -288,7 +293,7 @@ class DataStore(RoomMemberStore, RoomStore, or_ignore=True, ) - if not backfilled: + if not backfilled and not context.rejected: self._simple_insert_txn( txn, table="state_forward_extremities", diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index f660fc6eaf..2075a018b2 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -458,10 +458,12 @@ class SQLBaseStore(object): return [e for e in events if e] def _get_event_txn(self, txn, event_id, check_redacted=True, - get_prev_content=False): + get_prev_content=False, allow_rejected=False): sql = ( - "SELECT internal_metadata, json, r.event_id FROM event_json as e " + "SELECT internal_metadata, json, r.event_id, reason " + "FROM event_json as e " "LEFT JOIN redactions as r ON e.event_id = r.redacts " + "LEFT JOIN rejections as rej on rej.event_id = e.event_id " "WHERE e.event_id = ? " "LIMIT 1 " ) @@ -473,13 +475,16 @@ class SQLBaseStore(object): if not res: return None - internal_metadata, js, redacted = res + internal_metadata, js, redacted, rejected_reason = res - return self._get_event_from_row_txn( - txn, internal_metadata, js, redacted, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - ) + if allow_rejected or not rejected_reason: + return self._get_event_from_row_txn( + txn, internal_metadata, js, redacted, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + ) + else: + return None def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, check_redacted=True, get_prev_content=False): diff --git a/synapse/storage/rejections.py b/synapse/storage/rejections.py new file mode 100644 index 0000000000..7d38b31f44 --- /dev/null +++ b/synapse/storage/rejections.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore + +import logging + +logger = logging.getLogger(__name__) + + +class RejectionsStore(SQLBaseStore): + def _store_rejections_txn(self, txn, event_id, reason): + self._simple_insert_txn( + txn, + table="rejections", + values={ + "event_id": event_id, + "reason": reason, + "last_failure": self._clock.time_msec(), + } + ) diff --git a/synapse/storage/schema/delta/v12.sql b/synapse/storage/schema/delta/v12.sql new file mode 100644 index 0000000000..bd2a8b1bb5 --- /dev/null +++ b/synapse/storage/schema/delta/v12.sql @@ -0,0 +1,21 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS rejections( + event_id TEXT NOT NULL, + reason TEXT NOT NULL, + last_check TEXT NOT NULL, + CONSTRAINT ev_id UNIQUE (event_id) ON CONFLICT REPLACE +); diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index dd00c1cd2f..bc7c6b6ed5 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -123,3 +123,10 @@ CREATE TABLE IF NOT EXISTS room_hosts( ); CREATE INDEX IF NOT EXISTS room_hosts_room_id ON room_hosts (room_id); + +CREATE TABLE IF NOT EXISTS rejections( + event_id TEXT NOT NULL, + reason TEXT NOT NULL, + last_check TEXT NOT NULL, + CONSTRAINT ev_id UNIQUE (event_id) ON CONFLICT REPLACE +); -- cgit 1.4.1 From 0cbb6b0f5235e4501a0fb360e881d152644a17cd Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 28 Jan 2015 14:44:41 +0000 Subject: Google doc style --- synapse/storage/_base.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'synapse/storage/_base.py') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 4f172d3967..809c81f47f 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -195,10 +195,11 @@ class SQLBaseStore(object): def _simple_upsert(self, table, keyvalues, values): """ - :param table: The table to upsert into - :param keyvalues: Dict of the unique key tables and their new values - :param values: Dict of all the nonunique columns and their new values - :return: A deferred + Args: + table (str): The table to upsert into + keyvalues (dict): The unique key tables and their new values + values (dict): The nonunique columns and their new values + Returns: A deferred """ return self.runInteraction( "_simple_upsert", -- cgit 1.4.1 From fb0928097a0dc1606aebb9aed8f070bcea304178 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 28 Jan 2015 14:48:07 +0000 Subject: More magic commas (including the place I copied it from...) --- synapse/storage/_base.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/storage/_base.py') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 809c81f47f..9261c999cb 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -210,8 +210,8 @@ class SQLBaseStore(object): # Try to update sql = "UPDATE %s SET %s WHERE %s" % ( table, - ", ".join("%s = ?" % (k) for k in values), - " AND ".join("%s = ?" % (k) for k in keyvalues) + ", ".join("%s = ?" % (k,) for k in values), + " AND ".join("%s = ?" % (k,) for k in keyvalues) ) sqlargs = values.values() + keyvalues.values() logger.debug( @@ -390,8 +390,8 @@ class SQLBaseStore(object): if updatevalues: update_sql = "UPDATE %s SET %s WHERE %s" % ( table, - ", ".join("%s = ?" % (k) for k in updatevalues), - " AND ".join("%s = ?" % (k) for k in keyvalues) + ", ".join("%s = ?" % (k,) for k in updatevalues), + " AND ".join("%s = ?" % (k,) for k in keyvalues) ) def func(txn): -- cgit 1.4.1 From 6d485dd1c727e7ecfe3991066bd058794ae05051 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 28 Jan 2015 14:48:42 +0000 Subject: unnecessary newlines --- synapse/storage/_base.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'synapse/storage/_base.py') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 9261c999cb..4e8bd3faa9 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -237,8 +237,6 @@ class SQLBaseStore(object): ) txn.execute(sql, allvalues.values()) - - def _simple_select_one(self, table, keyvalues, retcols, allow_none=False): """Executes a SELECT query on the named table, which is expected to -- cgit 1.4.1 From 2f4cb04f455d24d0086b37bc363137e995d908d5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Jan 2015 14:48:11 +0000 Subject: Be more specific in naming columns in selects. --- synapse/storage/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/_base.py') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 1f5e74a16a..b350fd61f1 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -504,7 +504,7 @@ class SQLBaseStore(object): def _get_event_txn(self, txn, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False): sql = ( - "SELECT internal_metadata, json, r.event_id, reason " + "SELECT e.internal_metadata, e.json, r.event_id, rej.reason " "FROM event_json as e " "LEFT JOIN redactions as r ON e.event_id = r.redacts " "LEFT JOIN rejections as rej on rej.event_id = e.event_id " -- cgit 1.4.1