diff options
author | Erik Johnston <erik@matrix.org> | 2016-05-06 15:59:20 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-05-06 15:59:20 +0100 |
commit | 012cb5416cded6453e676696d74cab871db4aaf8 (patch) | |
tree | 71f21e4d6d4a6fd4037cfbf0a200667d29f46247 /synapse/storage | |
parent | Delete old pushers (diff) | |
parent | Merge pull request #767 from matrix-org/erikj/transaction_txn (diff) | |
download | synapse-012cb5416cded6453e676696d74cab871db4aaf8.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/push_actions_delete
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/__init__.py | 5 | ||||
-rw-r--r-- | synapse/storage/openid.py | 32 | ||||
-rw-r--r-- | synapse/storage/prepare_database.py | 2 | ||||
-rw-r--r-- | synapse/storage/room.py | 18 | ||||
-rw-r--r-- | synapse/storage/schema/delta/32/openid.sql | 9 | ||||
-rw-r--r-- | synapse/storage/schema/delta/32/reports.sql | 25 | ||||
-rw-r--r-- | synapse/storage/transactions.py | 165 |
7 files changed, 210 insertions, 46 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 045ae6c03f..d970fde9e8 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -44,6 +44,7 @@ from .receipts import ReceiptsStore from .search import SearchStore from .tags import TagsStore from .account_data import AccountDataStore +from .openid import OpenIdStore from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator @@ -81,7 +82,8 @@ class DataStore(RoomMemberStore, RoomStore, SearchStore, TagsStore, AccountDataStore, - EventPushActionsStore + EventPushActionsStore, + OpenIdStore, ): def __init__(self, db_conn, hs): @@ -114,6 +116,7 @@ class DataStore(RoomMemberStore, RoomStore, self._state_groups_id_gen = StreamIdGenerator(db_conn, "state_groups", "id") self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id") self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id") + self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id") self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id") self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id") self._push_rules_stream_id_gen = ChainedIdGenerator( diff --git a/synapse/storage/openid.py b/synapse/storage/openid.py new file mode 100644 index 0000000000..5dabb607bd --- /dev/null +++ b/synapse/storage/openid.py @@ -0,0 +1,32 @@ +from ._base import SQLBaseStore + + +class OpenIdStore(SQLBaseStore): + def insert_open_id_token(self, token, ts_valid_until_ms, user_id): + return self._simple_insert( + table="open_id_tokens", + values={ + "token": token, + "ts_valid_until_ms": ts_valid_until_ms, + "user_id": user_id, + }, + desc="insert_open_id_token" + ) + + def get_user_id_for_open_id_token(self, token, ts_now_ms): + def get_user_id_for_token_txn(txn): + sql = ( + "SELECT user_id FROM open_id_tokens" + " WHERE token = ? AND ? <= ts_valid_until_ms" + ) + + txn.execute(sql, (token, ts_now_ms)) + + rows = txn.fetchall() + if not rows: + return None + else: + return rows[0][0] + return self.runInteraction( + "get_user_id_for_token", get_user_id_for_token_txn + ) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 57f14fd12b..c8487c8838 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 31 +SCHEMA_VERSION = 32 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 70aa64fb31..26933e593a 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -23,6 +23,7 @@ from .engines import PostgresEngine, Sqlite3Engine import collections import logging +import ujson as json logger = logging.getLogger(__name__) @@ -221,3 +222,20 @@ class RoomStore(SQLBaseStore): aliases.extend(e.content['aliases']) defer.returnValue((name, aliases)) + + def add_event_report(self, room_id, event_id, user_id, reason, content, + received_ts): + next_id = self._event_reports_id_gen.get_next() + return self._simple_insert( + table="event_reports", + values={ + "id": next_id, + "received_ts": received_ts, + "room_id": room_id, + "event_id": event_id, + "user_id": user_id, + "reason": reason, + "content": json.dumps(content), + }, + desc="add_event_report" + ) diff --git a/synapse/storage/schema/delta/32/openid.sql b/synapse/storage/schema/delta/32/openid.sql new file mode 100644 index 0000000000..36f37b11c8 --- /dev/null +++ b/synapse/storage/schema/delta/32/openid.sql @@ -0,0 +1,9 @@ + +CREATE TABLE open_id_tokens ( + token TEXT NOT NULL PRIMARY KEY, + ts_valid_until_ms bigint NOT NULL, + user_id TEXT NOT NULL, + UNIQUE (token) +); + +CREATE index open_id_tokens_ts_valid_until_ms ON open_id_tokens(ts_valid_until_ms); diff --git a/synapse/storage/schema/delta/32/reports.sql b/synapse/storage/schema/delta/32/reports.sql new file mode 100644 index 0000000000..d13609776f --- /dev/null +++ b/synapse/storage/schema/delta/32/reports.sql @@ -0,0 +1,25 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE event_reports( + id BIGINT NOT NULL PRIMARY KEY, + received_ts BIGINT NOT NULL, + room_id TEXT NOT NULL, + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + reason TEXT, + content TEXT +); diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index d338dfcf0a..6c7481a728 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -16,16 +16,56 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached +from twisted.internet import defer, reactor + from canonicaljson import encode_canonical_json + +from collections import namedtuple + +import itertools import logging logger = logging.getLogger(__name__) +_TransactionRow = namedtuple( + "_TransactionRow", ( + "id", "transaction_id", "destination", "ts", "response_code", + "response_json", + ) +) + +_UpdateTransactionRow = namedtuple( + "_TransactionRow", ( + "response_code", "response_json", + ) +) + + class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. """ + def __init__(self, hs): + super(TransactionStore, self).__init__(hs) + + # New transactions that are currently in flights + self.inflight_transactions = {} + + # Newly delievered transactions that *weren't* persisted while in flight + self.new_delivered_transactions = {} + + # Newly delivered transactions that *were* persisted while in flight + self.update_delivered_transactions = {} + + self.last_transaction = {} + + reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns) + hs.get_clock().looping_call( + self._persist_in_mem_txns, + 1000, + ) + def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have already responded to it. If so, return the response code and response @@ -108,17 +148,30 @@ class TransactionStore(SQLBaseStore): list: A list of previous transaction ids. """ - return self.runInteraction( - "prep_send_transaction", - self._prep_send_transaction, - transaction_id, destination, origin_server_ts + auto_id = self._transaction_id_gen.get_next() + + txn_row = _TransactionRow( + id=auto_id, + transaction_id=transaction_id, + destination=destination, + ts=origin_server_ts, + response_code=0, + response_json=None, ) - def _prep_send_transaction(self, txn, transaction_id, destination, - origin_server_ts): + self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row - next_id = self._transaction_id_gen.get_next() + prev_txn = self.last_transaction.get(destination) + if prev_txn: + return defer.succeed(prev_txn) + else: + return self.runInteraction( + "_get_prevs_txn", + self._get_prevs_txn, + destination, + ) + def _get_prevs_txn(self, txn, destination): # First we find out what the prev_txns should be. # Since we know that we are only sending one transaction at a time, # we can simply take the last one. @@ -133,23 +186,6 @@ class TransactionStore(SQLBaseStore): prev_txns = [r["transaction_id"] for r in results] - # Actually add the new transaction to the sent_transactions table. - - self._simple_insert_txn( - txn, - table="sent_transactions", - values={ - "id": next_id, - "transaction_id": transaction_id, - "destination": destination, - "ts": origin_server_ts, - "response_code": 0, - "response_json": None, - } - ) - - # TODO Update the tx id -> pdu id mapping - return prev_txns def delivered_txn(self, transaction_id, destination, code, response_dict): @@ -161,27 +197,23 @@ class TransactionStore(SQLBaseStore): code (int) response_json (str) """ - return self.runInteraction( - "delivered_txn", - self._delivered_txn, - transaction_id, destination, code, - buffer(encode_canonical_json(response_dict)), - ) - def _delivered_txn(self, txn, transaction_id, destination, - code, response_json): - self._simple_update_one_txn( - txn, - table="sent_transactions", - keyvalues={ - "transaction_id": transaction_id, - "destination": destination, - }, - updatevalues={ - "response_code": code, - "response_json": None, # For now, don't persist response_json - } - ) + txn_row = self.inflight_transactions.get( + destination, {} + ).pop(transaction_id, None) + + self.last_transaction[destination] = transaction_id + + if txn_row: + d = self.new_delivered_transactions.setdefault(destination, {}) + d[transaction_id] = txn_row._replace( + response_code=code, + response_json=None, # For now, don't persist response + ) + else: + d = self.update_delivered_transactions.setdefault(destination, {}) + # For now, don't persist response + d[transaction_id] = _UpdateTransactionRow(code, None) def get_transactions_after(self, transaction_id, destination): """Get all transactions after a given local transaction_id. @@ -305,3 +337,48 @@ class TransactionStore(SQLBaseStore): txn.execute(query, (self._clock.time_msec(),)) return self.cursor_to_dict(txn) + + @defer.inlineCallbacks + def _persist_in_mem_txns(self): + try: + inflight = self.inflight_transactions + new_delivered = self.new_delivered_transactions + update_delivered = self.update_delivered_transactions + + self.inflight_transactions = {} + self.new_delivered_transactions = {} + self.update_delivered_transactions = {} + + full_rows = [ + row._asdict() + for txn_map in itertools.chain(inflight.values(), new_delivered.values()) + for row in txn_map.values() + ] + + def f(txn): + if full_rows: + self._simple_insert_many_txn( + txn=txn, + table="sent_transactions", + values=full_rows + ) + + for dest, txn_map in update_delivered.items(): + for txn_id, update_row in txn_map.items(): + self._simple_update_one_txn( + txn, + table="sent_transactions", + keyvalues={ + "transaction_id": txn_id, + "destination": dest, + }, + updatevalues={ + "response_code": update_row.response_code, + "response_json": None, # For now, don't persist response + } + ) + + if full_rows or update_delivered: + yield self.runInteraction("_persist_in_mem_txns", f) + except: + logger.exception("Failed to persist transactions!") |